- Published on
【联邦学习之旅】C1 FATE Flow 流程源码解析
FATE 作为目前最受欢迎的联邦学习开源项目,直接从源码来进行学习是非常好的途径。本文将从代码的角度来介绍 FATE 中的调度器 FATE Flow 的工作原理。
注:本文基于 FATE 1.5.0 版本,后续版本的代码将另外标注出变化。
分析与总结
因为文章太长,所以写在前面(笑)
不出意外的话,这篇文章截止发布时应该是全网最详尽的 FATE Flow 介绍和源码解析的文章。因为之前也自己实现过基于 DAG 的分布式调度器,所以看到很多当年的设计出现在 FATE 这样一个广受欢迎的开源框架中,感觉还是挺自豪的。当然了,也发现了不少可以优化和改进的地方,比如可以支持单步执行、执行到某个节点等更丰富的执行模式;比如可以更加优化代码的组织逻辑,而不是像现在这样纸包鸡包纸包鸡;比如可以采用更加统一的错误码及报错说明便于调试和排查错误;比如可以增加更丰富的调度器配置兼容不同的使用场景;诸如此类,不一而足。
不过话说回来,Talk is Cheap, Show Me The Code。FATE 能在这么短的时间内拿出这么大工程量的项目,且完成度很高,真的说明微众我的前同事们工作真的非常辛苦且卓有成效。道路是曲折的,前途是光明的。
后面我还会继续写一下联邦学习算法是如何被调度执行的(这部分在本文中非常简略),希望能对大家有所帮助。
FATE 总体架构
本文主要介绍 FATE Flow 的核心流程代码,作为核心调度器模块,会和系统中其他各个组件有较多交互,所以我们先来简单介绍一下整体的系统架构,方便后面说明和理解。

具体各个模块的说明如下:
- FATE Flow: 联邦学习的任务流水线管理模块(通俗理解就是调度器)
- FederatedML: 联邦机器学习的 Python 实现包(类比 scikit-learn)
- Cluster Manager: 集群管理器
- Node Manager: 节点管理器,管理每台机器的计算资源
- RollSite: 跨 Party 通讯组件,以前的版本里叫 Proxy+Federation
- Mysql: 数据库,FATE Flow 和 Cluster Manager 的数据在此存储
组件比较多,可以先有一个简单的了解,后面会跟随代码介绍各个模块的在代码中的交互关系。
FATE Flow 架构
FATE Flow 在 1.5 版本中有了一定的优化和增强,这里我们循序渐进介绍一下,下图是较早版本的架构,但仍然有参考意义:

各个模块的功能如下(来自 构建端到端的联邦学习 Pipeline 生产服务):
- DSL Parser:是调度的核心,通过 DSL parser 解析到一个计算任务的上下游关系及依赖等。
- Job Scheduler:是 DAG 层面的调度,把 DAG 作为一个 Job,DAG 里面的节点执行称为 task,也就是说一个 Job 会包含若干个 task
- Federated Task Scheduler:最小调度粒度就是 task,需要调度多方运行同一个组件但参数算法不同的 task,结束后,继续调度下一个组件,这里就会涉及到协同调度
- Job Controller:联邦任务控制器
- Executor:联邦任务执行节点,支持不同的 Operator 容器,现在支持 Python 和 Script 的 Operator。Executor,在我们目前的应用中拉起 FederatedML 定义的一些组件,如 data io 数据输入输出,特征选择等模块,每次调起一个组件去执行,然后,这些组件会调用基础架构的 API,如 Storage 和 Federation Service (API 的抽象) ,再经过 Proxy 就可以和对端的 FATE-Flow 进行协同调度。
- 注:这里还用老版本的说明,即 Proxy+Federation,最新版本统一为 RollSite
- Tracking Manager:任务输入输出的实时追踪,包括每个 task 输出的 data 和 model。
- Model Manager:联邦模型管理器

联邦学习任务多方协同调度的流程:
首先,是以任务提交的一种方式,提交任务到 Queue,然后 JobScheduler 会把这个任务拿出来给到 Federated TaskScheduler 调度,Federated TaskScheduler 通过 Parser 取得下游 N 个无依赖的 Component,再调度 Executor (由两部分组成:Tracking Manager 和 Task) 执行,同时这个任务会分发到联邦学习的各个参与方 Host。联邦参与方取得任务,如果是 New Job,则放入队列(参与方会定期调度队列中的 Job),否则启动多个 Executor 执行,Executor 在 run 的过程中,会利用 Federation API 进行联邦学习中的参数交互,对一个联邦学习任务,每一方的 Job id 是保持一致的,每跑一个 Component,它的 Task id 也是一致的。每个 Task 跑完 Initiator TaskScheduler 会收集各方的状态,进行下一步的调度。对于下一步的调度策略我们支持:all_succss,all_done,one_succss 等策略。由于基于 Task 为最小的调度单位,所以很容易实现 rerun,specified_task_run 等特定运行。

分以下几个部分:
- Task stat:Task 状态信息,如启动时间、运行状态、结束时间、超时时间等
- Task run process:Task 运行进程
- Life cron checker:Task 生命周期定时检测
- Job controller:联邦任务控制器
- Shutdown:kill process、清理任务以及同步指令到所有联邦参与方,保证联邦任务状态一致性
启动 Shutdown 的条件:
- 若 Task 运行时间超过配置超时时间或默认超时时间(一般较长),启动 Shutdown
- 若 Task 运行进程异常终止,启动 Shutdown
- 若 Task 正常运行终止,启动 Shutdown
最后,在 1.5.0 版本中的优化如下:

上面主要是基于官方的各类说明材料,比较抽象的架构图我们就介绍到这里,接下来我们就从代码入手,看看具体的实现吧。
源码框架流程
源码框架流程部分相对来说比较硬核和枯燥,我会尽量简化非必要的细节,点出关键要点,方便大家理解。首先,我们来看看代码的入口。
注:大部分相关代码均位于 python/fate_flow 文件夹中,少部分会位于 python/fate_arch 文件夹中。
FATE Flow Server
- 代码文件:
python/fate_flow/fate_flow_server.py - 所用 Web 框架:Flask
熟悉 Flask 的朋友都知道,这是一个轻量级的 Python Web 框架,底层是基于 werkzeug,实际上也是通过 werkzeug 来提供并发支持的,具体启动的代码位于 113-121 行:
try:
run_simple(hostname=IP, port=HTTP_PORT, application=app, threaded=True)
stat_logger.info("FATE Flow server start Successfully")
except OSError as e:
traceback.print_exc()
os.kill(os.getpid(), signal.SIGKILL)
except Exception as e:
traceback.print_exc()
os.kill(os.getpid(), signal.SIGKILL)
这里我们尤其要关注 run_simple 这个函数,这里采用 threaded=True 这个配置,说明 server 是以单进程多线程的方式启动,来处理并发的请求的。更多关于 werkzeug 的材料可以在文章最后的参考链接中找到,这里就不展开了。
接下来我们详细看看这个 Web Server 提供哪些功能,具体的功能都分散在不同的模块中,在 app 这个变量初始化是统一引入(这也是 Flask 的常用写法),具体代码位于 71-87 行:
app = DispatcherMiddleware(
manager,
{
'/{}/data'.format(API_VERSION): data_access_app_manager,
'/{}/model'.format(API_VERSION): model_app_manager,
'/{}/job'.format(API_VERSION): job_app_manager,
'/{}/table'.format(API_VERSION): table_app_manager,
'/{}/tracking'.format(API_VERSION): tracking_app_manager,
'/{}/pipeline'.format(API_VERSION): pipeline_app_manager,
'/{}/permission'.format(API_VERSION): permission_app_manager,
'/{}/version'.format(API_VERSION): version_app_manager,
'/{}/party'.format(API_VERSION): party_app_manager,
'/{}/initiator'.format(API_VERSION): initiator_app_manager,
'/{}/tracker'.format(API_VERSION): tracker_app_manager,
'/{}/forward'.format(API_VERSION): proxy_app_manager
}
)
这样的代码组织也使得我们只需要看对应不同 manager 的代码就能了解不同模块的功能,很好很合理。具体各个模块的功能如下(后面会分别详细说明):
- apps 文件夹
data_access_app_manager提供数据集上传、下载、查询等功能job_app_manager【核心模块】提供 Job 和 Task 的提交、执行、查询、配置等功能model_app_manager提供模型的载入、迁移、发布等功能,主要用于在线预测permission_app_manager提供权限验证相关功能pipeline_app_manager提供解析 DAG 各个组件依赖关系的功能proxy_app_manager提供各 Party 间通信及调用功能table_app_manager提供数据表的新增、删除等功能tracking_app_manager提供 component 相关状态、数据等查询、下载功能version_app_manager提供 FATE 相关版本查询功能
scheduler_apps文件夹initiator_app_manager提供在角色为 initiator 的 Party 方进行 Job 重新执行、停止和更新状态等功能party_app_manager【核心模块】提供在各个 Party 执行 Job 相关动作的功能tracker_app_manager提供查询 component 执行状态、结果和输出数据等功能
这里大家可能有一点疑惑,这里的 job_app_manager 和 party_app_manager 提供的功能似乎是相似的,其实不一样,一个是对外的接口,一个是对内的接口,具体如下:
job_app_manager提供的是对外使用的接口,可以被 flow client 或 http api 直接调用party_app_manager提供的是对内使用的接口,主要被内部调度器调用
接下来就是各类配置和服务的初始化及后台启动,具体代码位于 98-111 行,这里直接通过注释来说明代码功能:
# 运行时配置初始化
RuntimeConfig.init_env()
RuntimeConfig.set_process_role(ProcessRole.DRIVER)
# 鉴权模块初始化
PrivilegeAuth.init()
# 服务注册
ServiceUtils.register()
# 资源管理器初始化
ResourceManager.initialize()
# 任务探测器启动,每 5 秒执行一次
Detector(interval=5 * 1000).start()
# DAG 调度器启动,每 2 秒执行一次
DAGScheduler(interval=2 * 1000).start()
# 启动 grpc server,用于联邦任务调度和执行
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=[(cygrpc.ChannelArgKey.max_send_message_length, -1),
(cygrpc.ChannelArgKey.max_receive_message_length, -1)])
proxy_pb2_grpc.add_DataTransferServiceServicer_to_server(UnaryService(), server)
server.add_insecure_port("{}:{}".format(IP, GRPC_PORT))
server.start()
这里我们需要解开最后一个疑惑,为什么要另外启动一个 GRPC server?简单来说,FATE 会通过这个 GRPC Server 完成各个 Party 之间的函数调用,也就是说所有的 http 接口都通过本地调用,不同 Party 间的函数调用统一通过 grpc 的方式进行,而这个 grpc 的调用逻辑也很简单,实际上是通过解析 grpc 请求中的参数,对应再次调用上述 http 接口(主要是 party_app_manager)中的接口。关于 GRPC Server 的具体说明会在 文件夹 utils 一章详细介绍,这里只需要有粗略了解即可。
为了更便于大家理解具体的执行流程,接下来一节会以一个 Job 从提交到完成执行来进行说明各个模块的执行顺序和逻辑,因为步骤比较多,所以更具体的源码分析请参考后面 源码分析 章节。
Job 的生命周期
通过了解一个 Job 从提交到完成执行的各个步骤,基本可以掌握 FATE Flow 的关键所在,就像一条中轴线,其他的所有模块都是配合这条线而存在的。废话不多说,我们直接开始:
- 【任务提交】无论是通过 flow client 还是 http api 提交任务,实际上执行的都是
apps/job_app.py中第 45 行的submit_job函数,在检查完 job 运行配置后,调用DAGScheduler.submit()完成任务提交(具体逻辑参考后文dag_scheduler.py的说明。具体执行的任务简单来说就是:生成 JobID -> 通知各 Party 创建 Job -> 各方均创建成功后,任务提交成功。提交成功后将由 DAGSchudler 进行调度执行,具体的调度逻辑在下一节会详细说明,这里主要围绕 Job 本身的流程进行介绍。 - 【等待 Job 调度】创建完成之后,Job 的状态为 waiting,在
DAGScheduler.run_do函数中会从数据库中找到状态为 waiting 的任务,并通过DAGScheduler.schedule_waiting_jobs函数进行调度 - 【尝试启动 Job】若该 Job 开始被调度,则首先会向各 Party 通过
FederatedScheduler.resource_for_job函数进行计算资源申请,若申请成功则通过DAGScheduler.start_job函数启动 Job;若某方没有足够的计算资源(注意:这个和申请失败不一样),则已经申请的资源需要退回;若资源申请失败,则通过DAGScheduler.stop_job函数停止 Job。如果start_job成功完成,Job 的状态将变为 running - 【等待 Task 调度】Job 状态变为 running 之后,就会被
DAGScheduler.schedule_running_jobs函数进行调度,实际调用的是TaskScheduler.schedule函数 - 【尝试调度 Task】若该 Task 开始被调度,则会通过
FederatedScheduler.start_task函数在各方启动该 Task,实际上就是发起 grpc 调用,主要被调用的就是party_app_manager所提供的内部接口 - 【尝试启动 Task】被上一步 grpc 调用的接口是
party_app.py中的start_task函数,实际执行的是TaskController.start_task函数,在底层是 EGGROLL 的情况下,就是通过 shell 执行对应的 python 脚本(通过job_utils.run_subprocess函数执行) - 【执行 Task】具体 Task 的执行是通过
TaskExecutor.run_task函数进行的,在这里我们将解析 job 和 task 的上下文和配置,并通过run_object.run函数进行执行。在这里因为是另外一个进程,所以是同步执行的,等 Task 执行完成后,会保存 data 和 model 并更新 Task 状态,便于调度器继续执行。注:具体任务的执行就不在这里展开讲了,后面会结合算法的开发另写一篇。 - 【完成 Job】每次进行 task 调度时,都会通过
DAGScheduler.calculate_job_status函数来确定 Job 的状态,如果全部 Task 都 Success,那么 Job 的状态也变为 Success。至此,Job 执行完成。
注:这里因为篇幅关系,省略了部分细节,如状态更新、任务取消等,感兴趣的同学可以自行研究。
源码分析
这部分是具体各个文件的代码逻辑,建议配合上面 Job 的生命周期 阅读,当做细节查看手册,方便理解。
重点需要关注 federated_scheduler.py,只要理解了如何在多方同步调度,那么其他部分应该都可以迎刃而解了。
(此处省略详细的源码分析,原文非常长,主要包含以下文件的代码逻辑说明:controller/job_controller.py、controller/task_controller.py、scheduler/dag_scheduler.py、scheduler/task_scheduler.py、scheduler/federated_scheduler.py、scheduler_apps/party_app.py、operation/job_saver.py、utils/grpc_utils.py、utils/api_utils.py、utils/job_utils.py)
参考链接
FATE 相关
- 构建端到端的联邦学习 Pipeline 生产服务
- 精华合集 | 联邦学习 FATE 从入门到精通
- FATE 1.5.0 版本源码
- Fate AllinOne部署指南
- 圆桌会 | 第五期:深度解读FATE 1.5 LTS版本系统架构
- FATE 算法运行核心流程--fateflow(老版本)
Werkzeug 相关