博客
关于我
Flink CDC的使用
阅读量:479 次
发布时间:2019-03-06

本文共 4602 字,大约阅读时间需要 15 分钟。

MySQL数据准备与Flink CDC实时数据捕获

MySQL数据准备

在开始使用Flink CDC捕获MySQL变更数据之前,需要先准备好MySQL数据库。以下是具体的操作步骤:

  • 创建并使用数据库
  • create database if not exists test;
    use test;
    1. 创建学生表
    2. drop table if exists stu;
      create table stu (
      id int primary key auto_increment,
      name varchar(100),
      age int
      );
      1. 插入初始数据
      2. insert into stu(name, age) values("张三", 18);
        insert into stu(name, age) values("李四", 20);
        insert into stu(name, age) values("王五", 21);

        注意事项:确保表中有主键,否则Flink CDC可能无法正常工作。

        开启MySQL binlog

        为了实现Flink CDC对MySQL数据库的变更数据实时捕获,需要先开启MySQL的二进制日志。

      3. 修改MySQL配置文件
      4. sudo vim /etc/my.cnf
        1. 在配置文件中添加以下内容:
        2. server-id = 1
          log-bin=mysql-bin
          binlog_format=row
          binlog-do-db=test

          注意事项:启用binlog的数据库需要根据实际情况调整设置,确保二进制日志文件路径和权限正确。

          1. 重启MySQL服务
          2. sudo systemctl restart mysqld

            Flink代码开发

            本节将介绍如何使用Flink CDC从MySQL数据库实时捕获增删改数据。

            依赖管理

          3. 添加Flink CDC依赖
          4. com.ververica
            flink-connector-mysql-cdc
            2.4.0
            1. 其他Flink依赖(如 flink-table-api-java-bridge 等)
            2. org.apache.flink
              flink-table-api-java-bridge
              ${flink.version}
              import com.ververica.cdc.connectors.mysql.source.MySqlSource;
              import com.ververica.cdc.connectors.mysql.table.StartupOptions;
              import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
              import org.apache.flink.api.common.eventtime.WatermarkStrategy;
              import org.apache.flink.streaming.api.datastream.DataStreamSource;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
              public class FlinkCDCDemo {
              public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(4);
              MySqlSource
              mySqlSource = MySqlSource.builder()
              .hostname("node4")
              .port(3306)
              .username("root")
              .password("000000")
              .databaseList("test")
              .tableList("test.stu")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .startupOptions(StartupOptions.initial())
              .build();
              DataStreamSource
              dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source")
              .setParallelism(1);
              dataStreamSource.print();
              env.execute();
              }
              }

              注意事项:确保MySQL的binlog已经启用,并且Flink运行环境的版本与依赖版本匹配。

              测试与验证

              添加新数据

              执行以下SQL语句:

              mysql> insert into stu(name, age) values("赵六", 23);

              输出示例

              {
              "before": null,
              "after": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831654000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2300,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "c",
              "ts_ms": 1719831654692,
              "transaction": null
              }

              修改数据

              执行以下SQL语句:

              mysql> update stu set name="zl", age=19 where name="赵六";

              输出示例

              {
              "before": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "after": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831987000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2604,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "u",
              "ts_ms": 1719831987238,
              "transaction": null
              }

              删除数据

              执行以下SQL语句:

              mysql> delete from stu where id=4;

              输出示例

              {
              "before": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "after": null,
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719832151000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2913,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "d",
              "ts_ms": 1719832151198,
              "transaction": null
              }

              注意事项:通过IDEA控制台可以实时查看Flink程序的输出日志,确保数据捕获和处理过程中没有错误发生。

    转载地址:http://iaqbz.baihongyu.com/

    你可能感兴趣的文章
    OpenCV与AI深度学习 | 实战 | 基于YoloV5和Mask RCNN实现汽车表面划痕检测(步骤 + 代码)
    查看>>
    OpenCV与AI深度学习 | 实战 | 基于YOLOv9+SAM实现动态目标检测和分割(步骤 + 代码)
    查看>>
    OpenCV与AI深度学习 | 实战 | 基于YOLOv9和OpenCV实现车辆跟踪计数(步骤 + 源码)
    查看>>
    OpenCV与AI深度学习 | 实战 | 文本图片去水印--同时保持文本原始色彩(附源码)
    查看>>
    OpenCV与AI深度学习 | 实战 | 通过微调SegFormer改进车道检测效果(数据集 + 源码)
    查看>>
    OpenCV与AI深度学习 | 实战—使用YOLOv8图像分割实现路面坑洞检测(步骤 + 代码)
    查看>>
    OpenCV与AI深度学习 | 实战篇——基于YOLOv8和OpenCV实现车速检测(详细步骤 + 代码)
    查看>>
    OpenCV与AI深度学习 | 实战|OpenCV实时弯道检测(详细步骤+源码)
    查看>>
    OpenCV与AI深度学习 | 实用技巧 | 使用OpenCV进行模糊检测
    查看>>
    OpenCV与AI深度学习 | 实践教程|旋转目标检测模型-TensorRT 部署(C++)
    查看>>
    OpenCV与AI深度学习 | 工业缺陷检测中数据标注需要注意的几个事项
    查看>>
    OpenCV与AI深度学习 | 干货 | 深度学习模型训练和部署的基本步骤
    查看>>
    OpenCV与AI深度学习 | 手把手教你用Python和OpenCV搭建一个半自动标注工具(详细步骤 + 源码)
    查看>>
    OpenCV与AI深度学习 | 水下检测+扩散模型:或成明年CVPR最大惊喜!
    查看>>
    OpenCV与AI深度学习 | 深入浅出了解OCR识别票据原理
    查看>>
    OpenCV与AI深度学习 | 深度学习检测小目标常用方法
    查看>>
    OpenCV与AI深度学习 | 超越YOLOv10/11、RT-DETRv2/3!中科大D-FINE重新定义边界框回归任务
    查看>>
    OpenCV与AI深度学习 | 高效开源的OCR工具:Surya-OCR介绍与使用
    查看>>
    OpenCV与AI深度学习|16个含源码和数据集的计算机视觉实战项目(建议收藏!)
    查看>>
    Opencv中KNN背景分割器
    查看>>