清洗规则
提示
为了满足不同业务场景下的数据治理需求,系统提供了“清洗规则”功能。本文将介绍如何通过该功能,自主配置并扩展所需的数据清洗规则,以支持多样化的数据处理场景。
一、清洗规则表定义
清洗规则的元信息统一存储在表 ATT_CLEAN_RULE 中。该表用于维护清洗规则的基本信息、使用场景以及策略标识等内容。
建表语句
-- 创建表
CREATE TABLE QDATA_TEST.ATT_CLEAN_RULE (
ID BIGINT IDENTITY(1,1) NOT NULL,
NAME VARCHAR2(128) NOT NULL,
"LEVEL" CHAR(1) DEFAULT '0' NOT NULL,
DESCRIPTION VARCHAR2(512) NULL,
VALID_FLAG VARCHAR2(1) DEFAULT '1' NOT NULL,
DEL_FLAG VARCHAR2(1) DEFAULT '0' NOT NULL,
CREATE_BY VARCHAR2(32) NULL,
CREATOR_ID BIGINT NULL,
CREATE_TIME DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
UPDATE_BY VARCHAR2(32) NULL,
UPDATER_ID BIGINT NULL,
UPDATE_TIME DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
REMARK VARCHAR2(512) NULL,
EXAMPLE VARCHAR2(512) NULL,
"TYPE" VARCHAR2(10) NULL,
USE_CASE VARCHAR2(512) NULL,
STRATEGY_KEY VARCHAR2(32) NULL,
CODE VARCHAR2(32) NULL,
CAT_CODE VARCHAR2(128) NULL,
CONSTRAINT PK_ATT_CLEAN_RULE PRIMARY KEY (ID)
);
-- 表注释:
COMMENT ON TABLE QDATA_TEST.ATT_CLEAN_RULE IS '清洗规则表';字段说明
| 字段名 | 类型 | 约束/默认值 | 说明 |
|---|---|---|---|
| ID | BIGINT | 主键,自增 | 规则ID |
| NAME | VARCHAR2(128) | NOT NULL | 规则名称 |
| LEVEL | CHAR(1) | 默认 '0',NOT NULL | 规则级别;1:字段级,2:表级 |
| DESCRIPTION | VARCHAR2(512) | 规则描述 | |
| VALID_FLAG | VARCHAR2(1) | 默认 '1',NOT NULL | 是否有效;0:无效,1:有效 |
| DEL_FLAG | VARCHAR2(1) | 默认 '0',NOT NULL | 删除标志;1:已删除,0:未删除 |
| CREATE_BY | VARCHAR2(32) | 创建人 | |
| CREATOR_ID | BIGINT | 创建人ID | |
| CREATE_TIME | DATETIME | 默认 CURRENT_TIMESTAMP | 创建时间 |
| UPDATE_BY | VARCHAR2(32) | 更新人 | |
| UPDATER_ID | BIGINT | 更新人ID | |
| UPDATE_TIME | DATETIME | 默认 CURRENT_TIMESTAMP | 更新时间 |
| REMARK | VARCHAR2(512) | 备注 | |
| EXAMPLE | VARCHAR2(512) | 示例 | |
| TYPE | VARCHAR2(10) | 清洗规则类目ID | |
| USE_CASE | VARCHAR2(512) | 使用场景 | |
| STRATEGY_KEY | VARCHAR2(32) | 策略标识,用于清洗组件执行时识别 | |
| CODE | VARCHAR2(32) | 规则编码 | |
| CAT_CODE | VARCHAR2(128) | 类目编码 |
关键字段:
STRATEGY_KEY
该字段在清洗组件执行时用于规则识别和分发,是扩展新规则时的必填项。
二、前端参数化接入
在 清洗规则表 中完成基本信息与标识(含 STRATEGY_KEY)配置后,清洗规则会出现在 ETL 的清洗组件列表中。
但此时点击无反应,需要进行前端扩展:
开发说明
前端页面开发
- 为新增的清洗规则编写对应的配置页面。
- 页面用于录入和校验规则所需参数。
参数封装
- 将前端配置的参数封装到 ETL 组件的
taskParams中。 - 参数结构需与后端
parse2方法的解析逻辑保持一致。
- 将前端配置的参数封装到 ETL 组件的
传递至后端
- 前端保存时,参数随节点信息一并写入 ETL 的任务定义。
- 在后续任务执行时,通过
taskParams传递到执行组件。
注意:
- 若新增清洗规则没有额外参数,可直接继承默认结构;
- 若有扩展需求,需在
taskParams中定义新的参数字段,并同步更新解析逻辑。
三、ETL 参数封装与清洗组件落参
在前端点击保存 ETL 组件信息后,系统会进入参数封装阶段。
相关逻辑位于 qdata-module-dpp/qdata-module-dpp-biz 模块下的:
tech.qiantong.qdata.module.dpp.utils.TaskConverter#buildEtlTaskParams
方法说明
/**
* 构建etl参数数据
*/
public static Map<String, Object> buildEtlTaskParams(
String taskDefinitionList,
Map<String, DppEtlNodeSaveReqVO> nodeMap,
Map<String, Object> taskInfo,
List<DsResource> resourceList) {
Map<String, Object> result = new HashMap<>();
List<Map<String, Object>> transitionList = new ArrayList<>();
List<DppEtlNodeSaveReqVO> nodeList = JSON.parseArray(taskDefinitionList, DppEtlNodeSaveReqVO.class);
for (DppEtlNodeSaveReqVO dppEtlNodeSaveReqVO : nodeList) {
Integer version = 1;
if (nodeMap.containsKey(dppEtlNodeSaveReqVO.getCode())) {
version = nodeMap.get(dppEtlNodeSaveReqVO.getCode()).getVersion();
}
// 组件类型
String componentType = dppEtlNodeSaveReqVO.getComponentType();
TaskComponentTypeEnum taskComponentTypeEnum = TaskComponentTypeEnum.findEnumByType(componentType);
Map<String, Object> data = ComponentFactory.getComponentItem(componentType)
.parse2(dppEtlNodeSaveReqVO.getCode(), version, taskComponentTypeEnum,
dppEtlNodeSaveReqVO.getTaskParams(), resourceUrl, resourceList);
data.put("nodeName", dppEtlNodeSaveReqVO.getName());
data.put("projectCode", taskInfo.get("projectCode"));
switch (taskComponentTypeEnum) {
case DB_READER:
case EXCEL_READER:
case CSV_READER:
result.put("reader", data);
break;
case SPARK_CLEAN:
case SORT_RECORD:
case FIELD_DERIVATION:
transitionList.add(data);
break;
case DB_WRITER:
result.put("writer", data);
break;
}
}
// 配置 config
Map<String, Object> config = new HashMap<>();
config.put("taskInfo", taskInfo);
config.put("rabbitmq", rabbitmqConfig);
config.put("resourceUrl", resourceUrl);
result.put("transition", transitionList);
result.put("config", config);
return result;
}关键点
组件分类处理
reader:DB / Excel / CSV 读取器transition:中间处理链路(清洗、排序、字段衍生等)writer:DB 写入器
清洗组件类型
SPARK_CLEAN即为清洗组件的type,在此阶段会被加入到transitionList。
参数传递
- 清洗规则页面封装的参数会进入
taskParams。 - 在
parse2方法中,这些参数会被解析并写入transition。 - 通用元数据(如
nodeName、projectCode、version)由buildEtlTaskParams方法统一补齐。
- 清洗规则页面封装的参数会进入
注意
- 开发新的清洗规则时,不需要修改
buildEtlTaskParams方法。- 只需保证前端传入的
taskParams参数结构 与parse2方法解析逻辑 一致即可。
四、清洗组件参数封装与扩展
清洗组件的参数在 buildEtlTaskParams 中被调用 parse2 方法完成封装:
Map<String, Object> data = ComponentFactory.getComponentItem(componentType)
.parse2(dppEtlNodeSaveReqVO.getCode(),
version,
taskComponentTypeEnum,
dppEtlNodeSaveReqVO.getTaskParams(),
resourceUrl,
resourceList);方法示例:
@Override
public Map<String, Object> parse2(String nodeCode,
Integer nodeVersion,
TaskComponentTypeEnum componentType,
Map<String, Object> taskParams,
String resourceUrl,
List<DsResource> resourceList) {
// reader 配置
Map<String, Object> reader = new HashMap<>();
reader.put("nodeCode", nodeCode);
reader.put("nodeVersion", nodeVersion);
reader.put("componentType", componentType.getCode());
// 参数
Map<String, Object> parameter = new HashMap<>();
Map<String, Object> mainArgs = (Map<String, Object>) taskParams.get("mainArgs");
parameter.put("cleanRuleList", mainArgs.get("cleanRuleList"));
parameter.put("tableFields", taskParams.get("tableFields"));
parameter.put("where", taskParams.get("where"));
reader.put("parameter", parameter);
return reader;
}关键点
参数封装
- 清洗组件的所有参数统一存储在
taskParams中。 - 在
parse2方法中,这些参数会被解析并转入parameter字段。
- 清洗组件的所有参数统一存储在
默认结构
cleanRuleList:清洗规则集合tableFields:当前表字段信息where:条件过滤表达式
扩展说明
- 一般情况下无需修改
parse2的默认逻辑。 - 如果有扩展需求,可以在
taskParams中增加新的参数字段,并在parse2中进行存储。
- 一般情况下无需修改
注意
- 前端页面的参数模型必须与
parse2方法解析逻辑保持一致。- 新增参数字段时,应保持向后兼容,避免影响现有规则执行。
五、执行端实现(Spark 清洗执行)
清洗规则的执行位于 qdata-etl 模块
类:tech.qiantong.qdata.spark.etl.transition.CleanTransition
关键流程(节选)
JSONObject parameter = transition.getJSONObject("parameter");
// 1) 参数获取
List<Map<String, Object>> tableFieldList = (List<Map<String, Object>>) parameter.get("tableFields");
if (tableFieldList == null || tableFieldList.isEmpty()) {
return transitionOld(dataset, transition, logPath);
}
// 2) 全局 where
String where = parameter.getString("where");
if (StringUtils.isNotEmpty(where)) {
dataset = safeFilter(dataset, where, logPath);
}
// 3) 逐条规则处理
for (Map<String, Object> rule : tableFieldList) {
String ruleCode = MapUtils.getString(rule, "ruleCode");
String ruleType = MapUtils.getString(rule, "ruleType");
JSONObject ruleConfig = JSONObject.parseObject((String) rule.get("ruleConfig"));
String whereClause = MapUtils.getString(rule, "whereClause");
if (StringUtils.isNotEmpty(whereClause)) {
dataset = safeFilter(dataset, whereClause, logPath);
}
// 字段存在性校验
if (!checkColumnsExist(dataset, ruleConfig)) {
LogUtils.writeLog(logPath, String.format("跳过规则 %s(字段不存在)", ruleCode));
continue;
}
// 4) 分发执行
switch (ruleType) {
case "WITHIN_BOUNDARY": // 数值边界调整
dataset = applyNumericBoundary(dataset, ruleConfig);
break;
case "REMOVE_EMPTY_COMBINATION": // 组合字段为空删除
dataset = applyDeleteIfAllNull(dataset, ruleConfig);
break;
case "ADD_PREFIX_SUFFIX": // 字段前/后缀统一
dataset = applyPrefixSuffix(dataset, ruleConfig);
break;
case "MENU_CUSTOM": // 枚举值映射标准化
dataset = normalizeEnumMapping(dataset, ruleConfig);
break;
case "KEEP_LATEST_OR_FIRST": // 按组合字段去重(保留最新或首条)
dataset = deduplicateByFieldsKeepFirst(dataset, ruleConfig);
break;
default:
LogUtils.writeLog(logPath, "未知规则:" + ruleCode);
}
}参数来源
parameter.tableFields- 包含清洗规则清单
- 每条规则包含:
ruleCode/ruleType/ruleConfig/whereClause
parameter.where- 全局过滤条件
- 优先于规则级
whereClause执行
扩展指引
规则登记
- 在清洗规则表中新增记录。
- 设置唯一
STRATEGY_KEY(需与前端的ruleType对应)。
前端建模
- 在清洗组件配置页面补充该规则的参数模型。
- 将参数写入
taskParams.mainArgs.cleanRuleList[*].ruleConfig。
执行分发
- 在
switch (ruleType)中新增分支。 - 分支命名建议与
STRATEGY_KEY保持一致。 - 调用对应的实现方法。
- 在
规则实现
- 在
CleanTransition类中新增具体实现方法(例如applyYourRuleName(dataset, ruleConfig))。 - 方法内部需完成:
- 字段存在性校验
- 数据变换逻辑
- 日志记录
- 在
注意事项
规则执行顺序
- 按
tableFields数组顺序依次执行。 - 若规则之间存在依赖,需在前端或保存时明确排序。
- 按
字段校验
- 每条规则执行前必须通过
checkColumnsExist校验。 - 对于动态列,应定义缺失列处理策略(跳过 / 默认值 / 报错)。
- 每条规则执行前必须通过
条件过滤
- 全局
where与规则级whereClause都通过safeFilter执行。 - 建议执行顺序:先全局过滤,再局部过滤。
- 统一日志输出,避免注入风险。
- 全局
扩展规范
- 新增规则必须保证
STRATEGY_KEY与ruleType对应且唯一。 - 保持参数结构与前端
taskParams定义一致,避免解析异常。
- 新增规则必须保证
六、注意事项总览
在扩展和实现清洗规则时,需要特别关注以下几点:
参数扩展
- 新增参数统一放入
ruleConfig中,避免修改通用结构。
- 新增参数统一放入
数据库兼容
- 若涉及跨库执行,需参考数据连接模块的方言适配,确保类型与语法兼容。
唯一标识
STRATEGY_KEY必须全局唯一,用于清洗组件执行时的规则识别。- 变更时需做好向后兼容和历史规则的平滑迁移。
前后端一致性
- 前端参数模型必须与后端
parse2的解析逻辑保持一致。 - 参数校验应前后端双层把关,避免“脏参数”进入执行端。
- 前端参数模型必须与后端
执行顺序
transition中的规则按数组顺序依次执行。- 若规则存在依赖关系,需在保存时调整顺序。
条件过滤
- 全局
where与规则级whereClause可叠加,执行顺序为全局优先,局部其次。
- 全局
字段存在性
- 执行前建议进行字段校验。
- 动态列需定义缺失处理策略(跳过 / 默认值 / 报错)。
空值与类型处理
- 对
null值要有明确策略(忽略 / 默认值)。 - 数值和日期类字段需统一
cast,避免隐式转换。
- 对
性能优化
- 优先执行过滤条件,减少后续数据量。
- 谨慎使用会触发
shuffle的操作,必要时加缓存。
日志与可观测性
- 规则执行需记录:开始/结束/耗时/影响行数。
- 异常日志建议包含
ruleCode与节点名,便于排查。
扩展规范
- 新增规则需提供单元/集成测试样例。
- 规则文档需同步更新,保证研发与运维可查。
