Nacos注册中心原理及源码系列(五)- 心跳机制与服务健康检查设计原理及源码剖析

2021/10/31 11:39:33

本文主要是介绍Nacos注册中心原理及源码系列(五)- 心跳机制与服务健康检查设计原理及源码剖析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

心跳机制与服务健康检查设计原理及源码剖析

   客户端会调用/nacos/v1/ns/instance/beat接口进行心跳,主要逻辑有:

   1.如果在Nacos Server没有找到相对应的Instance,那么就构造一个Instance,源码如下:

    // com.alibaba.nacos.naming.controllers.InstanceController#beat
    @CanDistro
    @PutMapping("/beat")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public ObjectNode beat(HttpServletRequest request) throws Exception {
    // 省略其它代码。。。。。。。

    // 获取Instance
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    // 如果Instance为null,那么就构造一个Instance进行注册
    if (instance == null) {
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }

        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                             + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);

        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());
		// 把Client注册到Nacos
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }

    Service service = serviceManager.getService(namespaceId, serviceName);
	// 省略其它代码。。。。。。。
	// 处理Client端的心跳
    service.processClientBeat(clientBeat);

   // 省略其它代码。。。。。。。
    return result;
}

2.如果在Nacos Server中有当前实例,那么就更新一下lastBeat、healthy等,源码如下:

// com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor#run
@Override
public void run() {
    // 省略其它代码。。。。。。。

    for (Instance instance : instances) {
        // 从Service中找到需要更新的Instance
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
            }
            // 把当前时间设置到Insance的lastBeat中
            instance.setLastBeat(System.currentTimeMillis());
            if (!instance.isMarked()) {
                if (!instance.isHealthy()) {
                    // 更新一下健康状态
                    instance.setHealthy(true);
                    Loggers.EVT_LOG
                        .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                              cluster.getService().getName(), ip, port, cluster.getName(),
                              UtilsAndCommons.LOCALHOST_SITE);
                    getPushService().serviceChanged(service);
                }
            }
        }
    }
}

    服务端也会启动线程去检测客户端心跳信息,来判断客户端是否存活,Nacos是怎么启动心跳检测的,怎么心跳检测的?

    请看源码:

// com.alibaba.nacos.naming.core.Service#init
// 在注册Instance的时候会调用这个方法
public void init() {
    // 
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}
// com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck
public static void scheduleCheck(ClientBeatCheckTask task) {
    // 这里会启动一个定时任务来检测心跳,按照的是固定频率5s
    // 其实这里最后采用的JDK的scheduleWithFixedDelay
    futureMap.computeIfAbsent(task.taskKey(),
                              k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

// com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run
@Override
public void run() {
    try {
        // 判断是否是当前Nacos实例来启动心跳检测
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }

        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }

        List<Instance> instances = service.allIPs(true);

        // first set health status of instances:
        // 检测实例的健康状态
        for (Instance instance : instances) {
            // 判断心跳是否超时,默认超时时间为15秒
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        // 如果心跳超时,首先把此服务的健康状态设置为false
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                            .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                  instance.getIp(), instance.getPort(), instance.getClusterName(),
                                  service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                  instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());

                        // 发布一个服务变更事件
                        getPushService().serviceChanged(service);
                        // 发布一个心跳超时事件
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }

        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }

        // then remove obsolete instances:
        // 删除过时的实例(默认30s)
        for (Instance instance : instances) {

            if (instance.isMarked()) {
                continue;
            }

            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                                     JacksonUtils.toJson(instance));
                deleteIp(instance);
            }
        }

    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }

}

    默认的心跳超时时间为15s,如果发现Instance中的lastBeat的时间与当前时间对比,小于当前时间15s以上那么就判断Instance为不健康的状态,首先会设置Instance中的healthy为false,然后发布一个服务变更事件,再发布一个心跳超时事件。



这篇关于Nacos注册中心原理及源码系列(五)- 心跳机制与服务健康检查设计原理及源码剖析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程