xref: /OpenGrok/opengrok-indexer/src/main/java/org/opengrok/indexer/util/BoundedBlockingObjectPool.java (revision c6f0939b1c668e9f8e1e276424439c3106b3a029)
1 /*
2  * The contents of this file are Copyright (c) 2012, Swaranga Sarma, DZone MVB
3  * made available under free license,
4  * http://javawithswaranga.blogspot.com/2011/10/generic-and-concurrent-object-pool.html
5  * https://dzone.com/articles/generic-and-concurrent-object : "Feel free to use
6  * it, change it, add more implementations. Happy coding!"
7  * Portions Copyright (c) 2017-2018, Chris Fraire <cfraire@me.com>.
8  */
9 
10 package org.opengrok.indexer.util;
11 
12 import java.util.concurrent.Callable;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.LinkedBlockingDeque;
16 import java.util.concurrent.TimeUnit;
17 import java.util.logging.Level;
18 import java.util.logging.Logger;
19 import org.opengrok.indexer.logger.LoggerFactory;
20 
21 /**
22  * Represents a subclass of {@link AbstractObjectPool} and implementation of
23  * {@link BlockingObjectPool} with a defined limit of objects and a helper
24  * to validate instances on {@link #release(java.lang.Object)}.
25  * <p>An object failing validation is discarded, and a new one is created and
26  * made available.
27  * @author Swaranga
28  * @param <T> the type of objects to pool.
29  */
30 public final class BoundedBlockingObjectPool<T> extends AbstractObjectPool<T>
31         implements BlockingObjectPool<T> {
32 
33     private static final Logger LOGGER = LoggerFactory.getLogger(
34         BoundedBlockingObjectPool.class);
35 
36     private final int size;
37     private final LinkedBlockingDeque<T> objects;
38     private final ObjectValidator<T> validator;
39     private final ObjectFactory<T> objectFactory;
40     private final ExecutorService executor = Executors.newCachedThreadPool();
41     private volatile boolean puttingLast;
42     private volatile boolean shutdownCalled;
43 
BoundedBlockingObjectPool(int size, ObjectValidator<T> validator, ObjectFactory<T> objectFactory)44     public BoundedBlockingObjectPool(int size, ObjectValidator<T> validator,
45         ObjectFactory<T> objectFactory) {
46 
47         this.objectFactory = objectFactory;
48         this.size = size;
49         this.validator = validator;
50 
51         objects = new LinkedBlockingDeque<>(size);
52         initializeObjects();
53     }
54 
55     @Override
get(long timeOut, TimeUnit unit)56     public T get(long timeOut, TimeUnit unit) {
57         if (!shutdownCalled) {
58             T ret = null;
59             try {
60                 ret = objects.pollFirst(timeOut, unit);
61                 /*
62                  * When the queue first empties, switch to a strategy of putting
63                  * returned objects last instead of first.
64                  */
65                 if (!puttingLast && objects.size() < 1) {
66                     puttingLast = true;
67                 }
68             } catch (InterruptedException ie) {
69                 Thread.currentThread().interrupt();
70             }
71             return ret;
72         }
73         throw new IllegalStateException("Object pool is already shutdown");
74     }
75 
76     @Override
get()77     public T get() {
78         if (!shutdownCalled) {
79             T ret = null;
80             try {
81                 ret = objects.takeFirst();
82                 /*
83                  * When the queue first empties, switch to a strategy of putting
84                  * returned objects last instead of first.
85                  */
86                 if (!puttingLast && objects.size() < 1) {
87                     puttingLast = true;
88                 }
89             } catch (InterruptedException ie) {
90                 Thread.currentThread().interrupt();
91             }
92             return ret;
93         }
94         throw new IllegalStateException("Object pool is already shutdown");
95     }
96 
97     @Override
shutdown()98     public void shutdown() {
99         shutdownCalled = true;
100         executor.shutdownNow();
101         clearResources();
102     }
103 
clearResources()104     private void clearResources() {
105         for (T t : objects) {
106             validator.invalidate(t);
107         }
108     }
109 
110     @Override
returnToPool(T t)111     protected void returnToPool(T t) {
112         if (validator.isValid(t)) {
113             executor.submit(new ObjectReturner<>(objects, t, puttingLast));
114         }
115     }
116 
117     /*
118      * Creates a new instance, and returns that instead to the pool.
119      */
120     @Override
handleInvalidReturn(T t)121     protected void handleInvalidReturn(T t) {
122         if (LOGGER.isLoggable(Level.FINE)) {
123             LOGGER.log(Level.FINE, "createNew() to handle invalid {0}",
124                 t.getClass());
125         }
126 
127         t = objectFactory.createNew();
128         executor.submit(new ObjectReturner<>(objects, t, puttingLast));
129     }
130 
131     @Override
isValid(T t)132     protected boolean isValid(T t) {
133         return validator.isValid(t);
134     }
135 
initializeObjects()136     private void initializeObjects() {
137         for (int i = 0; i < size; i++) {
138             objects.add(objectFactory.createNew());
139         }
140     }
141 
142     private static class ObjectReturner<E> implements Callable<Void> {
143         private final LinkedBlockingDeque<E> queue;
144         private final E e;
145         private final boolean puttingLast;
146 
ObjectReturner(LinkedBlockingDeque<E> queue, E e, boolean puttingLast)147         ObjectReturner(LinkedBlockingDeque<E> queue, E e, boolean puttingLast) {
148             this.queue = queue;
149             this.e = e;
150             this.puttingLast = puttingLast;
151         }
152 
153         @Override
call()154         public Void call() {
155             while (true) {
156                 try {
157                     if (puttingLast) {
158                         queue.putLast(e);
159                     } else {
160                         queue.putFirst(e);
161                     }
162                     break;
163                 } catch (InterruptedException ie) {
164                     Thread.currentThread().interrupt();
165                 }
166             }
167 
168             return null;
169         }
170     }
171 }
172