当前位置 : 主页 > 编程语言 > java >

源码角度了解Skywalking之服务端OAP对JVM监控的处理

来源:互联网 收集:自由互联 发布时间:2022-10-15
源码角度了解Skywalking之服务端OAP对JVM监控的处理 Skywalking 的Agent对JVM信息收集发送给OAP后,由OAP的JVMMetricReportServiceHandler的collect()方法进行处理 客户端对应的逻辑在apm-agent-core模块的j

源码角度了解Skywalking之服务端OAP对JVM监控的处理

Skywalking 的Agent对JVM信息收集发送给OAP后,由OAP的JVMMetricReportServiceHandler的collect()方法进行处理

客户端对应的逻辑在apm-agent-core模块的jvm包下的 JVMService类中

服务端的逻辑在skywalking-jvm-receiver-plugin模块的JVMMetricReportServiceHandler类中,接收Agent发送的JVMMetric信息。

接收JVMMetric信息

JVMMetricReportServiceHandler的collect()方法

JVMMetricReportServiceHandler的collect()方法:

@Override public void collect(JVMMetricCollection request, StreamObserver<Commands> responseObserver) { int serviceInstanceId = request.getServiceInstanceId(); if (logger.isDebugEnabled()) { logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId); } request.getMetricsList().forEach(metrics -> { long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime()); jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics); }); responseObserver.onNext(Commands.newBuilder().build()); responseObserver.onCompleted(); }
  • 获取请求的服务实例ID
  • 遍历JVMMetric数据,调用JVMSourceDispatcher的sendMetric()方法发送数据
  • JVMSourceDispatcher的sendMetric()方法

    JVMSourceDispatcher的sendMetric()方法:

    void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) { ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId); int serviceId; if (Objects.nonNull(serviceInstanceInventory)) { serviceId = serviceInstanceInventory.getServiceId(); } else { logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId); return; } this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu()); this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList()); this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList()); this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList()); }
  • 根据服务实例Id获取ServiceInstanceInventory信息,
  • 如果为空就返回
  • 如果不为空发送CPU、内存、内存池、GC信息,这四个方法都会调用SourceReceiverImpl的receive()方法,具体由DispatcherManager来选择对应的转发器进行转发,也就是SourceDispatcher接口的实现类,默认实现类是ServiceDispatcher,这个类是可观察性分析语言(OAL)自动生成的,这个类中最终交由MetricsStreamProcessor来处理数据放入缓存中
  • 消费处理

    MetricsStreamProcessor作为生产者,把JVMMetric数据放入缓存队列中,ConsumerThread是消费者线程,run()中执行了MetricsAggregateWorker的consume()方法来消费缓存队列中的JVMMetric信息

    涉及到的相关Worker是由MetricsStreamProcessor的create()方法中定义的

    MetricsAggregateWorker会对数据进行聚合,写入缓存中,然后发送给下一个Worker进行处理。

    MetricsRemoteWorker会发送Metrics数据到远端的节点

    MetricsTransWorker的in()方法中按照小时、天和月调整时间窗口的粒度来调用下一个Worker

    MetricsPersistentWorker对Metrics数据进行持久化

    总结

    这篇文章主要讲了Skywalking的Agent发送JVM数据请求后服务端的处理逻辑。服务端通过JVMMetricReportServiceHandler来接收Agent的请求数据,按照CPU、内存、内存池和GC数据调用方法进行数据,最后通过MetricsStreamProcessor定义的一个个Workder处理数据,持久化到ES中。

    ❤️ 感谢大家

    如果你觉得这篇内容对你挺有有帮助的话:

  • 欢迎关注我❤️,点赞

  • 上一篇:utools 好用的桌面工具软件
    下一篇:没有了
    网友评论