xref: /OpenGrok/opengrok-indexer/src/main/java/org/opengrok/indexer/util/BoundedBlockingObjectPool.java (revision c6f0939b1c668e9f8e1e276424439c3106b3a029)
1b5840353SAdam Hornáček /*
2b5840353SAdam Hornáček  * The contents of this file are Copyright (c) 2012, Swaranga Sarma, DZone MVB
3b5840353SAdam Hornáček  * made available under free license,
4b5840353SAdam Hornáček  * http://javawithswaranga.blogspot.com/2011/10/generic-and-concurrent-object-pool.html
5b5840353SAdam Hornáček  * https://dzone.com/articles/generic-and-concurrent-object : "Feel free to use
6b5840353SAdam Hornáček  * it, change it, add more implementations. Happy coding!"
7e829566cSChris Fraire  * Portions Copyright (c) 2017-2018, Chris Fraire <cfraire@me.com>.
8b5840353SAdam Hornáček  */
9b5840353SAdam Hornáček 
109805b761SAdam Hornáček package org.opengrok.indexer.util;
11b5840353SAdam Hornáček 
12b5840353SAdam Hornáček import java.util.concurrent.Callable;
13b5840353SAdam Hornáček import java.util.concurrent.ExecutorService;
14b5840353SAdam Hornáček import java.util.concurrent.Executors;
15e829566cSChris Fraire import java.util.concurrent.LinkedBlockingDeque;
16b5840353SAdam Hornáček import java.util.concurrent.TimeUnit;
17b5840353SAdam Hornáček import java.util.logging.Level;
18b5840353SAdam Hornáček import java.util.logging.Logger;
199805b761SAdam Hornáček import org.opengrok.indexer.logger.LoggerFactory;
20b5840353SAdam Hornáček 
21b5840353SAdam Hornáček /**
22b5840353SAdam Hornáček  * Represents a subclass of {@link AbstractObjectPool} and implementation of
23b5840353SAdam Hornáček  * {@link BlockingObjectPool} with a defined limit of objects and a helper
24b5840353SAdam Hornáček  * to validate instances on {@link #release(java.lang.Object)}.
25b5840353SAdam Hornáček  * <p>An object failing validation is discarded, and a new one is created and
26b5840353SAdam Hornáček  * made available.
27b5840353SAdam Hornáček  * @author Swaranga
28b5840353SAdam Hornáček  * @param <T> the type of objects to pool.
29b5840353SAdam Hornáček  */
30b5840353SAdam Hornáček public final class BoundedBlockingObjectPool<T> extends AbstractObjectPool<T>
31b5840353SAdam Hornáček         implements BlockingObjectPool<T> {
32b5840353SAdam Hornáček 
33b5840353SAdam Hornáček     private static final Logger LOGGER = LoggerFactory.getLogger(
34b5840353SAdam Hornáček         BoundedBlockingObjectPool.class);
35b5840353SAdam Hornáček 
36b5840353SAdam Hornáček     private final int size;
37e829566cSChris Fraire     private final LinkedBlockingDeque<T> objects;
38b5840353SAdam Hornáček     private final ObjectValidator<T> validator;
39b5840353SAdam Hornáček     private final ObjectFactory<T> objectFactory;
40b5840353SAdam Hornáček     private final ExecutorService executor = Executors.newCachedThreadPool();
41e829566cSChris Fraire     private volatile boolean puttingLast;
42b5840353SAdam Hornáček     private volatile boolean shutdownCalled;
43b5840353SAdam Hornáček 
BoundedBlockingObjectPool(int size, ObjectValidator<T> validator, ObjectFactory<T> objectFactory)44b5840353SAdam Hornáček     public BoundedBlockingObjectPool(int size, ObjectValidator<T> validator,
45b5840353SAdam Hornáček         ObjectFactory<T> objectFactory) {
46b5840353SAdam Hornáček 
47b5840353SAdam Hornáček         this.objectFactory = objectFactory;
48b5840353SAdam Hornáček         this.size = size;
49b5840353SAdam Hornáček         this.validator = validator;
50b5840353SAdam Hornáček 
51e829566cSChris Fraire         objects = new LinkedBlockingDeque<>(size);
52b5840353SAdam Hornáček         initializeObjects();
53b5840353SAdam Hornáček     }
54b5840353SAdam Hornáček 
55b5840353SAdam Hornáček     @Override
get(long timeOut, TimeUnit unit)56b5840353SAdam Hornáček     public T get(long timeOut, TimeUnit unit) {
57b5840353SAdam Hornáček         if (!shutdownCalled) {
58e829566cSChris Fraire             T ret = null;
59b5840353SAdam Hornáček             try {
60e829566cSChris Fraire                 ret = objects.pollFirst(timeOut, unit);
61e829566cSChris Fraire                 /*
62e829566cSChris Fraire                  * When the queue first empties, switch to a strategy of putting
63e829566cSChris Fraire                  * returned objects last instead of first.
64e829566cSChris Fraire                  */
65e829566cSChris Fraire                 if (!puttingLast && objects.size() < 1) {
66e829566cSChris Fraire                     puttingLast = true;
67e829566cSChris Fraire                 }
68b5840353SAdam Hornáček             } catch (InterruptedException ie) {
69b5840353SAdam Hornáček                 Thread.currentThread().interrupt();
70b5840353SAdam Hornáček             }
71e829566cSChris Fraire             return ret;
72b5840353SAdam Hornáček         }
73b5840353SAdam Hornáček         throw new IllegalStateException("Object pool is already shutdown");
74b5840353SAdam Hornáček     }
75b5840353SAdam Hornáček 
76b5840353SAdam Hornáček     @Override
get()77b5840353SAdam Hornáček     public T get() {
78b5840353SAdam Hornáček         if (!shutdownCalled) {
79e829566cSChris Fraire             T ret = null;
80b5840353SAdam Hornáček             try {
81e829566cSChris Fraire                 ret = objects.takeFirst();
82e829566cSChris Fraire                 /*
83e829566cSChris Fraire                  * When the queue first empties, switch to a strategy of putting
84e829566cSChris Fraire                  * returned objects last instead of first.
85e829566cSChris Fraire                  */
86e829566cSChris Fraire                 if (!puttingLast && objects.size() < 1) {
87e829566cSChris Fraire                     puttingLast = true;
88e829566cSChris Fraire                 }
89b5840353SAdam Hornáček             } catch (InterruptedException ie) {
90b5840353SAdam Hornáček                 Thread.currentThread().interrupt();
91b5840353SAdam Hornáček             }
92e829566cSChris Fraire             return ret;
93b5840353SAdam Hornáček         }
94b5840353SAdam Hornáček         throw new IllegalStateException("Object pool is already shutdown");
95b5840353SAdam Hornáček     }
96b5840353SAdam Hornáček 
97b5840353SAdam Hornáček     @Override
shutdown()98b5840353SAdam Hornáček     public void shutdown() {
99b5840353SAdam Hornáček         shutdownCalled = true;
100b5840353SAdam Hornáček         executor.shutdownNow();
101b5840353SAdam Hornáček         clearResources();
102b5840353SAdam Hornáček     }
103b5840353SAdam Hornáček 
clearResources()104b5840353SAdam Hornáček     private void clearResources() {
105b5840353SAdam Hornáček         for (T t : objects) {
106b5840353SAdam Hornáček             validator.invalidate(t);
107b5840353SAdam Hornáček         }
108b5840353SAdam Hornáček     }
109b5840353SAdam Hornáček 
110b5840353SAdam Hornáček     @Override
returnToPool(T t)111b5840353SAdam Hornáček     protected void returnToPool(T t) {
112b5840353SAdam Hornáček         if (validator.isValid(t)) {
113e829566cSChris Fraire             executor.submit(new ObjectReturner<>(objects, t, puttingLast));
114b5840353SAdam Hornáček         }
115b5840353SAdam Hornáček     }
116b5840353SAdam Hornáček 
11781b586e6SVladimir Kotal     /*
118b5840353SAdam Hornáček      * Creates a new instance, and returns that instead to the pool.
119b5840353SAdam Hornáček      */
120b5840353SAdam Hornáček     @Override
handleInvalidReturn(T t)121b5840353SAdam Hornáček     protected void handleInvalidReturn(T t) {
122b5840353SAdam Hornáček         if (LOGGER.isLoggable(Level.FINE)) {
123b5840353SAdam Hornáček             LOGGER.log(Level.FINE, "createNew() to handle invalid {0}",
124b5840353SAdam Hornáček                 t.getClass());
125b5840353SAdam Hornáček         }
126b5840353SAdam Hornáček 
127b5840353SAdam Hornáček         t = objectFactory.createNew();
128e829566cSChris Fraire         executor.submit(new ObjectReturner<>(objects, t, puttingLast));
129b5840353SAdam Hornáček     }
130b5840353SAdam Hornáček 
131b5840353SAdam Hornáček     @Override
isValid(T t)132b5840353SAdam Hornáček     protected boolean isValid(T t) {
133b5840353SAdam Hornáček         return validator.isValid(t);
134b5840353SAdam Hornáček     }
135b5840353SAdam Hornáček 
initializeObjects()136b5840353SAdam Hornáček     private void initializeObjects() {
137b5840353SAdam Hornáček         for (int i = 0; i < size; i++) {
138b5840353SAdam Hornáček             objects.add(objectFactory.createNew());
139b5840353SAdam Hornáček         }
140b5840353SAdam Hornáček     }
141b5840353SAdam Hornáček 
142*c6f0939bSAdam Hornacek     private static class ObjectReturner<E> implements Callable<Void> {
143e829566cSChris Fraire         private final LinkedBlockingDeque<E> queue;
144b5840353SAdam Hornáček         private final E e;
145e829566cSChris Fraire         private final boolean puttingLast;
146b5840353SAdam Hornáček 
ObjectReturner(LinkedBlockingDeque<E> queue, E e, boolean puttingLast)147e829566cSChris Fraire         ObjectReturner(LinkedBlockingDeque<E> queue, E e, boolean puttingLast) {
148b5840353SAdam Hornáček             this.queue = queue;
149b5840353SAdam Hornáček             this.e = e;
150e829566cSChris Fraire             this.puttingLast = puttingLast;
151b5840353SAdam Hornáček         }
152b5840353SAdam Hornáček 
153b5840353SAdam Hornáček         @Override
call()154b5840353SAdam Hornáček         public Void call() {
155b5840353SAdam Hornáček             while (true) {
156b5840353SAdam Hornáček                 try {
157e829566cSChris Fraire                     if (puttingLast) {
158e829566cSChris Fraire                         queue.putLast(e);
159e829566cSChris Fraire                     } else {
160e829566cSChris Fraire                         queue.putFirst(e);
161e829566cSChris Fraire                     }
162b5840353SAdam Hornáček                     break;
163b5840353SAdam Hornáček                 } catch (InterruptedException ie) {
164b5840353SAdam Hornáček                     Thread.currentThread().interrupt();
165b5840353SAdam Hornáček                 }
166b5840353SAdam Hornáček             }
167b5840353SAdam Hornáček 
168b5840353SAdam Hornáček             return null;
169b5840353SAdam Hornáček         }
170b5840353SAdam Hornáček     }
171b5840353SAdam Hornáček }
172