xref: /OpenGrok/tools/src/main/python/opengrok_tools/utils/command.py (revision ffda442a9997660b969cc7b39d82f07b4236d98f)
1#
2# CDDL HEADER START
3#
4# The contents of this file are subject to the terms of the
5# Common Development and Distribution License (the "License").
6# You may not use this file except in compliance with the License.
7#
8# See LICENSE.txt included in this distribution for the specific
9# language governing permissions and limitations under the License.
10#
11# When distributing Covered Code, include this CDDL HEADER in each
12# file and include the License file at LICENSE.txt.
13# If applicable, add the following below this CDDL HEADER, with the
14# fields enclosed by brackets "[]" replaced with your own identifying
15# information: Portions Copyright [yyyy] [name of copyright owner]
16#
17# CDDL HEADER END
18#
19
20#
21# Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
22#
23
24import logging
25import os
26import signal
27import subprocess
28import threading
29import time
30
31
32class TimeoutException(Exception):
33    """
34    Exception returned when command exceeded its timeout.
35    """
36    pass
37
38
39class Command:
40    """
41    wrapper for synchronous execution of commands via subprocess.Popen()
42    and getting their output (stderr is redirected to stdout by default)
43    and exit value
44    """
45
46    # state definitions
47    FINISHED = "finished"
48    INTERRUPTED = "interrupted"
49    ERRORED = "errored"
50    TIMEDOUT = "timed out"
51
52    def __init__(self, cmd, args_subst=None, args_append=None, logger=None,
53                 excl_subst=False, work_dir=None, env_vars=None, timeout=None,
54                 redirect_stderr=True, resource_limits=None, doprint=False):
55
56        if doprint is None:
57            doprint = False
58
59        if isinstance(doprint, list):
60            doprint = doprint[0]
61
62        self.cmd = list(map(str, cmd))
63        self.state = "notrun"
64        self.excl_subst = excl_subst
65        self.work_dir = work_dir
66        self.env_vars = env_vars
67        self.timeout = timeout
68        self.pid = None
69        self.redirect_stderr = redirect_stderr
70        self.limits = resource_limits
71        self.doprint = doprint
72        self.err = None
73        self.returncode = None
74
75        self.logger = logger or logging.getLogger(__name__)
76
77        if args_subst or args_append:
78            self.fill_arg(args_append, args_subst)
79
80        self.out = None
81
82    def __str__(self):
83        return " ".join(self.cmd)
84
85    def execute(self):
86        """
87        Execute the command and capture its output and return code.
88        """
89
90        class TimeoutThread(threading.Thread):
91            """
92            Wait until the timeout specified in seconds expires and kill
93            the process specified by the Popen object after that.
94            If timeout expires, TimeoutException is stored in the object
95            and can be retrieved by the caller.
96            """
97
98            def __init__(self, logger, timeout, condition, p):
99                super(TimeoutThread, self).__init__()
100                self.timeout = timeout
101                self.popen = p
102                self.condition = condition
103                self.logger = logger
104                self.start()
105                self.exception = None
106
107            def terminate(self, p):
108                """
109                Make sure the process goes away.
110                """
111                self.logger.info("Terminating PID {}".format(p.pid))
112                p.terminate()
113
114                # The following code tries more methods to terminate
115                # the process and is specific to Unix.
116                if os.name == 'posix':
117                    timeout = self.timeout
118                    # disable E1101 - non existent attribute SIGKILL on windows
119                    # pylint: disable=E1101
120                    term_signals = [signal.SIGINT, signal.SIGKILL]
121                    # pylint: enable=E1101
122                    for sig in term_signals:
123                        timeout = timeout / 2  # exponential back-off
124                        self.logger.info("Sleeping for {} seconds".
125                                         format(timeout))
126                        time.sleep(timeout)
127
128                        if p.poll() is None:
129                            self.logger.info("Command with PID {} still alive,"
130                                             " killing with signal {}".
131                                             format(p.pid, sig))
132                            p.send_signal(sig)
133                        else:
134                            self.logger.info("Command with PID {} is gone".
135                                             format(p.pid))
136                            break
137
138            def run(self):
139                with self.condition:
140                    if not self.condition.wait(self.timeout):
141                        p = self.popen
142                        self.logger.info("Terminating command {} with PID {} "
143                                         "after timeout of {} seconds".
144                                         format(p.args, p.pid, self.timeout))
145                        self.exception = TimeoutException("Command {} with pid"
146                                                          " {} timed out".
147                                                          format(p.args,
148                                                                 p.pid))
149                        self.terminate(p)
150                    else:
151                        return None
152
153            def get_exception(self):
154                return self.exception
155
156        class OutputThread(threading.Thread):
157            """
158            Capture data from subprocess.Popen(). This avoids hangs when
159            stdout/stderr buffers fill up.
160            """
161
162            def __init__(self, event, logger, doprint=False):
163                super(OutputThread, self).__init__()
164                self.read_fd, self.write_fd = os.pipe()
165                self.pipe_fobj = os.fdopen(self.read_fd, encoding='utf8')
166                self.out = []
167                self.event = event
168                self.logger = logger
169                self.doprint = doprint
170
171                # Start the thread now.
172                self.start()
173
174            def run(self):
175                """
176                It might happen that after the process is gone, the thread
177                still has data to read from the pipe. Hence, event is used
178                to synchronize with the caller.
179                """
180                while True:
181                    line = self.pipe_fobj.readline()
182                    if not line:
183                        self.logger.debug("end of output")
184                        self.pipe_fobj.close()
185                        self.event.set()
186                        return
187
188                    self.out.append(line)
189
190                    if self.doprint:
191                        # Even if logging below fails, the thread has to keep
192                        # running to avoid hangups of the executed command.
193                        try:
194                            self.logger.info(line.rstrip())
195                        except Exception as print_exc:
196                            self.logger.error(print_exc)
197
198            def getoutput(self):
199                return self.out
200
201            def fileno(self):
202                return self.write_fd
203
204            def close(self):
205                self.logger.debug("closed")
206                os.close(self.write_fd)
207
208        orig_work_dir = None
209        if self.work_dir:
210            try:
211                orig_work_dir = os.getcwd()
212            except OSError:
213                self.state = Command.ERRORED
214                self.logger.error("Cannot get working directory",
215                                  exc_info=True)
216                return
217
218            try:
219                os.chdir(self.work_dir)
220            except OSError:
221                self.state = Command.ERRORED
222                self.logger.error("Cannot change working directory to {}".
223                                  format(self.work_dir), exc_info=True)
224                return
225
226        timeout_thread = None
227        output_event = threading.Event()
228        output_thread = OutputThread(output_event, self.logger,
229                                     doprint=self.doprint)
230
231        # If stderr redirection is off, setup a thread that will capture
232        # stderr data.
233        stderr_thread = None
234        stderr_event = None
235        if self.redirect_stderr:
236            stderr_dest = subprocess.STDOUT
237        else:
238            stderr_event = threading.Event()
239            stderr_thread = OutputThread(stderr_event, self.logger,
240                                         doprint=self.doprint)
241            stderr_dest = stderr_thread
242
243        start_time = None
244        try:
245            start_time = time.time()
246            try:
247                self.logger.debug("working directory = {}".format(os.getcwd()))
248            except PermissionError:
249                pass
250            self.logger.debug("command = '{}'".format(self))
251            my_args = {'stderr': stderr_dest,
252                       'stdout': output_thread}
253            if self.env_vars:
254                my_env = os.environ.copy()
255                my_env.update(self.env_vars)
256                self.logger.debug("environment variables: {}".format(my_env))
257                my_args['env'] = my_env
258            if self.limits:
259                my_args['preexec_fn'] = \
260                    lambda: self.set_resource_limits(self.limits)
261
262            # Actually run the command.
263            p = subprocess.Popen(self.cmd, **my_args)
264
265            self.pid = p.pid
266
267            if self.timeout:
268                time_condition = threading.Condition()
269                self.logger.debug("Setting timeout to {} seconds".
270                                  format(self.timeout))
271                timeout_thread = TimeoutThread(self.logger, self.timeout,
272                                               time_condition, p)
273
274            self.logger.debug("Waiting for process with PID {}".format(p.pid))
275            p.wait()
276            self.logger.debug("Done waiting")
277
278            if self.timeout:
279                e = timeout_thread.get_exception()
280                if e:
281                    raise e  # pylint: disable=E0702
282
283        except KeyboardInterrupt:
284            self.logger.info("Got KeyboardException while processing ",
285                             exc_info=True)
286            self.state = Command.INTERRUPTED
287        except OSError:
288            self.logger.error("Got OS error", exc_info=True)
289            self.state = Command.ERRORED
290        except TimeoutException:
291            self.logger.error("Timed out")
292            self.state = Command.TIMEDOUT
293        else:
294            self.state = Command.FINISHED
295            self.returncode = int(p.returncode)
296            self.logger.debug("'{}' -> {}".format(self, self.getretcode()))
297        finally:
298            if self.timeout != 0 and timeout_thread:
299                with time_condition:
300                    time_condition.notifyAll()
301
302            # The subprocess module does not close the write pipe descriptor
303            # it fetched via OutputThread's fileno() so in order to gracefully
304            # exit the read loop we have to close it here ourselves.
305            output_thread.close()
306            self.logger.debug("Waiting on output thread to finish reading")
307            output_event.wait()
308            self.out = output_thread.getoutput()
309
310            if not self.redirect_stderr and stderr_thread and stderr_event:
311                stderr_thread.close()
312                self.logger.debug("Waiting on stderr thread to finish reading")
313                stderr_event.wait()
314                self.err = stderr_thread.getoutput()
315
316            if start_time:
317                elapsed_time = time.time() - start_time
318                self.logger.debug("Command '{}' took {} seconds".
319                                  format(self, int(elapsed_time)))
320
321        if orig_work_dir:
322            try:
323                os.chdir(orig_work_dir)
324            except OSError:
325                self.state = Command.ERRORED
326                self.logger.error("Cannot change working directory back to {}".
327                                  format(orig_work_dir), exc_info=True)
328                return
329
330    def fill_arg(self, args_append=None, args_subst=None):
331        """
332        Replace argument names with actual values or append arguments
333        to the command vector.
334
335        The action depends whether exclusive substitution is on.
336        If yes, arguments will be appended only if no substitution was
337        performed.
338        """
339
340        newcmd = []
341        subst_done = -1
342        for i, cmdarg in enumerate(self.cmd):
343            if args_subst:
344                newarg = cmdarg
345                for pattern in args_subst.keys():
346                    if pattern in newarg and args_subst[pattern]:
347                        self.logger.debug("replacing '{}' in '{}' with '{}'".
348                                          format(pattern, newarg,
349                                                 args_subst[pattern]))
350                        newarg = newarg.replace(pattern, args_subst[pattern])
351                        self.logger.debug("replaced argument with {}".
352                                          format(newarg))
353                        subst_done = i
354
355                if subst_done != i:
356                    newcmd.append(self.cmd[i])
357                else:
358                    newcmd.append(newarg)
359            else:
360                newcmd.append(self.cmd[i])
361
362        if args_append and (not self.excl_subst or subst_done == -1):
363            self.logger.debug("appending {}".format(args_append))
364            newcmd.extend(args_append)
365
366        self.cmd = newcmd
367
368    def get_resource(self, name):
369        try:
370            import resource
371            if name == "RLIMIT_NOFILE":
372                return resource.RLIMIT_NOFILE
373        except ImportError:
374            raise NotImplementedError("manipulating resources is not "
375                                      "available on your platform")
376
377        raise NotImplementedError("unknown resource")
378
379    def set_resource_limit(self, name, value):
380        try:
381            import resource
382            self.logger.debug("Setting resource {} to {}"
383                              .format(name, value))
384            resource.setrlimit(self.get_resource(name), (value, value))
385        except ImportError:
386            raise NotImplementedError("manipulating resources is not "
387                                      "available on your platform")
388
389    def set_resource_limits(self, limits):
390        self.logger.debug("Setting resource limits")
391        for name, value in limits.items():
392            self.set_resource_limit(name, value)
393
394    def getretcode(self):
395        if self.state != Command.FINISHED:
396            return None
397        else:
398            return self.returncode
399
400    def getoutputstr(self):
401        if self.state == Command.FINISHED:
402            return "".join(self.out).strip()
403        else:
404            return None
405
406    def getoutput(self):
407        if self.state == Command.FINISHED:
408            return self.out
409        else:
410            return None
411
412    def geterroutput(self):
413        return self.err
414
415    def geterroutputstr(self):
416        if self.err:
417            return "".join(self.err).strip()
418        else:
419            return ""
420
421    def getstate(self):
422        return self.state
423
424    def getpid(self):
425        return self.pid
426
427    def log_error(self, msg):
428        if self.state is Command.FINISHED:
429            self.logger.error("{}: command {} in directory {} exited with {}".
430                              format(msg, self.cmd, self.work_dir,
431                                     self.getretcode()))
432        else:
433            self.logger.error("{}: command {} in directory {} ended with "
434                              "invalid state {}".
435                              format(msg, self.cmd, self.work_dir, self.state))
436