【个人技术经验及开发技巧分享】 【个人技术经验及开发技巧分享】
首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载

Louis

首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载
  • 开发杂货

  • 线上排查

  • 技巧备忘

  • 部署指南

    • Nginx安装
    • Nginx Https证书安装
    • ElasticSearch安装
    • Kibana安装
    • SkyWalking链路追踪
    • Zookeeper安装
    • RabbitMQ安装
    • Kafka集群搭建
    • Kafka Manager安装
    • MySQL安装
    • Canal数据同步
      • 1 MySQL开启binlog
      • 2 安装部署Canal
      • 3 启动Zookeeper和Kafka
      • 4 Canal配置文件修改
      • 5 启动Canal
      • 6 测试数据同步
      • 7 安装包下载
    • Redis高可用集群搭建
    • XXL-JOB本地部署
    • ELk+Filebeat部署
    • Nacos源码本地运行
  • 技术应用
  • 部署指南
luoxiaofeng
2022-05-19
目录

Canal数据同步

提示

Canal + Kafka + Mysql 数据同步方案示例。

# 1 MySQL开启binlog

查看MySQLs是否开启了binlog及binlog-format是否ROW模式。

-- 查看数据库版本
select version(); 

-- 显示OFF未开启 ON开启
show variables like ‘log_bin’

-- binlog_format 有三种:ROW,STATEMENT,MIXID
show variables like 'binlog_format';
1
2
3
4
5
6
7
8

MySQL安装目录下修改my.ini。

-- 在mysqld下面添加
log_bin=mysql-bin
binlog-format=ROW
server-id=1

-- 进入命令行重启mysql
停止 net stop mysql57
启动 net start mysql57
1
2
3
4
5
6
7
8

赋予canal用户复制权限。

-- 创建用户
create user 'canal'@'%' identified by '123456';

-- REPLICATION CLIENT
-- REPLICATION SLAVE
-- 复制相关。一般复制账号需要这两个权限。
grant select,replication slave, replication client on *.* to 'canal'@'%';

-- 刷新权限
FLUSH PRIVILEGES;
1
2
3
4
5
6
7
8
9
10

# 2 安装部署Canal

提示

canal.deployer-1.1.5.tar.gz 安装包已先上传到 / 根目录上。

可通过wget方式下载:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

# 创建目录
mkdir /louis/canal-1.1.5
# 解压
tar -zvxf canal.deployer-1.1.5.tar.gz -C /louis/canal-1.1.5
1
2
3
4

解压后目录如下

- bin    # 运维脚本文件
- conf   # 配置文件目录
  canal_local.properties  # canal本地配置,一般不需要改动
  canal.properties        # canal服务配置
  logback.xml             # logback日志配置
  metrics     # 度量统计配置
  spring      # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
  example     # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
    instance.properties   # 实例配置,一般指单个数据库的配置
- lib    # 服务依赖包
- logs   # 日志文件输出目录
- plugin # 支持的插件目录
  connector.kafka-1.1.5-jar-with-dependencies.jar     #kafka依赖包
  connector.rabbitmq-1.1.5-jar-with-dependencies.jar  #rabbitmq依赖包
  connector.rocketmq-1.1.5-jar-with-dependencies.jar  #rocketmq依赖包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 3 启动Zookeeper和Kafka

/louis/zookeeper-3.5.9/bin/zkServer.sh start

/louis/kafka_2.8.1/bin/kafka-server-start.sh /louis/kafka_2.8.1/config/server.properties &
/louis/kafka_2.8.1/bin/kafka-server-start.sh /louis/kafka_2.8.1/config/server-1.properties &
/louis/kafka_2.8.1/bin/kafka-server-start.sh /louis/kafka_2.8.1/config/server-2.properties &
1
2
3
4
5

启动Zookeeper客户端查看Kafka启动情况

/louis/zookeeper-3.5.9/bin/zkCli.sh 

# 进入客户端
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[0, 1, 2]
1
2
3
4
5

Kafka新建同步用的topic

louis-topic
1

# 4 Canal配置文件修改

conf/canal.properties

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# 配置要同步的kafka信息
kafka.bootstrap.servers = 172.16.227.132:9092,172.16.227.132:9093,172.16.227.132:9094
1
2
3
4

conf/example/instance.properties

# 值改成安装mysql服务器的ip及端口号
canal.instance.master.address=127.0.0.1:3306
# 前面新建的数据库备份账号canal及其密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

# 配置要同步的kafka信息
canal.mq.topic=louis-topic
canal.mq.partition=0
1
2
3
4
5
6
7
8
9

# 5 启动Canal

sh bin/startup.sh
1

# 6 测试数据同步

编写Kafka消费服务

package com.kafka.kafkaDemo;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MsgConsumer {
  private final static String TOPIC_NAME = "louis-topic";
  private final static String CONSUMER_GROUP_NAME = "testGroup";

  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.227.132:9092,172.16.227.132:9093,172.16.227.132:9094");
    // 消费分组名
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		/*
		consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将
		rebalance方案下发给consumer,这个时间可以稍微短一点
		*/
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        /*
        服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,
        对应的Partition也会被重新分配给其他consumer,默认是10秒
        */
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

    //一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        /*
        如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
        会将其踢出消费组,将分区分配给别的consumer消费
        */
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

    consumer.subscribe(Arrays.asList(TOPIC_NAME));

    while (true) {
      /*
       * poll() API 是拉取消息的长轮询
       */
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> record : records) {
        System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                record.offset(), record.key(), record.value());
      }
    }
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

Mysql插入测试数据

INSERT INTO `user`
(id, name)
VALUES(8, '888');
1
2
3

Kafka消费端收到消息

收到消息:partition = 0,offset = 11, key = null, value = 
{
    "data":[
        {
            "id":"8",
            "name":"888"
        }
    ],
    "database":"datatest",
    "es":1653296022000,
    "id":84,
    "isDdl":false,
    "mysqlType":{
        "id":"int(11)",
        "name":"varchar(50)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "name":12
    },
    "table":"user",
    "ts":1652899613492,
    "type":"INSERT"
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

Mysql新增列

ALTER TABLE datatest.`user` ADD sex varchar(2) NULL;
1

Kafka消费端收到消息

收到消息:partition = 0,offset = 12, key = null, value = 
{
    "data":null,
    "database":"datatest",
    "es":1653296552000,
    "id":85,
    "isDdl":true,
    "mysqlType":null,
    "old":null,
    "pkNames":null,
    "sql":"/* ApplicationName=DBeaver Ultimate 21.3.0 - SQLEditor &lt;Script-9.sql> */ ALTER TABLE datatest.`user` ADD sex varchar(2) NULL",
    "sqlType":null,
    "table":"user",
    "ts":1652900187875,
    "type":"ALTER"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 7 安装包下载

百度网盘

https://pan.baidu.com/s/1pvGbnkMpdqs3ICMpMsrVow (opens new window)

提取码 : cr1x

#安装部署#Canal
MySQL安装
Redis高可用集群搭建

← MySQL安装 Redis高可用集群搭建→

最近更新
01
SpringBoot
10-21
02
Spring
10-20
03
Sentinel
10-14
更多文章>
Copyright © 2022-2023 Louis | 粤ICP备2022060093号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式