当前位置 : 主页 > 编程语言 > java >

源码角度了解Skywalking之服务端OAP对Trace的处理

来源:互联网 收集:自由互联 发布时间:2022-10-15
源码角度了解Skywalking之服务端OAP对Trace的处理 从前几篇的文章我们知道Skywalking对Trace信息进行生成收集后,将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送给OAP服务端,服务端处

源码角度了解Skywalking之服务端OAP对Trace的处理

从前几篇的文章我们知道Skywalking对Trace信息进行生成收集后,将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送给OAP服务端,服务端处理对应的模块是skywalking-trace-receiver-plugin模块

TraceModuleProvider向GRPCHandlerRegister中添加处理器TraceSegmentReportServiceHandler

接收Agent数据

TraceSegmentReportServiceHandler的collect()方法接收Agent的数据,调用SegmentParseV2.Producer的send()方法发送

SegmentParseV2.Producer的send()方法:

public void send(UpstreamSegment segment, SegmentSource source) { SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config); segmentParse.setStandardizationWorker(standardizationWorker); segmentParse.parse(new BufferData<>(segment), source); }
  • 创建SegmentParseV2解析器
  • 调用SegmentParseV2的parse()方法
  • SegmentParseV2的parse()方法:

  • 创建SpanListener集合
  • 获取UpstreamSegment对象
  • 获取UpstreamSegment对象中关联的所有的TraceID
  • 如果UpstreamSegment中的SegmentObject实例为空,就解析UpstreamSegment实例得到SegmentObject对象进行填充
  • 重新检查段信息是否来自文件缓冲区,如果缓存中不存这个段信息对应的服务实例Id,然后返回true
  • 把SegmentObject对象封装成SegmentDecorator对象,这里是装饰者模式的体现
  • 调用preBuild()方法进行预构建操作,预构建不成功写入缓存文件中,构建成功会通知具体的监听器来进行构建
  • 预构建

    SegmentParseV2的preBuild()方法:

  • 构建SegmentCoreInfo对象中的segmentId
  • 调用notifyGlobalsListener()方法通知这个TraceSegment所关联的TraceId对应的监听器进行解析TraceId,需要采样的进行采样
  • 将Segment信息填充到SegmentCoreInfo对象中
  • 遍历TraceSegment的所有span,如果是TraceSegment的第一个span,调用notifyFirstListener()方法解析第一个span,将SegmentCoreInfo对象的属性添加到Segment对象中,记录firstEndpointId和firstEndpointName,其实就是对应的请求URL,根据Span类型通知不同的监听类
  • 通知监听器者构建

    这个方法遍历所有的span调用SegmentSpanListener的build()方法,设置Segment信息的端点id和端点名后调用SourceReceiverImpl的receive()方法,最终调用SegmentDispatcher的dispatch()方法

    SegmentDispatcher的dispatch()方法:

    @Override public void dispatch(Segment source) { SegmentRecord segment = new SegmentRecord(); segment.setSegmentId(source.getSegmentId()); segment.setTraceId(source.getTraceId()); segment.setServiceId(source.getServiceId()); segment.setServiceInstanceId(source.getServiceInstanceId()); segment.setEndpointName(source.getEndpointName()); segment.setEndpointId(source.getEndpointId()); segment.setStartTime(source.getStartTime()); segment.setEndTime(source.getEndTime()); segment.setLatency(source.getLatency()); segment.setIsError(source.getIsError()); segment.setDataBinary(source.getDataBinary()); segment.setTimeBucket(source.getTimeBucket()); segment.setVersion(source.getVersion()); RecordStreamProcessor.getInstance().in(segment); }

    组装SegmentRecord对象,通过RecordStreamProcessor创建实例,in()方法中调用RecordPersistentWorker来批量异步插入ES数据库中。

    总结

    这篇文章主要讲解了Skywalking的OAP接收到Agent发来的Trace信息的处理逻辑,入口是TraceSegmentReportServiceHandler的collect()方法,会对Agent封装的UpstreamSegment对象进行反序列化,构建Segment、Span等信息,最终由RecordStreamProcessor来批量异步把SegmentRecord写入ES数据库中

    ❤️ 感谢大家

    如果你觉得这篇内容对你挺有有帮助的话:

  • 欢迎关注我❤️,点赞
  • 上一篇:清新陶冶——可爱猫咪绘
    下一篇:没有了
    网友评论