标签:Console string producer 生产者 代码 server new kafka consumer
Producer
static void Main(string[] args) { Console.WriteLine("请输入消息内容"); using (var producer = new KafkaProducer()) { while (true) { string message = Console.ReadLine(); try { //topic名称是test var result = producer.ProduceAsync("test", new Confluent.Kafka.Message<string, string>() { Key = Guid.NewGuid().ToString(), Value = message }) .GetAwaiter().GetResult(); Console.WriteLine($"offset:{result.Offset.Value},partition:{result.Partition.Value}"); } catch (ProduceException<string, string> e) { Console.WriteLine($"失败的消息: {e.Message} [{e.Error.Code}]"); continue; } } } } class KafkaProducer : IDisposable { private ProducerConfig _config = new ProducerConfig(); private IProducer<string, string> _producer; public KafkaProducer(string server = null) { if (string.IsNullOrEmpty(server)) { server = "127.0.0.0.1:9001,127.0.0.0.1:9002,127.0.0.0.1:9003"; } _config.BootstrapServers = server; _producer = new ProducerBuilder<string, string>(_config).Build(); } public async Task<DeliveryResult<string, string>> ProduceAsync(string topic, Message<string, string> message) { return await _producer.ProduceAsync(topic, message); } public void Dispose() { _producer?.Dispose(); } }
Consumer
static void Main(string[] args) { Console.WriteLine("默认只关注test主题的消息)"); using (var consumer = new KafkaConsumer()) { while (true) { consumer.Consume(a => { if (a == null) { Console.WriteLine("暂无消息"); } else { Console.WriteLine($"Key:{a.Message.Key},Value:{a.Message.Value}"); } }); } } } class KafkaConsumer : IDisposable { private IConsumer<string, string> _consumer; public KafkaConsumer(string server = null) { if (string.IsNullOrEmpty(server)) { server = "127.0.0.0.1:9001,127.0.0.0.1:9002,127.0.0.0.1:9003"; } var config = new ConsumerConfig { GroupId = "TestGroupone", BootstrapServers = server, AutoOffsetReset = AutoOffsetReset.Earliest }; _consumer = new ConsumerBuilder<string, string>(config).Build(); //topic名称默认是test _consumer.Subscribe("test"); } public void Consume(Action<ConsumeResult<string, string>> action = null) { var consumerResult = _consumer.Consume(TimeSpan.FromSeconds(2)); action?.Invoke(consumerResult); } public void Dispose() { _consumer?.Dispose(); } }
标签:Console,string,producer,生产者,代码,server,new,kafka,consumer 来源: https://www.cnblogs.com/Ginease/p/16305225.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。