ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

天呐,更新小小的注册表居然这么复杂?【手撕eureka源码NO.2】

2021-11-17 13:02:00  阅读:242  来源: 互联网

标签:缓存 天呐 eureka 源码 client 注册表 server 数据


通过前面的学习,我们已经知道eureka-clienteureka-server主动上报了自己的通信地址,这一过程是通过调用服务注册的接口来完成的。

现在eureka-server能够获取到所有eureka-client的通信地址了,可eureka-client是怎么获取其他eureka-client的通信地址的呢?本篇揭晓。

eureka-client是怎样初始化的

先来看看eureka-client初始化时做了哪些操作。不知道大家还记不记得,上一篇讲过在eureka-server的初始化过程中,有一个步骤是创建eurekaClient对象。

image-20210822112300020

为什么eureka-server初始化过程,需要创建eurekaClient对象呢?

如果我们的eureka-server只部署在一台服务器上,并且只部署一个eureka-server,那么当然不需要创建eurekaClient。可一旦需要将eureka-server部署成一个集群,情况就变得复杂起来了。

因为在集群eureka-server之间需要相互注册,所以每一个eureka服务端,同时也是eureka的客户端

这里只是简单的提一下,具体的内容,后面讲解eureka-server集群相关的知识时再做补充。此处我们只是借助eureka-server的启动,来查看eurekaClient的初始化逻辑。

先贴一小段代码看看,大家脑海里有个大概的印象,后面画一张流程图讲解。

//创建eurekaClient对象
if (eurekaClient == null) {
    //读取eureka-client.properties中的配置
    EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
        ? new CloudInstanceConfig()
        : new MyDataCenterInstanceConfig();

    //通过instanceConfig和InstanceInfo构造Manager对象
    applicationInfoManager = new ApplicationInfoManager(
        instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

    EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
    //创建eurekaClient
    eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
}

流程图:

eureka-client初始化

InstanceInfo上一章里提到过,里面包含了服务的ip、端口号、服务名称、实例id等信息。服务注册时,eureka-client会把InstanceInfo中的信息发送给eureka-server。

现在大家知道InstanceInfo是在哪里创建的了:基于我们自己的配置文件+一些默认配置构成。

eureka-client是怎样获取注册表的

上文创建了一个DiscoveryClient对象,在该对象创建的过程中,eureka-client发起http请求向eureka-server请求到了所有的注册表信息。

老规矩,先上一张DiscoveryClient创建的源码简图。

全量拉取注册表(eureka-client)

拉取注册表的主要逻辑在getAndStoreFullRegistry()方法中,贴出来看一下(部分代码省略)

private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
        Applications apps = null;
    	//构建http请求,发起请求
        EurekaHttpResponse<Applications> httpResponse = 略...;
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            //从请求结果中,获取服务列表
            apps = httpResponse.getEntity();
        }
        if (apps == null) {
            logger.error("略...");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            //保存服务列表
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("略...");
        }
    }

服务列表被保存在AtomicReference<Applications> localRegionApps中,稍后我们会通过断点来看看Applications中的内容。

在此之前,我们先来看看,eureka-server在接收到客户端http请求后做了哪些逻辑处理?你可能会想:这有什么难的?直接把注册表数据返回不就可以了?憋着急,我们来看看源码验证一下你的猜想正不正确。

eureka-server是怎样返回注册表的

eureka-server接收http请求的代码,在eureka-core工程下的ApplicationsResource.java中,目录结构如下:

image-20210904145628768

返回服务列表的方法是getContainers(参数略),下面我们通过一张代码简图,来看看方法内部的主要逻辑。

全量拉取注册表(eureka-server)

在eureka-server的处理逻辑中,用到了一套多级缓存机制,返回服务列表时,不是直接返回注册表,而是先从只读缓存中读取,如果没有缓存数据,再从读写缓存中取,如果读写缓存也没有,则从注册表读取。

只读缓存:ConcurrentMap<Key, Value> readOnlyCacheMap

读写缓存:LoadingCache<Key, Value> readWriteCacheMap,基于com.google.common.cache.LoadingCache实现。

缓存读取流程如图:

多级缓存流程图

既然这里使用了缓存,那么一个新的问题就被引入了,什么时候过期缓存数据呢?

1.创建readWriteCacheMap时,指定了180s自动过期。(定时过期)

public class ResponseCacheImpl implements ResponseCache {
	private final LoadingCache<Key, Value> readWriteCacheMap;
    //读写缓存创建
    this.readWriteCacheMap =
        CacheBuilder.newBuilder().initialCapacity(1000)
        //从配置中读取缓存过期时间,指定时间单位为-秒
        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
}

时间配置在DefaultEurekaServerConfig.java中:

public class DefaultEurekaServerConfig implements EurekaServerConfig {
	@Override
    public long getResponseCacheAutoExpirationInSeconds() {
        //默认配置180
        return configInstance.getIntProperty(
                namespace + "responseCacheAutoExpirationInSeconds", 180).get();
    }
}

2.有新服务注册时,缓存会过期。(主动过期)

前面的内容里,我们讲解服务注册时,实际上是调用了eureka-serverAbstractInstanceRegistry.javaregister()方法,当时我们忽略了一些细节,其中就包括过期缓存的这一小段。现在我们来看一看。

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
	public void register(略...) {
        //调用内部的invalidateCache()方法
		invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), 
                        registrant.getSecureVipAddress());
	}
    
    private void invalidateCache(略...) {
        // 调用ResponseCacheImpl.java中过期缓存的方法
        responseCache.invalidate(appName, vipAddress, secureVipAddress);
    }
}

invalidateCache()可以看出,实际上还是使用的ResponseCacheImpl.java内部定义的invalidate()方法。

public void invalidate(Key... keys) {
    for (Key key : keys) {
        //使用缓存框架自带的清除方法,清除缓存
        readWriteCacheMap.invalidate(key);
        //略...
    }
}

3.ResponseCacheImpl.java创建时启动了一个定时任务,每隔30秒就会去过期缓存。(被动过期)

public class ResponseCacheImpl implements ResponseCache {
	//构造方法
    ResponseCacheImpl(略...){
        //是否启动只读缓存,默认true
        if (shouldUseReadOnlyResponseCache) {
            //具体要执行的任务:getCacheUpdateTask()
            timer.schedule(getCacheUpdateTask(),
                           //间隔多少毫秒后,首次执行此任务
                           new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs),
                           //每隔多少秒后,重复执行此任务
                           //config中的配置:"responseCacheUpdateIntervalMs", (30 * 1000)
                        responseCacheUpdateIntervalMs);
            }
    }
    
    //具体的缓存处理逻辑
    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                //遍历readOnlyCacheMap中所有缓存
                for (Key key : readOnlyCacheMap.keySet()) {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    //如果'只读缓存'中的数据和'读写缓存'的数据不一致,则用'读写缓存'数据覆盖掉'只读缓存'
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                }
            }
        };
    }
}

一张图总结一下:

缓存过期全图

最后,我们来看一下eureka-server返回的注册表数据长什么样。

返回的注册表数据1

Applications大概长什么样。

Applications清晰图

至此eureka-client获取注册表的机制我们就学习了二分之一了。

为什么说是二分之一?憋着急,继续往后看。

现在eureka-client已经能够获取到注册表了,看上去似乎没什么问题,可只要我们多想一步,就会发现问题所在。比如,一旦有新的服务注册到eureka-server上去,之前已经获取到注册表的eureka-client怎么同步新的数据呢?

eureka-client是怎么更新注册表的

为了获取到eureka-server端发生变动的注册表,在eureka-client初始化时启动了一个定时任务,每隔30秒就向eureka-server请求一次变化的注册表数据。

客户端定时抓取增量注册表

本文开始的代码简图中,我们梳理了discoverClient创建过程,其中有一段是initScheduledTasks(),也就是初始化定时任务的地方,当时让大家暂时忽略,现在我们来看一下。

initScheduledTasks()中包含多个定时任务,暂时我们只关注了其中刷新注册表的定时任务,余下部分依然还是后面用到时再来看。

主要逻辑代码如下:

@Singleton
public class DiscoveryClient implements EurekaClient {
	private final ScheduledExecutorService scheduler;
    private void initScheduledTasks() {
        //略...
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                //间隔多久执行一次定时任务,默认30(从默认配置获取)
                registryFetchIntervalSeconds,
                //间隔时间的单位
                TimeUnit.SECONDS,
                expBackOffBound,
                //具体任务逻辑
                new CacheRefreshThread()
            ),registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
}

这里我们先来看一下eureka-client默认配置是定义在什么地方的,定义在DefaultEurekaClientConfig.java中:

public class DefaultEurekaClientConfig implements EurekaClientConfig {
    @Override
    public int getRegistryFetchIntervalSeconds() {
        //默认配置:定时器间隔多久执行
        return configInstance.getIntProperty(
                namespace + REGISTRY_REFRESH_INTERVAL_KEY, 30).get();
        //配置的key定义:PropertyBasedClientConfigConstants.java
        //String REGISTRY_REFRESH_INTERVAL_KEY = "client.refresh.interval";
    }
}

了解了一下默认配置,接下来我们把思路拉回来,继续看刷新注册表的定时任务。

由于代码内部的方法调用过于复杂,此处就不贴源码了,采用代码简图代替。

客户端,定时抓取注册表,代码简图

最主要的方法是getAndUpdateDelta(),贴出源码看一下:

private void getAndUpdateDelta(Applications applications)
    Applications delta = null;
	//发送获取增量注册表的请求
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.
        getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
	//如果返回数据为空,则重新拉取所有的注册表
	if (delta == null) {
        getAndStoreFullRegistry();
    }else if(略...){
        //将获取到的注册表数据,与本地合并
        updateDelta(delta);
        //将合并后的结果,生成一个HashCode (此处稍后分析)
        String reconcileHashCode = getReconcileHashCode(applications);
        //对比http请求返回的HashCode和新生成的HashCode
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()){
            //对比结果不一致,重新拉取全量注册表
            reconcileAndLogDifference(delta, reconcileHashCode);  
        }
    }else{
        //log.err(..);
    }

)

从源代码中可以看到,eureka-client获取到数据后,本地进行了一个合并校验数据的过程,我们来看看这一块。

注册表数据合并:eureka-client获取到注册表数据后,会根据一个ActionType来判断服务实例的变动类型,也就是判断服务实例到底是需要新增到本地的注册表中,还是要从本地注册表删除,还是需要将本地的某个服务实例信息进行更新。

先看一眼源码,稍后通过流程图加以理解:

private void updateDelta(Applications delta) {
    //遍历获取到的注册表信息
	for (Application app : delta.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                if (ActionType.ADDED.equals(instance.getActionType())) {
                  //新增
                 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                }else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                  //修改
                 applications.***.addInstance(instance);
                }else if (ActionType.DELETED.equals(instance.getActionType())) {
                  //删除
                 applications.***.removeInstance(instance);
                }
            }
    }
}

**数据校验:**在eureka-server更新完本地注册表后,会将本地的注册表信息做一个hash计算得到一个哈希值,同时eureka-server在返回数据时,也携带了一个哈希值。

从理论上讲经过一轮更新后,eureka-server和eureka-client中的注册表数据是完全一致的,所以得出的hash计算结果也应该是一样的。

如果不一样,说明eureka-client和eureka-server之间的数据同步出现了问题,那么此时eureka-client会重新向eureka-server请求一次全部的注册表数据。然后将新获取到的数据覆盖掉本地的注册表数据,以保证自己和eureka-server的数据一致。

结合流程图,理解合并和数据校验过程:

20-client-增量注册表合并

以上,整个eureka-client端是怎样定时发送http请求的,获取到数据是怎么进行合并和校验的,就已经梳理清楚了。

接下来看看eureka-server的接口是怎样处理的呢?还是通过多级缓存机制返回数据吗?

eureka-server增量注册表数据是怎么维护的

对于增量注册表数据,eureka-server依然是通过多级缓存机制来返回,但是由于注册表信息在不断发生变化,所以eureka-server是不会重复的返回所有的注册表信息的,在这里eureka-server借助了Queue(队列)来记录变化的那一部分注册表信息。

每当发生服务注册或者服务主动下线时,就将变化的注册表信息发送到一个recentlyChangedQueue中,同时在需要这部分数据时,直接取queue中的数据即可。

配合流程图理解:

21-server-最近变化队列

recentlyChangedQueue定义在AbstractInstanceRegistry.java中,如果大家还有印象,就能记得前面代表注册表ConcurrentHashMap也是定义在这里。

我们简单来看看recentlyChangedQueue长什么样:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    //变量定义
    private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue 
        = new ConcurrentLinkedQueue<RecentlyChangedItem>();
    //队列中存放的数据类型(一个内部类)
    private static final class RecentlyChangedItem {
        private long lastUpdateTime;
        private Lease<InstanceInfo> leaseInfo;
    }
}

其中的LeaseInstanceInfo分别代表租约信息和服务实例信息,前面已经讲过了。

接下来我们看一下readWriteCacheMapQueue中数据的逻辑。

public class ResponseCacheImpl implements ResponseCache {
     private final AbstractInstanceRegistry registry;
     //readWriteCacheMap根据指定key未获取到数据,则执行此方法
     private Value generatePayload(Key key) {
         //获取全部注册表数据(全量数据)
         if (ALL_APPS.equals(key.getName())) {略..}
         //获取变化的注册表数据(增量数据)
         else if(ALL_APPS_DELTA.equals(key.getName())){
             payload = getPayLoad(key,
             //调用AbstractInstanceRegistry.java的
             //getApplicationDeltasFromMultipleRegions()方法获取队列数据
             registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
         } 
     }
}

大家想一想,这里使用队列存储数据会产生一个什么新的问题?随着时间推移,recentlyChangedQueue中的数据会不断增加,这就导致eureka-client多次定时获取数据时,会获取到重复的数据。

明明之前已经获取过的数据,再反复的重复的获取,完全没有必要。因为eureka-client获取到增量注册表数据后,还需要在本地做一些合并和校验工作,那么随着数据的增多,从网络传输、合并数据、校验数据整条工作路径上的效率都会降低。

所以eureka-server开启了一个定时任务,每隔30秒就清理一下recentlyChangedQueue中的数据,确保Queue中的数据是在180秒内发生变化的服务实例。

定时任务清除队列数据流程图:

22-server-最近队列过期

贴一小段简化源码:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    //构造方法
    protected AbstractInstanceRegistry(略..){
        //开启定时任务
        this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                //默认配置30s
                serverConfig.getDeltaRetentionTimerIntervalInMs(),
                serverConfig.getDeltaRetentionTimerIntervalInMs());
    }
    
    //定时任务执行逻辑
    private TimerTask getDeltaRetentionTask() {
        return new TimerTask() {
            @Override
            public void run() {
                //遍历队列中的数据
                Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                while (it.hasNext()) {
                    //将180s内未发生变动的服务,从队列中删除
                    if (it.next().getLastUpdateTime() <
                            System.currentTimeMillis() - 
                        //默认180s
                        serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                        it.remove();
                    } else {
                        break;
                    }
                }
            }
        };
    }
}

到此,整个服务注册和服务发现的核心流程我们就全部明白了,下一篇我们来看看eureka的心跳机制是如何实现的。

标签:缓存,天呐,eureka,源码,client,注册表,server,数据
来源: https://blog.csdn.net/VA_AV/article/details/121316590

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有