源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理
在服务已经完成了注册和服务实例注册之后,Agent会定期发送心跳请求,也就是调用serviceInstancePingStub的doPing()方法,通知Skywalking的OAP服务Agent状态良好,同时进行网络地址的同步和端点同步。
我们看一下服务端对心跳请求的处理
服务端对心跳请求的处理
skywalking-register-receiver-plugin模块的RegisterModuleProvider的start()方法中会在GRPCServer中添加处理类ServiceInstancePingServiceHandler,
ServiceInstancePingServiceHandler的doPing()方法:
@Override public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) { int serviceInstanceId = request.getServiceInstanceId(); long heartBeatTime = request.getTime(); serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime); ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId); if (Objects.nonNull(serviceInstanceInventory)) { serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime); responseObserver.onNext(Commands.getDefaultInstance()); } else { logger.warn("Can't find service by service instance id from cache," + " service instance id is: {}, will send a reset command to agent side", serviceInstanceId); final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request.getTime(), request.getServiceInstanceUUID()); final Command command = resetCommand.serialize().build(); final Commands nextCommands = Commands.newBuilder().addCommands(command).build(); responseObserver.onNext(nextCommands); } responseObserver.onCompleted(); }整体的逻辑还是比较简单的,服务端做的事情就是根据Agent请求的数据修改ES数据库中的服务的心跳时间的数据和服务实例心跳时间的数据
服务端对同步网络地址的映射关系的处理
对应的是RegisterServiceHandler的doNetworkAddressRegister()方法:
@Override public void doNetworkAddressRegister(NetAddresses request, StreamObserver<NetAddressMapping> responseObserver) { NetAddressMapping.Builder builder = NetAddressMapping.newBuilder(); request.getAddressesList().forEach(networkAddress -> { int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress, null); if (addressId != Const.NONE) { builder.addAddressIds(KeyIntValuePair.newBuilder().setKey(networkAddress).setValue(addressId)); } }); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); }这个方法比较简单就是对请求的地址集合进行遍历,调用NetworkAddressInventoryRegister的getOrCreate()方法将数据放入es数据库中,对应的是network_address_inventory索引,getOrCreate()方法会填充addressId字段,还会建立网络地址与服务的关联和网络地址与服务实例的关联。
服务端同步Endpoint的映射关系的处理
服务端对应的逻辑是RegisterServiceHandler的doEndpointRegister()方法
@Override public void doEndpointRegister(Endpoints request, StreamObserver<EndpointMapping> responseObserver) { EndpointMapping.Builder builder = EndpointMapping.newBuilder(); request.getEndpointsList().forEach(endpoint -> { int serviceId = endpoint.getServiceId(); String endpointName = endpoint.getEndpointName(); DetectPoint detectPoint = DetectPoint.fromNetworkProtocolDetectPoint(endpoint.getFrom()); if (DetectPoint.SERVER.equals(detectPoint)) { int endpointId = inventoryService.getOrCreate(serviceId, endpointName, detectPoint); if (endpointId != Const.NONE) { builder.addElements(EndpointMappingElement.newBuilder() .setServiceId(serviceId) .setEndpointName(endpointName) .setEndpointId(endpointId) .setFrom(endpoint.getFrom())); } } else { logger.warn("Unexpected endpoint register, endpoint isn't detected from server side. {}", request); } }); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); }这个方法和同步网络地址的方法逻辑差不多,也是对请求的地址集合进行遍历,获取请求中的端点的服务ID和端点名,根据网络端口探测是客户端还是服务端,如果是服务端调用EndpointInventoryRegister的getOrCreate()方法将数据放入es数据库中,对应的是endpoint_inventory索引
总结
这篇文章主要讲了Agent初始化的过程中调用服务注册服务实例注册后,服务进行心跳请求和网络地址同步和端点同步,服务端是如何处理心跳检测、同步网络地址和同步端点的。这三个方法的逻辑都差不多,都是通过InventoryStreamProcessor来操作ES数据库,ServiceInstancePingServiceHandler处理心跳请求更新服务和服务实例的心跳时间,RegisterServiceHandler在网络地址同步方法中通过NetworkAddressInventoryRegister填充 network_address_inventory的addressId字段并创建网络地址与服务的关联和网络地址与服务实例的关联,RegisterServiceHandler在端点同步中通过EndpointInventoryRegister构建endpoint_inventory索引的数据。
❤️ 感谢大家
如果你觉得这篇内容对你挺有有帮助的话: