当前位置 : 主页 > 编程语言 > java >

源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理

来源:互联网 收集:自由互联 发布时间:2022-10-15
源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理 我们了解到Skywalking的agent在进行启动初始化的时候会对服务进行注册,对应的逻辑是ServiceAndEndpointRegisterClient的run()方

源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理

我们了解到Skywalking的agent在进行启动初始化的时候会对服务进行注册,对应的逻辑是ServiceAndEndpointRegisterClient的run()方法

服务注册

ServiceAndEndpointRegisterClient的run()方法中会调用RegisterServiceHandler的doServiceRegister()方法进行注册

RegisterServiceHandler的doServiceRegister()方法:

@Override public void doServiceRegister(Services request, StreamObserver<ServiceRegisterMapping> responseObserver) { ServiceRegisterMapping.Builder builder = ServiceRegisterMapping.newBuilder(); request.getServicesList().forEach(service -> { String serviceName = service.getServiceName(); if (logger.isDebugEnabled()) { logger.debug("Register service, service code: {}", serviceName); } int serviceId = serviceInventoryRegister.getOrCreate(serviceName, null); if (serviceId != Const.NONE) { KeyIntValuePair value = KeyIntValuePair.newBuilder().setKey(serviceName).setValue(serviceId).build(); builder.addServices(value); } }); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); }
  • 从请求中获取服务名
  • 调用serviceInventoryRegister的getOrCreate()方法根据服务名获取服务的id
  • 将服务id返回给Agent
  • 获取服务ID

    serviceInventoryRegister的getOrCreate()

    @Override public int getOrCreate(String serviceName, JsonObject properties) { int serviceId = getServiceInventoryCache().getServiceId(serviceName); if (serviceId == Const.NONE) { ServiceInventory serviceInventory = new ServiceInventory(); serviceInventory.setName(serviceName); serviceInventory.setAddressId(Const.NONE); serviceInventory.setIsAddress(BooleanUtils.FALSE); long now = System.currentTimeMillis(); serviceInventory.setRegisterTime(now); serviceInventory.setHeartbeatTime(now); serviceInventory.setMappingServiceId(Const.NONE); serviceInventory.setLastUpdateTime(now); serviceInventory.setProperties(properties); InventoryStreamProcessor.getInstance().in(serviceInventory); } return serviceId; }
  • 获取ServiceInventoryCache缓存实例,调用ServiceInventoryCache对象的getServiceId()方法获取serviceId,这个方法中会先从缓存中查找,找到直接返回,没有找到就会调用ServiceInventoryCacheEsDAO的get()方法,也就是通过ElasticSearch客户端查询根据指定的Document Id进行查询sequence,返回结果并放入缓存,下次从缓存中直接获取到。
  • 如果serviceId为空,创建ServiceInventory对象,设置相关参数,调用InventoryStreamProcessor实例的in()方法对该服务生成serviceId中,放入缓存中下次直接能取到,底层其实是调用的DataCarrier的produce()方法将数据放入缓存中,等待消费者消费创建serviceId。InventoryStreamProcessor是异步的过程,不会阻塞当前方法。
  • 从ServiceAndEndpointRegisterClient的run()方法的逻辑我们知道如果serviceId返回0的话,会创建线程不断重试,直到服务Id都不会空

    服务实例注册

    ServiceAndEndpointRegisterClient的run()方法中会调用RegisterServiceHandler的doServiceInstanceRegister()方法进行服务实例注册

    RegisterServiceHandler的doServiceInstanceRegister()方法:

  • 遍历请求的服务实例的集合,根据请求中的服务ID从缓存中获取ServiceInventory对象,这个对象是服务的相关信息
  • 整合request中的host_name、os_name、ip地址、语言、进程ID等参数,放入JsonObject对象中
  • 根据服务名、进程ID和主机名构建服务实例名称
  • 根据请求参数调用ServiceInstanceInventoryRegister的getOrCreate()方法获取服务实例的id
  • 将服务实例Id返回给Agent
  • 获取服务实例的ID

    ServiceInstanceInventoryRegister的getOrCreate()方法:

    @Override public int getOrCreate(int serviceId, String serviceInstanceName, String uuid, long registerTime, JsonObject properties) { if (logger.isDebugEnabled()) { logger.debug("Get or create service instance by service instance name, service id: {}, service instance name: {},uuid: {}, registerTime: {}", serviceId, serviceInstanceName, uuid, registerTime); } int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, uuid); if (serviceInstanceId == Const.NONE) { ServiceInstanceInventory serviceInstanceInventory = new ServiceInstanceInventory(); serviceInstanceInventory.setServiceId(serviceId); serviceInstanceInventory.setName(serviceInstanceName); serviceInstanceInventory.setInstanceUUID(uuid); serviceInstanceInventory.setIsAddress(BooleanUtils.FALSE); serviceInstanceInventory.setAddressId(Const.NONE); serviceInstanceInventory.setMappingServiceInstanceId(Const.NONE); serviceInstanceInventory.setRegisterTime(registerTime); serviceInstanceInventory.setHeartbeatTime(registerTime); serviceInstanceInventory.setProperties(properties); InventoryStreamProcessor.getInstance().in(serviceInstanceInventory); } return serviceInstanceId; }
  • 获取ServiceInstanceInventoryCache缓存实例,调用ServiceInstanceInventoryCache对象的getServiceInstanceId()方法获取服务实例ID,这个方法中会先从缓存中查找,找到直接返回,没有找到就会调用ServiceInstanceInventoryCacheDAO的get()方法,通过是通过ElasticSearch客户端查询根据指定的Document Id进行查询sequence,返回结果并放入缓存,下次从缓存中直接获取到。
  • 如果serviceId为空,创建ServiceInstanceInventory对象,设置相关参数,调用InventoryStreamProcessor实例的in()方法对该服务生成服务实例ID,放入缓存中下次直接能取到,底层其实是调用的DataCarrier的produce()方法将数据放入缓存中,等待消费者消费创建服务实例ID。InventoryStreamProcessor是异步的过程,不会阻塞当前方法。
  • 同样的,我们看ServiceAndEndpointRegisterClient的run()方法中的逻辑,如果服务实例ID为空的话会一直进行重试,直到服务实例ID不为空

    总结

    这篇文章主要介绍了服务端对服务注册和服务实例注册的处理逻辑,这两个方法的整体逻辑是非相似,获取服务Id或者服务实例ID都是先从缓存中获取,缓存中没有就从es数据库中查询,查询到放入缓存,如果es数据库中也没有就会创建异步操作的InventoryStreamProcessor创建服务ID或服务实例ID

    ❤️ 感谢大家

    如果你觉得这篇内容对你挺有有帮助的话:

  • 欢迎关注我❤️,点赞
  • 上一篇:Algorithm-Day02
    下一篇:没有了
    网友评论