清洗规则
提示
为了满足不同业务场景下的数据治理需求,系统提供了“清洗规则”功能。本文将介绍如何通过该功能,自主配置并扩展所需的数据清洗规则,以支持多样化的数据处理场景。
一、清洗规则表定义
清洗规则的元信息统一存储在表 ATT_CLEAN_RULE
中。该表用于维护清洗规则的基本信息、使用场景以及策略标识等内容。
建表语句
-- 创建表
CREATE TABLE QDATA_TEST.ATT_CLEAN_RULE (
ID BIGINT IDENTITY(1,1) NOT NULL,
NAME VARCHAR2(128) NOT NULL,
"LEVEL" CHAR(1) DEFAULT '0' NOT NULL,
DESCRIPTION VARCHAR2(512) NULL,
VALID_FLAG VARCHAR2(1) DEFAULT '1' NOT NULL,
DEL_FLAG VARCHAR2(1) DEFAULT '0' NOT NULL,
CREATE_BY VARCHAR2(32) NULL,
CREATOR_ID BIGINT NULL,
CREATE_TIME DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
UPDATE_BY VARCHAR2(32) NULL,
UPDATER_ID BIGINT NULL,
UPDATE_TIME DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
REMARK VARCHAR2(512) NULL,
EXAMPLE VARCHAR2(512) NULL,
"TYPE" VARCHAR2(10) NULL,
USE_CASE VARCHAR2(512) NULL,
STRATEGY_KEY VARCHAR2(32) NULL,
CODE VARCHAR2(32) NULL,
CAT_CODE VARCHAR2(128) NULL,
CONSTRAINT PK_ATT_CLEAN_RULE PRIMARY KEY (ID)
);
-- 表注释:
COMMENT ON TABLE QDATA_TEST.ATT_CLEAN_RULE IS '清洗规则表';
字段说明
字段名 | 类型 | 约束/默认值 | 说明 |
---|---|---|---|
ID | BIGINT | 主键,自增 | 规则ID |
NAME | VARCHAR2(128) | NOT NULL | 规则名称 |
LEVEL | CHAR(1) | 默认 '0',NOT NULL | 规则级别;1:字段级,2:表级 |
DESCRIPTION | VARCHAR2(512) | 规则描述 | |
VALID_FLAG | VARCHAR2(1) | 默认 '1',NOT NULL | 是否有效;0:无效,1:有效 |
DEL_FLAG | VARCHAR2(1) | 默认 '0',NOT NULL | 删除标志;1:已删除,0:未删除 |
CREATE_BY | VARCHAR2(32) | 创建人 | |
CREATOR_ID | BIGINT | 创建人ID | |
CREATE_TIME | DATETIME | 默认 CURRENT_TIMESTAMP | 创建时间 |
UPDATE_BY | VARCHAR2(32) | 更新人 | |
UPDATER_ID | BIGINT | 更新人ID | |
UPDATE_TIME | DATETIME | 默认 CURRENT_TIMESTAMP | 更新时间 |
REMARK | VARCHAR2(512) | 备注 | |
EXAMPLE | VARCHAR2(512) | 示例 | |
TYPE | VARCHAR2(10) | 清洗规则类目ID | |
USE_CASE | VARCHAR2(512) | 使用场景 | |
STRATEGY_KEY | VARCHAR2(32) | 策略标识,用于清洗组件执行时识别 | |
CODE | VARCHAR2(32) | 规则编码 | |
CAT_CODE | VARCHAR2(128) | 类目编码 |
关键字段:
STRATEGY_KEY
该字段在清洗组件执行时用于规则识别和分发,是扩展新规则时的必填项。
二、前端参数化接入
在 清洗规则表 中完成基本信息与标识(含 STRATEGY_KEY
)配置后,清洗规则会出现在 ETL 的清洗组件列表中。
但此时点击无反应,需要进行前端扩展:
开发说明
前端页面开发
- 为新增的清洗规则编写对应的配置页面。
- 页面用于录入和校验规则所需参数。
参数封装
- 将前端配置的参数封装到 ETL 组件的
taskParams
中。 - 参数结构需与后端
parse2
方法的解析逻辑保持一致。
- 将前端配置的参数封装到 ETL 组件的
传递至后端
- 前端保存时,参数随节点信息一并写入 ETL 的任务定义。
- 在后续任务执行时,通过
taskParams
传递到执行组件。
注意:
- 若新增清洗规则没有额外参数,可直接继承默认结构;
- 若有扩展需求,需在
taskParams
中定义新的参数字段,并同步更新解析逻辑。
三、ETL 参数封装与清洗组件落参
在前端点击保存 ETL 组件信息后,系统会进入参数封装阶段。
相关逻辑位于 qdata-module-dpp/qdata-module-dpp-biz
模块下的:
tech.qiantong.qdata.module.dpp.utils.TaskConverter#buildEtlTaskParams
方法说明
/**
* 构建etl参数数据
*/
public static Map<String, Object> buildEtlTaskParams(
String taskDefinitionList,
Map<String, DppEtlNodeSaveReqVO> nodeMap,
Map<String, Object> taskInfo,
List<DsResource> resourceList) {
Map<String, Object> result = new HashMap<>();
List<Map<String, Object>> transitionList = new ArrayList<>();
List<DppEtlNodeSaveReqVO> nodeList = JSON.parseArray(taskDefinitionList, DppEtlNodeSaveReqVO.class);
for (DppEtlNodeSaveReqVO dppEtlNodeSaveReqVO : nodeList) {
Integer version = 1;
if (nodeMap.containsKey(dppEtlNodeSaveReqVO.getCode())) {
version = nodeMap.get(dppEtlNodeSaveReqVO.getCode()).getVersion();
}
// 组件类型
String componentType = dppEtlNodeSaveReqVO.getComponentType();
TaskComponentTypeEnum taskComponentTypeEnum = TaskComponentTypeEnum.findEnumByType(componentType);
Map<String, Object> data = ComponentFactory.getComponentItem(componentType)
.parse2(dppEtlNodeSaveReqVO.getCode(), version, taskComponentTypeEnum,
dppEtlNodeSaveReqVO.getTaskParams(), resourceUrl, resourceList);
data.put("nodeName", dppEtlNodeSaveReqVO.getName());
data.put("projectCode", taskInfo.get("projectCode"));
switch (taskComponentTypeEnum) {
case DB_READER:
case EXCEL_READER:
case CSV_READER:
result.put("reader", data);
break;
case SPARK_CLEAN:
case SORT_RECORD:
case FIELD_DERIVATION:
transitionList.add(data);
break;
case DB_WRITER:
result.put("writer", data);
break;
}
}
// 配置 config
Map<String, Object> config = new HashMap<>();
config.put("taskInfo", taskInfo);
config.put("rabbitmq", rabbitmqConfig);
config.put("resourceUrl", resourceUrl);
result.put("transition", transitionList);
result.put("config", config);
return result;
}
关键点
组件分类处理
reader
:DB / Excel / CSV 读取器transition
:中间处理链路(清洗、排序、字段衍生等)writer
:DB 写入器
清洗组件类型
SPARK_CLEAN
即为清洗组件的type
,在此阶段会被加入到transitionList
。
参数传递
- 清洗规则页面封装的参数会进入
taskParams
。 - 在
parse2
方法中,这些参数会被解析并写入transition
。 - 通用元数据(如
nodeName
、projectCode
、version
)由buildEtlTaskParams
方法统一补齐。
- 清洗规则页面封装的参数会进入
注意
- 开发新的清洗规则时,不需要修改
buildEtlTaskParams
方法。- 只需保证前端传入的
taskParams
参数结构 与parse2
方法解析逻辑 一致即可。
四、清洗组件参数封装与扩展
清洗组件的参数在 buildEtlTaskParams
中被调用 parse2
方法完成封装:
Map<String, Object> data = ComponentFactory.getComponentItem(componentType)
.parse2(dppEtlNodeSaveReqVO.getCode(),
version,
taskComponentTypeEnum,
dppEtlNodeSaveReqVO.getTaskParams(),
resourceUrl,
resourceList);
方法示例:
@Override
public Map<String, Object> parse2(String nodeCode,
Integer nodeVersion,
TaskComponentTypeEnum componentType,
Map<String, Object> taskParams,
String resourceUrl,
List<DsResource> resourceList) {
// reader 配置
Map<String, Object> reader = new HashMap<>();
reader.put("nodeCode", nodeCode);
reader.put("nodeVersion", nodeVersion);
reader.put("componentType", componentType.getCode());
// 参数
Map<String, Object> parameter = new HashMap<>();
Map<String, Object> mainArgs = (Map<String, Object>) taskParams.get("mainArgs");
parameter.put("cleanRuleList", mainArgs.get("cleanRuleList"));
parameter.put("tableFields", taskParams.get("tableFields"));
parameter.put("where", taskParams.get("where"));
reader.put("parameter", parameter);
return reader;
}
关键点
参数封装
- 清洗组件的所有参数统一存储在
taskParams
中。 - 在
parse2
方法中,这些参数会被解析并转入parameter
字段。
- 清洗组件的所有参数统一存储在
默认结构
cleanRuleList
:清洗规则集合tableFields
:当前表字段信息where
:条件过滤表达式
扩展说明
- 一般情况下无需修改
parse2
的默认逻辑。 - 如果有扩展需求,可以在
taskParams
中增加新的参数字段,并在parse2
中进行存储。
- 一般情况下无需修改
注意
- 前端页面的参数模型必须与
parse2
方法解析逻辑保持一致。- 新增参数字段时,应保持向后兼容,避免影响现有规则执行。
五、执行端实现(Spark 清洗执行)
清洗规则的执行位于 qdata-etl
模块
类:tech.qiantong.qdata.spark.etl.transition.CleanTransition
关键流程(节选)
JSONObject parameter = transition.getJSONObject("parameter");
// 1) 参数获取
List<Map<String, Object>> tableFieldList = (List<Map<String, Object>>) parameter.get("tableFields");
if (tableFieldList == null || tableFieldList.isEmpty()) {
return transitionOld(dataset, transition, logPath);
}
// 2) 全局 where
String where = parameter.getString("where");
if (StringUtils.isNotEmpty(where)) {
dataset = safeFilter(dataset, where, logPath);
}
// 3) 逐条规则处理
for (Map<String, Object> rule : tableFieldList) {
String ruleCode = MapUtils.getString(rule, "ruleCode");
String ruleType = MapUtils.getString(rule, "ruleType");
JSONObject ruleConfig = JSONObject.parseObject((String) rule.get("ruleConfig"));
String whereClause = MapUtils.getString(rule, "whereClause");
if (StringUtils.isNotEmpty(whereClause)) {
dataset = safeFilter(dataset, whereClause, logPath);
}
// 字段存在性校验
if (!checkColumnsExist(dataset, ruleConfig)) {
LogUtils.writeLog(logPath, String.format("跳过规则 %s(字段不存在)", ruleCode));
continue;
}
// 4) 分发执行
switch (ruleType) {
case "WITHIN_BOUNDARY": // 数值边界调整
dataset = applyNumericBoundary(dataset, ruleConfig);
break;
case "REMOVE_EMPTY_COMBINATION": // 组合字段为空删除
dataset = applyDeleteIfAllNull(dataset, ruleConfig);
break;
case "ADD_PREFIX_SUFFIX": // 字段前/后缀统一
dataset = applyPrefixSuffix(dataset, ruleConfig);
break;
case "MENU_CUSTOM": // 枚举值映射标准化
dataset = normalizeEnumMapping(dataset, ruleConfig);
break;
case "KEEP_LATEST_OR_FIRST": // 按组合字段去重(保留最新或首条)
dataset = deduplicateByFieldsKeepFirst(dataset, ruleConfig);
break;
default:
LogUtils.writeLog(logPath, "未知规则:" + ruleCode);
}
}
参数来源
parameter.tableFields
- 包含清洗规则清单
- 每条规则包含:
ruleCode
/ruleType
/ruleConfig
/whereClause
parameter.where
- 全局过滤条件
- 优先于规则级
whereClause
执行
扩展指引
规则登记
- 在清洗规则表中新增记录。
- 设置唯一
STRATEGY_KEY
(需与前端的ruleType
对应)。
前端建模
- 在清洗组件配置页面补充该规则的参数模型。
- 将参数写入
taskParams.mainArgs.cleanRuleList[*].ruleConfig
。
执行分发
- 在
switch (ruleType)
中新增分支。 - 分支命名建议与
STRATEGY_KEY
保持一致。 - 调用对应的实现方法。
- 在
规则实现
- 在
CleanTransition
类中新增具体实现方法(例如applyYourRuleName(dataset, ruleConfig)
)。 - 方法内部需完成:
- 字段存在性校验
- 数据变换逻辑
- 日志记录
- 在
注意事项
规则执行顺序
- 按
tableFields
数组顺序依次执行。 - 若规则之间存在依赖,需在前端或保存时明确排序。
- 按
字段校验
- 每条规则执行前必须通过
checkColumnsExist
校验。 - 对于动态列,应定义缺失列处理策略(跳过 / 默认值 / 报错)。
- 每条规则执行前必须通过
条件过滤
- 全局
where
与规则级whereClause
都通过safeFilter
执行。 - 建议执行顺序:先全局过滤,再局部过滤。
- 统一日志输出,避免注入风险。
- 全局
扩展规范
- 新增规则必须保证
STRATEGY_KEY
与ruleType
对应且唯一。 - 保持参数结构与前端
taskParams
定义一致,避免解析异常。
- 新增规则必须保证
六、注意事项总览
在扩展和实现清洗规则时,需要特别关注以下几点:
参数扩展
- 新增参数统一放入
ruleConfig
中,避免修改通用结构。
- 新增参数统一放入
数据库兼容
- 若涉及跨库执行,需参考数据连接模块的方言适配,确保类型与语法兼容。
唯一标识
STRATEGY_KEY
必须全局唯一,用于清洗组件执行时的规则识别。- 变更时需做好向后兼容和历史规则的平滑迁移。
前后端一致性
- 前端参数模型必须与后端
parse2
的解析逻辑保持一致。 - 参数校验应前后端双层把关,避免“脏参数”进入执行端。
- 前端参数模型必须与后端
执行顺序
transition
中的规则按数组顺序依次执行。- 若规则存在依赖关系,需在保存时调整顺序。
条件过滤
- 全局
where
与规则级whereClause
可叠加,执行顺序为全局优先,局部其次。
- 全局
字段存在性
- 执行前建议进行字段校验。
- 动态列需定义缺失处理策略(跳过 / 默认值 / 报错)。
空值与类型处理
- 对
null
值要有明确策略(忽略 / 默认值)。 - 数值和日期类字段需统一
cast
,避免隐式转换。
- 对
性能优化
- 优先执行过滤条件,减少后续数据量。
- 谨慎使用会触发
shuffle
的操作,必要时加缓存。
日志与可观测性
- 规则执行需记录:开始/结束/耗时/影响行数。
- 异常日志建议包含
ruleCode
与节点名,便于排查。
扩展规范
- 新增规则需提供单元/集成测试样例。
- 规则文档需同步更新,保证研发与运维可查。