ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Flink写入数据到MySQL案例

2022-02-22 16:04:19  阅读:262  来源: 互联网

标签:insertStmt temp Flink 写入 value MySQL sensor id updateStmt


案例准备:

1、启动MySQL,在mysql中创建数据库flinkdb,并创建表sensor_temp

CREATE TABLE sensor_temp  (
  id varchar(32),
  temp double
) 

代码实现:

def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[SensorReading] = env.addSource(new MyDefSource)
    dataStream.addSink(new MyJdbcSinkFunction())

    env.execute()
}

class MyJdbcSinkFunction extends RichSinkFunction[SensorReading]{
  var connection: Connection =_
  var insertStmt: PreparedStatement=_
  var updateStmt: PreparedStatement=_
  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    updateStmt.setDouble(1,value.temperature)
    updateStmt.setString(2,value.id)
    updateStmt.execute()

    if(updateStmt.getUpdateCount == 0){
      insertStmt.setString(1,value.id)
      insertStmt.setDouble(2,value.temperature)
      insertStmt.execute()
    }

  }

  override def open(parameters: Configuration): Unit = {
    connection = DriverManager.getConnection("jdbc:mysql://192.168.91.180:3306/flinkdb?useSSL=false", "root", "123123")
    insertStmt = connection.prepareStatement("insert into sensor_temp(id,temp) value(?,?)")
    updateStmt = connection.prepareStatement("update sensor_temp set temp=? where id=?")

  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    connection.close()
  }

运行结果:

查询数据select * from sensor_temp;
在这里插入图片描述

标签:insertStmt,temp,Flink,写入,value,MySQL,sensor,id,updateStmt
来源: https://blog.csdn.net/weixin_44911081/article/details/123070256

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有