源码角度了解Skywalking之服务端OAP对Trace的处理
从前几篇的文章我们知道Skywalking对Trace信息进行生成收集后,将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送给OAP服务端,服务端处理对应的模块是skywalking-trace-receiver-plugin模块
TraceModuleProvider向GRPCHandlerRegister中添加处理器TraceSegmentReportServiceHandler
接收Agent数据
TraceSegmentReportServiceHandler的collect()方法接收Agent的数据,调用SegmentParseV2.Producer的send()方法发送
SegmentParseV2.Producer的send()方法:
public void send(UpstreamSegment segment, SegmentSource source) { SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config); segmentParse.setStandardizationWorker(standardizationWorker); segmentParse.parse(new BufferData<>(segment), source); }SegmentParseV2的parse()方法:
预构建
SegmentParseV2的preBuild()方法:
通知监听器者构建
这个方法遍历所有的span调用SegmentSpanListener的build()方法,设置Segment信息的端点id和端点名后调用SourceReceiverImpl的receive()方法,最终调用SegmentDispatcher的dispatch()方法
SegmentDispatcher的dispatch()方法:
@Override public void dispatch(Segment source) { SegmentRecord segment = new SegmentRecord(); segment.setSegmentId(source.getSegmentId()); segment.setTraceId(source.getTraceId()); segment.setServiceId(source.getServiceId()); segment.setServiceInstanceId(source.getServiceInstanceId()); segment.setEndpointName(source.getEndpointName()); segment.setEndpointId(source.getEndpointId()); segment.setStartTime(source.getStartTime()); segment.setEndTime(source.getEndTime()); segment.setLatency(source.getLatency()); segment.setIsError(source.getIsError()); segment.setDataBinary(source.getDataBinary()); segment.setTimeBucket(source.getTimeBucket()); segment.setVersion(source.getVersion()); RecordStreamProcessor.getInstance().in(segment); }组装SegmentRecord对象,通过RecordStreamProcessor创建实例,in()方法中调用RecordPersistentWorker来批量异步插入ES数据库中。
总结
这篇文章主要讲解了Skywalking的OAP接收到Agent发来的Trace信息的处理逻辑,入口是TraceSegmentReportServiceHandler的collect()方法,会对Agent封装的UpstreamSegment对象进行反序列化,构建Segment、Span等信息,最终由RecordStreamProcessor来批量异步把SegmentRecord写入ES数据库中
❤️ 感谢大家
如果你觉得这篇内容对你挺有有帮助的话: