RocketMQ Connect 實務 3
準備
啟動 RocketMQ
- Linux/Unix/Mac
- 64 位元 JDK 1.8+;
- Maven 3.2.x+;
- 啟動 RocketMQ;
提示:${ROCKETMQ_HOME} 位置說明
bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release
source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution
啟動 Connect
編譯連接器外掛程式
Debezium RocketMQ 連接器
$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true
編譯 Debezium MySQL、PostgreSQL 和 RocketMQ 連接器套件,並將它們放置在執行時期載入目錄中。指令如下:
mkdir -p /usr/local/connector-plugins
cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins
cp rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins
JDBC 連接器
將編譯後的 JDBC 連接器套件移至執行時期載入目錄中。指令如下:
$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/
$ mvn clean package -Dmaven.test.skip=true
cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins
啟動 Connect Runtime
cd rocketmq-connect
mvn -Prelease-connect -DskipTests clean install -U
修改設定檔 connect-standalone.conf
,主要設定如下
$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
$ vim conf/connect-standalone.conf
workerId=standalone-worker
storePathRootDir=/tmp/storeRoot
## Http port for user to access REST API
httpPort=8082
# Rocketmq namesrvAddr
namesrvAddr=localhost:9876
# RocketMQ acl
aclEnable=false
accessKey=rocketmq
secretKey=12345678
autoCreateGroupEnable=false
clusterName="DefaultCluster"
# Core configuration, configure the plugin directory of the previously compiled debezium package here
# Source or sink connector jar file dir,The default value is rocketmq-connect-sample
pluginPaths=/usr/local/connector-plugins
cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
Postgres image
使用 debezium 的 Postgres docker 環境建立 Postgres 資料庫
# starting a pg instance
docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=password debezium/postgres:14
# bash into postgres instance
docker exec -ti postgres /bin/bash
Postgres 資訊 Port:5432 帳號:start_data_engineer/password 同步來源資料庫:bank.user
MySQL image
使用 debezium 的 MySQL docker 環境建立 MySQL 資料庫
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9
MySQL 資訊
Port:3306
帳號:root/debezium 同步來源資料庫:bank.user
目標資料庫:bank1.user
測試資料
使用 root/debezium 帳號登入資料庫
來源資料庫表格:bank.user
create database bank;
use bank;
create table bank.user
(
id bigint NOT NULL AUTO_INCREMENT,
user_id integer,
name varchar(8),
age integer,
birthday date,
datetime_created timestamp(3),
datetime_updated timestamp(3),
height decimal(11, 2) null,
PRIMARY KEY (`id`)
);
insert into bank.user values (1003, 1, 'lilei2', 10, now(), now(), now(), 1.72);
update bank.user set user_id = 1003 where id = 1003;
使用 start_data_engineer/password 帳號登入 PostgreSQL 資料庫
來源資料庫表格:bank.user
CREATE SCHEMA bank;
SET search_path TO bank,public;
create table bank.user
(
id integer not null
constraint user_pkey
primary key,
user_id integer,
name varchar(8),
age integer,
birthday date,
datetime_created timestamp(3),
datetime_updated timestamp(3),
height numeric(11, 2)
);
insert into bank.user values (1001, 1, 'lilei1', 10, now(), now(), now(), 1.72);
update bank.user set user_id = 1001 where id = 1001;
目標資料庫表格:bank1.user
create database bank1;
create table bank1.user
(
id bigint auto_increment
primary key,
user_id int null,
name varchar(8) null,
age int null,
birthday date null,
datetime_created timestamp(3) null,
datetime_updated timestamp(3) null,
height decimal(11, 2) null
);
啟動 Connector
啟動 Debezium source connector
同步來源資料表:bank.user 目的:解析 MySQL binlog 並封裝成共用的 ConnectRecord 物件,發送到 RocketMQ Topic。
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/MySQLCDCSource1000 -d '{
"connector.class": "org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
"max.task": "1",
"connect.topicname": "debezium-source-topic1000",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
"database.history.skip.unparseable.ddl": true,
"database.history.name.srv.addr": "localhost:9876",
"database.history.rocketmq.topic": "db-history-debezium-topic1000",
"database.history.store.only.monitored.tables.ddl": true,
"include.schema.changes": false,
"database.server.name": "dbserver1",
"database.port": 3306,
"database.hostname": "database ip",
"database.connectionTimeZone": "UTC",
"database.user": "debezium",
"database.password": "dbz",
"table.include.list": "bank.user",
"max.batch.size": 50,
"database.include.list": "bank",
"snapshot.mode": "when_needed",
"database.server.id": "184054",
"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
同步來源資料表:bank.user 目的:解析 Postgres binlog 並封裝成共用的 ConnectRecord 物件,發送到 RocketMQ Topic。
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector1000 -d '{
"connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
"max.task": "1",
"connect.topicname": "debezium-source-topic1000",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
"database.history.skip.unparseable.ddl": true,
"database.server.name": "bankserver1",
"database.port": 5432,
"database.hostname": "database ip",
"database.connectionTimeZone": "UTC",
"database.user": "start_data_engineer",
"database.dbname": "start_data_engineer",
"database.password": "password",
"table.whitelist": "bank.user",
"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
啟動 JDBC sink connector
目的:消費 Topic 中的資料,並透過 JDBC 協定寫入目標資料表。
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest1000 -d '{
"connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
"max.task": "2",
"connect.topicnames": "debezium-source-topic1000",
"connection.url": "jdbc:mysql://database ip:3306/bank1",
"connection.user": "root",
"connection.password": "debezium",
"pk.fields": "id",
"table.name.from.header": "true",
"pk.mode": "record_key",
"insert.mode": "UPSERT",
"db.timezone": "UTC",
"table.types": "TABLE",
"errors.deadletterqueue.topic.name": "dlq-topic",
"errors.log.enable": "true",
"errors.tolerance": "ALL",
"delete.enabled": "true",
"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
以上三個 Connector 任務建立成功後,使用 start_data_engineer/password 帳號登入 PostgreSQL 資料庫,或使用 root/debezium 帳號登入 MySQL 資料庫。
對來源資料庫表格 bank.user 做修改、刪除、新增等動作,都會同步到目標 MySQL 資料表 bank1.user。