import os
import re
import stat
import time
import json
import errno
import signal
import shutil
import codecs
import collections
import six
import pexpect
import psutil
import ansible_runner.plugins
from .utils import OutputEventFilter, cleanup_artifact_dir
from .exceptions import CallbackError, AnsibleRunnerException
from ansible_runner.output import debug
[docs]class Runner(object):
def __init__(self, config, cancel_callback=None, remove_partials=True,
event_handler=None, finished_callback=None, status_handler=None):
self.config = config
self.cancel_callback = cancel_callback
self.event_handler = event_handler
self.finished_callback = finished_callback
self.status_handler = status_handler
self.canceled = False
self.timed_out = False
self.errored = False
self.status = "unstarted"
self.rc = None
self.remove_partials = remove_partials
[docs] def event_callback(self, event_data):
'''
Invoked for every Ansible event to collect stdout with the event data and store it for
later use
'''
self.last_stdout_update = time.time()
if 'uuid' in event_data:
filename = '{}-partial.json'.format(event_data['uuid'])
partial_filename = os.path.join(self.config.artifact_dir,
'job_events',
filename)
full_filename = os.path.join(self.config.artifact_dir,
'job_events',
'{}-{}.json'.format(event_data['counter'],
event_data['uuid']))
try:
event_data.update(dict(runner_ident=str(self.config.ident)))
try:
with codecs.open(partial_filename, 'r', encoding='utf-8') as read_file:
partial_event_data = json.load(read_file)
event_data.update(partial_event_data)
if self.remove_partials:
os.remove(partial_filename)
except IOError:
debug("Failed to open ansible stdout callback plugin partial data file {}".format(partial_filename))
if self.event_handler is not None:
should_write = self.event_handler(event_data)
else:
should_write = True
for plugin in ansible_runner.plugins:
ansible_runner.plugins[plugin].event_handler(self.config, event_data)
if should_write:
with codecs.open(full_filename, 'w', encoding='utf-8') as write_file:
os.chmod(full_filename, stat.S_IRUSR | stat.S_IWUSR)
json.dump(event_data, write_file)
except IOError as e:
debug("Failed writing event data: {}".format(e))
[docs] def status_callback(self, status):
self.status = status
status_data = dict(status=status, runner_ident=str(self.config.ident))
for plugin in ansible_runner.plugins:
ansible_runner.plugins[plugin].status_handler(self.config, status_data)
if self.status_handler is not None:
self.status_handler(status_data, runner_config=self.config)
[docs] def run(self):
'''
Launch the Ansible task configured in self.config (A RunnerConfig object), returns once the
invocation is complete
'''
self.status_callback('starting')
stdout_filename = os.path.join(self.config.artifact_dir, 'stdout')
command_filename = os.path.join(self.config.artifact_dir, 'command')
try:
os.makedirs(self.config.artifact_dir, mode=0o700)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(self.config.artifact_dir):
pass
else:
raise
os.close(os.open(stdout_filename, os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR))
with codecs.open(command_filename, 'w', encoding='utf-8') as f:
os.chmod(command_filename, stat.S_IRUSR | stat.S_IWUSR)
json.dump(
{'command': self.config.command,
'cwd': self.config.cwd,
'env': self.config.env}, f, ensure_ascii=False
)
if self.config.ident is not None:
cleanup_artifact_dir(os.path.join(self.config.artifact_dir, ".."), self.config.rotate_artifacts)
stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
stdout_handle = OutputEventFilter(stdout_handle, self.event_callback, self.config.suppress_ansible_output, output_json=self.config.json_mode)
if not isinstance(self.config.expect_passwords, collections.OrderedDict):
# We iterate over `expect_passwords.keys()` and
# `expect_passwords.values()` separately to map matched inputs to
# patterns and choose the proper string to send to the subprocess;
# enforce usage of an OrderedDict so that the ordering of elements in
# `keys()` matches `values()`.
expect_passwords = collections.OrderedDict(self.config.expect_passwords)
password_patterns = list(expect_passwords.keys())
password_values = list(expect_passwords.values())
# pexpect needs all env vars to be utf-8 encoded bytes
# https://github.com/pexpect/pexpect/issues/512
# Use a copy so as not to cause problems when serializing the job_env.
env = {
k: v.encode('utf-8') if k != 'PATH' and isinstance(v, six.text_type) else v
for k, v in self.config.env.items()
}
self.status_callback('running')
self.last_stdout_update = time.time()
try:
child = pexpect.spawn(
self.config.command[0],
self.config.command[1:],
cwd=self.config.cwd,
env=env,
ignore_sighup=True,
encoding='utf-8',
echo=False,
use_poll=self.config.pexpect_use_poll,
)
child.logfile_read = stdout_handle
except pexpect.exceptions.ExceptionPexpect as e:
child = collections.namedtuple(
'MissingProcess', 'exitstatus isalive'
)(
exitstatus=127,
isalive=lambda: False
)
def _decode(x):
return x.decode('utf-8') if six.PY2 else x
# create the events directory (the callback plugin won't run, so it
# won't get created)
events_directory = os.path.join(self.config.artifact_dir, 'job_events')
if not os.path.exists(events_directory):
os.mkdir(events_directory, 0o700)
stdout_handle.write(_decode(str(e)))
stdout_handle.write(_decode('\n'))
job_start = time.time()
while child.isalive():
result_id = child.expect(password_patterns,
timeout=self.config.pexpect_timeout,
searchwindowsize=100)
password = password_values[result_id]
if password is not None:
child.sendline(password)
self.last_stdout_update = time.time()
if self.cancel_callback:
try:
self.canceled = self.cancel_callback()
except Exception as e:
# TODO: logger.exception('Could not check cancel callback - cancelling immediately')
#if isinstance(extra_update_fields, dict):
# extra_update_fields['job_explanation'] = "System error during job execution, check system logs"
raise CallbackError("Exception in Cancel Callback: {}".format(e))
if self.config.job_timeout and not self.canceled and (time.time() - job_start) > self.config.job_timeout:
self.timed_out = True
# if isinstance(extra_update_fields, dict):
# extra_update_fields['job_explanation'] = "Job terminated due to timeout"
if self.canceled or self.timed_out or self.errored:
Runner.handle_termination(child.pid, is_cancel=self.canceled)
if self.config.idle_timeout and (time.time() - self.last_stdout_update) > self.config.idle_timeout:
child.close(True)
self.timed_out = True
stdout_handle.flush()
stdout_handle.close()
if self.canceled:
self.status_callback('canceled')
elif child.exitstatus == 0 and not self.timed_out:
self.status_callback('successful')
elif self.timed_out:
self.status_callback('timeout')
else:
self.status_callback('failed')
self.rc = child.exitstatus if not self.timed_out else 254
for filename, data in [
('status', self.status),
('rc', self.rc),
]:
artifact_path = os.path.join(self.config.artifact_dir, filename)
if not os.path.exists(artifact_path):
os.close(os.open(artifact_path, os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR))
with open(artifact_path, 'w') as f:
f.write(str(data))
if self.config.directory_isolation_path and self.config.directory_isolation_cleanup:
shutil.rmtree(self.config.directory_isolation_path)
if self.finished_callback is not None:
try:
self.finished_callback(self)
except Exception as e:
raise CallbackError("Exception in Finished Callback: {}".format(e))
return self.status, self.rc
@property
def stdout(self):
'''
Returns an open file handle to the stdout representing the Ansible run
'''
stdout_path = os.path.join(self.config.artifact_dir, 'stdout')
if not os.path.exists(stdout_path):
raise AnsibleRunnerException("stdout missing")
return open(os.path.join(self.config.artifact_dir, 'stdout'), 'r')
@property
def events(self):
'''
A generator that will return all ansible job events in the order that they were emitted from Ansible
Example:
{
"event":"runner_on_ok",
"uuid":"00a50d9c-161a-4b74-b978-9f60becaf209",
"stdout":"ok: [localhost] => {\\r\\n \\" msg\\":\\"Test!\\"\\r\\n}",
"counter":6,
"pid":740,
"created":"2018-04-05T18:24:36.096725",
"end_line":10,
"start_line":7,
"event_data":{
"play_pattern":"all",
"play":"all",
"task":"debug",
"task_args":"msg=Test!",
"remote_addr":"localhost",
"res":{
"msg":"Test!",
"changed":false,
"_ansible_verbose_always":true,
"_ansible_no_log":false
},
"pid":740,
"play_uuid":"0242ac11-0002-443b-cdb1-000000000006",
"task_uuid":"0242ac11-0002-443b-cdb1-000000000008",
"event_loop":null,
"playbook_uuid":"634edeee-3228-4c17-a1b4-f010fdd42eb2",
"playbook":"test.yml",
"task_action":"debug",
"host":"localhost",
"task_path":"/tmp/demo/project/test.yml:3"
}
}
'''
event_path = os.path.join(self.config.artifact_dir, 'job_events')
if not os.path.exists(event_path):
raise AnsibleRunnerException("events missing")
dir_events = os.listdir(event_path)
dir_events_actual = []
for each_file in dir_events:
if re.match("^[0-9]+-.+json$", each_file):
dir_events_actual.append(each_file)
dir_events_actual.sort(key=lambda filenm: int(filenm.split("-", 1)[0]))
for event_file in dir_events_actual:
with codecs.open(os.path.join(event_path, event_file), 'r', encoding='utf-8') as event_file_actual:
event = json.load(event_file_actual)
yield event
@property
def stats(self):
'''
Returns the final high level stats from the Ansible run
Example:
{'dark': {}, 'failures': {}, 'skipped': {}, 'ok': {u'localhost': 2}, 'processed': {u'localhost': 1}}
'''
last_event = list(filter(lambda x: 'event' in x and x['event'] == 'playbook_on_stats',
self.events))
if not last_event:
return None
last_event = last_event[0]['event_data']
return dict(skipped=last_event['skipped'],
ok=last_event['ok'],
dark=last_event['dark'],
failures=last_event['failures'],
processed=last_event['processed'])
[docs] def host_events(self, host):
'''
Given a host name, this will return all task events executed on that host
'''
all_host_events = filter(lambda x: 'event_data' in x and 'host' in x['event_data'] and x['event_data']['host'] == host,
self.events)
return all_host_events
[docs] @classmethod
def handle_termination(cls, pid, is_cancel=True):
'''
Internal method to terminate a subprocess spawned by `pexpect` representing an invocation of runner.
:param pid: the process id of the running the job.
:param is_cancel: flag showing whether this termination is caused by
instance's cancel_flag.
'''
try:
main_proc = psutil.Process(pid=pid)
child_procs = main_proc.children(recursive=True)
for child_proc in child_procs:
try:
os.kill(child_proc.pid, signal.SIGKILL)
except (TypeError, OSError):
pass
os.kill(main_proc.pid, signal.SIGKILL)
except (TypeError, psutil.Error, OSError):
try:
os.kill(pid, signal.SIGKILL)
except (OSError):
pass
[docs] def get_fact_cache(self, host):
'''
Get the entire fact cache only if the fact_cache_type is 'jsonfile'
'''
if self.config.fact_cache_type != 'jsonfile':
raise Exception('Unsupported fact cache type. Only "jsonfile" is supported for reading and writing facts from ansible-runner')
fact_cache = os.path.join(self.config.fact_cache, host)
if os.path.exists(fact_cache):
with open(fact_cache) as f:
return json.loads(f.read())
return {}
[docs] def set_fact_cache(self, host, data):
'''
Set the entire fact cache data only if the fact_cache_type is 'jsonfile'
'''
if self.config.fact_cache_type != 'jsonfile':
raise Exception('Unsupported fact cache type. Only "jsonfile" is supported for reading and writing facts from ansible-runner')
fact_cache = os.path.join(self.config.fact_cache, host)
if not os.path.exists(os.path.dirname(fact_cache)):
os.makedirs(os.path.dirname(fact_cache), mode=0o700)
with open(fact_cache, 'w') as f:
return f.write(json.dumps(data))