ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Node.js 操作kafka

2021-08-10 19:04:39  阅读:217  来源: 互联网

标签:Node function producer value js ..... MQ kafka


Node.js 操作kafka

基础知识可参考:douzixiansheng/MQ​github.com/douzixiansheng/MQ/blob/master/kafka_basic.md

1.准备好kafka环境,没有安装的伙伴可以参考 (讲解了linux如何安装kafka):

douzixiansheng/MQ​github.com/douzixiansheng/MQ/blob/master/kafka_linux_install.md

2. 安装依赖 kafka-node 模块,编写package.json

{
    "name": "kafka",
    "private": false,
    "dependencies": {
        "kafka-node": "4.1.3"
    },
    "devDependencies": {
        "mocha": ">=0.0.1"
    }
}

3. 编写生产者与消费者 具体api可以参考官网:

kafka-node​www.npmjs.com/package/kafka-node

生产者:

/**
 * 生产者
 */

const kafka = require('kafka-node');

let conn = {'kafkaHost':'127.0.0.1:9092'};

var MQ = function (){
    this.mq_producers = {};
}

MQ.prototype.AddProducer = function (conn, handler){
    console.log('增加生产者',conn, this);
    let client = new kafka.KafkaClient(conn);
    let producer = new kafka.Producer(client);

    producer.on('ready', function(){
        if(!!handler){
            handler(producer);
        }
    });

    producer.on('error', function(err){
        console.error('producer error ',err.stack);
    });

    this.mq_producers['common'] = producer;
    return producer;
}
console.log(MQ);
var mq = new MQ();

mq.AddProducer(conn, function (producer){
    producer.createTopics(['broadcast'], function (){
        setInterval(function(){
            mq.mq_producers['common'].send([{topic:['broadcast'], 
            messages:[JSON.stringify({"cmd":"testRpc","value":"Hello World"})]}], function (){
                console.log("..... ");
            })
        }, 2000);
    })
});

消费者:

/**
 * 消费者
 */

const kafka = require('kafka-node');

let conn = {'kafkaHost':'127.0.0.1:9092'};
let consumers = [
    {
        'type': 'consumer',
        'options': {'autoCommit': true},
        'name':'common',
        'topic':[
            {'topic': 'broadcast', 'partition': 0}
        ]
    }
];

let MQ = function(){

}

MQ.prototype.AddConsumer = function (conn, topics, options, handler){
    let client = new kafka.KafkaClient(conn);
    let consumer = new kafka.Consumer(client, topics, options);

    if(!!handler){
        consumer.on('message', handler);
    }

    consumer.on('error', function(err){
        console.error('consumer error ',err.stack);
    });
}

var mq = new MQ();


mq.AddConsumer(conn, consumers[0].topic, consumers[0].options, function (message){
    console.log(message.value);
});

4.执行生产者脚本

root@FM:/home/MQ/Kafka# node producer.js 
[Function: MQ]
增加生产者 { kafkaHost: '127.0.0.1:9092' } MQ { mq_producers: {} }
..... 
..... 
..... 
..... 
..... 
..... 
..... 
..... 
..... 
..... 
..... 

执行消费者脚本

root@FM:/home/MQ/Kafka# node consumer.js 
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}

可以看到每隔两秒生产者往主题topic(broadcast) 上生产消息,消费者从主题上拉取消息

标签:Node,function,producer,value,js,.....,MQ,kafka
来源: https://www.cnblogs.com/aishuishuiADgai/p/15125357.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有