# rocketmq-connect
[](https://www.apache.org/licenses/LICENSE-2.0.html)
文档中心:[RocketMQ Connect](https://rocketmq-1.gitbook.io/rocketmq-connector/)
架构简介:[RocketMQ Connect 平台搭建与实践](https://developer.aliyun.com/search?q=rocketmq+connect&bizCategory=ucc-file)
# 快速开始
单机模式下[rocketmq-connect-sample](https://rocketmq-1.gitbook.io/rocketmq-connector/quick-start/file-connector)作为 demo
rocketmq-connect-sample的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件
## 1.准备
1. Linux/Unix/Mac
2. 64bit JDK 1.8+;
3. Maven 3.2.x或以上版本;
4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/);
5. 创建测试Topic
> sh ${ROCKETMQ_HOME}/bin/mqadmin updateTopic -t fileTopic -n localhost:9876 -c DefaultCluster -r 8 -w 8
**tips** : ${ROCKETMQ_HOME} 位置说明
>bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release
>
>source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution
## 2.构建Connect
```
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
mvn -Prelease-connect -DskipTests clean install -U
```
## 3.运行Worker
```
cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
```
**tips**: 可修改 /bin/runconnect.sh 适当调整 JVM Parameters Configuration
>JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
runtime启动成功:
>The standalone worker boot success.
查看启动日志文件:
>tail -100f ~/logs/rocketmqconnect/connect_runtime.log
ctrl + c 退出日志
## 4.启动source connector
当前目录创建测试文件 test-source-file.txt
```
touch test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}'
```
看到以下日志说明 file source connector 启动成功了
>tail -100f ~/logs/rocketmqconnect/connect_runtime.log
>
>2019-07-16 11:18:39 INFO pool-7-thread-1 - **Source task start**, config:{"properties":{"source-record-...
#### source connector配置说明
| key | nullable | default | description |
|-------------------| -------- | ---------------------|--------------------------|
| connector.class | false | | 实现 Connector接口的类名称(包含包名) |
| filename | false | | 数据源文件名称 |
| connect.topicname | false | | 同步文件数据所需topic |
## 5.启动sink connector
```
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}'
cat test-sink-file.txt
```
> tail -100f ~/logs/rocketmqconnect/connect_runtime.log
看到以下日志说明file sink connector 启动成功了
> 2019-07-16 11:24:58 INFO pool-7-thread-2 - **Sink task start**, config:{"properties":{"source-record-...
如果 test-sink-file.txt 生成并且与 source-file.txt 内容一样,说明整个流程正常运行。
文件内容可能顺序不一样,这主要是因为RocketMQ发到不同queue时,接收不同queue消息顺序可能也不一致导致的,是正常的。
#### sink connector配置说明
| key | nullable | default | description |
|--------------------| -------- | ------- | -------------------------------------------------------------------------------------- |
| connector.class | false | | 实现Connector接口的类名称(包含包名) |
| filename | false | | sink拉去的数据保存到文件 |
| connect.topicnames | false | | sink需要处理数据消息topics |
```
注:source/sink配置文件说明是以rocketmq-connect-sample为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector 为准
```
## 6.停止connector
```shell
GET请求
http://(your worker ip):(port)/connectors/(connector name)/stop
停止demo中的两个connector
curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop
```
看到以下日志说明connector停止成功了
>**Source task stop**, config:{"properties":{"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter","filename":"/home/zhoubo/IdeaProjects/my-new3-rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/source-file.txt","task-class":"org.apache.rocketmq.connect.file.FileSourceTask","topic":"fileTopic","connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector","update-timestamp":"1564765189322"}}
## 7.停止Worker进程
```
sh bin/connectshutdown.sh
```
## 8.日志目录
>${user.home}/logs/rocketmqconnect
## 9.配置文件
持久化配置文件默认目录 /tmp/storeRoot
| key | description |
|----------------------|---------------------------|
| connectorConfig.json | connector配置持久化文件 |
| position.json | source connect数据处理进度持久化文件 |
| taskConfig.json | task配置持久化文件 |
| offset.json | sink connect数据消费进度持久化文件 |
| connectorStatus.json | connector 状态持久化文件 |
| taskStatus.json | task 状态持久化文件 |
## 10.配置说明
可根据使用情况修改 [RESTful](https://restfulapi.cn/) 端口,storeRoot 路径,Nameserver 地址等信息
文件位置:work 启动目录下 conf/connect-standalone.conf
```shell
#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1
# Http prot for user to access REST API
httpPort=8082
# Local file dir for config store
storePathRootDir=/home/connect/storeRoot
#需要修改为自己的rocketmq nameserver 接入点
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876
#用于加载Connector插件,类似于jvm启动加载jar包或者class类,这里目录目录用于放Connector相关的实现插件,
支持文件和目录
# Source or sink connector jar file dir
pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar
# 补充:将 Connector 相关实现插件保存到指定文件夹
# pluginPaths=/usr/local/connector-plugins/*
```
## 11.其它restful接口
### 集群信息
+ 查看集群节点信息:
```
curl -X GET http://(your worker ip):(port)/getClusterInfo
```
+ 重新加载Connector插件目录下的Connector包:
```
curl -X GET http://(your worker ip):(port)/plugin/reload
```
### Connector/Task管理
+ 创建或更新connector(存在且配置不同会更新,不存在创建)
```
curl -X GET http://(your worker ip):(port)/connectors/{connectorName}
```
+ Pause(暂停)指定的connector
```
curl -X GET http://(your worker ip):(port)/connectors/{connectorName}/pause
```
+ Resume(重启)指定的connector
```
curl -X GET http://(your worker ip):(port)/connectors/{connectorName}/resume
```
+ Pause(暂停)所有的connector
```
curl -X GET http://(your worker ip):(port)/connectors/pause/all
```
+ Resume(重启)所有的connector
```
curl -X GET http://(your worker ip):(port)/connectors/resume/all
```
+ 停止并删除指定的connector(谨慎使用)
```
curl -X GET http://(your
没有合适的资源?快使用搜索试试~ 我知道了~
RocketMQ-Connect 二次开发源码

共1107个文件
java:903个
xml:60个
md:33个

1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉

温馨提示
RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开
资源推荐
资源详情
资源评论













收起资源包目录





































































































共 1107 条
- 1
- 2
- 3
- 4
- 5
- 6
- 12
资源评论

- weixin_447581412023-12-29感谢资源主的分享,这个资源对我来说很有用,内容描述详尽,值得借鉴。

地表最强菜鸡
- 粉丝: 14w+
- 资源: 80
上传资源 快速赚钱
我的内容管理 展开
我的资源 快来上传第一个资源
我的收益
登录查看自己的收益我的积分 登录查看自己的积分
我的C币 登录后查看C币余额
我的收藏
我的下载
下载帮助


最新资源
- Java实训报告大全 JavaScript
- apache-jmeter-5.6.3.zip
- yiwa-机器人开发资源
- CoCache-Kotlin资源
- boe谷歌浏览器收藏标签
- 重庆市dem 无偏移tif
- 影刀RPA中级证书-网页进阶-批量自动发货
- AIOT人工智能物联网数据平台前端-AI人工智能资源
- MODIS数据综合处理软件 V1.0
- Rust通用代码生成器:莲花-Rust资源
- “数学建模必会十大经典算法详解”是一份深入解析数学建模中核心算法的资源 该资源详细阐述了包括蒙特卡罗算法、数据拟合与参数估计、线性规划与整数规划、图论算法、动态规划、以及最优化理论的非经典算法(如模拟
- CH592 Peripheral设备 PWM+DMA 驱动WS2812灯带
- 云手机操作视频效果展示
- MegEngine -硬件开发资源
- form-create-移动应用开发资源
- go-view-Typescript资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈



安全验证
文档复制为VIP权益,开通VIP直接复制
