使用Debezium Binlog同步es

使用debezium binlog同步es

  • 步骤
    • 安装zookeeper
    • 安装kafka
    • 安装插件
      • kafka mysql binlog收集插件
      • kafka推送es插件

zookeeper安装、运行

  • 下载
1
2
3
4
5
# 下载,(需要前置jdk运行环境)
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
tar zxf apache-zookeeper-3.7.0-bin.tar.gz
cd apache-zookeeper-3.7.0-bin

  • 运行
    1
    2
    3
    4
    # 配置
    cp conf/zoo_sample.cfg conf/zoo.cfg
    # 单点不配置
    ./bin/zkServer.sh start

kafka安装、运行

  • 安装、运行
    1
    2
    3
    4
    5
    # 下载,(需要前置jdk运行环境)
    wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
    tar -zxf kafka_2.13-2.8.0.tgz
    cd kafka_2.13-2.8.0
    ./bin/kafka-server-start.sh -daemon /opt/debezium/kafka_2.13-2.8.0/config/server.properties

连接器插件配置

  • mysql连接器下载

  • es链接器下载

  • 配置插件路径

  • vi kafka_2.13-2.8.0/config/connect-distributed.properties 添加配置

    1
    2
    # 多个路径,隔开指向插件解压目录即可
    plugin.path=/opt/debezium/kafka_2.13-2.8.0/lib_conn/debezium-debezium-connector-mysql-1.5.0,/opt/debezium/kafka_2.13-2.8.0/lib_conn/confluentinc-kafka-connect-elasticsearch-11.0.4
  • 启动连接器

    1
    bin/connect-distributed.sh config/connect-distributed.properties
  • 使用Api添加链接器

    • mysql连接器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"name": "devops",
"config":
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "172.16.19.108",
"database.port": "3306",
"database.user": "binlogToEs",
"database.password": "qwertyuiopQ123!@#",
"database.server.id": "1",
"database.server.name": "devops",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "devops-t2-schema-changes-inventory",
"include.schema.changes": "true",
"database.whitelist": "devops",
"table.whitlelist": "app"
}
}

- es连接器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"name": "devops-app-es",
"config":{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://172.16.19.79:9200",
"connection.username": "devops",
"connection.password": "qwertyuiopQ123",
"key.ignore": "false",
"topics": "devops.devops.app",
"type.name": "_doc",
"tasks.max": "1",
"transforms": "extractKey",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"id"
}
}