Published on

【联邦学习之旅】C1 FATE Flow 流程源码解析

FATE 作为目前最受欢迎的联邦学习开源项目,直接从源码来进行学习是非常好的途径。本文将从代码的角度来介绍 FATE 中的调度器 FATE Flow 的工作原理。

注:本文基于 FATE 1.5.0 版本,后续版本的代码将另外标注出变化。

分析与总结

因为文章太长,所以写在前面(笑)

不出意外的话,这篇文章截止发布时应该是全网最详尽的 FATE Flow 介绍和源码解析的文章。因为之前也自己实现过基于 DAG 的分布式调度器,所以看到很多当年的设计出现在 FATE 这样一个广受欢迎的开源框架中,感觉还是挺自豪的。当然了,也发现了不少可以优化和改进的地方,比如可以支持单步执行、执行到某个节点等更丰富的执行模式;比如可以更加优化代码的组织逻辑,而不是像现在这样纸包鸡包纸包鸡;比如可以采用更加统一的错误码及报错说明便于调试和排查错误;比如可以增加更丰富的调度器配置兼容不同的使用场景;诸如此类,不一而足。

不过话说回来,Talk is Cheap, Show Me The Code。FATE 能在这么短的时间内拿出这么大工程量的项目,且完成度很高,真的说明微众我的前同事们工作真的非常辛苦且卓有成效。道路是曲折的,前途是光明的。

后面我还会继续写一下联邦学习算法是如何被调度执行的(这部分在本文中非常简略),希望能对大家有所帮助。

FATE 总体架构

本文主要介绍 FATE Flow 的核心流程代码,作为核心调度器模块,会和系统中其他各个组件有较多交互,所以我们先来简单介绍一下整体的系统架构,方便后面说明和理解。

具体各个模块的说明如下:

  • FATE Flow: 联邦学习的任务流水线管理模块(通俗理解就是调度器)
    • FederatedML: 联邦机器学习的 Python 实现包(类比 scikit-learn)
  • Cluster Manager: 集群管理器
  • Node Manager: 节点管理器,管理每台机器的计算资源
  • RollSite: 跨 Party 通讯组件,以前的版本里叫 Proxy+Federation
  • Mysql: 数据库,FATE Flow 和 Cluster Manager 的数据在此存储

组件比较多,可以先有一个简单的了解,后面会跟随代码介绍各个模块的在代码中的交互关系。

FATE Flow 架构

FATE Flow 在 1.5 版本中有了一定的优化和增强,这里我们循序渐进介绍一下,下图是较早版本的架构,但仍然有参考意义:

各个模块的功能如下(来自 构建端到端的联邦学习 Pipeline 生产服务):

  • DSL Parser:是调度的核心,通过 DSL parser 解析到一个计算任务的上下游关系及依赖等。
  • Job Scheduler:是 DAG 层面的调度,把 DAG 作为一个 Job,DAG 里面的节点执行称为 task,也就是说一个 Job 会包含若干个 task
  • Federated Task Scheduler:最小调度粒度就是 task,需要调度多方运行同一个组件但参数算法不同的 task,结束后,继续调度下一个组件,这里就会涉及到协同调度
  • Job Controller:联邦任务控制器
  • Executor:联邦任务执行节点,支持不同的 Operator 容器,现在支持 Python 和 Script 的 Operator。Executor,在我们目前的应用中拉起 FederatedML 定义的一些组件,如 data io 数据输入输出,特征选择等模块,每次调起一个组件去执行,然后,这些组件会调用基础架构的 API,如 Storage 和 Federation Service (API 的抽象) ,再经过 Proxy 就可以和对端的 FATE-Flow 进行协同调度。
    • 注:这里还用老版本的说明,即 Proxy+Federation,最新版本统一为 RollSite
  • Tracking Manager:任务输入输出的实时追踪,包括每个 task 输出的 data 和 model。
  • Model Manager:联邦模型管理器

联邦学习任务多方协同调度的流程:

首先,是以任务提交的一种方式,提交任务到 Queue,然后 JobScheduler 会把这个任务拿出来给到 Federated TaskScheduler 调度,Federated TaskScheduler 通过 Parser 取得下游 N 个无依赖的 Component,再调度 Executor (由两部分组成:Tracking Manager 和 Task) 执行,同时这个任务会分发到联邦学习的各个参与方 Host。联邦参与方取得任务,如果是 New Job,则放入队列(参与方会定期调度队列中的 Job),否则启动多个 Executor 执行,Executor 在 run 的过程中,会利用 Federation API 进行联邦学习中的参数交互,对一个联邦学习任务,每一方的 Job id 是保持一致的,每跑一个 Component,它的 Task id 也是一致的。每个 Task 跑完 Initiator TaskScheduler 会收集各方的状态,进行下一步的调度。对于下一步的调度策略我们支持:all_succssall_doneone_succss 等策略。由于基于 Task 为最小的调度单位,所以很容易实现 rerunspecified_task_run 等特定运行。

分以下几个部分:

  • Task stat:Task 状态信息,如启动时间、运行状态、结束时间、超时时间等
  • Task run process:Task 运行进程
  • Life cron checker:Task 生命周期定时检测
  • Job controller:联邦任务控制器
  • Shutdown:kill process、清理任务以及同步指令到所有联邦参与方,保证联邦任务状态一致性

启动 Shutdown 的条件:

  • 若 Task 运行时间超过配置超时时间或默认超时时间(一般较长),启动 Shutdown
  • 若 Task 运行进程异常终止,启动 Shutdown
  • 若 Task 正常运行终止,启动 Shutdown

最后,在 1.5.0 版本中的优化如下:

上面主要是基于官方的各类说明材料,比较抽象的架构图我们就介绍到这里,接下来我们就从代码入手,看看具体的实现吧。

源码框架流程

源码框架流程部分相对来说比较硬核和枯燥,我会尽量简化非必要的细节,点出关键要点,方便大家理解。首先,我们来看看代码的入口。

注:大部分相关代码均位于 python/fate_flow 文件夹中,少部分会位于 python/fate_arch 文件夹中。

FATE Flow Server

  • 代码文件:python/fate_flow/fate_flow_server.py
  • 所用 Web 框架:Flask

熟悉 Flask 的朋友都知道,这是一个轻量级的 Python Web 框架,底层是基于 werkzeug,实际上也是通过 werkzeug 来提供并发支持的,具体启动的代码位于 113-121 行:

try:
    run_simple(hostname=IP, port=HTTP_PORT, application=app, threaded=True)
    stat_logger.info("FATE Flow server start Successfully")
except OSError as e:
    traceback.print_exc()
    os.kill(os.getpid(), signal.SIGKILL)
except Exception as e:
    traceback.print_exc()
    os.kill(os.getpid(), signal.SIGKILL)

这里我们尤其要关注 run_simple 这个函数,这里采用 threaded=True 这个配置,说明 server 是以单进程多线程的方式启动,来处理并发的请求的。更多关于 werkzeug 的材料可以在文章最后的参考链接中找到,这里就不展开了。

接下来我们详细看看这个 Web Server 提供哪些功能,具体的功能都分散在不同的模块中,在 app 这个变量初始化是统一引入(这也是 Flask 的常用写法),具体代码位于 71-87 行:

app = DispatcherMiddleware(
    manager,
    {
        '/{}/data'.format(API_VERSION): data_access_app_manager,
        '/{}/model'.format(API_VERSION): model_app_manager,
        '/{}/job'.format(API_VERSION): job_app_manager,
        '/{}/table'.format(API_VERSION): table_app_manager,
        '/{}/tracking'.format(API_VERSION): tracking_app_manager,
        '/{}/pipeline'.format(API_VERSION): pipeline_app_manager,
        '/{}/permission'.format(API_VERSION): permission_app_manager,
        '/{}/version'.format(API_VERSION): version_app_manager,
        '/{}/party'.format(API_VERSION): party_app_manager,
        '/{}/initiator'.format(API_VERSION): initiator_app_manager,
        '/{}/tracker'.format(API_VERSION): tracker_app_manager,
        '/{}/forward'.format(API_VERSION): proxy_app_manager
    }
)

这样的代码组织也使得我们只需要看对应不同 manager 的代码就能了解不同模块的功能,很好很合理。具体各个模块的功能如下(后面会分别详细说明):

  • apps 文件夹
    • data_access_app_manager 提供数据集上传、下载、查询等功能
    • job_app_manager【核心模块】提供 Job 和 Task 的提交、执行、查询、配置等功能
    • model_app_manager 提供模型的载入、迁移、发布等功能,主要用于在线预测
    • permission_app_manager 提供权限验证相关功能
    • pipeline_app_manager 提供解析 DAG 各个组件依赖关系的功能
    • proxy_app_manager 提供各 Party 间通信及调用功能
    • table_app_manager 提供数据表的新增、删除等功能
    • tracking_app_manager 提供 component 相关状态、数据等查询、下载功能
    • version_app_manager 提供 FATE 相关版本查询功能
  • scheduler_apps 文件夹
    • initiator_app_manager 提供在角色为 initiator 的 Party 方进行 Job 重新执行、停止和更新状态等功能
    • party_app_manager【核心模块】提供在各个 Party 执行 Job 相关动作的功能
    • tracker_app_manager提供查询 component 执行状态、结果和输出数据等功能

这里大家可能有一点疑惑,这里的 job_app_managerparty_app_manager 提供的功能似乎是相似的,其实不一样,一个是对外的接口,一个是对内的接口,具体如下:

  • job_app_manager 提供的是对外使用的接口,可以被 flow client 或 http api 直接调用
  • party_app_manager 提供的是对内使用的接口,主要被内部调度器调用

接下来就是各类配置和服务的初始化及后台启动,具体代码位于 98-111 行,这里直接通过注释来说明代码功能:

# 运行时配置初始化
RuntimeConfig.init_env()
RuntimeConfig.set_process_role(ProcessRole.DRIVER)
# 鉴权模块初始化
PrivilegeAuth.init()
# 服务注册
ServiceUtils.register()
# 资源管理器初始化
ResourceManager.initialize()
# 任务探测器启动,每 5 秒执行一次
Detector(interval=5 * 1000).start()
# DAG 调度器启动,每 2 秒执行一次
DAGScheduler(interval=2 * 1000).start()
# 启动 grpc server,用于联邦任务调度和执行
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                     options=[(cygrpc.ChannelArgKey.max_send_message_length, -1),
                              (cygrpc.ChannelArgKey.max_receive_message_length, -1)])

proxy_pb2_grpc.add_DataTransferServiceServicer_to_server(UnaryService(), server)
server.add_insecure_port("{}:{}".format(IP, GRPC_PORT))
server.start()

这里我们需要解开最后一个疑惑,为什么要另外启动一个 GRPC server?简单来说,FATE 会通过这个 GRPC Server 完成各个 Party 之间的函数调用,也就是说所有的 http 接口都通过本地调用,不同 Party 间的函数调用统一通过 grpc 的方式进行,而这个 grpc 的调用逻辑也很简单,实际上是通过解析 grpc 请求中的参数,对应再次调用上述 http 接口(主要是 party_app_manager)中的接口。关于 GRPC Server 的具体说明会在 文件夹 utils 一章详细介绍,这里只需要有粗略了解即可。

为了更便于大家理解具体的执行流程,接下来一节会以一个 Job 从提交到完成执行来进行说明各个模块的执行顺序和逻辑,因为步骤比较多,所以更具体的源码分析请参考后面 源码分析 章节。

Job 的生命周期

通过了解一个 Job 从提交到完成执行的各个步骤,基本可以掌握 FATE Flow 的关键所在,就像一条中轴线,其他的所有模块都是配合这条线而存在的。废话不多说,我们直接开始:

  1. 【任务提交】无论是通过 flow client 还是 http api 提交任务,实际上执行的都是 apps/job_app.py 中第 45 行的 submit_job 函数,在检查完 job 运行配置后,调用 DAGScheduler.submit() 完成任务提交(具体逻辑参考后文 dag_scheduler.py 的说明。具体执行的任务简单来说就是:生成 JobID -> 通知各 Party 创建 Job -> 各方均创建成功后,任务提交成功。提交成功后将由 DAGSchudler 进行调度执行,具体的调度逻辑在下一节会详细说明,这里主要围绕 Job 本身的流程进行介绍。
  2. 【等待 Job 调度】创建完成之后,Job 的状态为 waiting,在 DAGScheduler.run_do 函数中会从数据库中找到状态为 waiting 的任务,并通过 DAGScheduler.schedule_waiting_jobs 函数进行调度
  3. 【尝试启动 Job】若该 Job 开始被调度,则首先会向各 Party 通过 FederatedScheduler.resource_for_job 函数进行计算资源申请,若申请成功则通过 DAGScheduler.start_job 函数启动 Job;若某方没有足够的计算资源(注意:这个和申请失败不一样),则已经申请的资源需要退回;若资源申请失败,则通过 DAGScheduler.stop_job 函数停止 Job。如果 start_job 成功完成,Job 的状态将变为 running
  4. 【等待 Task 调度】Job 状态变为 running 之后,就会被 DAGScheduler.schedule_running_jobs 函数进行调度,实际调用的是 TaskScheduler.schedule 函数
  5. 【尝试调度 Task】若该 Task 开始被调度,则会通过 FederatedScheduler.start_task 函数在各方启动该 Task,实际上就是发起 grpc 调用,主要被调用的就是 party_app_manager 所提供的内部接口
  6. 【尝试启动 Task】被上一步 grpc 调用的接口是 party_app.py 中的 start_task 函数,实际执行的是 TaskController.start_task 函数,在底层是 EGGROLL 的情况下,就是通过 shell 执行对应的 python 脚本(通过 job_utils.run_subprocess 函数执行)
  7. 【执行 Task】具体 Task 的执行是通过 TaskExecutor.run_task 函数进行的,在这里我们将解析 job 和 task 的上下文和配置,并通过 run_object.run 函数进行执行。在这里因为是另外一个进程,所以是同步执行的,等 Task 执行完成后,会保存 data 和 model 并更新 Task 状态,便于调度器继续执行。注:具体任务的执行就不在这里展开讲了,后面会结合算法的开发另写一篇。
  8. 【完成 Job】每次进行 task 调度时,都会通过 DAGScheduler.calculate_job_status 函数来确定 Job 的状态,如果全部 Task 都 Success,那么 Job 的状态也变为 Success。至此,Job 执行完成。

注:这里因为篇幅关系,省略了部分细节,如状态更新、任务取消等,感兴趣的同学可以自行研究。

源码分析

这部分是具体各个文件的代码逻辑,建议配合上面 Job 的生命周期 阅读,当做细节查看手册,方便理解。

重点需要关注 federated_scheduler.py,只要理解了如何在多方同步调度,那么其他部分应该都可以迎刃而解了。

(此处省略详细的源码分析,原文非常长,主要包含以下文件的代码逻辑说明:controller/job_controller.py、controller/task_controller.py、scheduler/dag_scheduler.py、scheduler/task_scheduler.py、scheduler/federated_scheduler.py、scheduler_apps/party_app.py、operation/job_saver.py、utils/grpc_utils.py、utils/api_utils.py、utils/job_utils.py)

参考链接

FATE 相关

Werkzeug 相关