ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

FlinkCDC读取MySQL并写入Kafka案例(com.ververica)

2022-01-20 01:32:32  阅读:350  来源: 互联网

标签:val flink ververica Kafka FlinkCDC import apache org com


该方法使用的是com.ververica版本的flink-connector-mysql-cdc,可以解决alibaba版本的以下两个问题:

1)可以有效避免锁表

2)当设置StartupOptions.latest()时做checkpoints可能出现的异常错误

因此不推荐使用alibaba的版本。

 

需要注意点,依赖的POM文件如下,标记为粗体的部分是需要注意的地方:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>gmall-flink-2021</artifactId>
        <groupId>com.king</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>gmall-flink-cdc-ververica</artifactId>
    <version>1.0.0</version>


    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.12.7</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <!--如果保存检查点到 hdfs 上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <!--Flink 默认使用的是 slf4j 记录日志,相当于一个日志的接口,我们这里使用 log4j 作为
        具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.32</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.32</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.17.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <!---时机为准备打包->
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        -->
                        <configuration>
                            <!--输出路径-->
                            <outputDirectory>${project.build.directory}/bin/lib</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <!--新文件才会覆盖-->
                            <overWriteIfNewer>true</overWriteIfNewer>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <!--添加外部jar包到classpath-->
                            <addClasspath>true</addClasspath>
                            <!--classpath路径前缀-->
                            <classpathPrefix>lib/</classpathPrefix>
                            <!--主类的全类名-->
                            <mainClass>com.Application</mainClass>
                        </manifest>
                    </archive>
                    <!--jar包输出路径为项目构建路径target下的bin目录-->
                    <outputDirectory>
                        ${project.build.directory}/bin
                    </outputDirectory>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>

 

这里直接上主程序:FlinkCdcWithVerverica

package com.king.app

import com.king.config.{DBServerConstant, StateBackendConfig}
import com.king.function.CustomerDeseriallization
import com.king.util.MyKafkaUtil
import com.ververica.cdc.connectors.mysql.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

/**
 * @Author: KingWang
 * @Date: 2022/1/15  
 * @Desc:
 **/
object FlinkCdcWithVerverica {
  def main(args: Array[String]): Unit = {

    //1. 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //1.1 开启ck并指定状态后端fs

    env.setStateBackend(new FsStateBackend(StateBackendConfig.getFileCheckPointDir("cdc_ververica")))
      env.enableCheckpointing(30000L) //头尾间隔:每5秒触发一次ck
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)  //
    env.getCheckpointConfig.setCheckpointTimeout(10000L)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000l)  //尾和头间隔时间3秒

    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));



    //2. 通过flinkCDC构建SourceFunction并读取数据
    val dbServer = DBServerConstant.mysql_gmall_flink()
    val sourceFunction = MySqlSource.builder[String]()
      .hostname(dbServer.hostname)
      .port(dbServer.port)
      .username(dbServer.username)
      .password(dbServer.password)
      .databaseList("gmall-210325-flink")


      //如果不添加该参数,则消费指定数据库中所有表的数据
      //如果添加,则需要按照 数据库名.表名 的格式指定,多个表使用逗号隔开
      .tableList("gmall-210325-flink.base_trademark")
//      .deserializer(new StringDebeziumDeserializationSchema())
      .deserializer(new CustomerDeseriallization())

      //监控的方式:
      // 1. initial 初始化全表拷贝,然后再比较
      // 2. earliest 最早的
      // 3. latest  指定最新的
      // 4. specificOffset 指定offset
      // 3. timestamp 比指定的时间大的

      .startupOptions(StartupOptions.latest())
      .build()

    val  dataStream = env.addSource(sourceFunction)

    //3. sink
    dataStream.print()
    dataStream.addSink(MyKafkaUtil.getKafkaProducer("test"))

    //4. 启动任务
    env.execute("flink-cdc")

  }
}

 

自定义的输出格式:CustomerDeseriallization

package com.king.function

import com.alibaba.fastjson.JSONObject
import com.ververica.cdc.debezium.DebeziumDeserializationSchema
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.{Schema, Struct}
import org.apache.kafka.connect.source.SourceRecord


/**
 * @Author: KingWang
 * @Date: 2021/12/29  
 * @Desc:
 **/
class CustomerDeseriallization extends DebeziumDeserializationSchema[String]{

  /**
   * 封装的数据:
   * {
   *   "database":"",
   *   "tableName":"",
   *   "type":"c r u d",
   *   "before":"",
   *   "after":"",
   *   "ts": ""
   *
   * }
   *
   * @param sourceRecord
   * @param collector
   */
  override def deserialize(sourceRecord: SourceRecord, collector: Collector[String]): Unit = {
    //1. 创建json对象用于保存最终数据
    val result = new JSONObject()


    val value:Struct = sourceRecord.value().asInstanceOf[Struct]
    //2. 获取库名&表名
    val source:Struct = value.getStruct("source")
    val database = source.getString("db")
    val table = source.getString("table")

    //3. 获取before
    val before = value.getStruct("before")
    val beforeObj = if(before != null)  getJSONObjectBySchema(before.schema(),before) else new JSONObject()


    //4. 获取after
    val after = value.getStruct("after")
    val afterObj = if(after != null) getJSONObjectBySchema(after.schema(),after) else new JSONObject()

    //5. 获取操作类型
    val op:String = value.getString("op")

    //6. 获取操作时间
    val ts = source.getInt64("ts_ms")
//    val ts = value.getInt64("ts_ms")


    //7. 拼接结果
    result.put("database", database)
    result.put("table", table)
    result.put("type", op)
    result.put("before", beforeObj)
    result.put("after", afterObj)
    result.put("ts", ts)

    collector.collect(result.toJSONString)

  }

  override def getProducedType: TypeInformation[String] = {
    BasicTypeInfo.STRING_TYPE_INFO
  }


  def getJSONObjectBySchema(schema:Schema,struct:Struct):JSONObject = {
    val fields = schema.fields()
    var jsonBean = new JSONObject()
    val iter = fields.iterator()
    while(iter.hasNext){
      val field = iter.next()
      val key = field.name()
      val value = struct.get(field)
      jsonBean.put(key,value)
    }
    jsonBean
  }

}

 

这里以设置每次从最新的开始读取,StartupOptions.latest() ,然后运行:

 

 新增一条数据:

 

 修改:

 

 

删除

 

 

到此,圆满结束。

通常情况下,执行步骤依次是:

1. 第一次初始化时,使用StartupOptions.initial(),将所有数据同步,

2. 再使用latest,取最新的记录,同时设置checkpoint检查点,以便于失败时,可以从检查点恢复。

 

标签:val,flink,ververica,Kafka,FlinkCDC,import,apache,org,com
来源: https://www.cnblogs.com/30go/p/15824833.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有