标签:异步 String ResponseFuture map msgId kafka 响应 future public
我们知道单体架构中的HTTP是同步请求响应,微服务架构中的消息时异步请求,无响应。
但如果实际需求中,我们需要获得这个消息的请求结果怎么办?
理论上也是可以实现的!
首先,需要对请求的消息体进行升级,增加一个msgID,用于在接收返回消息时进行识别。
第二,如果发送和接收消息的双方未约定请求通道和响应通道,发送消息时,消息体还需要携带响应通道信息。
为了简化需求,我们假定请求通道和响应通道双方已经约定好。
因为是异步请求,所以发送完消息后,需要返回一个future对象。而且这个future还是可以set的。
我们想到了guava中的SettableFuture对象。
好,下面进入实战环节,重新定义kafka发送消息的方法。新增一个异步send,跟普通send方法不同,发送完消息之后,我们有构造了一个future对象。
//发送异步响应消息,需要写的唯一msgID,初步考虑使用UUID实现 public ResponseFuture sendAsync(String msgId,String msg) { send(TASK_TOPIC, msg); SettableFuture<String> future = SettableFuture.create(); ResponseFuture responseFuture = new ResponseFuture(future); ResponseFuture.put(msgId, responseFuture); return responseFuture; }
核心的实现在ResponseFuture中。它相当于一个缓冲池,用于存放请求和未返回的响应。
@Data public class ResponseFuture { public static final Map<String, ResponseFuture> map = new ConcurrentHashMap<>(256); public static void remove(String msgId) { map.remove(msgId); } public static void put(String msgId, ResponseFuture future) { map.put(msgId, future); } public static void setResponse(String msgId, ResponseBody result) { ResponseFuture response = map.get(msgId); response.setCurrentTime(System.currentTimeMillis()); response.getFuture().set(result); map.put(msgId, response); } private Long currentTime; private SettableFuture<ResponseBody> future; public ResponseFuture( SettableFuture<ResponseBody> future) { this.currentTime = System.currentTimeMillis(); this.future = future; } //todo:周期性调度删除过期数据 public static void clear() { for (Map.Entry<String, ResponseFuture> entry : map.entrySet()) { ResponseFuture value = entry.getValue(); Long currentTime = value.getCurrentTime(); if (System.currentTimeMillis() - currentTime > 5000) { map.remove(entry.getKey()); } } } }
kafka接收消息
/** * 监听回复消息 * @param record */ public void listenResponse(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info(message.toString()); String msgId = message.toString().substring(1); ResponseBody responseBody = JSON.parseObject(message.toString(), ResponseBody.class); ResponseFuture.setResponse(msgId,responseBody); } }
发送和接收的主函数
public String testAsync() throws ExecutionException, InterruptedException { UUID uuid = UUID.randomUUID(); String msgId = uuid.toString(); String msg = ""; ResponseFuture responseFuture = kafkaSender.sendAsync(msgId, msg); SettableFuture<ResponseBody> future = responseFuture.getFuture(); try { ResponseBody responseBody = future.get(1, TimeUnit.SECONDS); return responseBody.getMsg(); } catch (TimeoutException e) { e.printStackTrace(); } ResponseFuture.remove(msgId); return null; }
标签:异步,String,ResponseFuture,map,msgId,kafka,响应,future,public 来源: https://www.cnblogs.com/wangbin2188/p/16541074.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。