跳至主要內容
版本:5.0

RocketMQ Connect 快速入門

快速入門

本教學課程將在獨立模式下啟動 RocketMQ Connector 範例專案「rocketmq-connect-sample」,以協助您了解連接器的運作原理。範例專案提供一個來源連接器,用於從來源檔案讀取資料並將其傳送至 RocketMQ 集群。它還提供一個接收器連接器,用於從 RocketMQ 集群讀取訊息並將其寫入目標檔案。

1. 準備:啟動 RocketMQ

  1. Linux/Unix/Mac
  2. 64 位元 JDK 1.8 以上;
  3. Maven 3.2.x 以上;
  4. 啟動 RocketMQ。可以使用 RocketMQ 4.xRocketMQ 5.x 5.x 版本;
  5. 使用工具測試 RocketMQ 訊息傳送和接收。

在此,使用環境變數 NAMESRV_ADDR 來告知工具用戶端 RocketMQ 的 NameServer 地址為 localhost:9876。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注意:RocketMQ 具備自動建立 Topic 和 Group 的功能。在發送或訂閱訊息時,如果對應的 Topic 或 Group 不存在,RocketMQ 將會自動建立。因此,無需事先建立 Topic 和 Group。

2. 建立 Connector Runtime

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

注意:專案預設已包含 rocketmq-connect-sample 的程式碼,因此無需另外建立 rocketmq-connect-sample 外掛。

3. 以獨立模式執行 Connector Worker

修改組態

修改 connect-standalone.conf 檔案,以設定 RocketMQ 連線位址和其他資訊。請參閱 9. 組態檔說明 以取得詳細資料。

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

在獨立模式中,RocketMQ Connect 會將同步檢查點資訊持續儲存在本機檔案目錄 storePathRootDir。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

如果您要重設同步檢查點,則需要刪除持續儲存的檢查點檔案。

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

以獨立模式啟動 Connector Worker

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

提示:您可以修改 docker/connect/bin/runconnect.sh 以根據需要調整 JVM 啟動參數。

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"

檢視啟動記錄檔

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

如果執行時期啟動成功,您會在記錄檔中看到以下列印

The standalone worker boot success.

要退出 tail -f 指令的記錄追蹤模式,您可以按 Ctrl + C 鍵盤組合鍵。

4. 啟動來源 Connector

建立來源檔案並寫入測試資料

mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt

echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt

注意:不應有空行(如果偵測到空行,示範程式會擲回錯誤)。來源 Connector 會持續讀取來源檔案,並將每一行資料轉換為訊息主體,傳送至 RocketMQ 供接收器 Connector 使用。

啟動來源連接器

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'

如果 curl 要求傳回狀態 200,表示已成功建立。範例回應

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}

檢視日誌檔

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

如果您看到以下日誌,表示檔案來源連接器已成功啟動

啟動連接器 fileSourceConnector 並設定目標狀態為 STARTED 成功!!

來源連接器組態說明

金鑰可為空值預設值說明
connector.class實作連接器介面的類別名稱(包括套件名稱)
filename來源檔案名稱(建議使用絕對路徑)
connect.topicname同步檔案資料所需的 Topic

5. 啟動接收器連接器

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'

如果 curl 要求傳回狀態 200,表示已成功建立。範例回應

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}

檢視日誌檔

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

如果您看到以下日誌,表示檔案接收器連接器已成功啟動

啟動連接器 fileSinkConnector 並設定目標狀態為 STARTED 成功!!

檢查接收器連接器是否已將資料寫入至目標檔案

cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

如果已產生 test-sink-file.txt 檔案,且其內容與 test-source-file.txt 相同,表示整個流程執行正確。

繼續將測試資料寫入至來源檔案 test-source-file.txt

cd /Users/YourUsername/rocketmqconnect/

echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt

# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

注意:檔案內容的順序可能會有所不同,因為 rocketmq-connect-sample 在傳送和接收 RocketMQ 主題的訊息時使用 一般訊息。這與 已排序訊息 不同,且使用 一般訊息 無法保證順序。

接收器連接器組態說明

金鑰可為空值預設值說明
connector.class實作連接器介面的類別名稱(包括套件名稱)
filename接收器會擷取資料並儲存至檔案(建議使用絕對路徑)
connect.topicnames接收器需要處理的資料訊息主題

提示:範例 rocketmq-connect-sample 的組態檔說明僅供參考,不同的來源/接收器連接器有不同的組態,請參閱特定的來源/接收器連接器。

6. 停止連接器

停止連接器的 RESTful 指令格式為 http://(您的工作人員 IP):(port)/connectors/(connector name)/stop

若要停止示範中的兩個連接器,可以使用下列指令

curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop

如果 curl 要求傳回的狀態為 200,表示已成功停止連接器。範例回應

{"status":200,"body":"Connector [fileSinkConnector] 已成功刪除"}

如果您看到下列日誌訊息,表示檔案接收器連接器已成功關閉

tail -100f ~/logs/rocketmqconnect/connect_default.log

已完成連接器名稱的關閉:fileSinkConnector

7. 停止工作程序

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh

8. 日誌目錄

可以使用下列指令查看日誌目錄

ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect

9. 設定檔說明

根據您的使用情況,修改 RESTful 埠、storeRoot 路徑、名稱伺服器位址和其他資訊。

以下是設定檔範例

#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1

# Http prot for user to access REST API
httpPort=8082

# Local file dir for config store
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

#You need to modify it to your own rocketmq nameserver endpoint.
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876

# Plugin path for loading Source/Sink Connectors
# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here.
pluginPaths=

storePathRootDir 設定的說明

在獨立模式中,RocketMQ Connect 會將同步檢查點資訊持續寫入 storePathRootDir 指定的本機檔案目錄。持續寫入的檔案包括

金鑰說明
connectorConfig.json連接器設定持續寫入檔案
position.json來源連線資料處理進度持續寫入檔案
taskConfig.json工作設定持續寫入檔案
offset.json接收器連線資料使用進度持續寫入檔案
connectorStatus.json連接器狀態持續寫入檔案
taskStatus.json工作狀態持續寫入檔案