ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink 题目

2022-07-23 23:34:27  阅读:147  来源: 互联网

标签:stat flink 题目 String val Flink result mysql


Flink 题目

从MySql中读取数据,通过Flink处理之后在存储到MySql中

package com.wt.flink.homework
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

object Text1 {
  /**
   *
   * 1、从数据库mysql中读取学生表的数据,
     2、统计班级的人数
     3、将统计好的结果保存到数据库中,一个班级只保存一条数据
   */
  def main(args: Array[String]): Unit = {
       //创建flink的环境
       val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

       //使用自定义的source读取mysql中的数据
       val mysqlDS: DataStream[String] = env.addSource(new MySqlSource())

       val countDS: DataStream[(String, Int)] = mysqlDS
         .map(stu => (stu.split("\t")(4), 1))
         .keyBy(_._1)
         .sum(1)

       //将统计号的结果保存到MySlq中
       countDS.addSink(new MySlqSink)

       //触发任务执行
       env.execute()
  }

     /**
      * 自定义source ,实现SourceFunction接口
      *
      */

     class MySqlSource extends SourceFunction[String] {
          /**
           * run: 用于读取外部数据的方法,只执行一次
           *
           * @param ctx : 上下文对象,用于将读取到的数据发送到下游
           */
          override def run(ctx: SourceFunction.SourceContext[String]):Unit = {
               /**
                *  使用jdbc读取mysql的数据,将读取到的数据发送到下游
                *
                */
               //创建连接
               Class.forName("com.mysql.jdbc.Driver")
               val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8", "root", "123456")

               //编写查询的数据sql
               val stat: PreparedStatement = conn.prepareStatement("select * from students")

               //执行查询
               val result: ResultSet = stat.executeQuery()

               //解析数据
               while(result.next()){
                    val id: Long = result.getLong("id")
                    val name: String = result.getString("name")
                    val age: Long = result.getLong("age")
                    val gender: String = result.getString("gender")
                    val clazz: String = result.getString("clazz")


                    //将每一条数据发送到下游
                    ctx.collect(s"$id\t$name\t$age\t$gender\t$clazz")
               }

               //关闭连接
               stat.close()
               conn.close()

          }

          //任务被取消的时候执行,一般用于回收资源
          override def cancel(): Unit = {}

     }

     //写到mysql中
     class MySlqSink extends RichSinkFunction[(String,Int)]{

          var con: Connection = _
          var stat: PreparedStatement = _


          /**
           * open:在invoke之前执行,每个task中只执行一次
           * 一般用于初始化数据库的连接
           *
           */
          override def open(parameters:Configuration):Unit ={
               //1、加载驱动
               Class.forName("com.mysql.jdbc.Driver")
               //创建链接
               con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
               //编写插入数据的sql
               //replace :如果不存在插入,如果存在就替换,需要在表中设置主键
               stat = con.prepareStatement("replace into clazz_num(clazz,num) values(?,?)")
          }

          /**
           * 任务关闭时后执行,一般用于回收资源
           */
          override def close():Unit={
               //关闭连接
               stat.close()
               con.close()
          }

          /**
           * 每一条数据会执行一次
           * 使用jdbc将数据保存到mysql中
           *
           * @param kv      : 一行数据
           * @param context : 上下文对象
           */
          override def invoke(kv: (String, Int), context: SinkFunction.Context): Unit = {
               //设置参数
               stat.setString(1, kv._1)
               stat.setInt(2, kv._2)
               //执行插入
               stat.execute()
          }
     }
}

标签:stat,flink,题目,String,val,Flink,result,mysql
来源: https://www.cnblogs.com/atao-BigData/p/16513590.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有