当前位置:网站首页>Skywalking系列学习之Trace Profiling源码分析
Skywalking系列学习之Trace Profiling源码分析
2022-08-09 19:29:00 【snail-jie】
前言
在《在线代码级性能剖析,补全分布式追踪的最后一块“短板”》中有提到再复杂的业务逻辑,都是基于线程去进行执行,那skywalking怎样利用方法栈快照进行代码级性能剖析的,出于好奇心,一起来debug看看其中的奥妙
demo演示
- 打开skywalking UI,点击新建Trace Profiling任务
- 配置Trace Profiling任务
- 查看堆栈信息
源码分析
UI创建任务
接收页面请求,通过ProfileTaskMutationService#createTask将任务存入ES中,索引名为:profile_task-*(profile_task-20220807)
public ProfileTaskCreationResult createTask(final String serviceId, final String endpointName, final long monitorStartTime, final int monitorDuration, final int minDurationThreshold, final int dumpPeriod, final int maxSamplingCount) throws IOException { // check data final String errorMessage = checkDataSuccess( serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod, maxSamplingCount ); if (errorMessage != null) { return ProfileTaskCreationResult.builder().errorReason(errorMessage).build(); } // create task final long createTime = System.currentTimeMillis(); final ProfileTaskRecord task = new ProfileTaskRecord(); task.setServiceId(serviceId); task.setEndpointName(endpointName.trim()); task.setStartTime(taskStartTime); task.setDuration(monitorDuration); task.setMinDurationThreshold(minDurationThreshold); task.setDumpPeriod(dumpPeriod); task.setCreateTime(createTime); task.setMaxSamplingCount(maxSamplingCount); task.setTimeBucket(TimeBucket.getMinuteTimeBucket(taskStartTime)); NoneStreamProcessor.getInstance().in(task); return ProfileTaskCreationResult.builder().id(task.id()).build(); }
CacheUpdateTimer#updateProfileTask更新profileTask缓存:ProfileTaskCache$profileTaskDownstreamCache
agent发起ProfileTaskCommandQuery请求
- agent通过ProfileTaskChannelService发起ProfileTaskCommandQuery请求
public void run() { if (status == GRPCChannelStatus.CONNECTED) { try { ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder(); // sniffer info builder.setService(Config.Agent.SERVICE_NAME).setServiceInstance(Config.Agent.INSTANCE_NAME); // last command create time builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class) .getLastCommandCreateTime()); // 发起ProfileTaskCommandQuery请求 Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .getProfileTaskCommands(builder.build()); // 处理响应 ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } catch (Throwable t) { } } }
- 服务端通过ProfileTaskServiceHandler#getProfileTaskCommands接收ProfileTaskCommandQuery请求
public void getProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver<Commands> responseObserver) { // query profile task list by service id final String serviceId = IDManager.ServiceID.buildId(request.getService(), true); final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, request.getServiceInstance()); // 从缓存中取出对应服务的任务 final List<ProfileTask> profileTaskList = profileTaskCache.getProfileTaskList(serviceId); if (CollectionUtils.isEmpty(profileTaskList)) { responseObserver.onNext(Commands.newBuilder().build()); responseObserver.onCompleted(); return; } // build command list final Commands.Builder commandsBuilder = Commands.newBuilder(); final long lastCommandTime = request.getLastCommandTime(); for (ProfileTask profileTask : profileTaskList) { // if command create time less than last command time, means sniffer already have task if (profileTask.getCreateTime() <= lastCommandTime) { continue; } // record profile task log -->索引名为:sw_profile_task_log-20220808 recordProfileTaskLog(profileTask, serviceInstanceId, ProfileTaskLogOperationType.NOTIFIED); // add command -->将ProfileTask转换为ProfileTaskCommand返回 commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build()); } responseObserver.onNext(commandsBuilder.build()); responseObserver.onCompleted(); }
- agent通过CommandService#receiveCommand处理ProfileTaskCommand返回,放入阻塞队列commands中
public void receiveCommand(Commands commands) { for (Command command : commands.getCommandsList()) { try { BaseCommand baseCommand = CommandDeserializer.deserialize(command); boolean success = this.commands.offer(baseCommand); } catch (UnsupportedCommandException e) { } } }
agent异步处理ProfileTaskCommand
- CommandService线程循环检测commands队列的任务,交给不同command执行器去执行对应的任务
public void run() { final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class); while (isRunning) { try { // 取出commands队列的任务 BaseCommand command = commands.take(); if (isCommandExecuted(command)) { continue; } commandExecutorService.execute(command); serialNumberCache.add(command.getSerialNumber()); } catch (CommandExecutionException e) { LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand()); } catch (Throwable e) { LOGGER.error(e, "There is unexpected exception"); } } }
- ProfileTaskCommandExecutor#execute将ProfileTaskCommand转换为ProfileTask
- ProfileTaskExecutionService#addProfileTask处启动定时任务处理ProfileTask
public void addProfileTask(ProfileTask task) { // update last command create time if (task.getCreateTime() > lastCommandCreateTime) { lastCommandCreateTime = task.getCreateTime(); } // check profile task limit final CheckResult dataError = checkProfileTaskSuccess(task); if (!dataError.isSuccess()) { LOGGER.warn( "check command error, cannot process this profile task. reason: {}", dataError.getErrorReason()); return; } // add task to list profileTaskList.add(task); // schedule to start task long timeToProcessMills = task.getStartTime() - System.currentTimeMillis(); PROFILE_TASK_SCHEDULE.schedule(() -> processProfileTask(task), timeToProcessMills, TimeUnit.MILLISECONDS); }
- ProfileTaskExecutionService#processProfileTask新建ProfileThread线程丢入线程池中,得到其返回profilingFuture(方便后面关闭)
ProfileThread开始profiling
- ProfileThread线程循环处理ProfileTaskExecutionContext的profilingSegmentSlots(profilingSegmentSlots什么时候插入呢?–>下文有答案)
- 通过Thread#getStackTrace获取线程栈,将其转换为线程快照TracingThreadSnapshot
- 将线程快照TracingThreadSnapshot放入快照队列snapshotQueue中
private void profiling(ProfileTaskExecutionContext executionContext) throws InterruptedException {
// 监控间隔 ->10ms
int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();
// run loop when current thread still running
long currentLoopStartTime = -1;
// 循环
while (!Thread.currentThread().isInterrupted()) {
currentLoopStartTime = System.currentTimeMillis();
// each all slot 采集插槽
AtomicReferenceArray<ThreadProfiler> profilers = executionContext.threadProfilerSlots();
int profilerCount = profilers.length();
for (int slot = 0; slot < profilerCount; slot++) {
ThreadProfiler currentProfiler = profilers.get(slot);
if (currentProfiler == null) {
continue;
}
switch (currentProfiler.profilingStatus().get()) {
case PENDING:
// check tracing context running time
currentProfiler.startProfilingIfNeed();
break;
case PROFILING:
// dump stack
TracingThreadSnapshot snapshot = currentProfiler.buildSnapshot();
if (snapshot != null) {
profileTaskChannelService.addProfilingSnapshot(snapshot);
} else {
// tell execution context current tracing thread dump failed, stop it
executionContext.stopTracingProfile(currentProfiler.tracingContext());
}
break;
}
}
// sleep to next period
// if out of period, sleep one period
long needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis();
needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod;
Thread.sleep(needToSleep);
}
}
profilingSegmentSlots什么时候插入呢?
- 在agent拦截入口方法前(譬如tomcat),初始化TracingContext会插入slot到profilingSegmentSlots(通过Thread.currentThread()获取线程栈信息)
public ProfileStatusReference attemptProfiling(TracingContext tracingContext, String traceSegmentId, String firstSpanOPName) { ........ final ThreadProfiler threadProfiler = new ThreadProfiler( tracingContext, traceSegmentId, Thread.currentThread(), this); int slotLength = profilingSegmentSlots.length(); for (int slot = 0; slot < slotLength; slot++) { if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) { return threadProfiler.profilingStatus(); } } }
- 在agent拦截入口方法后(譬如tomcat),将之前插入slot重置为null
agent将线程快照异步发送给Server端
- ProfileTaskChannelService在boot时会启动500ms的定时任务,从快照队列snapshotQueue取出快照放入缓存中,批量发送给server端
public void boot() { ....... sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("ProfileSendSnapshotService") ).scheduleWithFixedDelay( new RunnableWithExceptionProtection( () -> { List<TracingThreadSnapshot> buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE); //从快照队列snapshotQueue取出快照 snapshotQueue.drainTo(buffer); if (!buffer.isEmpty()) { sender.send(buffer); } }, t -> LOGGER.error("Profile segment snapshot upload failure.", t) ), 0, 500, TimeUnit.MILLISECONDS ); ........ }
- ProfileSnapshotSender#send将TracingThreadSnapshot转换为ThreadSnapshot发送给server
束语
通过本篇文章可以知道UI创建任务 --> agent获取任务 --> agent上报线程快照的整个流程,了解skywalking在其中使用大量的异步变成技巧,后续继续挖掘学习。
边栏推荐
- SqlServer 2016 安装相关问题
- Next second data: the transformation of the modern data stack brought about by the integration of lake and warehouse has begun
- 【NOI模拟赛】防AK题(生成函数,单位根,Pollard-Rho)
- 如何从800万数据中快速捞出自己想要的数据?
- 39. 组合总和 && 40. 组合总和2 && 216. 组合总和3
- Unity2D_线框材质
- 奥特曼卡牌隐藏的百亿市场
- Cholesterol-PEG-Thiol,CLS-PEG-SH,胆固醇-聚乙二醇-巯基用于改善溶解度
- 渗透测试-对新型内存马webshell的研究
- 如何在WPF中设置Grid ColumnDefinitions的样式
猜你喜欢
PCL学习之滤波Filtering
Puyuan Jingdian turned losses into profits in the first half of the year, and high-end products continued to develop!Are you optimistic about "Huawei" in the instrument industry?
基于Web的疫情隔离区订餐系统
CMake installation upgrade higher version
没有 accept,我可以建立 TCP 连接吗?
source install/setup.bash时出现错误
39. 组合总和 && 40. 组合总和2 && 216. 组合总和3
MySQL, which is asked on both sides of the byte, almost didn't answer well
tki-tree 树组件控制默认展开第几层数据
What to do if Windows 11 can't find Internet Explorer
随机推荐
An overall security understanding and method of cyberspace based on connection and security entropy
laravel之phpunit单元测试
【kali-权限提升】(4.2.6)社会工程学工具包(中):中间人攻击工具Ettercap
6 g underwater channel modeling were summarized based on optical communication
软件测试技术之如何编写测试用例(6)
一千以内的水仙花数
数据集成API如何成为企业数字化转型的关键?
Lyapp exponents and bifurcation diagrams for fractional chaotic systems
《评估、创建和使用知识图谱的限制》2022最新230页博士论文,根特大学
matlab neural network ANN classification
Can I make a TCP connection without accept?
PCL学习之滤波Filtering
hdu 3341 Lost's revenge(dp+Ac自动机)
【kali-权限提升】(4.2.7)社会工程学工具包:权限维持创建后门、清除痕迹
使用Mock技术模拟数据
How to deal with keys when Redis is large?
小满nestjs(第六章 nestjs cli 常用命令)
Reverse Analysis of Unknown Cryptographic Protocol Based on Network Data Flow
What are the benefits of enterprise data integration?How do different industries solve the problem of data access?
如何在WPF中设置Grid ColumnDefinitions的样式