架构设计
source connector从debezium发送消息到kafka
sink connector将记录从kafka中发送到其他系统接收器
Debezium Server
使用Debezium Server部署Debezium,这个Server是可配置的。
Configuration
Avro序列化
Debezium的序列化根据Kafka配置的序列化方式
复制 key.converter= org.apache.kafka.connect.json.JsonConverter
value.converter= org.apache.kafka.connect.json.JsonConverter
关闭schemas的传递
key.converter.schemas.enable
:设置为false关闭key的schema传递
value.converter.schemas.enable
:设置为false关闭value的schema传递
Avro序列化的优势
Avro二进制格式是紧凑和有效的,并且可以保证每条记录的数据格式的正确。
这对于Debezium连接器非常重要,它将动态生成每条记录的模式,以匹配已更改的数据库表的结构。
变更的事件记录写入相同的topic可能有相同的schema不同的版本,avro更容易适用于变更的记录schema
APIicurio API Schema Registry使用
使用avro序列化必须部署一个schema registry为了管理Avro消息schemas和他们的版本。
配置一个Debezium connector实例去使用Avro Schema
复制 key.converter= io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url= http://apicurio:8080/api
key.converter.apicurio.registry.global-id= io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
value.converter= io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url= http://apicurio:8080/api
value.converter.apicurio.registry.global-id= io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
Confluent Schema Registry
复制 key.converter= io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url= http://localhost:8081
value.converter= io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url= http://localhost:8081
复制 docker run -it --rm --name schema-registry \
--link zookeeper \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
-p 8181:8181 confluentinc/cp-schema-registry
Topic路由
每个kafka记录包含一个数据变更事件有一个默认定义的topic。如果需要重新发送到其他topic需要在在记录到kafkaConnector之前指定一个topic。
debezium提供的topic路由是单独消息转换,配置这个转换器在debezium的kafkaConnect配置中
Use case
默认debezium提供的topic的名字为:debeziumname.database.tablename
对于PG的分区表关闭添加唯一键行为:key.enforce.uniqueness=false
reroute配置
复制 transforms= Reroute
transforms.Reroute.type= io.debezium.transforms.ByLogicalTableRouter
# 匹配*customers_shard*相关的表,比如myserver.mydb.customers_shard1、myserver.mydb.customers_shard2
transforms.Reroute.topic.regex= (.*)customers_shard(.*)
transforms.Reroute.topic.replacement= $1customers_all_shards
topic.regex
指定将转换应用于每个更改事件记录的正则表达式,以确定是否应将其路由到特定主题。
topic.replacement
指定表示目标主题名称的正则表达式。转换将每个匹配的记录路由到由该表达式标识的主题。
满足唯一键
一个debezium变更事件的key使用的是表的列作为表的主键,对于分库的表来说可能debezium的key是重复的。为了满足每个相同的key发送到相同的partition,topic路由转换插入一个字段__dbz__physicalTableIdentifier
来保证,其默认为目标topic名称。
transforms.Reroute.key.field.name=shard_id
设置其他唯一key名称。
如果表包含全局唯一键,并且不需要更改键结构,则可以设置key.enforcement.uniqueness
选项为false
新记录状态提取
改变时间结构
Debezium生成的数据变更事件是一个复杂的结构,每个事件包含三个部分
Metadata:进行更改的操作\数据源信息比如database、table名称/变更时间/可选的转换信息等
复制 {
"op" : "u" ,
"source" : {
...
} ,
"ts_ms" : "..." ,
"before" : {
"field1" : "oldvalue1" ,
"field2" : "oldvalue2"
} ,
"after" : {
"field1" : "newvalue1" ,
"field2" : "newvalue2"
}
}
配置
复制 transforms= unwrap,...
# 指定提取类
transforms.unwrap.type= io.debezium.transforms.ExtractNewRecordState
# 在事件流中保留删除操作的逻辑删除记录。
transforms.unwrap.drop.tombstones= false
# 为删除操作提供"__deleted"标示
transforms.unwrap.delete.handling.mode= rewrite
# 添加表和lsn字段的更改事件元数据。
transforms.unwrap.add.fields= table,lsn
transforms.unwrap.delete.handling.mode=rewrite
,添加__deleted
标示
复制 "value" : {
"pk" : 2 ,
"cola" : null ,
"__deleted" : "true"
}
transforms.unwrap.add.fields
在metadata中添加字段,包括table
/lsn
等
复制 {
...
"__op" : "c" ,
"__table" : "MY_TABLE" ,
"__lsn" : "123456789" ,
"__source_ts_ms" : "123456789" ,
...
}
自定义Topic自动创建
topic动态为offsets,connector status,config storge和history topics创建内部topics。目标topics为了捕获表将会动态创建一个默认的配置当kafka brokers的配置auto.create.topics.enable
设置为true
时
配置Kafka Connect
复制 # 开启动态topic创建
auto.topic.creation.enable = true
默认group配置
复制 {
...
"topic.creation.default.replication.factor" : 3 ,
"topic.creation.default.partitions" : 10 ,
"topic.creation.default.cleanup.policy" : "compact" ,
"topic.creation.default.compression.type" : "lz4"
...
}
自定义group配置
复制 {
...
// 不同的库定义不同的group策略
"topic.creation.inventory.include" : "dbserver1\\.inventory\\.*" ,
"topic.creation.inventory.partitions" : 20 ,
"topic.creation.inventory.cleanup.policy" : "compact" ,
"topic.creation.inventory.delete.retention.ms" : 7776000000 ,
"topic.creation.applicationlogs.include" : "dbserver1\\.logs\\.applog-.*" ,
"topic.creation.applicationlogs.exclude" : "dbserver1\\.logs\\.applog-old-.*" ,
"topic.creation.applicationlogs.replication.factor" : 1 ,
"topic.creation.applicationlogs.partitions" : 20 ,
"topic.creation.applicationlogs.cleanup.policy" : "delete" ,
"topic.creation.applicationlogs.retention.ms" : 7776000000 ,
"topic.creation.applicationlogs.compression.type" : "lz4" ,
...
}
注册自定义group
复制 {
...
"topic.creation.groups" : "inventory,applicationlogs" ,
...
}
完整的配置
复制 {
"topic.creation.default.replication.factor" : 3 ,
"topic.creation.default.partitions" : 10 ,
"topic.creation.default.cleanup.policy" : "compact" ,
"topic.creation.default.compression.type" : "lz4"
"topic.creation.groups" : "inventory,applicationlogs" ,
"topic.creation.inventory.include" : "dbserver1\\.inventory\\.*" ,
"topic.creation.inventory.replication.factor" : 3 ,
"topic.creation.inventory.partitions" : 20 ,
"topic.creation.inventory.cleanup.policy" : "compact" ,
"topic.creation.inventory.delete.retention.ms" : 7776000000 ,
"topic.creation.applicationlogs.include" : "dbserver1\\.logs\\.applog-.*" ,
"topic.creation.applicationlogs.exclude" : "dbserver1\\.logs\\.applog-old-.*" ,
"topic.creation.applicationlogs.replication.factor" : 1 ,
"topic.creation.applicationlogs.partitions" : 20 ,
"topic.creation.applicationlogs.cleanup.policy" : "delete" ,
"topic.creation.applicationlogs.retention.ms" : 7776000000 ,
"topic.creation.applicationlogs.compression.type" : "lz4"
}
Connector
MySQL Connector
由于通常将MySQL设置为在指定的时间段后清除binlog,因此MySQL连接器会对每个数据库执行初始的一致快照。 MySQL连接器从创建快照的位置读取binlog。
当连接器崩溃或正常停止后重新启动时,连接器从特定位置(即从特定时间点)开始读取binlog。连接器通过读取数据库历史Kafka topic
并解析所有DDL语句
,重新构建了此时存在的表结构,直到binlog中连接器开始的位置。
database的history topic仅提供给connector使用,connnctor可以选择性的生成schema变更事件对于不同的topic提供给应用程序使用。
执行database快照
当连接器第一次启动时,它会对你的数据库执行一个初始一致的快照。
初始化快照执行流程
使用可重复读语义启动事务,以确保事务内的所有后续读取都针对一致快照执行。
释放全局读锁。这现在允许其他数据库客户端写入数据库。
将DDL更改写入schema变更topic,包括所有必要的删除和创建DDL语句。
扫描数据库表并为每一行生成表特定Kafka topic上的CREATE事件。
connector初始化过程失败
如果连接器发生故障、停止或在生成初始快照时重新平衡,则连接器在重新启动后将创建一个新快照。一旦初始快照完成,Debezium MySQL连接器就会从binlog中的相同位置重新启动,这样它就不会错过任何更新。
如果connector停止或者中断过程,mysql的binlog被清空,那么就会在发生一次初始化快照的过程。
全局读锁
有些环境不允许全局读锁。如果Debezium MySQL连接器检测到不允许全局读锁,连接器将使用表级锁,并使用该方法执行快照。
暴露Schema变更
通过配置Debezium MySQL连接器去提供schema变更事件,其中包括应用于MySQL服务器数据库的所有DDL语句这个连接器写入这些事件到名为"serverName"的kafka topic中,serverName通过database.server.name
配置
schema变更topic结构
复制 {
"schema" : {
"type" : "struct" ,
"name" : "io.debezium.connector.mysql.SchemaChangeKey" ,
"optional" : false ,
"fields" : [
{
"field" : "databaseName" ,
"type" : "string" ,
"optional" : false
}
]
} ,
"payload" : {
"databaseName" : "inventory"
}
}
复制 {
"schema" : {
"type" : "struct" ,
"name" : "io.debezium.connector.mysql.SchemaChangeValue" ,
"optional" : false ,
"fields" : [
{
"field" : "databaseName" ,
"type" : "string" ,
"optional" : false
} ,
{
"field" : "ddl" ,
"type" : "string" ,
"optional" : false
} ,
{
"field" : "source" ,
"type" : "struct" ,
"name" : "io.debezium.connector.mysql.Source" ,
"optional" : false ,
"fields" : [
{
"type" : "string" ,
"optional" : true ,
"field" : "version"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "name"
} ,
{
"type" : "int64" ,
"optional" : false ,
"field" : "server_id"
} ,
{
"type" : "int64" ,
"optional" : false ,
"field" : "ts_sec"
} ,
{
"type" : "string" ,
"optional" : true ,
"field" : "gtid"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "file"
} ,
{
"type" : "int64" ,
"optional" : false ,
"field" : "pos"
} ,
{
"type" : "int32" ,
"optional" : false ,
"field" : "row"
} ,
{
"type" : "boolean" ,
"optional" : true ,
"default" : false ,
"field" : "snapshot"
} ,
{
"type" : "int64" ,
"optional" : true ,
"field" : "thread"
} ,
{
"type" : "string" ,
"optional" : true ,
"field" : "db"
} ,
{
"type" : "string" ,
"optional" : true ,
"field" : "table"
} ,
{
"type" : "string" ,
"optional" : true ,
"field" : "query"
}
]
}
]
} ,
"payload" : {
"databaseName" : "inventory" ,
"ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;",
"source" : {
"version" : "0.10.0.Beta4" ,
"name" : "mysql-server-1" ,
"server_id" : 0 ,
"ts_sec" : 0 ,
"gtid" : null ,
"file" : "mysql-bin.000003" ,
"pos" : 154 ,
"row" : 0 ,
"snapshot" : true ,
"thread" : null ,
"db" : null ,
"table" : null ,
"query" : null
}
}
}
事件
create event
复制 {
"schema" : {
"type" : "struct" ,
"fields" : [
{
"type" : "struct" ,
"fields" : [
{
"type" : "int32" ,
"optional" : false ,
"field" : "id"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "first_name"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "last_name"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "email"
}
] ,
"optional" : true ,
"name" : "mysql-server-1.inventory.customers.Value" ,
"field" : "before"
} ,
{
"type" : "struct" ,
"fields" : [
{
"type" : "int32" ,
"optional" : false ,
"field" : "id"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "first_name"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "last_name"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "email"
}
] ,
"optional" : true ,
"name" : "mysql-server-1.inventory.customers.Value" ,
"field" : "after"
} ,
{
"type" : "struct" ,
"fields" : [
{
"type" : "string" ,
"optional" : false ,
"field" : "version"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "connector"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "name"
} ,
{
"type" : "int64" ,
"optional" : false ,
"field" : "ts_sec"
} ,
{
"type" : "boolean" ,
"optional" : true ,
"default" : false ,
"field" : "snapshot"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "db"
} ,
{
"type" : "string" ,
"optional" : true ,
"field" : "table"
} ,
{
"type" : "int64" ,
"optional" : false ,
"field" : "server_id"
} ,
{
"type" : "string" ,
"optional" : true ,
"field" : "gtid"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "file"
} ,
{
"type" : "int64" ,
"optional" : false ,
"field" : "pos"
} ,
{
"type" : "int32" ,
"optional" : false ,
"field" : "row"
} ,
{
"type" : "int64" ,
"optional" : true ,
"field" : "thread"
} ,
{
"type" : "string" ,
"optional" : true ,
"field" : "query"
}
] ,
"optional" : false ,
"name" : "io.debezium.connector.mysql.Source" ,
"field" : "source"
} ,
{
"type" : "string" ,
"optional" : false ,
"field" : "op"
} ,
{
"type" : "int64" ,
"optional" : true ,
"field" : "ts_ms"
}
] ,
"optional" : false ,
"name" : "mysql-server-1.inventory.customers.Envelope"
} ,
"payload" : {
"op" : "c" ,
"ts_ms" : 1465491411815 ,
"before" : null ,
"after" : {
"id" : 1004 ,
"first_name" : "Anne" ,
"last_name" : "Kretchmar" ,
"email" : "annek@noanswer.org"
} ,
"source" : {
"version" : "1.3.0.Final" ,
"connector" : "mysql" ,
"name" : "mysql-server-1" ,
"ts_sec" : 0 ,
"snapshot" : false ,
"db" : "inventory" ,
"table" : "customers" ,
"server_id" : 0 ,
"gtid" : null ,
"file" : "mysql-bin.000003" ,
"pos" : 154 ,
"row" : 0 ,
"thread" : 7 ,
"query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
}
}
}
update event
复制 {
"schema" : { ... } ,
"payload" : {
"before" : {
"id" : 1004 ,
"first_name" : "Anne" ,
"last_name" : "Kretchmar" ,
"email" : "annek@noanswer.org"
} ,
"after" : {
"id" : 1004 ,
"first_name" : "Anne Marie" ,
"last_name" : "Kretchmar" ,
"email" : "annek@noanswer.org"
} ,
"source" : {
"version" : "1.3.0.Final" ,
"name" : "mysql-server-1" ,
"connector" : "mysql" ,
"name" : "mysql-server-1" ,
"ts_sec" : 1465581 ,
"snapshot" : false ,
"db" : "inventory" ,
"table" : "customers" ,
"server_id" : 223344 ,
"gtid" : null ,
"file" : "mysql-bin.000003" ,
"pos" : 484 ,
"row" : 0 ,
"thread" : 7 ,
"query" : "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
} ,
"op" : "u" ,
"ts_ms" : 1465581029523
}
}
delete event
复制 {
"schema" : { ... } ,
"payload" : {
"before" : {
"id" : 1004 ,
"first_name" : "Anne Marie" ,
"last_name" : "Kretchmar" ,
"email" : "annek@noanswer.org"
} ,
"after" : null ,
"source" : {
"version" : "1.3.0.Final" ,
"connector" : "mysql" ,
"name" : "mysql-server-1" ,
"ts_sec" : 1465581 ,
"snapshot" : false ,
"db" : "inventory" ,
"table" : "customers" ,
"server_id" : 223344 ,
"gtid" : null ,
"file" : "mysql-bin.000003" ,
"pos" : 805 ,
"row" : 0 ,
"thread" : 7 ,
"query" : "DELETE FROM customers WHERE id=1004"
} ,
"op" : "d" ,
"ts_ms" : 1465581902461
}
}
墓碑event
当一行被删除时,delete事件值仍然在日志压缩中工作,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka删除所有具有相同密钥的消息,消息值必须为空。为了实现这一点,Debezium MySQL连接器发出delete事件后,连接器发出一个特殊的tombstone事件,该事件具有相同的键,但为空值。
映射数据类型
literal type : 值如何表示使用Kafka连接schema类型
semantic type : Kafka连接模式如何捕获字段(schema名)的含义
MySQL type Literal type Semantic type io.debezium.data.Bits
The length
schema parameter contains an integer that represents the number of bits. The byte[]
contains the bits in little-endian form and is sized to contain the specified number of bits.example (where n is bits)numBytes = n/8 + (n%8== 0 ? 0 : 1)
n/a Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the binary handling mode setting
n/a Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the binary handling mode setting
n/a Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the binary handling mode setting
n/a Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the binary handling mode setting
n/a Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the binary handling mode setting
n/a Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the binary handling mode setting
io.debezium.data.Json
Contains the string representation of a JSON
document, array, or scalar.
io.debezium.data.Enum
The allowed
schema parameter contains the comma-separated list of allowed values.
io.debezium.data.EnumSet
The allowed
schema parameter contains the comma-separated list of allowed values.
io.debezium.time.ZonedTimestamp
In ISO 8601 format with microsecond precision. MySQL allows M
to be in the range of 0-6
.
表对应的topic
一个表对应一个topic,格式为serverName.databaseName.tableName
支持的MySQL拓扑
Standalone
当使用一个MySQL服务器时,该服务器必须启用binlog(可选启用GTIDs),以便Debezium MySQL连接器可以监视服务器。这通常是可以接受的,因为二进制日志还可以用作增量备份。在这种情况下,MySQL连接器总是连接并遵循这个独立的MySQL服务器实例。
Primary and replica
Debezium MySQL连接器可以跟随一个主服务器或一个副本(如果该副本启用了binlog),但是连接器只能看到集群中对该服务器可见的更改。通常,除了多主拓扑之外,这不是问题。
连接器记录其在服务器binlog中的位置,这在集群中的每个服务器上都是不同的。因此,连接器将只需要遵循一个MySQL服务器实例。如果该服务器发生故障,必须重新启动或恢复该服务器,连接器才能继续运行。
High available clusters
MySQL有各种各样的高可用性解决方案,它们使其更容易容忍,几乎可以立即从问题和故障中恢复。大多数HA MySQL集群使用GTIDs,以便副本能够跟踪任何主服务器上的所有更改。
Multi-primary
Hosted
配置MYSQL服务端
为Debezium创建一个MySQL用户
复制 -- 创建用户
CREATE USER ' user '@ 'localhost' IDENTIFIED BY 'password' ;
-- 授予权限,查询,RELOAD:FLUSH语句来清除或重新加载内部缓存、刷新表或获取锁,
--REPLICATION SLAVE:启用连接器连接和读取MySQL服务器binlog。
-- REPLICATION CLIENT:允许使用SHOW MASTER STATUS/SHOW SLAVE STATUS/SHOW BINARY LOGS语句
GRANT SELECT , RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON * . * TO 'user' IDENTIFIED BY 'password' ;
-- 刷新权限
FLUSH PRIVILEGES;
开启binlog
复制 -- 检查log-bin是否开启
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';
复制 server-id = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
开启全局事务标识
全局事务标识符(GTIDs)唯一地标识集群内服务器上发生的事务。虽然Debezium MySQL连接器不需要GTIDs,但使用GTIDs可以简化复制,并允许您更容易地确认主服务器和副本服务器是否一致。
复制 gtid_mode =ON
enforce_gtid_consistency =ON
-- 校验
show global variables like '%GTID%' ;
设置会话超时时间
复制 interactive_timeout= <duration-in-seconds>
wait_timeout= <duration-in-seconds>
开启查询log event
复制 binlog_rows_query_log_events= ON
部署Mysql connector
安装Mysql Connector
必需环境
zk、kafka、kafka-connect、Mysql Server
提供
添加kafka connect配置plugin.path=/kafka/connect
配置Mysql connector
复制 {
"name" : "inventory-connector" ,
"config" : {
"connector.class" : "io.debezium.connector.mysql.MySqlConnector" ,
"database.hostname" : "192.168.99.100" ,
"database.port" : "3306" ,
"database.user" : "debezium-user" ,
"database.password" : "debezium-user-pw" ,
"database.server.id" : "184054" ,
"database.server.name" : "fullfillment" ,
"database.include.list" : "inventory" ,
"database.history.kafka.bootstrap.servers" : "kafka:9092" ,
"database.history.kafka.topic" : "dbhistory.fullfillment" ,
"include.schema.changes" : "true"
}
}
复制 1. Method:POST,URL:http://ip:port/connectors 提交connector
2. Method:GET,URL:http://ip:port/connectors 获取所有connector
3. Method:DELETE,URL:http://ip:port/connectors/{connector name} 删除指定的connector name的connector
4. Method:GET,URL:http://ip:port/connectors/{connector name}/status 获取指定connecto name的运行状态
5. Method:POST,URL:http://ip:port/connectors/{connector name}/restart 重启指定connector name的connector
6. Method:GET,URL:http://ip:port/connectors/{connector name}/tasks/{task id}/status 获取指定task的运行状态
7. Method:GET,URL:http://ip:port/connector-plugins/ 获取kafka connect环境中的所有可执行connector plugins
复制 {
"name" : "for_os_connector" ,
"config" : {
"connector.class" : "io.debezium.connector.mysql.MySqlConnector" ,
"tasks.max" : "1" ,
"database.hostname" : "common.mysql.test.local" ,
"database.port" : "3306" ,
"database.user" : "debezium_mysql" ,
"database.password" : "TT5mUKA1P78nl6EVPb" ,
"database.server.id" : "184042" ,
"database.server.name" : "ao_2020_binlog" ,
"time.precision.mode" : "connect" ,
"database.include.list" : "ao_2020" ,
"database.history.kafka.bootstrap.servers" : "cdh04:9092,cdh05:9092,cdh06:9092" ,
"database.history.kafka.topic" : "ao2020ddlhistory" ,
"tombstones.on.delete" : "false" ,
// 添加新表示动态取拉取其历史记录快照
"snapshot.new.tables" : "parallel" ,
// 数据库时区
"database.serverTimezone" : "UTC" ,
// 是否记录全部DDL到history topic中
"database.history.store.only.monitored.tables.ddl" : "true" ,
// 设置snapshot时是否需要计算总count
"min.row.count.to.stream.results" : 0 ,
// 每批快照处理的速度
"max.batch.size" : 4096 ,
// 正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。 默认为2048。
"max.queue.size" : 8192
}
}