
数据连接
为了满足不同业务场景对数据源的接入需求,系统提供了“数据连接”菜单。本文将介绍如何通过该功能,自主配置扩展所需的数据库或接口数据连接。
一、🔍 数据链接-SQL
表 DA_DATASOURCE
主要用于记录系统中配置的数据源连接信息,其中包含了如数据源名称(DATASOURCE_NAME
)、类型(DATASOURCE_TYPE
)、IP 地址(IP
)、端口号(PORT
)等常规连接参数,用于支撑大多数关系型数据库的连接管理。
然而,在实际应用中,尤其是在接入大数据生态中的一些数据源(如 Hive、HBase、Kafka 等)时,其连接方式往往更为复杂,所需的配置项也较为灵活,无法通过固定字段完全覆盖。因此,表中预留了 DATASOURCE_CONFIG
字段,用于以 JSON 字符串的形式存储扩展参数,如用户名、密码、Kerberos 认证信息、Zookeeper 地址等。该字段为系统提供了更强的可扩展性与适配能力,使其能够支持多种异构数据源的动态接入。
-- ----------------------------
-- 数据源表-dm示例脚本
-- ----------------------------
CREATE TABLE "QDATA"."DA_DATASOURCE"
(
"ID" BIGINT IDENTITY(49, 1) NOT NULL,
"DATASOURCE_NAME" VARCHAR2(128) NOT NULL,
"DATASOURCE_TYPE" VARCHAR2(32) NOT NULL,
"DATASOURCE_CONFIG" VARCHAR2(3072),
"IP" VARCHAR2(32) NOT NULL,
"PORT" INTEGER NOT NULL,
"LIST_COUNT" INTEGER DEFAULT 0,
"SYNC_COUNT" INTEGER DEFAULT 0,
"DATA_SIZE" INTEGER DEFAULT 0,
"DESCRIPTION" VARCHAR2(512),
"VALID_FLAG" VARCHAR2(1) DEFAULT 1 NOT NULL,
"CREATE_BY" VARCHAR2(32),
"CREATOR_ID" BIGINT,
"CREATE_TIME" DATETIME(6),
"UPDATE_BY" VARCHAR2(32),
"UPDATER_ID" BIGINT,
"UPDATE_TIME" DATETIME(6),
"REMARK" VARCHAR2(512),
"DEL_FLAG" VARCHAR(1) DEFAULT '0' NOT NULL,
NOT CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;
COMMENT ON TABLE "QDATA"."DA_DATASOURCE" IS '数据源表';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."CREATE_BY" IS '创建人';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."CREATE_TIME" IS '创建时间';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."CREATOR_ID" IS '创建人id';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DATA_SIZE" IS '同步数据量大小(预留)';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DATASOURCE_CONFIG" IS '数据源配置(json字符串)';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DATASOURCE_NAME" IS '数据源名称';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DATASOURCE_TYPE" IS '数据源类型';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DESCRIPTION" IS '描述';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."ID" IS 'ID';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."IP" IS 'IP';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."LIST_COUNT" IS '数据库表数(预留)';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."PORT" IS '端口号';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."REMARK" IS '备注';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."SYNC_COUNT" IS '同步记录数(预留)';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."UPDATE_BY" IS '更新人';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."UPDATE_TIME" IS '更新时间';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."UPDATER_ID" IS '更新人id';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."VALID_FLAG" IS '是否有效;0:无效,1:有效';
二、🔍 数据连接-业务代码
从数据连接表名 DA_DATASOURCE
即可看出,数据源连接相关的业务代码归属于 qdata-module-da
模块。在该模块中,控制器层的入口类位于:
tech.qiantong.qdata.module.da.controller.admin.datasource
具体地,核心控制器为 DaDatasourceController
,负责处理数据连接的相关接口请求,包括数据源的新增、编辑、测试连接等操作。
如果希望扩展数据连接的业务逻辑(如新增支持的数据库类型、调整连接参数结构、扩展校验逻辑等),可从该类入手进行功能扩展。同时,其下层的 service
层 等也需进行相应的联动改造。
三、🔍 数据连接-工具方法
当你深入到数据连接模块的 service
层,就会发现其底层与 JDBC 的交互逻辑,并不是在每个业务类中独立实现的,而是统一封装并指向了一个公共方法包。
该公共包位于 qdata-framework
模块下的子模块 qdata-common
中,具体路径如下:
tech.qiantong.qdata.common.database
在该包中,系统将所有与数据库交互相关的能力进行了统一封装,包括但不限于:
- 建立数据源连接(如 DataSourceFactory)
- 根据数据源类型选择合适的 SQL 方言(DbDialect、DialectFactory)
- 获取数据库结构信息(DbQuery、DbQueryFactoryBean)
- 执行 SQL 查询、元数据提取实体(DbTable、DbColumn)
- 支持缓存机制、异常封装、字段类型定义等
📂 包结构如下(核心类说明):
datasource
: 提供抽象工厂与默认工厂类,用于动态创建连接对象query
: 定义了查询执行逻辑接口与具体实现dialect
: 按数据库类型封装 SQL 方言适配策略constants
: 定义数据库类型、字段类型、解析方式枚举等core
: 抽象表、字段、分页对象utils
: 工具类如MD5Util
该设计极大地增强了系统的数据库适配能力,使新增数据源类型只需实现对应方言及查询工厂,便可无缝接入整个数据连接模块,保证了扩展性与代码解耦性。
四、🚀 数据链接-扩展新的数据源类型(开发流程)
如果你已经仔细阅读了上文对数据连接公共方法体系的介绍,此时应对其整体结构有了初步理解。接下来,我们将以扩展 MySQL
数据源为例,完整走一遍 新增数据源类型的流程,便于你后续接入其他数据库如 Hive、ClickHouse 等。
1️⃣ 在枚举类 DbType
中新增数据源类型
路径:tech.qiantong.qdata.common.database.constants.DbType
/**
* MYSQL
*/
MYSQL("MySql",
"MySql数据库",
"jdbc:mysql://${host}:${port}/${dbName}?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8",
"LENGTH",
"SELECT COUNT(1) FROM {tableName}",
"SELECT {tableFieldName} FROM {tableName} ORDER BY {orderBy} DESC LIMIT ({pageNo}-1)*{pageSize},{pageSize}");
每个数据源枚举项包含:
- 数据源标识
- 显示名称
- JDBC URL 模板
- 获取字段长度函数
- 查询记录数 SQL 模板
- 分页查询 SQL 模板
2️⃣ 创建对应 Dialect 类(方言解析器)
路径:tech.qiantong.qdata.common.database.dialect
创建类名:如 MySqlDialect
,需继承抽象类:
public class MySqlDialect extends AbstractDbDialect {
// 实现父类中抽象的方法
}
3️⃣ 实现方言逻辑
在 MySqlDialect
中实现 AbstractDbDialect
定义的核心方法,如:
tables
columns
buildQuerySqlFields
someInternalSqlGenerator
可参考已有数据库实现或查阅接口 DbQuery
的说明。
4️⃣ 注册新方言到方言工厂
路径:DialectRegistry.java
位置:tech.qiantong.qdata.common.database.dialect.DialectRegistry
在构造方法中将新方言注册到映射表:
public DialectRegistry() {
...
dialect_enum_map.put(DbType.MYSQL, new MySqlDialect());
...
}
确保你的 DbType
枚举值和注册项一致。
✅ 至此,新增数据源的主要工作已完成。
系统在处理数据源相关功能(如建表语句生成、字段提取、分页查询等)时,将根据 DbType
自动调用你实现的 Dialect
类,从而实现不同数据库的接入适配。
五、数据链接-额外扩展小知识
1️⃣ 特殊数据连接(HDFS、HBase、Kafka 等)
对于 HDFS、HBase、Kafka 等无法通过 JDBC 建立标准连接的数据源,其接入方式与传统数据库略有不同。此类数据源在数据连接模块中需做以下特殊处理:
🔹 1. 配置存储方式
这类特殊数据源所需的连接参数(如 Zookeeper 地址、认证信息、主题名等)统一保存在 DATASOURCE_CONFIG
字段中,该字段为 JSON 格式,用于支撑灵活扩展。
🔹 2. 构建连接上下文对象(DbQueryProperty)
在调用数据库工厂方法 createDbQuery()
之前,必须构造 DbQueryProperty
,其中包含主机地址、端口、数据库名等常规字段。
对于这类特殊数据源,需将额外的连接参数兼容性写入 config
字段:
DbQueryProperty dbQueryProperty = new DbQueryProperty();
dbQueryProperty.setHost("kafka-host");
dbQueryProperty.setPort(9092);
dbQueryProperty.setConfig(Map.of(
"security.protocol", "SASL_PLAINTEXT",
"sasl.mechanism", "PLAIN"
));
🔹 3. 实现自定义连接验证逻辑(以 Kafka 为例)
Kafka 连接测试需自定义 validConnection
方法,不再通过 JDBC 建立连接,而是通过 Kafka AdminClient 方式测试:
@Override
public Boolean validConnection(DataSource dataSource, DbQueryProperty dbQueryProperty) {
Properties props = new Properties();
props.put("bootstrap.servers", dbQueryProperty.getHost() + ":" + dbQueryProperty.getPort());
props.put("default.api.timeout.ms", 10000);
props.put("request.timeout.ms", 10000);
props.put("admin.request.timeout.ms", 10000);
if (dbQueryProperty.getConfig() != null && !dbQueryProperty.getConfig().isEmpty()) {
dbQueryProperty.getConfig().forEach(props::put);
}
String topic = "TEST_TOPIC_" + UUID.randomUUID();
AdminClient admin = AdminClient.create(props);
try {
admin.createTopics(Collections.singleton(new NewTopic(topic, 1, (short) 1))).all().get();
admin.deleteTopics(Collections.singleton(topic)).all().get();
return true;
} catch (Exception e) {
throw new DataQueryException("Kafka 连接失败,请稍后重试");
} finally {
try {
admin.close();
} catch (Exception e) {
throw new DataQueryException("关闭 Kafka 连接失败");
}
}
}
🔹 4. 其他类型数据源的适配
对于 HDFS、HBase 等其他非关系型数据源,可仿照 Kafka 的方式,在对应的 DbQuery
实现类中:
- 从
DbQueryProperty.config
中获取必要参数 - 自定义连接初始化逻辑(如 FileSystem、HBaseAdmin 等)
- 重写
valid()
或其他相关方法
📝 小结:
特殊数据源的适配原则是:配置信息 JSON 化、连接流程策略化、接口调用统一化,从而确保各类数据源都能通过统一工厂机制注册、初始化与验证。
2️⃣ 页面配置
当前我们已完成了新数据源类型的后端扩展,但在前端页面中,如果没有进行配置,仍然无法选择该数据源类型。此时需在系统管理 → 字典管理菜单中进行相应配置。
具体操作如下:
- 进入系统管理模块 → 字典管理页面
- 找到字典类型:
datasource_type
- 在该字典项中新增一个字典值,字典编码需与后端
DbType
枚举名称保持一致,例如:KAFKA
、CLICKHOUSE
配置完成后,即可在前端页面的新增数据源表单中看到对应类型选项,支持页面联动与参数输入。
✅ 开发文档总结
至此,数据连接模块的开发文档已全部介绍完毕,包含了模块结构、核心功能、底层封装逻辑、以及新增数据源类型的完整扩展流程。通过本指南,你将能够快速理解并掌握该模块的整体设计理念与开发方法,为后续的数据源适配与能力扩展打下基础。