1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * See LICENSE.txt included in this distribution for the specific 9 * language governing permissions and limitations under the License. 10 * 11 * When distributing Covered Code, include this CDDL HEADER in each 12 * file and include the License file at LICENSE.txt. 13 * If applicable, add the following below this CDDL HEADER, with the 14 * fields enclosed by brackets "[]" replaced with your own identifying 15 * information: Portions Copyright [yyyy] [name of copyright owner] 16 * 17 * CDDL HEADER END 18 */ 19 20 /* 21 * Copyright (c) 2017, 2020, Chris Fraire <cfraire@me.com>. 22 * Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved. 23 */ 24 package org.opengrok.indexer.index; 25 26 import java.util.concurrent.ExecutorService; 27 import java.util.concurrent.Executors; 28 import java.util.concurrent.ForkJoinPool; 29 import java.util.concurrent.ScheduledThreadPoolExecutor; 30 31 import org.opengrok.indexer.analysis.Ctags; 32 import org.opengrok.indexer.analysis.CtagsValidator; 33 import org.opengrok.indexer.configuration.RuntimeEnvironment; 34 import org.opengrok.indexer.util.BoundedBlockingObjectPool; 35 import org.opengrok.indexer.util.CtagsUtil; 36 import org.opengrok.indexer.util.LazilyInstantiate; 37 import org.opengrok.indexer.util.ObjectFactory; 38 import org.opengrok.indexer.util.ObjectPool; 39 40 /** 41 * Represents a container for executors that enable parallelism for indexing 42 * across projects and repositories and also within any {@link IndexDatabase} 43 * instance -- with global limits for all execution. 44 * <p>A fixed-thread pool is used for parallelism across repositories, and a 45 * work-stealing {@link ForkJoinPool} is used for parallelism within any 46 * {@link IndexDatabase}. Threads in the former pool are customers of the 47 * latter, and the bulk of work is done in the latter pool. The work-stealing 48 * {@link ForkJoinPool} makes use of a corresponding fixed pool of {@link Ctags} 49 * instances. 50 * <p>Additionally there are pools for executing for history, for renamings in 51 * history, and for watching the {@link Ctags} instances for timing purposes. 52 */ 53 public class IndexerParallelizer implements AutoCloseable { 54 55 private final RuntimeEnvironment env; 56 private final int indexingParallelism; 57 58 private LazilyInstantiate<ForkJoinPool> lzForkJoinPool; 59 private LazilyInstantiate<ObjectPool<Ctags>> lzCtagsPool; 60 private LazilyInstantiate<ExecutorService> lzFixedExecutor; 61 private LazilyInstantiate<ExecutorService> lzHistoryExecutor; 62 private LazilyInstantiate<ExecutorService> lzHistoryFileExecutor; 63 private LazilyInstantiate<ExecutorService> lzCtagsWatcherExecutor; 64 private LazilyInstantiate<ExecutorService> lzXrefWatcherExecutor; 65 66 /** 67 * Initializes a new instance using settings from the specified environment 68 * instance. 69 * @param env a defined instance 70 */ IndexerParallelizer(RuntimeEnvironment env)71 public IndexerParallelizer(RuntimeEnvironment env) { 72 if (env == null) { 73 throw new IllegalArgumentException("env is null"); 74 } 75 this.env = env; 76 /* 77 * Save the following value explicitly because it must not change for 78 * an IndexerParallelizer instance. 79 */ 80 this.indexingParallelism = env.getIndexingParallelism(); 81 82 createLazyForkJoinPool(); 83 createLazyCtagsPool(); 84 createLazyFixedExecutor(); 85 createLazyHistoryExecutor(); 86 createLazyHistoryFileExecutor(); 87 createLazyCtagsWatcherExecutor(); 88 createLazyXrefWatcherExecutor(); 89 } 90 91 /** 92 * @return the fixedExecutor 93 */ getFixedExecutor()94 public ExecutorService getFixedExecutor() { 95 return lzFixedExecutor.get(); 96 } 97 98 /** 99 * @return the forkJoinPool 100 */ getForkJoinPool()101 public ForkJoinPool getForkJoinPool() { 102 return lzForkJoinPool.get(); 103 } 104 105 /** 106 * @return the ctagsPool 107 */ getCtagsPool()108 public ObjectPool<Ctags> getCtagsPool() { 109 return lzCtagsPool.get(); 110 } 111 112 /** 113 * @return the ExecutorService used for history parallelism (repository level) 114 */ getHistoryExecutor()115 public ExecutorService getHistoryExecutor() { 116 return lzHistoryExecutor.get(); 117 } 118 119 /** 120 * @return the ExecutorService used for history parallelism (file level) 121 */ getHistoryFileExecutor()122 public ExecutorService getHistoryFileExecutor() { 123 return lzHistoryFileExecutor.get(); 124 } 125 126 /** 127 * @return the Executor used for ctags parallelism 128 */ getCtagsWatcherExecutor()129 public ExecutorService getCtagsWatcherExecutor() { 130 return lzCtagsWatcherExecutor.get(); 131 } 132 133 /** 134 * @return the Executor used for enforcing xref timeouts. 135 */ getXrefWatcherExecutor()136 public ExecutorService getXrefWatcherExecutor() { 137 return lzXrefWatcherExecutor.get(); 138 } 139 140 /** 141 * Calls {@link #bounce()}, which prepares for -- but does not start -- new 142 * pools. 143 */ 144 @Override close()145 public void close() { 146 bounce(); 147 } 148 149 /** 150 * Shuts down the instance's executors if any of the getters were called, 151 * releasing all resources; and prepares them to be called again to return 152 * new instances. 153 * <p> 154 * N.b. this method is not thread-safe w.r.t. the getters, so care must be 155 * taken that any scheduled work has been completed and that no other 156 * thread might call those methods simultaneously with this method. 157 * <p> 158 * The JVM will await any instantiated thread pools until they are 159 * explicitly shut down. A principle intention of this method is to 160 * facilitate OpenGrok test classes that run serially. The non-test 161 * processes using {@link IndexerParallelizer} -- i.e. {@code opengrok.jar} 162 * indexer or opengrok-web -- would only need a one-way shutdown; but they 163 * call this method satisfactorily too. 164 */ bounce()165 public void bounce() { 166 bounceForkJoinPool(); 167 bounceFixedExecutor(); 168 bounceCtagsPool(); 169 bounceHistoryExecutor(); 170 bounceHistoryRenamedExecutor(); 171 bounceCtagsWatcherExecutor(); 172 bounceXrefWatcherExecutor(); 173 } 174 bounceForkJoinPool()175 private void bounceForkJoinPool() { 176 if (lzForkJoinPool.isActive()) { 177 ForkJoinPool formerForkJoinPool = lzForkJoinPool.get(); 178 createLazyForkJoinPool(); 179 formerForkJoinPool.shutdown(); 180 } 181 } 182 bounceFixedExecutor()183 private void bounceFixedExecutor() { 184 if (lzFixedExecutor.isActive()) { 185 ExecutorService formerFixedExecutor = lzFixedExecutor.get(); 186 createLazyFixedExecutor(); 187 formerFixedExecutor.shutdown(); 188 } 189 } 190 bounceCtagsPool()191 private void bounceCtagsPool() { 192 if (lzCtagsPool.isActive()) { 193 ObjectPool<Ctags> formerCtagsPool = lzCtagsPool.get(); 194 createLazyCtagsPool(); 195 formerCtagsPool.shutdown(); 196 } 197 } 198 bounceHistoryExecutor()199 private void bounceHistoryExecutor() { 200 if (lzHistoryExecutor.isActive()) { 201 ExecutorService formerHistoryExecutor = lzHistoryExecutor.get(); 202 createLazyHistoryExecutor(); 203 formerHistoryExecutor.shutdown(); 204 } 205 } 206 bounceHistoryRenamedExecutor()207 private void bounceHistoryRenamedExecutor() { 208 if (lzHistoryFileExecutor.isActive()) { 209 ExecutorService formerHistoryRenamedExecutor = lzHistoryFileExecutor.get(); 210 createLazyHistoryFileExecutor(); 211 formerHistoryRenamedExecutor.shutdown(); 212 } 213 } 214 bounceCtagsWatcherExecutor()215 private void bounceCtagsWatcherExecutor() { 216 if (lzCtagsWatcherExecutor.isActive()) { 217 ExecutorService formerCtagsWatcherExecutor = lzCtagsWatcherExecutor.get(); 218 createLazyCtagsWatcherExecutor(); 219 formerCtagsWatcherExecutor.shutdown(); 220 } 221 } 222 bounceXrefWatcherExecutor()223 private void bounceXrefWatcherExecutor() { 224 if (lzXrefWatcherExecutor.isActive()) { 225 ExecutorService formerXrefWatcherExecutor = lzXrefWatcherExecutor.get(); 226 createLazyXrefWatcherExecutor(); 227 formerXrefWatcherExecutor.shutdown(); 228 } 229 } 230 createLazyForkJoinPool()231 private void createLazyForkJoinPool() { 232 lzForkJoinPool = LazilyInstantiate.using(() -> 233 new ForkJoinPool(indexingParallelism)); 234 } 235 createLazyCtagsPool()236 private void createLazyCtagsPool() { 237 lzCtagsPool = LazilyInstantiate.using(() -> 238 new BoundedBlockingObjectPool<>(indexingParallelism, 239 new CtagsValidator(), new CtagsObjectFactory())); 240 } 241 createLazyCtagsWatcherExecutor()242 private void createLazyCtagsWatcherExecutor() { 243 lzCtagsWatcherExecutor = LazilyInstantiate.using(() -> 244 new ScheduledThreadPoolExecutor(1, runnable -> { 245 Thread thread = Executors.defaultThreadFactory().newThread(runnable); 246 thread.setName("ctags-watcher-" + thread.getId()); 247 return thread; 248 })); 249 } 250 createLazyXrefWatcherExecutor()251 private void createLazyXrefWatcherExecutor() { 252 lzXrefWatcherExecutor = LazilyInstantiate.using(() -> 253 new ScheduledThreadPoolExecutor(1, runnable -> { 254 Thread thread = Executors.defaultThreadFactory().newThread(runnable); 255 thread.setName("xref-watcher-" + thread.getId()); 256 return thread; 257 })); 258 } 259 createLazyFixedExecutor()260 private void createLazyFixedExecutor() { 261 lzFixedExecutor = LazilyInstantiate.using(() -> 262 Executors.newFixedThreadPool(indexingParallelism)); 263 } 264 createLazyHistoryExecutor()265 private void createLazyHistoryExecutor() { 266 lzHistoryExecutor = LazilyInstantiate.using(() -> 267 Executors.newFixedThreadPool(env.getHistoryParallelism(), runnable -> { 268 Thread thread = Executors.defaultThreadFactory().newThread(runnable); 269 thread.setName("history-" + thread.getId()); 270 return thread; 271 })); 272 } 273 createLazyHistoryFileExecutor()274 private void createLazyHistoryFileExecutor() { 275 lzHistoryFileExecutor = LazilyInstantiate.using(() -> 276 Executors.newFixedThreadPool(env.getHistoryFileParallelism(), runnable -> { 277 Thread thread = Executors.defaultThreadFactory().newThread(runnable); 278 thread.setName("history-file-" + thread.getId()); 279 return thread; 280 })); 281 } 282 283 private class CtagsObjectFactory implements ObjectFactory<Ctags> { 284 createNew()285 public Ctags createNew() { 286 return CtagsUtil.newInstance(env); 287 } 288 } 289 } 290