Giter Site home page Giter Site logo

dts-server's Introduction

基于阿里云的DTS封装

最近公司需要应用订阅阿里云RDS相关的binlog,基于阿里云提供的案例subscribe_exampleale演化而来,重构成了SpringBoot、并且升级了相应的jar包,避免了很多版本上面带来的问题,还新增集成了新的客户端kafka、后续会考虑redis等等。

功能点:

  • binlog格式化
  • 数据过滤
  • 集成客户端kafka消息发送
  • 监控规则
  • 位点回滚: 回到指定时间点重新开始

后续还会加强的功能点:

  1. 消费位点的更新(目前位点是基于kafka同步,可以做到容灾切换)
  2. 消息流转的监控
    1. 消费情况
      • 延迟
      • 丢失
      • 重复消费
    2. 业务数据处理情况
      1. 操作类型

由于DTS集成的kafka是基于单分区的,所以同一时刻只能有一个消费者消费(也就是自身自带的消费者),在某些情况可能会产生消息堆积,导致消费延迟(如果业务直接基于dts单个消费者开发的话),这里只是将消费过来的消息进行格式化转换之后,直接放入一个全新的kafka,交给业务方自己去消费。避免binlog消费能力下降。

项目结构

  • config : 配置类
  • listener: 所有binlog消费到的消息都会被监听处理并且消费
  • sender: 消息发送
  • DtsServerApplication: 启动类

其他的基本和案例保持一致

另外可能需要看一下application.yml配置,将你的应用的一些环境配置上去. 如果涉及到环境切分的话

KafkaConfiguration代码中有一个环境变量的配置: dev 如果了解环境切分的可以新增一个application-dev.yml的环境里面配置ssl相关的参数

如果启动时候,发现数据没进来,请查看配置spring.dts.include-data-info是否关注了该数据,又或者在logback.xml文件中将日志级别调整至DEBUG,但是可能会产生大量刷屏日志,这个在调试的时候注意一下。

代码流程介绍

DtsServerApplication : 启动整个应用

StartCallback: Spring的容器构建完成之后回调该方法,启动dts的运行环境。(dts-kafka的构建,以及一些位点线程、找对应消费者的流程初始化

以上环境基本初始化完成。

RecordConsumerListener : binlog回调类。

AbstractEventProcess: 事件处理类,包括关注事件、消息格式化、业务处理、消息发送等等

  • DDLEventProcess : 针对表的DDL操作进行回调处理
  • DataEventProcess: 针对表数据的增删改查回调处理

binlog格式化模版

增删改数据

参考类: com.elab.data.dts.model.DMLData

{
"changeFieldList":[                                // 修改的字段名
        "updated"
    ],
    "databaseName":"databaseName",                  // 数据库名称
    "fieldDataMap":{
        "id":{
            "dataType":"java.lang.Integer",        // 数据类型
            "field":"id",                          // 字段名称
            "oldValue":"502941",                   // 老的字段
            "value":"502941"                       // 当前字段
        },
        "updated":{
            "dataType":"java.util.Date",
            "field":"updated",
            "oldValue":"2020-10-16 11:30:06",
            "value":"2020-10-16 11:35:06"
        }
    },
    "operation":"UPDATE",                         // 操作类型 UPDATE、INSERT、DELETE等等
    "sourceTimestamp":1602819306,                 // 触发时间戳
    "tableName":"table_info"                      // 表名
}

表结构变化语句

参考类: com.elab.data.dts.model.DDLData

{
    "databaseName":"库名",                                               // 数据库名称
    "operation":"DDL",                                                  // 表示操作类型
    "sourceTimestamp":1605582383,                                       // 数据产生时间戳
    "sql":"alter table 表名 modify 字段名 varchar(100) COMMENT '描述'",	// 具体的执行SQL
    "tableName":"表名"                                                   // 表名
}

如果有需要可以在这个基础上进行二次开发,节省更多时间。

注意的点

use-config-checkpoint-name: 为true的情况,非第一次才会参考initial-checkpoint-name的时间戳为主,达到位点偏移到具体的时间戳

spring:
  dts:
    initial-checkpoint-name: 1596440543           # 注意这里是秒的时间戳,为空的话默认是当前时间戳。第一次启动的时候会参考这个时间戳,后续配合use-config-checkpoint-name属性以文件或者kafka的存储位点为主,
    use-config-checkpoint-name: false             # 强制使用当前位点,这里使用的时候要特别注意,不是非得回滚到指定位点,不要用true,否则重启的时候会重复消费,通常用来回到特定时间点的数据进行消费
    subscribe-mode-name: subscribe                # subscribe表示多机主备,如果有多台,只有其中一台会消费,其他只是等待这个消费挂掉,后续补上,起到容灾作用
    max-poll-records: 100
    include-data-info:                            # 数据过滤  包含数据信息
      marketing_db_prod: [ all,abc ]              # marketing_db_prod : 对应的库名    [ all,abc ] 对应的表名 : all 代表所有表, abc 代表具体的表名
      # 如果有多个的话可以继续添加例如(marketing_db_prod2: [ all,abc ])
    exclude-data-info:                            # 排除具体的表 : A , B ,C 代表3个表名, marketing_db : 代表库名
      marketing_db: [ A , B , C]
    exclude-table-change-field:                   # 过滤掉binlog中发生改变的字段
      all: [updated,updator]                      # 过滤掉所有表的发生改变的字段
      c_user_info: [ updated ]                    # 过滤掉指定表的发生改变的字段
    table-partition-map:                          # 表和分区进行绑定,特定的表直接绑定固定的kafka分区
      content_materials_label_rlat_info: 6

临时通过API接口新增要过滤的表

仅仅是应对特殊情况,由于是单分区消费也可以理解为单机消费,所以同一时刻也只可能有一个消费者在消费,重启太麻烦了。

该数据存在内存中,重启则无效。

# GET 请求  获取当前要过滤的表名列表
http://localhost:8686/debug/register/tableName/list?token=自定义的token

# GET 请求  新增要过滤的表名
http://localhost:8686/debug/register/tableName/list?token=自定义的token&&tb=具体的表名

# GET 请求  删除要过滤的表名
http://localhost:8686/debug/register/tableName/clear?token=自定义的token&&tb=具体的表名

# GET 请求 清空所有过滤的表名
http://localhost:8686/debug/register/tableName/clear?token=自定义的token

后续还会将一些dts使用心得分享处理,会持续更新.

欢迎大家一起交流

dts-server's People

Contributors

liukaixiong avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.