源码解读Saltstack运行机制之Job Runtime
鄙人深切以为 “在开源的世界里阅读源码才是深层次理解软件运行机制的不二法门”,本系列即是旨在通过解读(当然,鄙人实力有限,只是粗浅的解读)Saltstack源码的方式,了解Saltstack内部的运行机制,对排障和更好理解和二次开发Saltstack有一定助力。
本篇文章即解决”salt 如何从启动minion到执行一个job“的困惑,鄙人才疏学浅,有欠缺的地方还请海涵。
under salt master & minion 0.17.5
一个Salt Minion的启动过程
从init.d下的启动脚本开始追溯可以定位启动代码位于salt下的__init__.py初始脚本里的Minion类,它会执行prepare方法来完成一系列的准备工作:
- 读取配置文件,获取一系列的参数变量;
- 调用Minion或MultiMinion方法,在这一步获取grains数据,并尝试和Master认证,Pillar的编译(?) ; (这里会做一次判断,如果minion配置文件里指向了多个master,则会调用salt.minion.MultiMinion方法来启动多个Minion Server instance,这里暂时只讨论调用Minion方法的情况)
- check_user判断当前调起Minion user是否符合conf文件描述,满足则进行Minion Server的tune in,也就是从初始化完成到真正Online working的微调过程;
- tune_in 是只有Minion Server或者说主Minion进程才能调用的微调方法,其建立Minion Pub\Pull socket,连接到Master的消息总线,并fire event声明该Minion已启动,并开始执行设定的scheduler(如果有),并进入一个无限循环,长期的监听event并去完成对应任务,至此Minion服务算是启动完毕。
def tune_in(self):
'''
Lock onto the publisher. This is the main event
loop for the minion
'''
……
# On first startup execute a state run if configured to do so
self._state_run()
time.sleep(.5)
#wjx add
#here start the loop!!!
loop_interval = int(self.opts['loop_interval'])
while True:
基于Zeromq的消息总线
Salt Master和Minion的交互主要借助Zeromq完成,所有的Job也是通过Zeromq消息总线来下发和接收。Zeromq通信这块相应的被掩盖在底层接口下,目前鄙人暂时未能搞清具体的运作机理,也就相应带过,后面可能专门写文理解一下。
清道夫——Minion Worker
_handle_payload 方法
正如上文所介绍的那样,Salt Minion Server(Main Minion)在每次收到Master下发的Job时均会对应调起一个新的Worker进程(或线程,具体可以在/etc/salt/minion参数里予以设置,默认为 #multiprocessing: True
),这个操作便是在__handle_payload 方法里完成。
#wjx add
#In main loop, Main Minion will call _handle_payload
#if there is some poll data from Master
socks = dict(self.poller.poll(
loop_interval * 1000)
)
if self.socket in socks and socks[self.socket] == zmq.POLLIN:
payload = self.serial.loads(self.socket.recv())
self._handle_payload(payload)
How to handle payload ?
那么,_handle_payload方法如何去处理Master端下发的Job呢?答案看源码便明了一二:
def _handle_payload(self, payload):
'''
Takes a payload from the master publisher
and does whatever the master wants done.
'''
{'aes': self._handle_aes,
'pub': self._handle_pub,
'clear': self._handle_clear}[payload['enc']](payload['load'],
payload['sig'] if 'sig' in payload else None)
实际上是根据传入的payload dict的enc键值来对应调用方法,最常见的应该是_handle_aes方法。然后,Minion便根据本地的minion_master.pub来解密数据,得到data字典(这中间有段代码使用到pub key,但是目前没怎么看懂意思):
try:
data = self.crypticle.loads(load)
解密之后的数据使用_handle_decoded_payload予以对应处理,在这里,Main Minion调用Multiprocessing模块或者Threading模块来唤醒一个新的Worker予以处理对应的job任务:
#wjx add
#There is some difference between Linux and Win
#Multiprocess or Multi-Thread is def in /etc/salt/minion
#also, it will be diff if Master called multi-func like :
#import salt.client
#local = salt.client.LocalClient()
#local.cmd( '*', [ 'test.ping','cmd.run' ], [ [],['w'] ] )
instance = self
if self.opts['multiprocessing']:
if sys.platform.startswith('win'):
# let python reconstruct the minion
# on the other side if we're
# running on windows
instance = None
process = multiprocessing.Process(
target=target, args=(instance, self.opts, data)
)
else:
process = threading.Thread(
target=target, args=(instance, self.opts, data)
)
在Main Minion唤起一个Worker去完成对应Job后,实际上这个Job就只跟Worker Minion有关了,Main Minion重新回到监听event->处理Job这样的一个循环。新的Worker由以下入口(或者_thread_multi_return)开始:
@classmethod
def _thread_return(cls, minion_instance, opts, data):
'''
This method should be used as a threading target,
start the actual minion side execution.
'''
# this seems awkward at first, but it's a workaround for Windows
# multiprocessing communication.
……
这里注意一点,在Minion完成decode过程时便由__load_modules导入了Minion端的 functions(也可以理解为modules)以及 returners 数据,所以此时便可以直接调用Master下发的job数据中指定的function:
def __load_modules(self):
'''
Return the functions and the returners loaded up from the loader
module
'''
self.opts['grains'] = salt.loader.grains(self.opts)
functions = salt.loader.minion_mods(self.opts)
returners = salt.loader.returners(self.opts, functions)
return functions, returners
Minion Worker 调用function执行Job:
#wjx add
#use success tag to judge the job ret
ret = {'success': False}
function_name = data['fun']
if function_name in minion_instance.functions:
try:
func = minion_instance.functions[data['fun']]
args, kwargs = parse_args_and_kwargs(func, data['arg'], data)
sys.modules[func.__module__].__context__['retcode'] = 0
return_data = func(*args, **kwargs)
之后便是Minion Worker反馈数据到Master以及Returner的Job归档:
#wjx add
#send data to master by _return_pub function
ret['jid'] = data['jid']
ret['fun'] = data['fun']
minion_instance._return_pub(ret)
if data['ret']:
ret['id'] = opts['id']
for returner in set(data['ret'].split(',')):
try:
minion_instance.returners['{0}.returner'.format(
returner
)](ret)
except Exception as exc:
log.error(
'The return failed for job {0} {1}'.format(
data['jid'],
exc
)
)
如果一切顺利的话,_thread_return方法成功完成,Worker进程或线程便会自然消亡(到达process.join(),终止)。
总结
除开细节上可能存在的一些模糊和错误之处需要完善,本文算是告一段落,主要就Minion角度考虑从启动到处理Job的整个运转过程,理解这样一个机制可能对一些Minion Worker的异常僵死的处理有一定助力。
Salt的源码本身是一个优秀的Python编程样例,阅读其源码不单有助于理解Salt的运转机制,而且对Python开发也会有一定帮助,可以的话,鄙人会持续进行源码的挖掘。
下篇文章可能会就salt\salt-run\salt-call等模块亦或是zeromq机制展开讨论,具体内容就看鄙人能不能写出来了……