RocketMQ Connect -> Elasticsearch Sink">RocketMQ Connect -> Elasticsearch Sink">
跳至主要內容
版本:5.0

RocketMQ Connect 實戰 5

Elasticsearch 來源 -> RocketMQ Connect -> Elasticsearch 儲存槽

準備工作

啟動 RocketMQ

  1. Linux/Unix/Mac
  2. 64 位元 JDK 1.8 以上;
  3. Maven 3.2.x 以上;
  4. 啟動 RocketMQ。可以使用 RocketMQ 4.xRocketMQ 5.x 5.x 版本;
  5. 使用工具測試 RocketMQ 訊息傳送和接收。

在此,使用環境變數 NAMESRV_ADDR 來告知工具用戶端 RocketMQ 的 NameServer 位址為 localhost:9876。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注意:RocketMQ 具備自動建立 Topic 和 Group 的功能,在發送或訂閱訊息時,若對應的 Topic 或 Group 不存在,RocketMQ 會自動建立,因此無需事先建立 Topic 和 Group。

以下為該內容的英文翻譯

建立連接器執行時間

複製儲存庫並建立 RocketMQ Connect 專案

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

建立 Elasticsearch 連接器外掛程式

建立 Elasticsearch RocketMQ 連接器外掛程式

cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-elasticsearch/

mvn clean package -Dmaven.test.skip=true

將編譯好的 Elasticsearch RocketMQ 連接器外掛程式 JAR 檔案複製到執行時間使用的外掛程式目錄中

mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins

cp target/rocketmq-connect-elasticsearch-1.0.0-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins

在獨立模式下執行連接器工作

修改 connect-standalone.conf 檔案以設定 RocketMQ 連線位址和其他資訊。

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

範例設定資訊如下

workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

## Http port for user to access REST API
httpPort=8082

# Rocketmq namesrvAddr
namesrvAddr=localhost:9876

# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678

clusterName="DefaultCluster"

# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins

在獨立模式下,RocketMQ Connect 會將同步檢查點資訊持續儲存在 storePathRootDir 指定的本機檔案目錄中。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

如果您想重設同步檢查點,請刪除持續性檔案

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

在獨立模式下啟動連接器工作

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

設定 Elasticsearch 服務

Elasticsearch 是一個開源搜尋和分析引擎。

我們將使用兩個獨立的 Elasticsearch Docker 執行個體作為我們的來源和目的地資料庫

docker pull docker.elastic.co/elasticsearch/elasticsearch:7.15.1

docker run --name es1 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \
-v /Users/YourUsername/rocketmqconnect/es/es1_data:/usr/share/elasticsearch/data \
-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1

docker run --name es2 -p 9201:9200 -p 9301:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \
-v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data \
-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1

Docker 指令說明

  • --name es2:指定容器名稱,例如 es2
  • -p 9201:9200 -p 9301:9300:將 Elasticsearch 容器上的埠 9200 和 9300 對應至主機埠 9201 和 9301,以便可透過主機存取 Elasticsearch 服務。
  • -e discovery.type=single-node:設定 Elasticsearch 在單一節點上執行,而不會發現叢集中的其他節點,適合單一伺服器部署。
  • -v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data:將主機上的目錄掛載到容器內的 /usr/share/elasticsearch/data,以持續儲存 Elasticsearch 資料。

這會執行 Elasticsearch 的自訂設定執行個體,並將持續資料儲存在容器中,可透過主機電腦上的 9200 埠存取,這對於本機電腦上的開發或測試環境非常實用。

檢視 Elasticsearch 記錄

docker logs -f es1

docker logs -f es2

驗證 Elasticsearch 是否已成功啟動

# Check Elasticsearch instance 1
curl -XGET https://127.0.0.1:9200

# Check Elasticsearch instance 2
curl -XGET https://127.0.0.1:9201

如果連線成功且運作正確,將會產生包含 Elasticsearch 及其版本號碼資訊的 JSON 回應。

設定 Kibana 服務

Kibana 是一個開放原始碼的資料視覺化工具,讓使用者可以互動式地探索和了解儲存在 Elasticsearch 群集中的資料。它提供豐富的功能,例如圖表、圖形和儀表板。

為了方便起見,我們將在 Docker 中設定兩個獨立的 Kibana 執行個體,並使用下列指令將它們連結到我們先前建立的 Elasticsearch 容器

docker pull docker.elastic.co/kibana/kibana:7.15.1

docker run --name kibana1 --link es1:elasticsearch -p 5601:5601 -d docker.elastic.co/kibana/kibana:7.15.1

docker run --name kibana2 --link es2:elasticsearch -p 5602:5601 -d docker.elastic.co/kibana/kibana:7.15.1

Docker 指令說明

  • --name kibana2:指定新容器的名稱,例如 kibana2
  • --link es2:elasticsearch:將容器連結到另一個名為 Elasticsearch 的執行個體(在本例中為「es2」)。這會讓 Kibana 和 Elasticsearch 能夠互相通訊。
  • -p 5602:5601:將 Kibana 的預設埠 (5601) 對應到主機電腦上的同一個埠,以便透過瀏覽器存取。
  • -d:在分離模式下執行 Docker 容器。

容器啟動後,您可以監控其記錄輸出

docker logs -f kibana1

docker logs -f kibana2

若要存取 Kibana 主控台頁面,只需在瀏覽器中輸入下列網址

  • kibana1:https://127.0.0.1:5601
  • kibana2:https://127.0.0.1:5602

如果載入正確,表示對應的 Kibana 執行個體已成功啟動。

將測試資料寫入來源 Elasticsearch

Kibana 的開發工具可以協助您在 Kibana 中直接與 Elasticsearch 互動和操作。您可以執行各種查詢和操作,分析和了解回傳的資料。請參閱文件 console-kibana

大量寫入測試資料

透過瀏覽器存取 Kibana1 主控台,從左側選單中找到開發工具,並在頁面上輸入下列指令以寫入測試資料

POST /_bulk
{ "index" : { "_index" : "connect_es" } }
{ "id": "1", "field1": "value1", "field2": "value2" }
{ "index" : { "_index" : "connect_es" } }
{ "id": "2", "field1": "value3", "field2": "value4" }

注意:

  • connect_es:資料的索引名稱。
  • id/field1/field2:這些是欄位名稱,而 1、value1、value2 則代表欄位的數值。

注意rocketmq-connect-elasticsearch 有個限制,它需要資料中有一個欄位可用於 >= 比較運算(字串或數字)。這個欄位將用於記錄同步檢查點。在上面的範例中,id 欄位是一個全球唯一、遞增的數字欄位。

查詢資料

若要查詢索引中的資料,請使用下列指令

GET /connect_es/_search
{
"size": 100
}

如果沒有可用的資料,回應將會是

{
"error" : {
...
"type" : "index_not_found_exception",
"reason" : "no such index [connect_es]",
"resource.type" : "index_or_alias",
"resource.id" : "connect_es",
"index_uuid" : "_na_",
"index" : "connect_es"
},
"status" : 404
}

如果有的可用的資料,回應將會是

{
...
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "connect_es",
"_type" : "_doc",
"_id" : "_dx49osBb46Z9cN4hYCg",
"_score" : 1.0,
"_source" : {
"id" : "1",
"field1" : "value1",
"field2" : "value2"
}
},
{
"_index" : "connect_es",
"_type" : "_doc",
"_id" : "_tx49osBb46Z9cN4hYCg",
"_score" : 1.0,
"_source" : {
"id" : "2",
"field1" : "value3",
"field2" : "value4"
}
}
]
}
}

刪除資料

如果您需要刪除索引中的資料,由於重複測試或其他原因,您可以使用下列指令

DELETE /connect_es

啟動連接器

啟動 Elasticsearch 來源連接器

執行下列指令以啟動 ES 來源連接器。連接器將連接到 Elasticsearch 並從 connect_es 索引中讀取文件資料。它將解析 Elasticsearch 文件資料並將其封裝到一個通用 ConnectRecord 物件中,該物件將被傳送至 RocketMQ 主題供 Sink 連接器使用。

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSourceConnector -d  '{
"connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector",
"elasticsearchHost":"localhost",
"elasticsearchPort":9200,
"index":{
"connect_es": {
"primaryShards":1,
"id":1
}
},
"max.tasks":2,
"connect.topicname":"ConnectEsTopic",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'

注意:啟動指令指定來源 ES 應同步 connect_es 索引,而索引中的遞增欄位是 id。資料將從 id=1 開始擷取。

如果 curl 要求傳回狀態:200,表示建立成功,範例回應將會是

{"status":200,"body":{"connector.class":"...

如果您看到下列記錄,表示檔案來源連接器已成功啟動。

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

啟動連接器 elasticsearchSourceConnector 並設定目標狀態為已成功啟動!!

啟動 Elasticsearch Sink 連接器

執行下列指令以啟動 ES sink 連接器。連接器將訂閱 RocketMQ 主題的資料並使用它。它將轉換每則訊息為文件資料並將其寫入目的地 ES。

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSinkConnector -d '{
"connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSinkConnector",
"elasticsearchHost":"localhost",
"elasticsearchPort":9201,
"max.tasks":2,
"connect.topicnames":"ConnectEsTopic",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'

注意:啟動指令指定目的地 ES 的地址和埠,它對應到先前在 Docker 中啟動的 ES2。

如果 curl 要求傳回狀態:200,表示建立成功,範例回應將會是

{"status":200,"body":{"connector.class":"...

如果您看到下列記錄,表示檔案來源連接器已成功啟動

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

啟動連接器 elasticsearchSinkConnector 並設定目標狀態為已成功啟動!!

若要檢查 sink 連接器是否已將資料寫入目的地 ES 索引

  1. 在瀏覽器中存取 Kibana2 主控台地址:https://127.0.0.1:5602
  2. 在 Kibana2 Dev Tools 頁面中,查詢索引中的資料。如果它與來源 ES1 中的資料相符,表示連接器正常執行。
GET /connect_es/_search
{
"size": 100
}