ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

FLINK-connectors-写入ES6

2021-04-01 20:02:23  阅读:179  来源: 互联网

标签:ES6 flink val FLINK new connectors import apache org


1.pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

2.详细代码

import java.util
import java.util.Properties

import com.google.gson.Gson
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.http.HttpHost
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
import org.elasticsearch.client.{Requests, RestClientBuilder}

object FLink_Kafka_ES {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 非常关键,一定要设置启动检查点!!
    env.enableCheckpointing(1000)

    //设置kafka topic
    val topic: String = "test"
    //配置kafka参数
    val props: Properties = new Properties
    props.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092")
    props.setProperty("group.id", "test01")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    
    //导入隐式转换
    import org.apache.flink.streaming.connectors.kafka._
    import org.apache.flink.api.scala._
    import scala.collection.JavaConverters._

    val consumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), props)
    //设置最新的数据进行消费
    consumer.setStartFromLatest()
    //构建数据源
    val kafkaSource: DataStream[String] = env.addSource(consumer)
    //进行转换
    val mapDS: DataStream[Map[String, AnyRef]] = kafkaSource.map(x => {
        //创建Gson解析对象, 把json转化成map
      (new Gson).fromJson(x, classOf[util.Map[String, AnyRef]]).asScala.toMap
    })

    //配置ES节点信息
    val httpHosts = new java.util.ArrayList[HttpHost]
    httpHosts.add(new HttpHost("10.11.159.106", 9204, "http"))
    //构建es sink
    val esSinkBuilder = new ElasticsearchSink.Builder[Map[String, AnyRef]](
      httpHosts,
      new ElasticsearchSinkFunction[Map[String, AnyRef]] {
        override def process(t: Map[String, AnyRef], runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          val map: util.Map[String, AnyRef] = t.asJava
          val indexRequest: IndexRequest = Requests
            .indexRequest()
            .index("flink_kafka")
            //.`type`("kafka_data") //非必选项ES 7.x中不需要再设置文档
            //.id(user_id) //设置文档id为插入数据的某个字段值
            //.create(false) //是否自动创建索引,不推荐使用,最好提前在es中进行Mapping映射,当然如果你的时间字段能够被ES自动识别可以让它自动创建
            //因为ES命名的问题,无法直接使用ES的命名
            //如需使用 x.x 命名格式, 可以考虑嵌套map或者json
            //如使用嵌套map需注意把所有的 map 都需要转化成 java.util.map 否则会爆类型异常
            .source(map)
          //发送请求,写入数据
          requestIndexer.add(indexRequest)
          //写入数据成功输出一下
          println("data saved successfully")
        }
      })
    
    //设置es sink 的参数
    esSinkBuilder.setRestClientFactory(
      new RestClientFactory {
        override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
          restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback {
            override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
              val provider: BasicCredentialsProvider = new BasicCredentialsProvider()
              //设置用户名和密码
              val credentials: UsernamePasswordCredentials = new UsernamePasswordCredentials("elastic", "123456") //根据实际情况改变用户名和密码值,如果不需要用户名密码,字段可设为空字符串“”
              provider.setCredentials(AuthScope.ANY, credentials)
              httpClientBuilder.setDefaultCredentialsProvider(provider)
            }
          })
        }
      })
    //设置最大并行度,来一条请求处理一条
    esSinkBuilder.setBulkFlushMaxActions(1)
    //进行重试的时间间隔。对于指数型则表示起始的基数
    //esSink.setBulkFlushBackoffDelay(1)
    //失败重试的次数
    esSink.setBulkFlushBackoffRetries(3)
    //重试策略,又可以分为以下两种类型
    //a、指数型,表示多次重试之间的时间间隔按照指数方式进行增长。eg:2 -> 4 -> 8 ...
    //b、常数型,表示多次重试之间的时间间隔为固定常数。eg:2 -> 2 -> 2 ...
    esSink.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL)
    //设置批量提交时间间隔
    //esSink.setBulkFlushInterval(100)
    //设置批量提交的最大字节 以MB为单位
    //esSink.setBulkFlushMaxSizeMb(16)

    //es 容错处理
    esSink.setFailureHandler(
      new ActionRequestFailureHandler {
        override def onFailure(actionRequest: ActionRequest, throwable: Throwable, i: Int, requestIndexer: RequestIndexer): Unit = {
          if (ExceptionUtils.findThrowable(throwable, classOf[EsRejectedExecutionException]).isPresent) {
            // ES队列满了,放回队列
            requestIndexer.add(actionRequest)
          } else if (ExceptionUtils.findThrowable(throwable, classOf[SocketTimeoutException]).isPresent) {
            // ES超时异常,放回队列
            requestIndexer.add(actionRequest)
          } else {
            // 其它异常,丢弃数据,记录日志
            println(s"Sink to es exception ,exceptionData: "+actionRequest.toString+" exceptionStackTrace: " + org.apache.commons.lang.exception.ExceptionUtils.getFullStackTrace(throwable))
            throw throwable
          }
        }
      }
    )
  
    //设置最大并行度
    mapDS.setMaxParallelism(1)
    //把数据sink到es
    mapDS.addSink(esSinkBuilder.build())

    env.execute("Kafka_Flink")

    //生产数据命令如下
    // $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic test
    //kafka中输入的测试数据
    // {"id":1,"completed":false,"title":"delectus aut autem","userId":1}
    
    //查看索引
    //Get _cat/indices
    //查看索引中的内容
    //Get flink_kafka/_search
    //批量请求的配置;这将指示接收器在每个元素之后发出请求,否则将对它们进行缓冲。
    
  }
}
参考原文链接:https://blog.csdn.net/hongchenshijie/article/details/109704636

标签:ES6,flink,val,FLINK,new,connectors,import,apache,org
来源: https://www.cnblogs.com/mn-lily/p/14607871.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有