RocketMQ Connect -> SFTP Server (File)">RocketMQ Connect -> SFTP Server (File)">
跳到主要內容
版本:5.0

RocketMQ Connect 實務 4

SFTP 伺服器(檔案資料)-> RocketMQ Connect -> SFTP 伺服器(檔案)

準備

啟動 RocketMQ

  1. Linux/Unix/Mac
  2. 64 位元 JDK 1.8+;
  3. Maven 3.2.x+;
  4. 啟動 RocketMQ。可以使用 RocketMQ 4.xRocketMQ 5.x 5.x 版本;
  5. 使用工具測試 RocketMQ 訊息傳送和接收。

在此,使用環境變數 NAMESRV_ADDR 通知工具用戶端 RocketMQ 的 NameServer 位址為 localhost:9876。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注意:RocketMQ 具備自動建立主題和群組的功能。傳送或訂閱訊息時,如果對應的主題或群組不存在,RocketMQ 會自動建立它們。因此,無需事先建立主題和群組。

建立連接器執行時期

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

建立 SFTP 連接器外掛

cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-sftp/

mvn clean package -Dmaven.test.skip=true

將 SFTP RocketMQ 連接器的編譯 jar 放入外掛目錄中以進行執行時間載入。

mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins
cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins

在獨立模式下執行連接器工作

修改 connect-standalone.conf 檔案以設定 RocketMQ 連線地址和其他資訊。

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

範例設定資訊如下

workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

## Http port for user to access REST API
httpPort=8082

# Rocketmq namesrvAddr
namesrvAddr=localhost:9876

# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678

clusterName="DefaultCluster"

# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins

在獨立模式下,RocketMQ Connect 會將同步檢查點資訊持續儲存在 storePathRootDir 指定的本機檔案目錄中。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

如果您想要重設同步檢查點,您需要刪除已持續化的檢查點資訊檔案。

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

在獨立模式下啟動連接器工作

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

設定 SFTP 伺服器

SFTP (SSH 檔案傳輸通訊協定) 是一種用於電腦之間安全檔案傳輸的檔案傳輸通訊協定。SFTP 建立在 SSH (安全外殼) 通訊協定之上,並使用加密和驗證。

我們將使用 macOS 中內建的 SFTP 服務 (透過啟用「遠端登入」存取)。有關詳細說明,請參閱 允許遠端電腦存取您的 Mac 文件。

建立來源測試檔案

建立一個名為 source.txt 的測試檔案,並寫入一些測試資料

mkdir -p /Users/YourUsername/rocketmqconnect/sftp-test/

cd /Users/YourUsername/rocketmqconnect/sftp-test/

touch source.txt

echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt

登入 SFTP 服務以驗證您是否可以正常存取。輸入以下指令,然後輸入您的密碼

# sftp -P port YourUsername@hostname
sftp -P 22 YourUsername@127.0.0.1

注意:由於這是由您的本機 MAC OS 提供的 SFTP 服務,因此地址為 127.0.0.1,而埠為預設的 22。

sftp> cd /Users/YourUsername/rocketmqconnect/sftp-test/
sftp> ls source.txt
sftp> bye

啟動連接器

啟動 SFTP 來源連接器

執行以下指令以啟動 SFTP 來源連接器。此連接器將連線到 SFTP 服務以從 source.txt 檔案讀取。對於檔案中的每一行文字,連接器將解析並封裝內容至通用 ConnectRecord 物件,然後將其傳送至 RocketMQ 主題以供接收連接器使用。

curl -X POST --location "https://127.0.0.1:8082/connectors/SftpSourceConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSourceConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/source.txt",
"connect.topicname": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'

如果 curl 要求傳回狀態:200,表示已成功建立連接器。範例回應如下所示

{"status":200,"body":{"connector.class":"...

若要確認檔案來源連接器已成功啟動,請執行以下指令

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

啟動連接器 SftpSourceConnector 並設定目標狀態 STARTED 已成功!!

啟動 SFTP 接收連接器

執行以下指令以啟動 SFTP 接收連接器。此連接器將訂閱 RocketMQ 主題以使用訊息並將每個訊息轉換為單一行文字,然後使用 SFTP 協定將其寫入目的地檔案 sink.txt

curl -X POST --location "https://127.0.0.1:8082/connectors/SftpSinkConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt",
"connect.topicnames": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'

如果 curl 要求傳回狀態:200,表示已成功建立連接器。範例回應如下所示

{"status":200,"body":{"connector.class":"...

查看記錄檔以確認 SFTP 接收連接器已成功啟動

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

啟動連接器 SftpSinkConnector 並設定目標狀態 STARTED 已成功!!

執行以下指令以確認資料已寫入目的地檔案

cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

如果已產生 sink.txt 檔案且其內容與 source.txt 檔案的內容相符,表示整個處理程序運作正常。

將更多測試資料寫入 source.txt 檔案以繼續測試

cd /Users/YourUsername/rocketmqconnect/sftp-test/

echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt

# Wait a few seconds to give the connector time to replicate data to the sink file.
sleep 10

cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

注意:檔案內容的順序可能會有所不同,因為 rocketmq-connect-sftp 在傳送和接收訊息至/從 RocketMQ 主題時使用 一般訊息。這與 已排序訊息 不同,且使用 一般訊息 無法保證順序。