ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Nacos源码之客户端服务订阅机制核心流程

2022-05-21 01:03:28  阅读:154  来源: 互联网

标签:订阅 serviceKey String Nacos groupName 源码 clusters serviceName 客户端


1.Nacos订阅概述

Nacos的订阅机制如果用一句话来描述就是:Nacos客户端通过一个定时任务每6秒从注册中心获取实例列表,当发现实例发生变化时发布变更事件,订阅者进行业务处理(更新实例,更改本地缓存)

订阅方法整体流程:

image

2.定时任务开启

其实订阅本质上就是服务发现的一种方式,也就是在服务发现的时候执行订阅方法,触发定时任务去拉取服务端的数据。

NacosNamingService中暴露了许多重载的subscribe方法,重载的目的是让大家少写一些参数,这些参数Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
 throws NacosException {
 if (null == listener) {
     return;
 }
 String clusterString = StringUtils.join(clusters, ",");
 changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
 clientProxy.subscribe(serviceName, groupName, clusterString);
}

这里的subscribe方法实际上就是NamingClientProxyDelegate.subscribe(),和之前的服务发现中调用的是一个方法,这里其实是在做服务列表的查询,所以得出结论查询和订阅都调用了同一个方法:

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
 String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
 String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
 // 定时调度UpdateTask
 serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
 // 获取缓存中的ServiceInfo
 ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
 if (null == result) {
     // 如果为null,则进行订阅逻辑处理,基于gRPC协议
     result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
 }
 // ServiceInfo本地缓存处理
 serviceInfoHolder.processServiceInfo(result);
 return result;
}

这里我们要关注任务调度,该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask,而其中的addTask的实现就是发起了一个定时任务:

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
 String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
 if (futureMap.get(serviceKey) != null) {
     return;
 }
 synchronized (futureMap) {
     if (futureMap.get(serviceKey) != null) {
         return;
     }
		//构建UpdateTask
     ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
     futureMap.put(serviceKey, future);
 }
}

定时任务延迟一秒执行:

private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
 return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

所以在这里我们得出结论:核心为调用订阅方法和发起定时任务。

3.定时任务执行内容

UpdateTask封装了订阅机制的核心业务逻辑,流程图:

image

对应的源码:

@Override
public void run() {
 long delayTime = DEFAULT_DELAY;

 try {
     // 判断是服务是否订阅和未开启过定时任务,如果订阅过直接不在执行
     if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
         NAMING_LOGGER
             .info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
         return;
     }

     // 获取缓存的service信息
     ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
     // 如果为空
     if (serviceObj == null) {
         // 根据serviceName从注册中心服务端获取Service信息
         serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
         // 处理本地缓存
         serviceInfoHolder.processServiceInfo(serviceObj);
         lastRefTime = serviceObj.getLastRefTime();
         return;
     }

     // 过期服务,服务的最新更新时间小于等于缓存刷新(最后一次拉取数据的时间)时间,从注册中心重新查询
     if (serviceObj.getLastRefTime() <= lastRefTime) {
         serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
         // 处理本地缓存
         serviceInfoHolder.processServiceInfo(serviceObj);
     }
     //刷新更新时间
     lastRefTime = serviceObj.getLastRefTime();
     if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
         incFailCount();
         return;
     }
     // 下次更新缓存时间设置,默认6秒
     // TODO multiple time can be configured.
     delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
     // 重置失败数量为0(可能会出现失败情况,没有ServiceInfo,连接失败)
     resetFailCount();
 } catch (Throwable e) {
     incFailCount();
     NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
 } finally {
     // 下次调度刷新时间,下次执行的时间与failCount有关,failCount=0,则下次调度时间为6秒,最长为1分钟
     // 即当无异常情况下缓存实例的刷新时间是6秒
     executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
 }
}

业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。

4.总结

  1. 调用订阅方法,并进行EventListener的注册,后面UpdateTask要用来进行判断;

  2. 通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;

  3. 通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;

  4. UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。

  5. 重新计算定时任务时间,循环执行流程。

标签:订阅,serviceKey,String,Nacos,groupName,源码,clusters,serviceName,客户端
来源: https://www.cnblogs.com/ZT-666/p/16294226.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有