7.13 基于 ansible-api 二次开发¶
长久以来,IT 运维在企业内部一直是个耗人耗力的事情。随着虚拟化的大量应用、私有云、容器的不断普及,数据中心内部的压力愈发增加。传统的自动化工具,往往是面向于数据中心特定的一类对象,例如操作系统、虚拟化、网络设备的自动化运维工具往往是不同的。那么,有没有一种数据中心级别的统一的自动化运维工具呢?
答案就是Ansible。和传统的自动化工具 (如 Puppet)相比,Ansible 尤其明显的优势:
简单,是一种高级的脚本类语言,而非标准语言。
不需要安装 agent, 分为管理节点和远程被管节点,通过 SSH 认证。
纳管范围广泛,不仅仅是操作系统,还包括各种虚拟化、公有云,甚至网络设备。
用过 ansible 的人都知道,ansible 一般都是通过命令行执行的:ansible、ansible-playbook、ansible-doc、ansible-galaxy、ansible-console 等。用执行命令的方式,整个过程的所有信息都会实时的打印在终端屏幕。这样的好处是,反馈实时,可以观察整个部署过程。缺点就是信息过多,难以获取有效的信息。
这时候,我们自然想到可以直接调用 ansible 提供的接口进行开发。
通过搜索引擎查询有关 ansible 二次开发的相关资料,发现这方面的文章非常少,而且由于 ansible 一直在进行代码的重构优化,有很多网友提供的 demo 已不适用我当前使用的版本,一直跑不通。
本想从官方文档中查得一些资料,结果发现官网中对于这块也没有很详细的介绍。
后来想想,ansible、ansible-playbook、ansible-doc 这些命令不也是基于 ansible 的 api 开发出来的吗?
我不是可以模仿他们,来调用 ansible api 接口。
以下通过阅读 ansible cli 的源码,总结出来的。
如何调用 Playbook¶
我习惯性地将调用 playbook 的代码封装为一个函数。
def playbook_task():
try:
# 执行 playbook 的各项参数
cli = PlaybookCLI(args=["", "you_task.yml"])
cli.parse()
super(PlaybookCLI, cli).run()
loader, inventory, variable_manager = cli._play_prereqs(cli.options)
CLI.get_host_list(inventory, cli.options.subset)
pbex = PlaybookExecutor(playbooks=cli.args, inventory=inventory,
variable_manager=variable_manager,
loader=loader,
options=cli.options,
passwords=None)
# 【重点】指定回调实例
pbex._tqm._stdout_callback = ResultCallback()
exit_code = pbex.run()
except Exception as e:
LOG.exception('Failed to run playbook:{}'.format(e))
以上的代码相对固定,你可以直接拷贝直接用。除了两个位置你要注意一下
第一个注释处:按需要对应添加 playbook 的参数
第二个注释处:指定你自定义的回调实例
定义回调类
from ansible.plugins.callback.default import CallbackModule
class ResultCallback(CallbackModule):
def __init__(self):
self.display_failed_stderr = False
self.display_ok_hosts = True
super(ResultCallback, self).__init__()
def v2_runner_on_failed(self, result, ignore_errors=False, **kwargs):
super(ResultCallback, self).v2_runner_on_failed(result, ignore_errors, **kwargs)
host = result._host.get_name()
self.runner_on_failed(host, result._result, ignore_errors)
if not ignore_errors:
if "ws" not in host:
host = "ws_" + host.replace("_host", "")
if host in task_result:
task_result[host]["detailStatus"] = "FAIL"
task_result[host]["detailMsg"] = result.task_name + ": " + result._result.get("msg", "").encode("utf-8")
else:
for host, _ in task_result.items():
task_result[host]["detailStatus"] = "FAIL"
task_result[host]["detailMsg"] = result.task_name + ": " + result._result.get("msg", "").encode("utf-8")
def v2_runner_on_unreachable(self, result):
super(ResultCallback, self).v2_runner_on_unreachable(result)
host = result._host.get_name()
self.runner_on_unreachable(host, result._result)
if "ws" not in host:
host = "ws_" + host.replace("_host", "")
task_result[host]["detailStatus"] = "FAIL"
task_result[host]["detailMsg"] = result.task_name + ": " + result._result.get("msg", "").encode("utf-8")
def v2_runner_on_skipped(self, result):
super(ResultCallback, self).v2_runner_on_skipped(result)
if C.DISPLAY_SKIPPED_HOSTS:
host = result._host.get_name()
self.runner_on_skipped(host, self._get_item(getattr(result._result, 'results', {})))
并实现其中几个关键的方法
v2_runner_on_ok:单个部署任务成功时,会调用
v2_runner_on_failed:节点的部署任务失败时,会调用
v2_runner_on_unreachable:节点不可达时,会调用
v2_runner_on_skipped:部署任务跳过时,会调用
v2_playbook_on_stats:所有的部署任务完成时,调用