RocketMQ Connect 快速入門
快速入門
本教學課程將在獨立模式下啟動 RocketMQ Connector 範例專案「rocketmq-connect-sample」,以協助您了解連接器的運作原理。範例專案提供一個來源連接器,用於從來源檔案讀取資料並將其傳送至 RocketMQ 集群。它還提供一個接收器連接器,用於從 RocketMQ 集群讀取訊息並將其寫入目標檔案。
1. 準備:啟動 RocketMQ
- Linux/Unix/Mac
- 64 位元 JDK 1.8 以上;
- Maven 3.2.x 以上;
- 啟動 RocketMQ。可以使用 RocketMQ 4.x 或 RocketMQ 5.x 5.x 版本;
- 使用工具測試 RocketMQ 訊息傳送和接收。
在此,使用環境變數 NAMESRV_ADDR 來告知工具用戶端 RocketMQ 的 NameServer 地址為 localhost:9876。
#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
注意:RocketMQ 具備自動建立 Topic 和 Group 的功能。在發送或訂閱訊息時,如果對應的 Topic 或 Group 不存在,RocketMQ 將會自動建立。因此,無需事先建立 Topic 和 Group。
2. 建立 Connector Runtime
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
export RMQ_CONNECT_HOME=`pwd`
mvn -Prelease-connect -Dmaven.test.skip=true clean install -U
注意:專案預設已包含 rocketmq-connect-sample 的程式碼,因此無需另外建立 rocketmq-connect-sample 外掛。
3. 以獨立模式執行 Connector Worker
修改組態
修改 connect-standalone.conf
檔案,以設定 RocketMQ 連線位址和其他資訊。請參閱 9. 組態檔說明 以取得詳細資料。
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
vim conf/connect-standalone.conf
在獨立模式中,RocketMQ Connect 會將同步檢查點資訊持續儲存在本機檔案目錄 storePathRootDir。
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
如果您要重設同步檢查點,則需要刪除持續儲存的檢查點檔案。
rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*
以獨立模式啟動 Connector Worker
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
提示:您可以修改 docker/connect/bin/runconnect.sh
以根據需要調整 JVM 啟動參數。
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
檢視啟動記錄檔
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果執行時期啟動成功,您會在記錄檔中看到以下列印
The standalone worker boot success.
要退出 tail -f
指令的記錄追蹤模式,您可以按 Ctrl + C
鍵盤組合鍵。
4. 啟動來源 Connector
建立來源檔案並寫入測試資料
mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
注意:不應有空行(如果偵測到空行,示範程式會擲回錯誤)。來源 Connector 會持續讀取來源檔案,並將每一行資料轉換為訊息主體,傳送至 RocketMQ 供接收器 Connector 使用。
啟動來源連接器
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'
如果 curl 要求傳回狀態 200,表示已成功建立。範例回應
{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}
檢視日誌檔
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果您看到以下日誌,表示檔案來源連接器已成功啟動
啟動連接器 fileSourceConnector 並設定目標狀態為 STARTED 成功!!
來源連接器組態說明
金鑰 | 可為空值 | 預設值 | 說明 |
---|---|---|---|
connector.class | 否 | 實作連接器介面的類別名稱(包括套件名稱) | |
filename | 否 | 來源檔案名稱(建議使用絕對路徑) | |
connect.topicname | 否 | 同步檔案資料所需的 Topic |
5. 啟動接收器連接器
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'
如果 curl 要求傳回狀態 200,表示已成功建立。範例回應
{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}
檢視日誌檔
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果您看到以下日誌,表示檔案接收器連接器已成功啟動
啟動連接器 fileSinkConnector 並設定目標狀態為 STARTED 成功!!
檢查接收器連接器是否已將資料寫入至目標檔案
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
如果已產生 test-sink-file.txt 檔案,且其內容與 test-source-file.txt 相同,表示整個流程執行正確。
繼續將測試資料寫入至來源檔案 test-source-file.txt
cd /Users/YourUsername/rocketmqconnect/
echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt
# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
注意:檔案內容的順序可能會有所不同,因為 rocketmq-connect-sample
在傳送和接收 RocketMQ 主題的訊息時使用 一般訊息
。這與 已排序訊息
不同,且使用 一般訊息
無法保證順序。
接收器連接器組態說明
金鑰 | 可為空值 | 預設值 | 說明 |
---|---|---|---|
connector.class | 否 | 實作連接器介面的類別名稱(包括套件名稱) | |
filename | 否 | 接收器會擷取資料並儲存至檔案(建議使用絕對路徑) | |
connect.topicnames | 否 | 接收器需要處理的資料訊息主題 |
提示:範例 rocketmq-connect-sample 的組態檔說明僅供參考,不同的來源/接收器連接器有不同的組態,請參閱特定的來源/接收器連接器。
6. 停止連接器
停止連接器的 RESTful 指令格式為 http://(您的工作人員 IP):(port)/connectors/(connector name)/stop
若要停止示範中的兩個連接器,可以使用下列指令
curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop
如果 curl 要求傳回的狀態為 200,表示已成功停止連接器。範例回應
{"status":200,"body":"Connector [fileSinkConnector] 已成功刪除"}
如果您看到下列日誌訊息,表示檔案接收器連接器已成功關閉
tail -100f ~/logs/rocketmqconnect/connect_default.log
已完成連接器名稱的關閉:fileSinkConnector
7. 停止工作程序
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh
8. 日誌目錄
可以使用下列指令查看日誌目錄
ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect
9. 設定檔說明
根據您的使用情況,修改 RESTful 埠、storeRoot 路徑、名稱伺服器位址和其他資訊。
以下是設定檔範例
#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1
# Http prot for user to access REST API
httpPort=8082
# Local file dir for config store
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
#You need to modify it to your own rocketmq nameserver endpoint.
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876
# Plugin path for loading Source/Sink Connectors
# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here.
pluginPaths=
storePathRootDir 設定的說明
在獨立模式中,RocketMQ Connect 會將同步檢查點資訊持續寫入 storePathRootDir 指定的本機檔案目錄。持續寫入的檔案包括
金鑰 | 說明 |
---|---|
connectorConfig.json | 連接器設定持續寫入檔案 |
position.json | 來源連線資料處理進度持續寫入檔案 |
taskConfig.json | 工作設定持續寫入檔案 |
offset.json | 接收器連線資料使用進度持續寫入檔案 |
connectorStatus.json | 連接器狀態持續寫入檔案 |
taskStatus.json | 工作狀態持續寫入檔案 |