xref: /OpenGrok/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexerParallelizer.java (revision b93c48943c452edcd01170fc7cff56485276e385)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * See LICENSE.txt included in this distribution for the specific
9  * language governing permissions and limitations under the License.
10  *
11  * When distributing Covered Code, include this CDDL HEADER in each
12  * file and include the License file at LICENSE.txt.
13  * If applicable, add the following below this CDDL HEADER, with the
14  * fields enclosed by brackets "[]" replaced with your own identifying
15  * information: Portions Copyright [yyyy] [name of copyright owner]
16  *
17  * CDDL HEADER END
18  */
19 
20 /*
21  * Copyright (c) 2017, 2020, Chris Fraire <cfraire@me.com>.
22  * Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.
23  */
24 package org.opengrok.indexer.index;
25 
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ForkJoinPool;
29 import java.util.concurrent.ScheduledThreadPoolExecutor;
30 
31 import org.opengrok.indexer.analysis.Ctags;
32 import org.opengrok.indexer.analysis.CtagsValidator;
33 import org.opengrok.indexer.configuration.RuntimeEnvironment;
34 import org.opengrok.indexer.util.BoundedBlockingObjectPool;
35 import org.opengrok.indexer.util.CtagsUtil;
36 import org.opengrok.indexer.util.LazilyInstantiate;
37 import org.opengrok.indexer.util.ObjectFactory;
38 import org.opengrok.indexer.util.ObjectPool;
39 
40 /**
41  * Represents a container for executors that enable parallelism for indexing
42  * across projects and repositories and also within any {@link IndexDatabase}
43  * instance -- with global limits for all execution.
44  * <p>A fixed-thread pool is used for parallelism across repositories, and a
45  * work-stealing {@link ForkJoinPool} is used for parallelism within any
46  * {@link IndexDatabase}. Threads in the former pool are customers of the
47  * latter, and the bulk of work is done in the latter pool. The work-stealing
48  * {@link ForkJoinPool} makes use of a corresponding fixed pool of {@link Ctags}
49  * instances.
50  * <p>Additionally there are pools for executing for history, for renamings in
51  * history, and for watching the {@link Ctags} instances for timing purposes.
52  */
53 public class IndexerParallelizer implements AutoCloseable {
54 
55     private final RuntimeEnvironment env;
56     private final int indexingParallelism;
57 
58     private LazilyInstantiate<ForkJoinPool> lzForkJoinPool;
59     private LazilyInstantiate<ObjectPool<Ctags>> lzCtagsPool;
60     private LazilyInstantiate<ExecutorService> lzFixedExecutor;
61     private LazilyInstantiate<ExecutorService> lzHistoryExecutor;
62     private LazilyInstantiate<ExecutorService> lzHistoryFileExecutor;
63     private LazilyInstantiate<ExecutorService> lzCtagsWatcherExecutor;
64     private LazilyInstantiate<ExecutorService> lzXrefWatcherExecutor;
65 
66     /**
67      * Initializes a new instance using settings from the specified environment
68      * instance.
69      * @param env a defined instance
70      */
IndexerParallelizer(RuntimeEnvironment env)71     public IndexerParallelizer(RuntimeEnvironment env) {
72         if (env == null) {
73             throw new IllegalArgumentException("env is null");
74         }
75         this.env = env;
76         /*
77          * Save the following value explicitly because it must not change for
78          * an IndexerParallelizer instance.
79          */
80         this.indexingParallelism = env.getIndexingParallelism();
81 
82         createLazyForkJoinPool();
83         createLazyCtagsPool();
84         createLazyFixedExecutor();
85         createLazyHistoryExecutor();
86         createLazyHistoryFileExecutor();
87         createLazyCtagsWatcherExecutor();
88         createLazyXrefWatcherExecutor();
89     }
90 
91     /**
92      * @return the fixedExecutor
93      */
getFixedExecutor()94     public ExecutorService getFixedExecutor() {
95         return lzFixedExecutor.get();
96     }
97 
98     /**
99      * @return the forkJoinPool
100      */
getForkJoinPool()101     public ForkJoinPool getForkJoinPool() {
102         return lzForkJoinPool.get();
103     }
104 
105     /**
106      * @return the ctagsPool
107      */
getCtagsPool()108     public ObjectPool<Ctags> getCtagsPool() {
109         return lzCtagsPool.get();
110     }
111 
112     /**
113      * @return the ExecutorService used for history parallelism (repository level)
114      */
getHistoryExecutor()115     public ExecutorService getHistoryExecutor() {
116         return lzHistoryExecutor.get();
117     }
118 
119     /**
120      * @return the ExecutorService used for history parallelism (file level)
121      */
getHistoryFileExecutor()122     public ExecutorService getHistoryFileExecutor() {
123         return lzHistoryFileExecutor.get();
124     }
125 
126     /**
127      * @return the Executor used for ctags parallelism
128      */
getCtagsWatcherExecutor()129     public ExecutorService getCtagsWatcherExecutor() {
130         return lzCtagsWatcherExecutor.get();
131     }
132 
133     /**
134      * @return the Executor used for enforcing xref timeouts.
135      */
getXrefWatcherExecutor()136     public ExecutorService getXrefWatcherExecutor() {
137         return lzXrefWatcherExecutor.get();
138     }
139 
140     /**
141      * Calls {@link #bounce()}, which prepares for -- but does not start -- new
142      * pools.
143      */
144     @Override
close()145     public void close() {
146         bounce();
147     }
148 
149     /**
150      * Shuts down the instance's executors if any of the getters were called,
151      * releasing all resources; and prepares them to be called again to return
152      * new instances.
153      * <p>
154      * N.b. this method is not thread-safe w.r.t. the getters, so care must be
155      * taken that any scheduled work has been completed and that no other
156      * thread might call those methods simultaneously with this method.
157      * <p>
158      * The JVM will await any instantiated thread pools until they are
159      * explicitly shut down. A principle intention of this method is to
160      * facilitate OpenGrok test classes that run serially. The non-test
161      * processes using {@link IndexerParallelizer} -- i.e. {@code opengrok.jar}
162      * indexer or opengrok-web -- would only need a one-way shutdown; but they
163      * call this method satisfactorily too.
164      */
bounce()165     public void bounce() {
166         bounceForkJoinPool();
167         bounceFixedExecutor();
168         bounceCtagsPool();
169         bounceHistoryExecutor();
170         bounceHistoryRenamedExecutor();
171         bounceCtagsWatcherExecutor();
172         bounceXrefWatcherExecutor();
173     }
174 
bounceForkJoinPool()175     private void bounceForkJoinPool() {
176         if (lzForkJoinPool.isActive()) {
177             ForkJoinPool formerForkJoinPool = lzForkJoinPool.get();
178             createLazyForkJoinPool();
179             formerForkJoinPool.shutdown();
180         }
181     }
182 
bounceFixedExecutor()183     private void bounceFixedExecutor() {
184         if (lzFixedExecutor.isActive()) {
185             ExecutorService formerFixedExecutor = lzFixedExecutor.get();
186             createLazyFixedExecutor();
187             formerFixedExecutor.shutdown();
188         }
189     }
190 
bounceCtagsPool()191     private void bounceCtagsPool() {
192         if (lzCtagsPool.isActive()) {
193             ObjectPool<Ctags> formerCtagsPool = lzCtagsPool.get();
194             createLazyCtagsPool();
195             formerCtagsPool.shutdown();
196         }
197     }
198 
bounceHistoryExecutor()199     private void bounceHistoryExecutor() {
200         if (lzHistoryExecutor.isActive()) {
201             ExecutorService formerHistoryExecutor = lzHistoryExecutor.get();
202             createLazyHistoryExecutor();
203             formerHistoryExecutor.shutdown();
204         }
205     }
206 
bounceHistoryRenamedExecutor()207     private void bounceHistoryRenamedExecutor() {
208         if (lzHistoryFileExecutor.isActive()) {
209             ExecutorService formerHistoryRenamedExecutor = lzHistoryFileExecutor.get();
210             createLazyHistoryFileExecutor();
211             formerHistoryRenamedExecutor.shutdown();
212         }
213     }
214 
bounceCtagsWatcherExecutor()215     private void bounceCtagsWatcherExecutor() {
216         if (lzCtagsWatcherExecutor.isActive()) {
217             ExecutorService formerCtagsWatcherExecutor = lzCtagsWatcherExecutor.get();
218             createLazyCtagsWatcherExecutor();
219             formerCtagsWatcherExecutor.shutdown();
220         }
221     }
222 
bounceXrefWatcherExecutor()223     private void bounceXrefWatcherExecutor() {
224         if (lzXrefWatcherExecutor.isActive()) {
225             ExecutorService formerXrefWatcherExecutor = lzXrefWatcherExecutor.get();
226             createLazyXrefWatcherExecutor();
227             formerXrefWatcherExecutor.shutdown();
228         }
229     }
230 
createLazyForkJoinPool()231     private void createLazyForkJoinPool() {
232         lzForkJoinPool = LazilyInstantiate.using(() ->
233                 new ForkJoinPool(indexingParallelism));
234     }
235 
createLazyCtagsPool()236     private void createLazyCtagsPool() {
237         lzCtagsPool = LazilyInstantiate.using(() ->
238                 new BoundedBlockingObjectPool<>(indexingParallelism,
239                         new CtagsValidator(), new CtagsObjectFactory()));
240     }
241 
createLazyCtagsWatcherExecutor()242     private void createLazyCtagsWatcherExecutor() {
243         lzCtagsWatcherExecutor = LazilyInstantiate.using(() ->
244                 new ScheduledThreadPoolExecutor(1, runnable -> {
245                     Thread thread = Executors.defaultThreadFactory().newThread(runnable);
246                     thread.setName("ctags-watcher-" + thread.getId());
247                     return thread;
248                 }));
249     }
250 
createLazyXrefWatcherExecutor()251     private void createLazyXrefWatcherExecutor() {
252         lzXrefWatcherExecutor = LazilyInstantiate.using(() ->
253                 new ScheduledThreadPoolExecutor(1, runnable -> {
254                     Thread thread = Executors.defaultThreadFactory().newThread(runnable);
255                     thread.setName("xref-watcher-" + thread.getId());
256                     return thread;
257                 }));
258     }
259 
createLazyFixedExecutor()260     private void createLazyFixedExecutor() {
261         lzFixedExecutor = LazilyInstantiate.using(() ->
262                 Executors.newFixedThreadPool(indexingParallelism));
263     }
264 
createLazyHistoryExecutor()265     private void createLazyHistoryExecutor() {
266         lzHistoryExecutor = LazilyInstantiate.using(() ->
267                 Executors.newFixedThreadPool(env.getHistoryParallelism(), runnable -> {
268                         Thread thread = Executors.defaultThreadFactory().newThread(runnable);
269                         thread.setName("history-" + thread.getId());
270                         return thread;
271                 }));
272     }
273 
createLazyHistoryFileExecutor()274     private void createLazyHistoryFileExecutor() {
275         lzHistoryFileExecutor = LazilyInstantiate.using(() ->
276                 Executors.newFixedThreadPool(env.getHistoryFileParallelism(), runnable -> {
277                     Thread thread = Executors.defaultThreadFactory().newThread(runnable);
278                     thread.setName("history-file-" + thread.getId());
279                     return thread;
280                 }));
281     }
282 
283     private class CtagsObjectFactory implements ObjectFactory<Ctags> {
284 
createNew()285         public Ctags createNew() {
286             return CtagsUtil.newInstance(env);
287         }
288     }
289 }
290