Apache DolphinScheduler 深度技术文档

一、架构深度解析

1.1 去中心化 Master-Worker 架构

DolphinScheduler 采用完全去中心化的 Master-Worker 架构,所有服务通过 ZooKeeper 注册与发现,消除了传统调度系统(如 Airflow 的 CeleryExecutor)的单点瓶颈。

架构示意图

 ┌──────────────────────────────────────────────┐
 │                   UI (前端)                  │
 └──────────────────────┬───────────────────────┘
                        │ HTTP
 ┌──────────────────────▼───────────────────────┐
 │              API Server (多节点)             │
 └──────────────────────┬───────────────────────┘
                        │ 写入指令
 ┌──────────────────────▼──────────────┐
 │           ZooKeeper 集群            │
 │   (Master/Worker注册、分布式锁)     │
 └──────┬────────────────────▲─────────┘
        │ 监听任务分配       │ 上报状态
 ┌──────▼─────────┐   ┌─────┴───────────────┐
 │ Master (多节点) │   │   Worker (多节点)   │
 │  DAG 解析、切分 │   │   任务实际执行       │
 │  负载均衡	   │   │   (用户进程)         │
 └────────────────┘   └─────────────────────┘

Master 核心职责

Worker 核心职责

高可用保障


二、核心概念与数据模型

2.1 概念层级关系

租户 (Tenant) 	—— 对应 Linux 用户
  └─ 项目 (Project) 		—— 业务线隔离单元
       └─ 工作流定义 (Process Definition) 	—— DAG 图与版本
            └─ 任务定义 (Task Definition) 	—— 节点元数据
       └─ 工作流实例 (Process Instance) 	—— 一次调度触发
            └─ 任务实例 (Task Instance) 		—— 单个节点的执行记录

数据库关键表组

表组 表名 关键字段 说明
工作流定义 t_ds_process_definition id, name, version, release_state, project_code 存储 DAG 元数据,发布后 release_state 标记上线
任务定义 t_ds_task_definition code, name, task_type, task_params (JSON) 任务参数(类型、SQL 语句、脚本等)以 JSON 存储
工作流实例 t_ds_process_instance id, name, state, start_time, end_time 状态包括:0-提交成功, 1-运行中, 2-暂停, 3-停止, 4-重试, 5-失败, 6-成功, 7-需要容错, 8-杀死, 9-等待线程, 10-等待依赖
任务实例 t_ds_task_instance id, task_code, state, retry_times, max_retry_times 任务状态标识:0-提交, 1-运行中, 2-准备暂停, 3-暂停, 4-准备停止, 5-停止, 6-失败, 7-成功, 8-需要容错, 9-杀死, 10-等待线程, 11-等待依赖
指令表 t_ds_command id, command_type, process_definition_code, command_param Master 拉取指令:START_PROCESS, RECOVER_PROCESS 等
调度 t_ds_schedules id, process_definition_code, crontab, start_time, end_time Cron 表达式和生效时间

2.2 任务状态机(关键)

任务实例从创建到结束的完整状态流转,直接影响调度逻辑和容错机制:

                        ┌─────────┐
                        │  SUBMIT │  (初始)
                        └────┬────┘
                             │ Master 分发
                        ┌────▼────┐
                        │ DISPATCH│ -> Worker 领取
                        └────┬────┘
                             │ Worker 开始执行
                        ┌────▼────┐
                        │ RUNNING │
                        └─┬──┬──┬─┘
              成功───────┘  │  │  └───────失败(未超重试)
              │             │  │             │
         ┌────▼───┐   ┌───▼──▼───┐   ┌──────▼──────┐
         │SUCCESS│   │ PAUSE/STOP│   │ FAILURE/RETRY│
         └────────┘   └──────────┘   └──────┬──────┘
                                             │ 可重试且未达上限
                                        ┌────▼────┐
                                        │  DELAY  │ (等待重试)
                                        └─────────┘

容错机制


三、参数体系详解

3.1 参数类型与优先级

类型 作用域 示例 优先级
系统内置参数 全局 ${system.biz.date} 最低(可被覆盖)
全局参数 工作流定义 day=${system.biz.date} 较低
局部参数 (IN) 当前任务节点 input_path=/data/raw
局部参数 (OUT) 向下游传递 运行时由任务生成
上游传递参数 下游节点接收 由 OUT 参数自动注入 高于全局参数

3.2 衍生内置参数 $[...] 深度用法

衍生参数是解决非 T-1 调度跨天依赖的核心工具,支持任意日期偏移和格式定制。

语法规则

$[format_string 偏移量]

高级示例

# 上个月的今天(处理月末差异)
$[add_months(yyyyMMdd, -1)]

# 上周同一天(7天前)
$[yyyyMMdd-7]

# 明天凌晨2点的时间戳(自定义组合)
$[yyyy-MM-dd+1] 02:00:00

# 上一小时的整点(用于小时级调度)
$[yyyyMMddHH-1/24]

应用场景

3.3 工作流内参数传递

限制规则

任务类型 能否向下游传递 (OUT) 能否接收上游 (IN)
SQL
Shell
Python
Spark/Flink

Shell 传递示例

# 计算目标分区并传递给下游
target_date=$(date -d "yesterday" +%Y%m%d)
echo "分区: ${target_date}"
# 关键语法:${setValue(key=value)}
echo '${setValue(partition=${target_date})}'

此时下游 SQL 节点可直接使用 ${partition} 作为参数。


四、与 Hive 的集成实战

4.1 配置 Hive 数据源

在 DolphinScheduler 中使用 Hive 任务,需要先配置 Hive 数据源

步骤

  1. 进入 数据源中心创建数据源

  2. 选择 HIVE 类型。

  3. 填写连接信息:

    数据源名称:hive_production
    IP/主机名:hiveserver2-host
    端口:10000
    用户名:hive
    密码:(若启用认证)
    数据库名:default
    
  4. 其他参数(可选): hive.server2.proxy.user=hadoop (代理用户,解决 Kerberos 认证)

  5. 点击 测试连接 通过后保存。

Kerberos 集成: 若 Hive 启用了 Kerberos,需在 api-server/conf/common.properties 中添加:

# 启用 Kerberos
security.authentication.type=kerberos
# 提供 keytab 和 principal
hive.kerberos.principal=hive/_HOST@REALM
hive.kerberos.keytab.path=/opt/dolphinscheduler/conf/hive.keytab

Worker 节点需同步 keytab 文件,并确保 hive 用户有权限。

4.2 Hive 任务节点配置

在工作流画布中添加 HIVE 节点:

基本配置

资源级联: 如果 SQL 较大,可上传 .sql 文件至 资源中心,然后在节点中引用 <filename>。DolphinScheduler 会自动读取文件内容替换占位符。

4.3 Hive 集成常见问题

问题 原因 解决方案
Could not open client transport with JDBC HiveServer2 未启动或连接信息错误 检查 HiveServer2 端口,测试 beeline 连接:beeline -u jdbc:hive2://host:10000
User: hive is not allowed to impersonate hive Hadoop 代理用户配置问题 core-site.xml 中设置 hadoop.proxyuser.hive.hostshadoop.proxyuser.hive.groups*
Table not found SQL 中未指定库名或数据源默认库错误 在 SQL 中使用全限定表名 database.table,或在数据源中明确默认数据库
Kerberos 认证失败 keytab 路径错误、时间不同步 确保 Worker 时间与 KDC 同步(NTP),验证 kinit -kt keytab principal 可执行
任务长时间卡在 RUNNING Hive 任务卡在 YARN,未产生输出 检查 YARN Application 是否 ACCEPTED(资源不足),或 MapReduce 任务阻塞
中文乱码 元数据编码或 JDBC 连接字符集问题 HiveServer2 JDBC URL 加 ?useUnicode=true&characterEncoding=UTF-8
Failed to run task: java.lang.OutOfMemoryError: PermGen space Worker JVM 参数不足 增加 Worker 的 -XX:MaxMetaspaceSize=256m(Java 8+)或调整 worker.exec.threads 减少并发

4.4 补数示例:Hive ETL 历史数据处理

场景:需要重新计算 2025-01-01 至 2025-01-15 的历史数据,工作流定义使用 $[yyyyMMdd-1] 动态分区。

操作流程

  1. 进入 工作流实例 页面,点击 补数

  2. 设置起始日期 2025-01-01,结束日期 2025-01-15

  3. 选择 串行执行并行执行(注意 Hive 资源,避免同时过多并发)。

  4. 点击 执行,系统自动为每天生成一个工作流实例,每个实例中的 ${schedule_time} 会被替换为对应的调度时间,从而 $[yyyyMMdd-1] 计算出的分区即为正确的历史日期。

  5. 可在 工作流实例 列表查看每一天的执行状态和日志,失败实例可单独重跑或批量重跑。

注意事项


五、核心源码深度解读

本节选取 DolphinScheduler 3.x 分支中的关键类,揭示调度核心流程。

5.1 调度入口:MasterSchedulerService

所属模块:dolphinscheduler-master职责:Master 的核心循环,负责扫描 Command 并提交工作流实例。

// 伪代码简化逻辑
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS)
public void scheduled() {
    // 1. 使用槽位机制从 t_ds_command 表获取属于自己的指令
    List<Command> commands = commandDao.queryCommandBySlot(currentSlot, masterCount);
    for (Command command : commands) {
        // 2. 根据指令类型执行不同逻辑:启动、恢复、重试、补数等
        switch (command.getType()) {
            case START_PROCESS:
                // 创建工作流实例,构建 DAG,切分任务
                processInstance = createProcessInstance(command);
                dag = buildDag(processInstance);
                submitTask(dag.getStartNodes());
                break;
            case RECOVER_PROCESS:
                // 从失败节点恢复
                recoverProcessInstance(command);
                break;
            // ...
        }
    }
}

关键设计:槽位机制 id % master_count == current_index 避免了多个 Master 抢占同一个 Command,同时减少了对 ZooKeeper 分布式锁的依赖。

5.2 任务分发模型:WorkflowExecuteRunnable

所属模块:dolphinscheduler-master职责:实际控制单个工作流实例的执行状态机。

public class WorkflowExecuteRunnable implements Callable<Void> {
    private Map<String, TaskInstance> activeTaskProcess = new HashMap<>();
    private ProcessInstance processInstance;

    public void run() {
        while (!processInstance.isFinished()) {
            // 1. 检查可提交的任务(所有前置任务已完成)
            List<TaskInstance> readyTasks = getReadyToSubmitTasks();
            for (TaskInstance task : readyTasks) {
                // 2. 通过 Netty 将任务发送给 Worker
                TaskDispatchCommand dispatch = new TaskDispatchCommand(task);
                NettyClientManager.getInstance().send(dispatch);
            }
            // 3. 处理已完成任务的状态上报
            for (TaskInstance completed : completedQueue) {
                // 更新 DAG 状态,标记后续节点可执行
                updateTaskState(completed);
            }
        }
    }
}

设计要点

5.3 Worker 执行器:TaskExecuteProcessor

所属模块:dolphinscheduler-worker职责:接收任务命令,创建具体执行器,启动进程并监控。

public class TaskExecuteProcessor implements NettyRequestProcessor {
    @Override
    public void process(Channel channel, Command command) {
        TaskExecutionContext taskRequest = JSONUtils.parseObject(command.getBody(), TaskExecutionContext.class);
        // 根据任务类型获取对应的执行器,如 ShellTask, SqlTask
        AbstractTask task = TaskFactory.createTask(taskRequest);
        // 提交到线程池执行
        workerManagerService.submit(new TaskExecutionThread(task));
    }
}

AbstractTask 的核心方法:

public abstract class AbstractTask {
    public void handle() {
        // 1. 构建进程执行命令
        String command = buildCommand();
        // 2. 创建进程
        Process process = Runtime.getRuntime().exec(command);
        // 3. 读取标准输出和错误输出
        // 4. 等待进程结束并获取退出码
        int exitCode = process.waitFor();
        // 5. 根据退出码判定成功或失败
        if (exitCode == 0) {
            setResult(SUCCESS);
        } else {
            setResult(FAILURE);
        }
    }
}

5.4 依赖处理:DependentTaskProcessor

职责:实现跨项目、跨工作流的依赖等待。

核心逻辑:

5.5 告警机制:AlertSender

职责:发送告警通知。采用插件化设计,支持邮件、钉钉、企业微信、HTTP 等。

public class AlertSender {
    public void sendAlert(AlertInfo alertInfo, List<AlertPluginInstance> pluginInstances) {
        for (AlertPluginInstance instance : pluginInstances) {
            AlertChannel channel = AlertPluginManager.getChannel(instance.getPluginDefineId());
            channel.process(alertInfo); // 发送告警
        }
    }
}

告警规则在 t_ds_alert 表中定义,通过 t_ds_alert_plugin_instance 关联具体通道。


六、2025 年大厂面试高频题

Q1:DolphinScheduler 和 Airflow / Azkaban 的架构区别?为什么选择 DS?

考点:去中心化架构、Push 模型、多租户。 回答要点

Q2:Master 如何保证高可用?多个 Master 同时运行如何避免冲突?

考点:ZooKeeper 选主、槽位机制。 回答要点

Q3:工作流实例执行过程中 Master 宕机会怎样?任务状态是否丢失?

考点:状态持久化、故障转移。 回答要点

Q4:你怎么设计一个支持动态分区的 Hive 调度任务?如何处理补数?

考点:参数系统、补数机制。 回答要点

Q5:Shell 任务如何将参数传递给下游的 SQL 任务?

考点:参数传递 OUT/IN。 回答要点

Q6:如果 Hive 任务执行超长或卡住,你如何在 DS 中排查?

考点:日志定位、问题隔离。 回答要点

Q7:DolphinScheduler 如何集成 Kerberos 认证的 Hive?

考点:Kerberos 配置、keytab 使用。 回答要点

Q8:你觉得 DS 的 Worker 分组有什么用?什么场景下要用?

考点:资源隔离、业务分组。 回答要点