ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Flink实战(八十):flink-sql使用(七)Flink SQL Clien读取Kafka数据流式写入Hive(用hive 管理kafka元数据)

2021-06-10 20:02:36  阅读:169  来源: 互联网

标签:... Flink flink hive connector metastore SQL table


声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。

《2021年最新版大数据面试题全面开启更新》

版本说明:

  1. Flink 1.11.2
  2. Kafka 2.4.0
  3. Hive 3.1.2
  4. Hadoop 3.1.3

1 hive 

安装hive,使用mysql做为元数据存储

1.2 hive-site.xml 配置 (版本3.1.2)

复制代码

复制代码

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:mysql://hadoop102:3306/metastore?createDatabaseIfNotExist=true</value>
            <description>JDBC connect string for a JDBC metastore</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>com.mysql.cj.jdbc.Driver</value>
            <description>Driver class name for a JDBC metastore</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>root</value>
            <description>username to use against metastore database</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            <value>123456</value>
            <description>password to use against metastore database</description>
        </property>

    <property>
         <name>hive.metastore.warehouse.dir</name>
         <value>/user/hive/warehouse</value>
         <description>location of default database for the warehouse</description>
    </property>

    <property>
        <name>hive.cli.print.header</name>
        <value>true</value>
    </property>

    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>

    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>

    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>

    <property>
        <name>hive.server2.thrift.bind.host</name>
         <value>192.168.1.122</value>
    </property>

<property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
 </property>


    <property>
        <name>datanucleus.schema.autoCreateAll</name>
        <value>true</value>
    </property>


    <property>
            <name>hive.metastore.uris</name>
            <value>thrift://localhost:9083</value> <!-- metastore 在的pc的ip-->
    </property>



</configuration>

复制代码

复制代码

2 flink(版本1.10.2) 

2.1 配置conf/sql-client-hive.yaml

复制代码

复制代码

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################


# This file defines the default environment for Flink's SQL Client.
# Defaults might be overwritten by a session specific environment.


# See the Table API & SQL documentation for details about supported properties.


#==============================================================================
# Tables
#==============================================================================

# Define tables here such as sources, sinks, views, or temporal tables.

#tables: [] # empty list
# A typical table source definition looks like:
# - name: ...
#   type: source-table
#   connector: ...
#   format: ...
#   schema: ...

# A typical view definition looks like:
# - name: ...
#   type: view
#   query: "SELECT ..."

# A typical temporal table definition looks like:
# - name: ...
#   type: temporal-table
#   history-table: ...
#   time-attribute: ...
#   primary-key: ...


#==============================================================================
# User-defined functions
#==============================================================================

# Define scalar, aggregate, or table functions here.

#functions: [] # empty list
# A typical function definition looks like:
# - name: ...
#   from: class
#   class: ...
#   constructor: ...


#==============================================================================
# Catalogs
#==============================================================================

# Define catalogs here.

catalogs: # empty list
# A typical catalog definition looks like:
  - name: myhive # 名字随意取
    type: hive 
    hive-conf-dir: /opt/module/hive/conf # hive-site.xml 所在的路径
#    default-database: ...

#==============================================================================
# Modules
#==============================================================================


# Define modules here.

#modules: # note the following modules will be of the order they are specified
#  - name: core
#    type: core

#==============================================================================
# Execution properties
#==============================================================================

# Properties that change the fundamental execution behavior of a table program.

execution:
  # select the implementation responsible for planning table programs
  # possible values are 'blink' (used by default) or 'old'
  planner: blink
  # 'batch' or 'streaming' execution
  type: streaming
  # allow 'event-time' or only 'processing-time' in sources
  time-characteristic: event-time
  # interval in ms for emitting periodic watermarks
  periodic-watermarks-interval: 200
  # 'changelog' or 'table' presentation of results
  result-mode: table
  # maximum number of maintained rows in 'table' presentation of results
  max-table-result-rows: 1000000
  # parallelism of the program
  parallelism: 1
  # maximum parallelism
  max-parallelism: 128
  # minimum idle state retention in ms
  min-idle-state-retention: 0
  # maximum idle state retention in ms
  max-idle-state-retention: 0
  # current catalog ('default_catalog' by default)
  current-catalog: myhive
  # current database of the current catalog (default database of the catalog by default)
  current-database: hive
  # controls how table programs are restarted in case of a failures
  restart-strategy:
    # strategy type
    # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
    type: fallback

#==============================================================================
# Configuration options
#==============================================================================

# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" web page.

# A configuration can look like:
# configuration:
#   table.exec.spill-compression.enabled: true
#   table.exec.spill-compression.block-size: 128kb
#   table.optimizer.join-reorder-enabled: true

#==============================================================================
# Deployment properties
#==============================================================================

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  # general cluster communication timeout in ms
  response-timeout: 5000
  # (optional) address from cluster to gateway
  gateway-address: ""
  # (optional) port from cluster to gateway
  gateway-port: 0

复制代码

复制代码

2.2 配置jar包

 

复制代码

复制代码

/flink-1.10.2
   /lib

       // Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.10.2.jar

       // Hadoop dependencies
       // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively
       // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop
       // cluster and the Hive version you're using.
       flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

       // Hive dependencies
       hive-exec-2.3.4.jar
       hive-metastore-3.1.2.jar
    libfb303-0.9.3.jar
       // kafka dependencies
        flink-sql-connector-kafka_2.11-1.11.2.jar

复制代码

复制代码

后三个JAR包都是Hive自带的,可以在${HIVE_HOME}/lib目录下找到。前两个可以通过阿里云Maven搜索GAV找到并手动下载(groupId都是org.apache.flink)。

注意:要将lib包分发到集群中其他flink机器上

3 启动

3.1 启动hadoop集群

省略。。。

3.2 启动Hive meatastore

hive --service metastore &

3.3 启动Flink  

$FLINK_HOME/bin/start-cluster.sh

3.4 启动 Flink SQL Client

atguigu@hadoop102:/opt/module/flink$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml -l lib/

3.5 在Flink SQL Client中创建Hive表,指定数据源为Kafka

复制代码

CREATE TABLE student(
  id INT,
  name STRING,
  password STRING,
  age INT,
  ts BIGINT,
  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
  WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal', -- 指定Kafka连接器版本,不能为2.4.0,必须为universal,否则会报错
  'connector.topic' = 'student', -- 指定消费的topic
  'connector.startup-mode' = 'latest-offset', -- 指定起始offset位置
  'connector.properties.zookeeper.connect' = 'hadoop000:2181',
  'connector.properties.bootstrap.servers' = 'hadooop000:9092',
  'connector.properties.group.id' = 'student_1',
  'format.type' = 'json',
  'format.derive-schema' = 'true', -- 由表schema自动推导解析JSON
  'update-mode' = 'append'
);

复制代码

3.6 启动Kafka,发送数据

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list hadoop000:9092 --topic student
{"id":12, "name":"kevin", "password":"wong", "age":22, "ts":1603769073}

3.7 通过Flink SQL Client查询表中的数据

select * from student

 

 参考:https://blog.csdn.net/hll19950830/article/details/109308055

错误参考:

java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

https://blog.csdn.net/qq_31866793/article/details/107487858

 

标签:...,Flink,flink,hive,connector,metastore,SQL,table
来源: https://blog.51cto.com/u_14222592/2892537

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有