1 /* 2 * The contents of this file are Copyright (c) 2012, Swaranga Sarma, DZone MVB 3 * made available under free license, 4 * http://javawithswaranga.blogspot.com/2011/10/generic-and-concurrent-object-pool.html 5 * https://dzone.com/articles/generic-and-concurrent-object : "Feel free to use 6 * it, change it, add more implementations. Happy coding!" 7 * Portions Copyright (c) 2017-2018, Chris Fraire <cfraire@me.com>. 8 */ 9 10 package org.opengrok.indexer.util; 11 12 import java.util.concurrent.Callable; 13 import java.util.concurrent.ExecutorService; 14 import java.util.concurrent.Executors; 15 import java.util.concurrent.LinkedBlockingDeque; 16 import java.util.concurrent.TimeUnit; 17 import java.util.logging.Level; 18 import java.util.logging.Logger; 19 import org.opengrok.indexer.logger.LoggerFactory; 20 21 /** 22 * Represents a subclass of {@link AbstractObjectPool} and implementation of 23 * {@link BlockingObjectPool} with a defined limit of objects and a helper 24 * to validate instances on {@link #release(java.lang.Object)}. 25 * <p>An object failing validation is discarded, and a new one is created and 26 * made available. 27 * @author Swaranga 28 * @param <T> the type of objects to pool. 29 */ 30 public final class BoundedBlockingObjectPool<T> extends AbstractObjectPool<T> 31 implements BlockingObjectPool<T> { 32 33 private static final Logger LOGGER = LoggerFactory.getLogger( 34 BoundedBlockingObjectPool.class); 35 36 private final int size; 37 private final LinkedBlockingDeque<T> objects; 38 private final ObjectValidator<T> validator; 39 private final ObjectFactory<T> objectFactory; 40 private final ExecutorService executor = Executors.newCachedThreadPool(); 41 private volatile boolean puttingLast; 42 private volatile boolean shutdownCalled; 43 BoundedBlockingObjectPool(int size, ObjectValidator<T> validator, ObjectFactory<T> objectFactory)44 public BoundedBlockingObjectPool(int size, ObjectValidator<T> validator, 45 ObjectFactory<T> objectFactory) { 46 47 this.objectFactory = objectFactory; 48 this.size = size; 49 this.validator = validator; 50 51 objects = new LinkedBlockingDeque<>(size); 52 initializeObjects(); 53 } 54 55 @Override get(long timeOut, TimeUnit unit)56 public T get(long timeOut, TimeUnit unit) { 57 if (!shutdownCalled) { 58 T ret = null; 59 try { 60 ret = objects.pollFirst(timeOut, unit); 61 /* 62 * When the queue first empties, switch to a strategy of putting 63 * returned objects last instead of first. 64 */ 65 if (!puttingLast && objects.size() < 1) { 66 puttingLast = true; 67 } 68 } catch (InterruptedException ie) { 69 Thread.currentThread().interrupt(); 70 } 71 return ret; 72 } 73 throw new IllegalStateException("Object pool is already shutdown"); 74 } 75 76 @Override get()77 public T get() { 78 if (!shutdownCalled) { 79 T ret = null; 80 try { 81 ret = objects.takeFirst(); 82 /* 83 * When the queue first empties, switch to a strategy of putting 84 * returned objects last instead of first. 85 */ 86 if (!puttingLast && objects.size() < 1) { 87 puttingLast = true; 88 } 89 } catch (InterruptedException ie) { 90 Thread.currentThread().interrupt(); 91 } 92 return ret; 93 } 94 throw new IllegalStateException("Object pool is already shutdown"); 95 } 96 97 @Override shutdown()98 public void shutdown() { 99 shutdownCalled = true; 100 executor.shutdownNow(); 101 clearResources(); 102 } 103 clearResources()104 private void clearResources() { 105 for (T t : objects) { 106 validator.invalidate(t); 107 } 108 } 109 110 @Override returnToPool(T t)111 protected void returnToPool(T t) { 112 if (validator.isValid(t)) { 113 executor.submit(new ObjectReturner<>(objects, t, puttingLast)); 114 } 115 } 116 117 /* 118 * Creates a new instance, and returns that instead to the pool. 119 */ 120 @Override handleInvalidReturn(T t)121 protected void handleInvalidReturn(T t) { 122 if (LOGGER.isLoggable(Level.FINE)) { 123 LOGGER.log(Level.FINE, "createNew() to handle invalid {0}", 124 t.getClass()); 125 } 126 127 t = objectFactory.createNew(); 128 executor.submit(new ObjectReturner<>(objects, t, puttingLast)); 129 } 130 131 @Override isValid(T t)132 protected boolean isValid(T t) { 133 return validator.isValid(t); 134 } 135 initializeObjects()136 private void initializeObjects() { 137 for (int i = 0; i < size; i++) { 138 objects.add(objectFactory.createNew()); 139 } 140 } 141 142 private static class ObjectReturner<E> implements Callable<Void> { 143 private final LinkedBlockingDeque<E> queue; 144 private final E e; 145 private final boolean puttingLast; 146 ObjectReturner(LinkedBlockingDeque<E> queue, E e, boolean puttingLast)147 ObjectReturner(LinkedBlockingDeque<E> queue, E e, boolean puttingLast) { 148 this.queue = queue; 149 this.e = e; 150 this.puttingLast = puttingLast; 151 } 152 153 @Override call()154 public Void call() { 155 while (true) { 156 try { 157 if (puttingLast) { 158 queue.putLast(e); 159 } else { 160 queue.putFirst(e); 161 } 162 break; 163 } catch (InterruptedException ie) { 164 Thread.currentThread().interrupt(); 165 } 166 } 167 168 return null; 169 } 170 } 171 } 172