练习-统计卡口通过的车辆数量(及提交到集群中的问题) 统计卡口查流量 1、统计最近15分分钟的车流量 2、每隔5分钟统计一次 3、使用事件时间 4、数据源使用kafka 5、将统计好的结果保存到mysql中 1、创建kafka生产者生产几条数据 #创建一个Topic kafka-topics.sh --create --zookeep
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。 消息队列模式 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)。 布/订阅模式(一对多,消费者消费数据之后不会清除消息)。 名词: Producer :消息生产者,就是向 kafka broker 发消
canal搭建-简单使用,flume配合kafka使用 开启mysql binlog 默认没有开启 开启binlog之后mysql的性能会手动影响 1、修改mysql配置文件/etc/my.cnf # 如果配置文件不存在,复制一个过来 cp /usr/share/mysql/my-medium.cnf /etc/my.cnf vim /etc/my.cnf # 在配置文件中增加二配
Kafka-java 1. 在写代码前需要导入依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> <
一、搭建Kafka 1、上传解压修改环境变量 # 解压 tar -xvf kafka_2.11-1.0.0.tgz # 配置环境变量 vim /etc/profile export KAFKA_HOME=/usr/local/soft/kafka_2.11-1.0.0 export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile 2、修改配置文件 vim config/server.prop
1. 参考 docker部署kafka集群 (多台服务器) 2. 安装 docker-compose 查看之前的博客进行部署(动动小手) 3. 安装 kafka-cluster 目录结构 docker-kafka-cluster ├── docker-compose .yml ├── .env └── start .sh version: '3' services: kafka-manage
一:Kafka特性 1.为什么要用消息队列 《解耦、异步、削峰》 1.1高吞吐量、低延迟: kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个主题可以分多个分区,消费组对分区进行消费操作。 1.2可扩展性: kafka集群支持热扩展 1.3持久性、可靠性:消息被持
ActiveMQ、RabbitMQ、RocketMQ、Kafka四种消息中间件分析介绍 我们从四种消息中间件的介绍到基本使用,以及高可用,消息重复性,消息丢失,消息顺序性能方面进行分析介绍! 一、消息中间件的使用场景 消息中间件的使用场景总结就是六个字:解耦、异步、削峰 1.解耦 如果我方系统A要与三方B系
前言 Apache Kafka 最早是由 LinkedIn 开源出来的分布式消息系统,现在是Apache旗下的一个子项目,并且已经成为开源领域应用最广泛的消息系统之一。尤其是做日志中间件。 Kafka是一个分布式系统, 背景 我们公司迁移ActiveMQ消息中间件,为了减少资源开支,引入Kafka这种高性能高吞吐
kafka系列文章之python-api的使用。 在使用kafka-python时候需要注意,一定要版本兼容,否则在使用生产者会报 无法更新元数据的错误。 在本片测试中java版本为如下,kafka版本为0.10.0,kafka-python版本为1.3.1,目前最新的版本为1.4.4 [root@test2 bin]# java -version java version "1.
1. kafka-python的安装 pip3 install kafka-python 2.kafka-python的基本使用 最简单使用实例 1.消费端 from kafka import KafkaConsumer consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'],value_seriali
python 发送kafka python 发送kafka大体有三种方式 1 发送并忘记(不关注是否正常到达,不对返回结果做处理) 1 import pickle 2 import time 3 from kafka import KafkaProducer 4 5 producer = KafkaProducer(bootstrap_servers=['ip:9092'], 6 key
一、前言 由于工作原因使用到了 Kafka,而现有的代码并不能满足性能需求,所以需要开发高效读写 Kafka 的工具,本文是一个 Python Kafka Client 的性能测试记录,通过本次测试,可以知道选用什么第三方库的性能最高,选用什么编程模型开发出来的工具效率最高。 二、第三方库性能测试 1
无消息丢失配置 我们有时候听到开发说消息丢了。遇到这种情况你要想找这个消息通常是去生产者的日志里去看是否发送了这条消息以及去Kafka日志里去找是否有这条消息,不过这都是事后措施,通常来说我们如果要求不丢失消息,那么我们要怎么做呢? 但是从上图可以看出来这里面涉及三方,生产
# ----------------------系统相关---------------------- # broker的全局唯一编号,不能重复,和zookeeper的myid是一个意思 broker.id=0 # broker监听IP和端口也可以是域名 listeners=PLAINTEXT://172.16.48.163:9092 # 这个配置方式和listeners相同,主要用于Kafka有多个IP地址,比
消息分区策略 这里不是说Kafka的主题为什么要分区,而是说在分区的结构下,如何让具有某种特点的消息发送到特定分区。 这里有一个很明显的问题,就是主题分区,那么生产者发送的消息到底被发送到了哪个分区呢?一般我们都知道有轮询机制或者是随机机制,这两种机制都比较好理解。而且Kafka也
主题管理 创建主题 1 kafka-topics.sh --bootstrap-server 172.16.100.10:9092 --create --topic TestCCC --partitions 3 --replication-factor 3 列出所有主题 1 2 3 4 kafka-topics.sh --list --bootstrap-server 172.16.100.10:9092 # 对于启
日志 这里所说的日志是用来记录生产者向主题发送消息而产生的日志,但日志中记录的并不是消息而是record,因为Kafka并不是将原始消息直接写入日志的,而是把消息和其他元数据封装在一个record里写入日志,我们把这个record叫做消息集合。 这些日志信息放在配置文件的log.dirs指定的目录
安装前准备: 安装JDK 安装zookeeper(如果使用kafka内置的zookeeper,可以忽略) 第一步、安装JDK kafka依赖JAVA环境,所以首先要安装jdk 第二步、安装zookeeper 1)下载 官网地址:https://zookeeper.apache.org/releases.html 下载地址:http://archive.apache.org/dist/zookeeper/zookeeper-
1 Kafka提供的命令行脚本 Kafka默认提供了多个命令行脚本,用于实现各种各样的功能和运维管理。从2.2版本开始,提供了多达30+个Shell脚本。 今天我们来看一些其中比较实用的Shell脚本。 2 生产消费测试脚本 这恐怕是我们最常用到的工具脚本了,没有之一。 生产消息 生产消息使用kafka-
本地安装Kafka 一、安装方法 前提 : 本地安装zookeeper 1、下载 kafka_2.13-2.5.1.tgz 文件 (带src是源码 如:kafka-2.5.1-src.tgz ,不能安装) 2、解压文件,进入目录,创建 logs文件夹。进入config目录,打开 server.properties 文件:注释#log.dirs=/tmp/ka
一、报错日志 2022-07-21 09:12:42,274 WARN - [main-SendThread(hadoop01:2181):] ~ Client session timed out, have not heard from server in 34640ms for sessionid 0xff821bd396610617 (ClientCnxn$SendThread:1190) 2022-07-21 09:12:42,819 WARN - [main-SendThread(ha
1 关于流处理 流处理平台(Streaming Systems)是处理无限数据集(Unbounded Dataset)的数据处理引擎,而流处理是与批处理(Batch Processing)相对应的。所谓的无线数据,指的是数据永远没有尽头。而流处理平台就是专门处理这种数据集的系统或框架。下图生动形象地展示了流处理和批处理的区别:
先说NIO reactor 原始的方式 全部一个线程完成 networkclient acceptor dispathch hander networkclient 增加一个处理器负责处理业务 Processor 线程池专门处理 handler networkclient acceptor (多线程)Processor pool 【新增】 dispathch hander networkclient 主从Reacto
消息的两种模式 点对点模式,消费消息后删除消息 发布订阅模式,消息携带主题,消费消息后不删除消息 架构 消息分组,将消息分成多个Partition 消费者分组 Partition主从备份 命令 kafka-topics.sh 创建主题 # 创建test主题,分区数1,分区副本1 KAFKA_HOME/bin/kafka-topics.sh --cre