Dagster 作为新一代的调度引擎,可以覆盖整个数据开发的全部流程。阅读完本文,你会知道 Dagster 究竟是啥,为什么是新一代,和以前的调度有什么不同,以及核心概念和特点。关于具体如何做一个项目,我们会在后续的文章中继续介绍。
更新历史
- 2023.06.16: 完成初稿
- 2023.07.05: 更新源代码仓库
安装与创建第一个项目
首先我们需要进入 venv 的虚拟环境,关于虚拟环境的配置参考 这里,版本有些差异但是问题不大,需要注意的是至少需要 python 3.7
1 | 安装 |
然后我们在浏览器里访问 127.0.0.1:3000,看到如下图所示页面,就说明配置成功了。
基本概念
资产 Assets
Dagster 中有一个很大的变化,就是我们在开发过程中要从任务的视角转换到资产的视角。
所谓任务视角,如下图所示,就是有一个/一系列数据源作为输入,经过一系列处理,然后再写入一个/一系列数据作为输出。当任务不太复杂的时候,我们基本上通过任务的名称就可以知道,稍微复杂一点或名称不清晰的时候,就很容易理解有偏差,只能去阅读源码/注释/文档,其中注释和文档很可能是不全或者过时的,久而久之维护成本越来越高。
当任务数目越来越多,涉及到的表越来越多,处理逻辑越来越复杂的时候,很多时候我们只能花费大量时间去梳理,一般来说随着代码迭代次数增加,文档会慢慢落后,之后整个数据仓库就基本无法维护了。
但是在 Dagster 中,有另一种数据资产视角的调度模式,声明式定义工作流,也就是说我们需要关注的是有哪些数据资产,以及这些数据资产的关系,如下图所示:
定义一个数据资产需要确定下面三件事:
- 声明是个数据资产:
@asset
注解 - 声明上游的数据资产:
@asset
注解中的ins
或non_argument_deps
属性(不用担心看不懂,下面会有更详细说明) - 如何利用上游产生当前的数据资产:在
@asset
声明的方法中通过 python 实现具体的处理逻辑,比如如何从 input DataFrame 变成 output DataFrame,也就是数据处理的核心逻辑
接下来我们就可以通过 define_asset_job
方法,根据依赖关系自动构建 DAG 工作流。更多关于 asset 的说明我负载文末的参考材料中。
IO Manager
通过 asset 的定义我们可以知道,其实 asset 可以理解为数据的核心业务逻辑。这是 dagster 的另一个不同,将数据处理逻辑和读写分类了,数据的读写就在 IO Manager 中定义。具体的逻辑如下图所示:
IO Manager 可以直接用官方内置的,如果没有的话也可以自定义,继承 dagster.IOManager
,然后实现 handle_output
和 load_input
这两个方法即可。
Ops/Graph/Job
Op 可以简单理解为一个“操作”,是 dagster 中最小的计算单元,而 Graph 则可以理解为是 op 或 graph 的组合(即支持嵌套),graph 是通过 python 代码来定义的。
Job 则是用来执行和监控的单元,可以理解为工作流,一般由 Graph, Op(通过 Python 代码)连接而成
Schedule/Sensor
Schedule 顾名思义就是调度,一般来说需要注意配置好时区:在 ScheduleDefinition
的 execution_timezone
属性中配置,也可以在 environment_vars
属性定义执行 Job 时的环境变量。
Sensor 可以简单理解为 Job 运行结束或 asset 物化结束之后的操作,可以做任何事情,一般来说我们会用于发送通知、清理无用数据等等。
Schedule 和 Sensor 的运行都需要 dagster-daemon 进程,一般来说可以把 dagster-daemon 和 dagit 放到一个 pod 中。
Repository/Workspace
Repository 包含一个项目所有的 asset, op, graph, job, schedule, sensor 等资源。Dagster UI 左侧栏一次只能显示一个 repository,通过 @repository
注解定义。
Workspace 是 Dagit 实例级别的工作去,可以包含多个 Repository,通过 workspace.yaml
文件配置要从哪里加载 repository和启动 gRPC 服务等。具体参考 Workspace files
Dagster 实例
在环境变量 DAGSTER_HOME
目录中的 dagster.yaml
文件进行配置,包含存储位置和日志位置等等。具体参考 Dagster Instance
命令使用
我们在命令行中输入 dagster -h
,就可以看到我们刚才学的相关概念了:
1 | Usage: dagster [OPTIONS] COMMAND [ARGS]... |
输入具体的 commands 就可以看到相关操作了。大家可以自己探索和尝试一下。关于更多的使用,我们将在下一篇文章中进行讲解。
参考材料
- 官方 Getting Started
- Fully Featured Project
- Dagster + dbt starter kit
- Dagster + Modern Data Stack starter kit
Asset 相关(其他更多的看左侧栏即可)
IO Manager 相关(其他更多的看左侧栏即可)
Ops 相关(其他更多的看左侧栏即可)
Graph 相关(其他更多的看左侧栏即可)
Job 相关(其他更多的看左侧栏即可)