Data Connection
Note
To meet diverse business requirements for connecting data sources, the system provides a "Data Connection" menu. This article explains how to independently configure and extend database or API data connections through this feature.
1. Data Connection – SQL
The table DA_DATASOURCE
is primarily used to store configured data source connection information within the system. It includes standard connection parameters such as data source name (DATASOURCE_NAME
), type (DATASOURCE_TYPE
), IP address (IP
), and port (PORT
), supporting connection management for most relational databases.
However, in practical applications—especially when integrating data sources from the big data ecosystem (e.g., Hive, HBase, Kafka)—the connection methods are often more complex and require flexible configurations that cannot be fully covered by fixed fields. Therefore, the table includes the DATASOURCE_CONFIG
field, which stores extended parameters (such as username, password, Kerberos authentication, Zookeeper address, etc.) in JSON string format. This field enhances the system’s extensibility and adaptability, enabling dynamic integration of various heterogeneous data sources.
-- ----------------------------
-- Data Source Table - DM Sample Script
-- ----------------------------
CREATE TABLE "QDATA"."DA_DATASOURCE"
(
"ID" BIGINT IDENTITY(49, 1) NOT NULL,
"DATASOURCE_NAME" VARCHAR2(128) NOT NULL,
"DATASOURCE_TYPE" VARCHAR2(32) NOT NULL,
"DATASOURCE_CONFIG" VARCHAR2(3072),
"IP" VARCHAR2(32) NOT NULL,
"PORT" INTEGER NOT NULL,
"LIST_COUNT" INTEGER DEFAULT 0,
"SYNC_COUNT" INTEGER DEFAULT 0,
"DATA_SIZE" INTEGER DEFAULT 0,
"DESCRIPTION" VARCHAR2(512),
"VALID_FLAG" VARCHAR2(1) DEFAULT 1 NOT NULL,
"CREATE_BY" VARCHAR2(32),
"CREATOR_ID" BIGINT,
"CREATE_TIME" DATETIME(6),
"UPDATE_BY" VARCHAR2(32),
"UPDATER_ID" BIGINT,
"UPDATE_TIME" DATETIME(6),
"REMARK" VARCHAR2(512),
"DEL_FLAG" VARCHAR(1) DEFAULT '0' NOT NULL,
NOT CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;
COMMENT ON TABLE "QDATA"."DA_DATASOURCE" IS 'Data source table';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."CREATE_BY" IS 'Creator';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."CREATE_TIME" IS 'Creation time';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."CREATOR_ID" IS 'Creator ID';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DATA_SIZE" IS 'Synchronized data size (reserved)';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DATASOURCE_CONFIG" IS 'Data source configuration (JSON string)';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DATASOURCE_NAME" IS 'Data source name';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DATASOURCE_TYPE" IS 'Data source type';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."DESCRIPTION" IS 'Description';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."ID" IS 'ID';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."IP" IS 'IP';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."LIST_COUNT" IS 'Number of database tables (reserved)';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."PORT" IS 'Port';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."REMARK" IS 'Remarks';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."SYNC_COUNT" IS 'Synchronized record count (reserved)';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."UPDATE_BY" IS 'Updater';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."UPDATE_TIME" IS 'Update time';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."UPDATER_ID" IS 'Updater ID';
COMMENT ON COLUMN "QDATA"."DA_DATASOURCE"."VALID_FLAG" IS 'Validity flag; 0: invalid, 1: valid';
2. Data Connection – Business Code
From the table name DA_DATASOURCE
, it's clear that the business logic for data source connections resides in the qdata-module-da
module. Within this module, the entry point for the controller layer is located at:
tech.qiantong.qdata.module.da.controller.admin.datasource
Specifically, the core controller is DaDatasourceController
, which handles API requests related to data connections, including adding, editing, and testing data sources.
To extend the business logic (e.g., support new database types, adjust parameter structures, or enhance validation), start from this class. Corresponding modifications in the lower service
layer may also be required.
3. Data Connection – Utility Methods
When diving into the service
layer of the data connection module, you’ll find that the underlying interaction logic with JDBC is not implemented independently in each business class. Instead, it is uniformly encapsulated and directed to a shared utility package.
This shared package is located in the qdata-common
submodule under the qdata-framework
module, with the specific path:
tech.qiantong.qdata.common.database
This package centrally encapsulates all database interaction capabilities, including but not limited to:
- Establishing data source connections (e.g.,
DataSourceFactory
) - Selecting appropriate SQL dialects based on data source type (
DbDialect
,DialectFactory
) - Retrieving database schema information (
DbQuery
,DbQueryFactoryBean
) - Executing SQL queries and extracting metadata entities (
DbTable
,DbColumn
) - Supporting caching mechanisms, exception handling, and field type definitions
📂 Package structure (key classes):
datasource
: Provides abstract and default factory classes for dynamically creating connection objectsquery
: Defines query execution interfaces and implementationsdialect
: Encapsulates SQL dialect adaptation strategies by database typeconstants
: Defines enumerations for database types, field types, and parsing methodscore
: Abstracts table, column, and pagination objectsutils
: Utility classes such asMD5Util
This design significantly enhances the system’s database adaptability. Adding a new data source type only requires implementing the corresponding dialect and query factory, allowing seamless integration into the entire data connection module, ensuring extensibility and code decoupling.
4. Adding a New Data Source Type (Development Workflow)
If you've carefully read the previous sections on the data connection utility system, you should now have a basic understanding of its overall architecture. Next, we'll walk through the complete process of adding a new data source type, using MySQL
as an example, to help you integrate other databases like Hive or ClickHouse in the future.
1. Add a New Data Source Type in the DbType
Enum
Path: tech.qiantong.qdata.common.database.constants.DbType
/**
* MYSQL
*/
MYSQL("MySql",
"MySql Database",
"jdbc:mysql://${host}:${port}/${dbName}?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8",
"LENGTH",
"SELECT COUNT(1) FROM {tableName}",
"SELECT {tableFieldName} FROM {tableName} ORDER BY {orderBy} DESC LIMIT ({pageNo}-1)*{pageSize},{pageSize}");
Each enum entry includes:
- Data source identifier
- Display name
- JDBC URL template
- Field length function
- Record count query SQL template
- Paginated query SQL template
2. Create the Corresponding Dialect Class
Path: tech.qiantong.qdata.common.database.dialect
Create a class, e.g., MySqlDialect
, extending the abstract class:
public class MySqlDialect extends AbstractDbDialect {
// Implement abstract methods from parent class
}
3. Implement Dialect Logic
In MySqlDialect
, implement core methods defined in AbstractDbDialect
, such as:
tables
columns
buildQuerySqlFields
someInternalSqlGenerator
Refer to existing implementations or the DbQuery
interface documentation.
4. Register the New Dialect in the Dialect Factory
Path: DialectRegistry.java
Location: tech.qiantong.qdata.common.database.dialect.DialectRegistry
Register the new dialect in the constructor's mapping table:
public DialectRegistry() {
...
dialect_enum_map.put(DbType.MYSQL, new MySqlDialect());
...
}
Ensure the DbType
enum value matches the registration key.
5. The main work for adding a new data source is now complete.
The system will automatically invoke your implemented Dialect
class based on the DbType
when performing data source operations (e.g., generating DDL, extracting fields, paginated queries), enabling seamless integration of different databases.
5. Additional Extension Tips
1. Special Data Connections (HDFS, HBase, Kafka, etc.)
For data sources like HDFS, HBase, and Kafka, which cannot establish standard connections via JDBC, the integration approach differs slightly from traditional databases. Special handling is required in the data connection module:
(1) Configuration Storage
Connection parameters for such special data sources (e.g., Zookeeper address, authentication info, topic names) are uniformly stored in the DATASOURCE_CONFIG
field. This JSON-formatted field supports flexible extension.
(2) Construct Connection Context Object (DbQueryProperty
)
Before calling the database factory method createDbQuery()
, you must construct a DbQueryProperty
object containing standard fields like host, port, and database name.
For special data sources, additional connection parameters must be written into the config
field:
DbQueryProperty dbQueryProperty = new DbQueryProperty();
dbQueryProperty.setHost("kafka-host");
dbQueryProperty.setPort(9092);
dbQueryProperty.setConfig(Map.of(
"security.protocol", "SASL_PLAINTEXT",
"sasl.mechanism", "PLAIN"
));
(3) Implement Custom Connection Validation Logic (Kafka Example)
Kafka connection testing requires a custom validConnection
method, using Kafka AdminClient instead of JDBC:
@Override
public Boolean validConnection(DataSource dataSource, DbQueryProperty dbQueryProperty) {
Properties props = new Properties();
props.put("bootstrap.servers", dbQueryProperty.getHost() + ":" + dbQueryProperty.getPort());
props.put("default.api.timeout.ms", 10000);
props.put("request.timeout.ms", 10000);
props.put("admin.request.timeout.ms", 10000);
if (dbQueryProperty.getConfig() != null && !dbQueryProperty.getConfig().isEmpty()) {
dbQueryProperty.getConfig().forEach(props::put);
}
String topic = "TEST_TOPIC_" + UUID.randomUUID();
AdminClient admin = AdminClient.create(props);
try {
admin.createTopics(Collections.singleton(new NewTopic(topic, 1, (short) 1))).all().get();
admin.deleteTopics(Collections.singleton(topic)).all().get();
return true;
} catch (Exception e) {
throw new DataQueryException("Kafka connection failed. Please try again later.");
} finally {
try {
admin.close();
} catch (Exception e) {
throw new DataQueryException("Failed to close Kafka connection.");
}
}
}
(4) Adapting Other Data Source Types
For other non-relational data sources like HDFS and HBase, follow the Kafka example by:
- Retrieving required parameters from
DbQueryProperty.config
- Implementing custom connection initialization (e.g.,
FileSystem
,HBaseAdmin
) - Overriding
valid()
or other relevant methods in the correspondingDbQuery
implementation
📝 Summary:
The adaptation principle for special data sources is: JSON-based configuration, strategy-driven connection flow, and unified interface calls, ensuring all data sources can be registered, initialized, and validated through a unified factory mechanism.
2. Frontend Configuration
After completing the backend extension for a new data source type, it still won't appear in the frontend unless configured. You must update the configuration in System Management → Dictionary Management.
Steps:
- Navigate to System Management → Dictionary Management
- Locate the dictionary type:
datasource_type
- Add a new dictionary item, ensuring the dictionary code matches the backend
DbType
enum name, e.g.,KAFKA
,CLICKHOUSE
After configuration, the new data source type will appear in the "Add Data Source" form on the frontend, enabling UI selection and parameter input.
Summary
This concludes the development documentation for the data connection module, covering module structure, core functionality, underlying encapsulation, and the complete workflow for extending new data source types. With this guide, you can quickly understand the module’s design philosophy and development approach, laying a solid foundation for future data source integration and capability expansion.