1# 2# CDDL HEADER START 3# 4# The contents of this file are subject to the terms of the 5# Common Development and Distribution License (the "License"). 6# You may not use this file except in compliance with the License. 7# 8# See LICENSE.txt included in this distribution for the specific 9# language governing permissions and limitations under the License. 10# 11# When distributing Covered Code, include this CDDL HEADER in each 12# file and include the License file at LICENSE.txt. 13# If applicable, add the following below this CDDL HEADER, with the 14# fields enclosed by brackets "[]" replaced with your own identifying 15# information: Portions Copyright [yyyy] [name of copyright owner] 16# 17# CDDL HEADER END 18# 19 20# 21# Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved. 22# 23 24import logging 25import os 26import signal 27import subprocess 28import threading 29import time 30 31 32class TimeoutException(Exception): 33 """ 34 Exception returned when command exceeded its timeout. 35 """ 36 pass 37 38 39class Command: 40 """ 41 wrapper for synchronous execution of commands via subprocess.Popen() 42 and getting their output (stderr is redirected to stdout by default) 43 and exit value 44 """ 45 46 # state definitions 47 FINISHED = "finished" 48 INTERRUPTED = "interrupted" 49 ERRORED = "errored" 50 TIMEDOUT = "timed out" 51 52 def __init__(self, cmd, args_subst=None, args_append=None, logger=None, 53 excl_subst=False, work_dir=None, env_vars=None, timeout=None, 54 redirect_stderr=True, resource_limits=None, doprint=False): 55 56 if doprint is None: 57 doprint = False 58 59 if isinstance(doprint, list): 60 doprint = doprint[0] 61 62 self.cmd = list(map(str, cmd)) 63 self.state = "notrun" 64 self.excl_subst = excl_subst 65 self.work_dir = work_dir 66 self.env_vars = env_vars 67 self.timeout = timeout 68 self.pid = None 69 self.redirect_stderr = redirect_stderr 70 self.limits = resource_limits 71 self.doprint = doprint 72 self.err = None 73 self.returncode = None 74 75 self.logger = logger or logging.getLogger(__name__) 76 77 if args_subst or args_append: 78 self.fill_arg(args_append, args_subst) 79 80 self.out = None 81 82 def __str__(self): 83 return " ".join(self.cmd) 84 85 def execute(self): 86 """ 87 Execute the command and capture its output and return code. 88 """ 89 90 class TimeoutThread(threading.Thread): 91 """ 92 Wait until the timeout specified in seconds expires and kill 93 the process specified by the Popen object after that. 94 If timeout expires, TimeoutException is stored in the object 95 and can be retrieved by the caller. 96 """ 97 98 def __init__(self, logger, timeout, condition, p): 99 super(TimeoutThread, self).__init__() 100 self.timeout = timeout 101 self.popen = p 102 self.condition = condition 103 self.logger = logger 104 self.start() 105 self.exception = None 106 107 def terminate(self, p): 108 """ 109 Make sure the process goes away. 110 """ 111 self.logger.info("Terminating PID {}".format(p.pid)) 112 p.terminate() 113 114 # The following code tries more methods to terminate 115 # the process and is specific to Unix. 116 if os.name == 'posix': 117 timeout = self.timeout 118 # disable E1101 - non existent attribute SIGKILL on windows 119 # pylint: disable=E1101 120 term_signals = [signal.SIGINT, signal.SIGKILL] 121 # pylint: enable=E1101 122 for sig in term_signals: 123 timeout = timeout / 2 # exponential back-off 124 self.logger.info("Sleeping for {} seconds". 125 format(timeout)) 126 time.sleep(timeout) 127 128 if p.poll() is None: 129 self.logger.info("Command with PID {} still alive," 130 " killing with signal {}". 131 format(p.pid, sig)) 132 p.send_signal(sig) 133 else: 134 self.logger.info("Command with PID {} is gone". 135 format(p.pid)) 136 break 137 138 def run(self): 139 with self.condition: 140 if not self.condition.wait(self.timeout): 141 p = self.popen 142 self.logger.info("Terminating command {} with PID {} " 143 "after timeout of {} seconds". 144 format(p.args, p.pid, self.timeout)) 145 self.exception = TimeoutException("Command {} with pid" 146 " {} timed out". 147 format(p.args, 148 p.pid)) 149 self.terminate(p) 150 else: 151 return None 152 153 def get_exception(self): 154 return self.exception 155 156 class OutputThread(threading.Thread): 157 """ 158 Capture data from subprocess.Popen(). This avoids hangs when 159 stdout/stderr buffers fill up. 160 """ 161 162 def __init__(self, event, logger, doprint=False): 163 super(OutputThread, self).__init__() 164 self.read_fd, self.write_fd = os.pipe() 165 self.pipe_fobj = os.fdopen(self.read_fd, encoding='utf8') 166 self.out = [] 167 self.event = event 168 self.logger = logger 169 self.doprint = doprint 170 171 # Start the thread now. 172 self.start() 173 174 def run(self): 175 """ 176 It might happen that after the process is gone, the thread 177 still has data to read from the pipe. Hence, event is used 178 to synchronize with the caller. 179 """ 180 while True: 181 line = self.pipe_fobj.readline() 182 if not line: 183 self.logger.debug("end of output") 184 self.pipe_fobj.close() 185 self.event.set() 186 return 187 188 self.out.append(line) 189 190 if self.doprint: 191 # Even if logging below fails, the thread has to keep 192 # running to avoid hangups of the executed command. 193 try: 194 self.logger.info(line.rstrip()) 195 except Exception as print_exc: 196 self.logger.error(print_exc) 197 198 def getoutput(self): 199 return self.out 200 201 def fileno(self): 202 return self.write_fd 203 204 def close(self): 205 self.logger.debug("closed") 206 os.close(self.write_fd) 207 208 orig_work_dir = None 209 if self.work_dir: 210 try: 211 orig_work_dir = os.getcwd() 212 except OSError: 213 self.state = Command.ERRORED 214 self.logger.error("Cannot get working directory", 215 exc_info=True) 216 return 217 218 try: 219 os.chdir(self.work_dir) 220 except OSError: 221 self.state = Command.ERRORED 222 self.logger.error("Cannot change working directory to {}". 223 format(self.work_dir), exc_info=True) 224 return 225 226 timeout_thread = None 227 output_event = threading.Event() 228 output_thread = OutputThread(output_event, self.logger, 229 doprint=self.doprint) 230 231 # If stderr redirection is off, setup a thread that will capture 232 # stderr data. 233 stderr_thread = None 234 stderr_event = None 235 if self.redirect_stderr: 236 stderr_dest = subprocess.STDOUT 237 else: 238 stderr_event = threading.Event() 239 stderr_thread = OutputThread(stderr_event, self.logger, 240 doprint=self.doprint) 241 stderr_dest = stderr_thread 242 243 start_time = None 244 try: 245 start_time = time.time() 246 try: 247 self.logger.debug("working directory = {}".format(os.getcwd())) 248 except PermissionError: 249 pass 250 self.logger.debug("command = '{}'".format(self)) 251 my_args = {'stderr': stderr_dest, 252 'stdout': output_thread} 253 if self.env_vars: 254 my_env = os.environ.copy() 255 my_env.update(self.env_vars) 256 self.logger.debug("environment variables: {}".format(my_env)) 257 my_args['env'] = my_env 258 if self.limits: 259 my_args['preexec_fn'] = \ 260 lambda: self.set_resource_limits(self.limits) 261 262 # Actually run the command. 263 p = subprocess.Popen(self.cmd, **my_args) 264 265 self.pid = p.pid 266 267 if self.timeout: 268 time_condition = threading.Condition() 269 self.logger.debug("Setting timeout to {} seconds". 270 format(self.timeout)) 271 timeout_thread = TimeoutThread(self.logger, self.timeout, 272 time_condition, p) 273 274 self.logger.debug("Waiting for process with PID {}".format(p.pid)) 275 p.wait() 276 self.logger.debug("Done waiting") 277 278 if self.timeout: 279 e = timeout_thread.get_exception() 280 if e: 281 raise e # pylint: disable=E0702 282 283 except KeyboardInterrupt: 284 self.logger.info("Got KeyboardException while processing ", 285 exc_info=True) 286 self.state = Command.INTERRUPTED 287 except OSError: 288 self.logger.error("Got OS error", exc_info=True) 289 self.state = Command.ERRORED 290 except TimeoutException: 291 self.logger.error("Timed out") 292 self.state = Command.TIMEDOUT 293 else: 294 self.state = Command.FINISHED 295 self.returncode = int(p.returncode) 296 self.logger.debug("'{}' -> {}".format(self, self.getretcode())) 297 finally: 298 if self.timeout != 0 and timeout_thread: 299 with time_condition: 300 time_condition.notifyAll() 301 302 # The subprocess module does not close the write pipe descriptor 303 # it fetched via OutputThread's fileno() so in order to gracefully 304 # exit the read loop we have to close it here ourselves. 305 output_thread.close() 306 self.logger.debug("Waiting on output thread to finish reading") 307 output_event.wait() 308 self.out = output_thread.getoutput() 309 310 if not self.redirect_stderr and stderr_thread and stderr_event: 311 stderr_thread.close() 312 self.logger.debug("Waiting on stderr thread to finish reading") 313 stderr_event.wait() 314 self.err = stderr_thread.getoutput() 315 316 if start_time: 317 elapsed_time = time.time() - start_time 318 self.logger.debug("Command '{}' took {} seconds". 319 format(self, int(elapsed_time))) 320 321 if orig_work_dir: 322 try: 323 os.chdir(orig_work_dir) 324 except OSError: 325 self.state = Command.ERRORED 326 self.logger.error("Cannot change working directory back to {}". 327 format(orig_work_dir), exc_info=True) 328 return 329 330 def fill_arg(self, args_append=None, args_subst=None): 331 """ 332 Replace argument names with actual values or append arguments 333 to the command vector. 334 335 The action depends whether exclusive substitution is on. 336 If yes, arguments will be appended only if no substitution was 337 performed. 338 """ 339 340 newcmd = [] 341 subst_done = -1 342 for i, cmdarg in enumerate(self.cmd): 343 if args_subst: 344 newarg = cmdarg 345 for pattern in args_subst.keys(): 346 if pattern in newarg and args_subst[pattern]: 347 self.logger.debug("replacing '{}' in '{}' with '{}'". 348 format(pattern, newarg, 349 args_subst[pattern])) 350 newarg = newarg.replace(pattern, args_subst[pattern]) 351 self.logger.debug("replaced argument with {}". 352 format(newarg)) 353 subst_done = i 354 355 if subst_done != i: 356 newcmd.append(self.cmd[i]) 357 else: 358 newcmd.append(newarg) 359 else: 360 newcmd.append(self.cmd[i]) 361 362 if args_append and (not self.excl_subst or subst_done == -1): 363 self.logger.debug("appending {}".format(args_append)) 364 newcmd.extend(args_append) 365 366 self.cmd = newcmd 367 368 def get_resource(self, name): 369 try: 370 import resource 371 if name == "RLIMIT_NOFILE": 372 return resource.RLIMIT_NOFILE 373 except ImportError: 374 raise NotImplementedError("manipulating resources is not " 375 "available on your platform") 376 377 raise NotImplementedError("unknown resource") 378 379 def set_resource_limit(self, name, value): 380 try: 381 import resource 382 self.logger.debug("Setting resource {} to {}" 383 .format(name, value)) 384 resource.setrlimit(self.get_resource(name), (value, value)) 385 except ImportError: 386 raise NotImplementedError("manipulating resources is not " 387 "available on your platform") 388 389 def set_resource_limits(self, limits): 390 self.logger.debug("Setting resource limits") 391 for name, value in limits.items(): 392 self.set_resource_limit(name, value) 393 394 def getretcode(self): 395 if self.state != Command.FINISHED: 396 return None 397 else: 398 return self.returncode 399 400 def getoutputstr(self): 401 if self.state == Command.FINISHED: 402 return "".join(self.out).strip() 403 else: 404 return None 405 406 def getoutput(self): 407 if self.state == Command.FINISHED: 408 return self.out 409 else: 410 return None 411 412 def geterroutput(self): 413 return self.err 414 415 def geterroutputstr(self): 416 if self.err: 417 return "".join(self.err).strip() 418 else: 419 return "" 420 421 def getstate(self): 422 return self.state 423 424 def getpid(self): 425 return self.pid 426 427 def log_error(self, msg): 428 if self.state is Command.FINISHED: 429 self.logger.error("{}: command {} in directory {} exited with {}". 430 format(msg, self.cmd, self.work_dir, 431 self.getretcode())) 432 else: 433 self.logger.error("{}: command {} in directory {} ended with " 434 "invalid state {}". 435 format(msg, self.cmd, self.work_dir, self.state)) 436