源码角度了解Skywalking之服务端OAP对JVM监控的处理
Skywalking 的Agent对JVM信息收集发送给OAP后,由OAP的JVMMetricReportServiceHandler的collect()方法进行处理
客户端对应的逻辑在apm-agent-core模块的jvm包下的 JVMService类中
服务端的逻辑在skywalking-jvm-receiver-plugin模块的JVMMetricReportServiceHandler类中,接收Agent发送的JVMMetric信息。
接收JVMMetric信息
JVMMetricReportServiceHandler的collect()方法
JVMMetricReportServiceHandler的collect()方法:
@Override public void collect(JVMMetricCollection request, StreamObserver<Commands> responseObserver) { int serviceInstanceId = request.getServiceInstanceId(); if (logger.isDebugEnabled()) { logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId); } request.getMetricsList().forEach(metrics -> { long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime()); jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics); }); responseObserver.onNext(Commands.newBuilder().build()); responseObserver.onCompleted(); }JVMSourceDispatcher的sendMetric()方法
JVMSourceDispatcher的sendMetric()方法:
void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) { ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId); int serviceId; if (Objects.nonNull(serviceInstanceInventory)) { serviceId = serviceInstanceInventory.getServiceId(); } else { logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId); return; } this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu()); this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList()); this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList()); this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList()); }消费处理
MetricsStreamProcessor作为生产者,把JVMMetric数据放入缓存队列中,ConsumerThread是消费者线程,run()中执行了MetricsAggregateWorker的consume()方法来消费缓存队列中的JVMMetric信息
涉及到的相关Worker是由MetricsStreamProcessor的create()方法中定义的
MetricsAggregateWorker会对数据进行聚合,写入缓存中,然后发送给下一个Worker进行处理。
MetricsRemoteWorker会发送Metrics数据到远端的节点
MetricsTransWorker的in()方法中按照小时、天和月调整时间窗口的粒度来调用下一个Worker
MetricsPersistentWorker对Metrics数据进行持久化
总结
这篇文章主要讲了Skywalking的Agent发送JVM数据请求后服务端的处理逻辑。服务端通过JVMMetricReportServiceHandler来接收Agent的请求数据,按照CPU、内存、内存池和GC数据调用方法进行数据,最后通过MetricsStreamProcessor定义的一个个Workder处理数据,持久化到ES中。
❤️ 感谢大家
如果你觉得这篇内容对你挺有有帮助的话:
欢迎关注我❤️,点赞