0%

【大数据架构之旅】3 从零起步学 dagster

Dagster 作为新一代的调度引擎,可以覆盖整个数据开发的全部流程。阅读完本文,你会知道 Dagster 究竟是啥,为什么是新一代,和以前的调度有什么不同,以及核心概念和特点。关于具体如何做一个项目,我们会在后续的文章中继续介绍。


更新历史

安装与创建第一个项目

首先我们需要进入 venv 的虚拟环境,关于虚拟环境的配置参考 这里,版本有些差异但是问题不大,需要注意的是至少需要 python 3.7

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple dagster
# 创建第一个项目
dagster project scaffold --name dagster_hello_world

# 进入项目安装依赖
cd dagster_hello_world
pip install -e ".[dev]"
# 如果网络不给力,加上 -i 参数
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple -e ".[dev]"

# 安装成功后,启动 dagster 服务
dagit
# 成功启动的标志,屏幕上输出类似下面的内容
# 2023-06-15 11:30:51 +0800 - dagit - INFO - Serving dagit on http://127.0.0.1:3000 in process 41156

然后我们在浏览器里访问 127.0.0.1:3000,看到如下图所示页面,就说明配置成功了。

基本概念

资产 Assets

Dagster 中有一个很大的变化,就是我们在开发过程中要从任务的视角转换到资产的视角。

所谓任务视角,如下图所示,就是有一个/一系列数据源作为输入,经过一系列处理,然后再写入一个/一系列数据作为输出。当任务不太复杂的时候,我们基本上通过任务的名称就可以知道,稍微复杂一点或名称不清晰的时候,就很容易理解有偏差,只能去阅读源码/注释/文档,其中注释和文档很可能是不全或者过时的,久而久之维护成本越来越高。

当任务数目越来越多,涉及到的表越来越多,处理逻辑越来越复杂的时候,很多时候我们只能花费大量时间去梳理,一般来说随着代码迭代次数增加,文档会慢慢落后,之后整个数据仓库就基本无法维护了。

但是在 Dagster 中,有另一种数据资产视角的调度模式,声明式定义工作流,也就是说我们需要关注的是有哪些数据资产,以及这些数据资产的关系,如下图所示:

定义一个数据资产需要确定下面三件事:

  1. 声明是个数据资产:@asset 注解
  2. 声明上游的数据资产:@asset 注解中的 insnon_argument_deps 属性(不用担心看不懂,下面会有更详细说明)
  3. 如何利用上游产生当前的数据资产:在 @asset 声明的方法中通过 python 实现具体的处理逻辑,比如如何从 input DataFrame 变成 output DataFrame,也就是数据处理的核心逻辑

接下来我们就可以通过 define_asset_job 方法,根据依赖关系自动构建 DAG 工作流。更多关于 asset 的说明我负载文末的参考材料中。

IO Manager

通过 asset 的定义我们可以知道,其实 asset 可以理解为数据的核心业务逻辑。这是 dagster 的另一个不同,将数据处理逻辑和读写分类了,数据的读写就在 IO Manager 中定义。具体的逻辑如下图所示:

IO Manager 可以直接用官方内置的,如果没有的话也可以自定义,继承 dagster.IOManager,然后实现 handle_outputload_input 这两个方法即可。

Ops/Graph/Job

Op 可以简单理解为一个“操作”,是 dagster 中最小的计算单元,而 Graph 则可以理解为是 op 或 graph 的组合(即支持嵌套),graph 是通过 python 代码来定义的。

Job 则是用来执行和监控的单元,可以理解为工作流,一般由 Graph, Op(通过 Python 代码)连接而成

Schedule/Sensor

Schedule 顾名思义就是调度,一般来说需要注意配置好时区:在 ScheduleDefinitionexecution_timezone 属性中配置,也可以在 environment_vars 属性定义执行 Job 时的环境变量。

Sensor 可以简单理解为 Job 运行结束或 asset 物化结束之后的操作,可以做任何事情,一般来说我们会用于发送通知、清理无用数据等等。

Schedule 和 Sensor 的运行都需要 dagster-daemon 进程,一般来说可以把 dagster-daemon 和 dagit 放到一个 pod 中。

Repository/Workspace

Repository 包含一个项目所有的 asset, op, graph, job, schedule, sensor 等资源。Dagster UI 左侧栏一次只能显示一个 repository,通过 @repository 注解定义。

Workspace 是 Dagit 实例级别的工作去,可以包含多个 Repository,通过 workspace.yaml 文件配置要从哪里加载 repository和启动 gRPC 服务等。具体参考 Workspace files

Dagster 实例

在环境变量 DAGSTER_HOME 目录中的 dagster.yaml 文件进行配置,包含存储位置和日志位置等等。具体参考 Dagster Instance

命令使用

我们在命令行中输入 dagster -h,就可以看到我们刚才学的相关概念了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Usage: dagster [OPTIONS] COMMAND [ARGS]...

CLI tools for working with Dagster.

Options:
-v, --version Show the version and exit.
-h, --help Show this message and exit.

Commands:
asset Commands for working with Dagster assets.
code-server Commands for working with Dagster code servers.
debug Commands for helping debug Dagster issues by dumping or...
dev Start a local deployment of Dagster, including dagit...
instance Commands for working with the current Dagster instance.
job Commands for working with Dagster jobs.
project Commands for bootstrapping new Dagster projects and code...
run Commands for working with Dagster job runs.
schedule Commands for working with Dagster schedules.
sensor Commands for working with Dagster sensors.

输入具体的 commands 就可以看到相关操作了。大家可以自己探索和尝试一下。关于更多的使用,我们将在下一篇文章中进行讲解。

参考材料

Asset 相关(其他更多的看左侧栏即可)

IO Manager 相关(其他更多的看左侧栏即可)

Ops 相关(其他更多的看左侧栏即可)

Graph 相关(其他更多的看左侧栏即可)

Job 相关(其他更多的看左侧栏即可)