当前位置 : 主页 > 编程语言 > java >

源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理

来源:互联网 收集:自由互联 发布时间:2022-10-15
源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理 在服务已经完成了注册和服务实例注册之后,Agent会定期发送心跳请求,也就是调用serviceInstancePingStub的

源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理

在服务已经完成了注册和服务实例注册之后,Agent会定期发送心跳请求,也就是调用serviceInstancePingStub的doPing()方法,通知Skywalking的OAP服务Agent状态良好,同时进行网络地址的同步和端点同步。

我们看一下服务端对心跳请求的处理

服务端对心跳请求的处理

skywalking-register-receiver-plugin模块的RegisterModuleProvider的start()方法中会在GRPCServer中添加处理类ServiceInstancePingServiceHandler,

ServiceInstancePingServiceHandler的doPing()方法:

@Override public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) { int serviceInstanceId = request.getServiceInstanceId(); long heartBeatTime = request.getTime(); serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime); ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId); if (Objects.nonNull(serviceInstanceInventory)) { serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime); responseObserver.onNext(Commands.getDefaultInstance()); } else { logger.warn("Can't find service by service instance id from cache," + " service instance id is: {}, will send a reset command to agent side", serviceInstanceId); final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request.getTime(), request.getServiceInstanceUUID()); final Command command = resetCommand.serialize().build(); final Commands nextCommands = Commands.newBuilder().addCommands(command).build(); responseObserver.onNext(nextCommands); } responseObserver.onCompleted(); }
  • 从请求中获取服务实例ID和心跳发送时间
  • 调用ServiceInstanceInventoryRegister的heartbeat()更新服务实例的心跳时间,也就是对应ES中的service_instance_inventory索引的相应Document的heartbeat_time字段
  • 根据服务实例ID从缓存中获取服务实例清单,从而获取服务的ID
  • 调用ServiceInventoryRegister的heartbeat()更新服务的心跳时间,也就是对应ES中的service_inventory索引的相应Document的heartbeat_time字段
  • 整体的逻辑还是比较简单的,服务端做的事情就是根据Agent请求的数据修改ES数据库中的服务的心跳时间的数据和服务实例心跳时间的数据

    服务端对同步网络地址的映射关系的处理

    对应的是RegisterServiceHandler的doNetworkAddressRegister()方法:

    @Override public void doNetworkAddressRegister(NetAddresses request, StreamObserver<NetAddressMapping> responseObserver) { NetAddressMapping.Builder builder = NetAddressMapping.newBuilder(); request.getAddressesList().forEach(networkAddress -> { int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress, null); if (addressId != Const.NONE) { builder.addAddressIds(KeyIntValuePair.newBuilder().setKey(networkAddress).setValue(addressId)); } }); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); }

    这个方法比较简单就是对请求的地址集合进行遍历,调用NetworkAddressInventoryRegister的getOrCreate()方法将数据放入es数据库中,对应的是network_address_inventory索引,getOrCreate()方法会填充addressId字段,还会建立网络地址与服务的关联和网络地址与服务实例的关联。

    服务端同步Endpoint的映射关系的处理

    服务端对应的逻辑是RegisterServiceHandler的doEndpointRegister()方法

    @Override public void doEndpointRegister(Endpoints request, StreamObserver<EndpointMapping> responseObserver) { EndpointMapping.Builder builder = EndpointMapping.newBuilder(); request.getEndpointsList().forEach(endpoint -> { int serviceId = endpoint.getServiceId(); String endpointName = endpoint.getEndpointName(); DetectPoint detectPoint = DetectPoint.fromNetworkProtocolDetectPoint(endpoint.getFrom()); if (DetectPoint.SERVER.equals(detectPoint)) { int endpointId = inventoryService.getOrCreate(serviceId, endpointName, detectPoint); if (endpointId != Const.NONE) { builder.addElements(EndpointMappingElement.newBuilder() .setServiceId(serviceId) .setEndpointName(endpointName) .setEndpointId(endpointId) .setFrom(endpoint.getFrom())); } } else { logger.warn("Unexpected endpoint register, endpoint isn't detected from server side. {}", request); } }); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); }

    这个方法和同步网络地址的方法逻辑差不多,也是对请求的地址集合进行遍历,获取请求中的端点的服务ID和端点名,根据网络端口探测是客户端还是服务端,如果是服务端调用EndpointInventoryRegister的getOrCreate()方法将数据放入es数据库中,对应的是endpoint_inventory索引

    总结

    这篇文章主要讲了Agent初始化的过程中调用服务注册服务实例注册后,服务进行心跳请求和网络地址同步和端点同步,服务端是如何处理心跳检测、同步网络地址和同步端点的。这三个方法的逻辑都差不多,都是通过InventoryStreamProcessor来操作ES数据库,ServiceInstancePingServiceHandler处理心跳请求更新服务和服务实例的心跳时间,RegisterServiceHandler在网络地址同步方法中通过NetworkAddressInventoryRegister填充 network_address_inventory的addressId字段并创建网络地址与服务的关联和网络地址与服务实例的关联,RegisterServiceHandler在端点同步中通过EndpointInventoryRegister构建endpoint_inventory索引的数据。

    ❤️ 感谢大家

    如果你觉得这篇内容对你挺有有帮助的话:

  • 欢迎关注我❤️,点赞
  • 网友评论