ETL组件
提示
为了满足不同业务场景对 ETL 数据处理的需求,系统提供了 Reader(输入)、Transition(转换)、Writer(输出)三类组件。
本文将介绍 ETL 组件的整体结构、源码目录及扩展方式,帮助开发者快速完成新组件的接入与参数封装。
一、 定义(ETL)
本模块用于配置与执行数据集成(ETL)任务,支持多源数据的接入、转换与写出,是构建数据流、数据处理链路的核心组件。通过节点式图形化配置,用户可完成从数据读取、清洗、转换到落地的完整流程。
二、 整体结构
运行机制
当前 ETL 组件依托 DolphinScheduler 调度执行,整体运行机制如下:
- 用户在前端页面通过拖拽方式配置输入、转换、输出。
- 系统将配置参数进行封装与打包。
- 与 DolphinScheduler 交互,创建并存储任务。
- 任务信息双存:本地表 & DolphinScheduler 调度器。
- 执行阶段由 DolphinScheduler 触发,调用集成的 ETL JAR 包 执行任务。
扩展转换组件
若要新增一个转换组件,需要修改三个层面:
- 前端页面:新增节点配置 UI。
- 后端代码封装:支持参数的序列化和传递。
- ETL JAR 包源码:新增具体的转换组件实现。
扩展输入与输出组件
(一)扩展输入数据源
- 参考 《数据源扩展文档》 完成新数据源扩展。
- 修改 页面、参数封装逻辑、ETL JAR 包源码,保持与转换扩展一致。
(二)扩展非数据源输入组件(示例:Excel 文件)
- 文件获取:先完成 Excel 文件的上传与存储。
- 参数封装:记录文件元信息,并传递给组件。
- 新建 Reader:编写 Excel 读取逻辑,将数据转换为 DataSet。
- 进入 ETL 流程:与常规 Reader 组件流程保持一致。
三、 源码目录结构
模块位置
ETL 组件源码位于 qdata-etl
模块下:
tech.qiantong.qdata.spark.etl
目录结构
- reader:输入组件(数据源读取逻辑)
- transition:转换组件(数据清洗与转换逻辑)
- writer:输出组件(数据写入逻辑)
- utils:工具类(通用方法、辅助函数)
- EtlApplication:ETL 任务执行入口
定位说明
- 扩展 输入 → 修改
reader
- 扩展 转换 → 修改
transition
- 扩展 输出 → 修改
writer
- 公共工具方法 → 使用
utils
- 执行入口逻辑 → 查看
EtlApplication
四、 输入组件(Reader)
位置
输入组件位于:
tech.qiantong.qdata.spark.etl.reader
包含核心类:
- Reader:输入组件接口,定义统一的读取方法。
- ReaderFactory:输入组件工厂,根据组件
code
获取具体实现。 - ReaderRegistry:输入组件注册表,维护已注册的 Reader 映射。
Reader 接口
public interface Reader {
Dataset<Row> read(SparkSession spark, JSONObject reader, List<String> readerColumns, String logPath);
String code();
}
• read:核心方法,负责从输入源读取数据并返回 Dataset<Row>
• code:标识输入组件类型
ReaderFactory 工厂类
通过组件编码获取具体 Reader 实例:
public static Reader getReader(String code) {
return Optional.ofNullable(COMPONENT_ITEM_REGISTRY.getReader(code))
.orElseThrow(() -> new ServiceException(String.format("%s not supported.", code)));
}
ReaderRegistry 注册表
系统已注册的 Reader:
- DBReader(数据库输入)
- ExcelReader(Excel 文件输入)
- CsvReader(CSV 文件输入)
- KafkaReader(Kafka 消息流输入)
public class ReaderRegistry {
private final Map<String, Reader> readerMap = new HashMap<>();
public ReaderRegistry() {
this.readerMap.put(TaskComponentTypeEnum.DB_READER.getCode(), new DBReader());
this.readerMap.put(TaskComponentTypeEnum.EXCEL_READER.getCode(), new ExcelReader());
this.readerMap.put(TaskComponentTypeEnum.CSV_READER.getCode(), new CsvReader());
this.readerMap.put(TaskComponentTypeEnum.KAFKA_READER.getCode(), new KafkaReader());
}
public Reader getReader(String code) {
return this.readerMap.get(code);
}
}
扩展方式
扩展 Reader 有两种典型方式:
扩展数据源(DBReader)
- 在
DBReader
内增加对应数据库的 JDBC 兼容即可。 - 适合 MySQL、Oracle、SQL Server、达梦等关系型数据库。
- 在
扩展非数据源(如 Excel 文件)
- 新建类
XXXReader
实现Reader
接口。 - 在
ReaderRegistry
中注册新 Reader。 - 实现
read
方法,完成数据加载。
- 新建类
示例:ExcelReader
@Override
public Dataset<Row> read(SparkSession spark, JSONObject reader, List<String> readerColumns, String logPath) {
LogUtils.writeLog(logPath, "********************************* Initialize task context ***********************************");
LogUtils.writeLog(logPath, "开始Excel输入节点");
LogUtils.writeLog(logPath, "开始任务时间: " + DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
LogUtils.writeLog(logPath, "任务参数:" + reader.toJSONString(PrettyFormat));
JSONObject parameter = reader.getJSONObject("parameter");
List<Object> column = parameter.getJSONArray("column");
String path = parameter.getString("path");
Dataset<Row> dataset = spark.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(path);
dataset = dataset.select(column.stream()
.map(c -> new Column(((JSONObject) c).getString("columnName")))
.toArray(Column[]::new));
readerColumns.addAll(column.stream()
.map(c -> ((JSONObject) c).getString("columnName"))
.collect(Collectors.toList()));
LogUtils.writeLog(logPath, "输入数据量为:" + dataset.count());
dataset.na().fill("Unknown").show(10);
LogUtils.writeLog(logPath, "部分数据:\n" + dataset.na().fill("Unknown").showString(10, 0, false));
return dataset;
}
五、 转换组件(Transition)
位置
转换组件位于:
tech.qiantong.qdata.spark.etl.transition
目前系统内置示例:
- CleanTransition(清洗组件),扩展方式可参考 《清洗开发文档》。
当前实现的限制
在 EtlApplication
中,转换逻辑被写死为:
- 仅取 第一个转换组件:
transitionArr.get(0)
- 默认调用 CleanTransition 执行
示例代码:
if (taskParams.getJSONArray("transition") != null && taskParams.getJSONArray("transition").size() > 0) {
JSONArray transitionArr = taskParams.getJSONArray("transition");
JSONObject transition = (JSONObject) transitionArr.get(0);
String transitionLogPath = LogUtils.createLogPath(resourceUrl, transition);
TaskInstance transitionTaskInstance = createTask(processInstance, transitionLogPath, transition, now, rabbitmq);
try {
data = CleanTransition.transition(data, transition, transitionLogPath);
LogUtils.writeLog(transitionLogPath, "任务成功");
} catch (Exception e) {
updateProcess(processInstance, WorkflowExecutionStatus.FAILURE, rabbitmq);
updateTask(transitionTaskInstance, TaskExecutionStatus.FAILURE, rabbitmq);
spark.stop();
LogUtils.writeLog(transitionLogPath, "任务失败");
return;
}
updateTask(transitionTaskInstance, TaskExecutionStatus.SUCCESS, rabbitmq);
}
扩展方式
在实际业务中,转换组件不能都堆积在 CleanTransition 中,应当通过扩展机制支持多组件。
具体步骤如下:
抽象 Transition 接口
定义统一的转换规范,便于扩展。public interface Transition { Dataset<Row> transform(Dataset<Row> dataset, JSONObject transition, String logPath); String code(); }
新增 Transition 实现类
- 例如:
MappingTransition
(字段映射)、SplitTransition
(字段拆分)、DerivationTransition
(派生字段)等。 - 每个实现类负责单一的转换逻辑,保持职责清晰。
- 引入工厂 + 注册表模式
- 类似 Reader / Writer,建立
TransitionFactory
与TransitionRegistry
。 - 根据配置中的
code
,动态获取对应的转换组件实例。
- 改造 EtlApplication
- 将原来写死的 CleanTransition 替换为工厂模式。
- 遍历
transitionArr
,依次调用不同的转换组件。 - 保持任务日志、异常捕获、任务状态更新与原有逻辑一致。
六、 输出组件(Writer)
位置
输出组件位于:
tech.qiantong.qdata.spark.etl.writer
主要类说明
Writer 接口(Writer.java)
定义输出组件的统一规范,约束所有 Writer 实现类必须提供写出数据的方法,以及组件类型标识。WriterFactory 工厂类(WriterFactory.java)
根据组件code
获取具体的 Writer 实现,保证外部调用时无需关心具体类名。WriterRegistry 注册表(WriterRegistry.java)
维护已注册的 Writer 组件实例映射,目前只注册了 DBWriter(数据库写入)。
扩展方式
- 新增类,实现
Writer
接口,完成对应输出逻辑(例如写入 Hive、HDFS、Kafka 等)。 - 在
WriterRegistry
中注册新组件。 - 通过
WriterFactory
统一获取并使用。
小结
Writer 与 Reader 结构保持一致,遵循 接口 + 工厂 + 注册表 的模式,方便扩展和管理。
当前版本仅开放了数据库写入,开发者可根据业务需求扩展新的输出组件。
七、 扩展业务代码与参数封装
位置
业务代码的参数封装逻辑位于:
tech.qiantong.qdata.module.dpp.service.etl.impl.DppEtlTaskServiceImpl
核心方法:
- createEtlTask:ETL 任务创建入口
- TaskConverter.buildEtlTaskParams:构建 ETL 程序所需参数
参数流转路径
- 前端页面:用户配置输入、转换、输出节点,生成参数。
- 后端业务服务:接收参数,调用
TaskConverter
进行封装。 - DolphinScheduler:调度器存储并调度任务。
- ETL 组件:根据封装参数执行具体 Reader / Transition / Writer。
TaskConverter.buildEtlTaskParams
该方法根据组件类型对参数进行分发与封装:
- Reader 节点:DB_READER、EXCEL_READER、CSV_READER 等放入
result.reader
。 - Transition 节点:SPARK_CLEAN、SORT_RECORD、FIELD_DERIVATION 等加入
transitionList
。 - Writer 节点:DB_WRITER 放入
result.writer
。 - Config 节点:保存任务基本信息、资源路径、消息队列配置等。
最终返回的结果结构包含:
reader
:输入组件参数transition
:转换组件参数列表writer
:输出组件参数config
:公共配置
扩展方式
当新增一个组件时,需要:
- 在前端:支持参数配置与传递。
- 在后端:
- 在
TaskComponentTypeEnum
中定义新组件类型。 - 在
TaskConverter.buildEtlTaskParams
的switch-case
中增加分支逻辑。 - 调用对应工厂方法进行参数封装。
- 在
- 在调度与执行层:保证 DolphinScheduler 能正确调度任务,ETL 组件能解析参数并执行。
小结
业务代码部分是 组件与调度之间的桥梁。
只有 Reader / Transition / Writer 三端与 TaskConverter 的参数封装逻辑保持一致,才能保证组件被正确调用。
八、 注意事项
参数一致性
- 前端配置的参数字段必须与后端
TaskConverter
封装逻辑保持一致。 - 新增组件时,务必确认参数命名与解析方式统一,避免执行时参数缺失或解析错误。
- 前端配置的参数字段必须与后端
组件注册
- Reader、Transition、Writer 均采用 接口 + 工厂 + 注册表 模式。
- 新增组件必须在对应的
Registry
中注册,否则工厂无法获取。
任务链路完整性
- ETL 任务必须包含输入(Reader)、输出(Writer),转换(Transition)可选。
- 在封装参数时,确保三部分数据结构齐全,保证任务执行链路顺畅。
异常处理
- 转换和写出环节容易出错,扩展组件时需要添加日志记录与异常捕获。
- 确保在异常发生时能更新任务状态,避免调度器出现“卡死”状态。
性能与资源控制
- 读取、转换、写入过程中涉及 Spark Dataset 操作,应注意内存和分区处理。
- 对大文件或高并发任务,需要合理设置 Spark 参数,避免 OOM。
扩展文档
- 新增数据源时,请优先参考 《数据链接-开发文档》。
- 新增清洗或转换组件时,请参考 《清洗规则-开发文档》,保持扩展规范一致。
总结
ETL 组件扩展不仅仅是新增代码实现,更重要的是保证 前端参数 → 后端封装 → 调度器调度 → 组件执行 全链路的一致性与健壮性。