当前位置:网站首页>Source code interpretation of Flink index parameters (read quantity, sent quantity, sent bytes, received bytes, etc.)
Source code interpretation of Flink index parameters (read quantity, sent quantity, sent bytes, received bytes, etc.)
2022-04-23 03:02:00 【Yang Linwei】
List of articles
01 introduction
attach :Flink Source code download address
stay Flink Of Web page , If you are careful, you can see in the monitoring page , There are details of the task , There are detailed monitoring indicators , Here's the picture ( Number of records sent 、 Number of records received 、 Number of bytes sent , Number of bytes received, etc ):

A lot of times , We all need “ Take out the data ”, And use it in our needs , So how to extract these data ? This paper analyzes the source code .
02 Source code analysis
2.1 Source entrance
stay Flink Of web page , Press F12 View source code , You can see :
- Address of the interface :http:// domain name /jobs/ad75bbaaa624e41a249825a9820a65cc

- Response content :
{
"jid": "ad75bbaaa624e41a249825a9820a65cc",
"name": "insert-into_default_catalog.default_database.t_student_copy",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1650352652357,
"end-time": -1,
"duration": 64629227,
"maxParallelism": -1,
"now": 1650417281584,
"timestamps": {
"INITIALIZING": 1650352652357,
"FAILED": 0,
"CREATED": 1650352652449,
"RESTARTING": 0,
"FAILING": 0,
"FINISHED": 0,
"SUSPENDED": 0,
"RECONCILING": 0,
"CANCELLING": 0,
"CANCELED": 0,
"RUNNING": 1650352653087
},
"vertices": [
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"name": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1650352658363,
"end-time": -1,
"duration": 64623221,
"tasks": {
"CREATED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"RECONCILING": 0,
"CANCELED": 0,
"RUNNING": 1,
"DEPLOYING": 0,
"FINISHED": 0,
"FAILED": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true
}
}
],
"status-counts": {
"CREATED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"RECONCILING": 0,
"CANCELED": 0,
"RUNNING": 1,
"DEPLOYING": 0,
"FINISHED": 0,
"FAILED": 0,
"SCHEDULED": 0
},
"plan": {
"jid": "ad75bbaaa624e41a249825a9820a65cc",
"name": "insert-into_default_catalog.default_database.t_student_copy",
"nodes": [
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
"optimizer_properties": {
}
}
]
}
}
You can see ,vertices.[0].metrics The content below is the content to be read in this article , Here's the picture :

Ctrl+H Global search Flink Source code , We might think of looking at the interface first “/jobs/{jobId}”, In fact, this is inefficient , The best way is to use its “ specific characteristics ”, such as , We can return the fields from read-bytes Starting with , Find the definition in org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo This class :

Okay , We can IOMetricsInfo This class is used as our source code analysis entry .
2.2 IOMetricsInfo
stay IOMetricsInfo,Ctrl+G see , You can see that this class has multiple places to be called , In fact, the real thing is to be JobDetailsHandler Called , Other class suffixes are Test Test class , So it is not the next step of the analysis , Let's see below. JobDetailsHandler.

stay JobDetailsHandler, You can see that the index value is the total counts What I got from , Continue to look at counts

counts It's assigned here :

Next , Let's see. MutableIOMetrics This class .
2.3 MutableIOMetrics
Enter the... Specified in the previous step MutableIOMetrics Inside addIOMetrics Method , You can see that the code depends on the running state of the program , Get the index value from different places :
- Termination status : from
AccessExecutionThe index value is obtained - Running state : from
MetricFetcherThe index value is obtained

Because our program is running , Of course , We need to study MetricFetcher How to get the value in this class .
2.3 MetricFetcher
fetcher: It means grab , It can be understood as taking data
I translated MetricFetcher Comments for this class , The contents are as follows :
package org.apache.flink.runtime.rest.handler.legacy.metrics;
/** * MetricFetcher Can be used from JobManager And all registered taskmanager Get indicators from . * <p> * Only when calling {@link MetricFetcher#update()} The index will be obtained , If there is enough time since the last call to pass . * * @author : YangLinWei * @createTime: 2022/4/20 10:30 In the morning * @version: 1.0.0 */
public interface MetricFetcher {
/** * obtain {@link MetricStore}, It contains all the indicators currently obtained . * * @return {@link MetricStore} Include all acquired indicators */
MetricStore getMetricStore();
/** * Trigger the acquisition of indicators */
void update();
/** * @return Timestamp of the last update . */
long getLastUpdateTime();
}
continue Ctrl+T View its implementation :

You can see that there are several implementation classes , Beyond all doubt ,MetricFetcherImpl Is its real implementation class , Look at the code inside .
2.3.1 MetricFetcherImpl
MetricFetcherImpl There are several ways , as follows :
We need to know where these indicators come from ? There is not much code in it , Most of them are not what we need , After reading , You can know , Indicators are from queryMetrics Get this method in . Look at the code for this method :
/** * Query the metrics from the given QueryServiceGateway. * * @param queryServiceGateway to query for metrics */
private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
queryServiceGateway
.queryMetrics(timeout)
.whenCompleteAsync(
(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
if (t != null) {
LOG.debug("Fetching metrics failed.", t);
} else {
metrics.addAll(deserializer.deserialize(result));
}
},
executor);
}
therefore , The code has been tracked for so long , The discovery indicator is from the gateway (MetricQueryServiceGateway) Call the interface to get , So we need to look at the source code, the gateway interface (queryMetrics) Code implementation of .
2.4 MetricQueryServiceGateway
From the previous step , You can know that MetricQueryServiceGateway Of queryMetrics Interface , Concrete implementation MetricQueryService Class queryMetrics Method , The code is as follows :
@Override
public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(
Time timeout) {
return callAsync(
() -> enforceSizeLimit(serializer.serialize(counters, gauges, histograms, meters)),
timeout);
}
I want to see others callAsync Method :

It can be learned that , The essence is to use rpcServer I called the interface remotely to get the index ( Where exactly is called ?).
Let's see. RpcEndpoint This class .
2.5 RpcEndpoint
Let's see. RpcEndpoint The method structure of this class :

From these method names , We can know , It's similar to a HTTP The server , So we can also know , original Flink Of Web This is the server for page access . Look at the construction method :

See how to open the service inside :

You can know , It's called AkkaRpcService Of startServer Method to open the service .
Okay , It's time to stop here , Because it deviates from the center of this article , What we need to know is Where do these indicators come from ? Then how to proceed to the next step ?
Let's go back to 2.4 Inside MetricQueryService class , See how this class is constructed ?( There is strong coherence here ).
2.6 MetricQueryService
You can see MetricQueryService There is one in this class createMetricQueryService Method , This method refers to Create index query service :

See where this method is called :

It can be found in the index service registry (MetricRegistryImpl) Inside startQueryService Method is called , Let's see where to call startQueryService This method :

You can see that there is 3 A place has opened the service of this indicator , Namely :
- ClusterEntrypoint:
FlinkThe base class of the cluster entry point - MiniCluster:
MiniClusterExecute locallyFlinkMission - TaskManagerRunner: stay
yarnorstandaloneIn mode , This class is the executable entry point of the task manager . It builds related components ( The Internet 、I/OManager 、 Memory manager 、RPCservice 、HAservice ) And start the .
For ease of understanding , Local execution is interpreted here Flink The mode of the task is good , That is, continue to study MiniCluster.
2.7 MiniCluster
stay MiniCluster Of start() Method , You can see the call startQueryService Method

Keep looking inside metricQueryServiceRpcService Enter the reference , You can know ,metricQueryServiceRpcService( Index query service ) It is initialized from the configuration .

Keep looking. configuration To configure :

It can be learned that , Configuration is from miniClusterConfiguration What I got from , further :

Find out , The configuration is obtained from the constructor , Continue to see where to call MiniCluster The constructor method of this class :

There are many classes that call this method , According to the name , It is reasonable to know that LocalExecutor This class .
2.8 LocalExecutor
I am right. LocalExecutor The understanding of the : One for local execution Pipelines( for example : multiple FlinkSQL) The actuator of .
Look where it's called MiniCluster Construction method of :

Continue to see where to call create Method , It can be learned that in LocalExecutorFactory Inside getExecutor Method is called :

Ctrl+T, You can see in the ExecutionEnviroment and StreamExecutionEnviroment In the call :

Oh, Ho , This is not what we do every day Flink Have you developed two commonly used classes . Open it up StreamExecutionEnviroment Look at this class .
2.8 StreamExecutionEnviroment
You can see , In it executeAsync The method replaces :

Come here , We know that the configuration is initialized from the user StreamExecutionEnviroment Incoming .
03 Summary
Where to get the parameters of specific indicators , We have a good analytical idea , We can write our own Flink The program , It uses StreamExecutionEnviroment, Then break the source code of this article , You know the context .
Because of the length of this article , Continue on in the next blog .
版权声明
本文为[Yang Linwei]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220632523438.html
边栏推荐
- C# 11 的这个新特性,我愿称之最强!
- Navicat premium import SQL file
- Golden nine silver ten interview season, you are welcome to take away the interview questions (with detailed answer analysis)
- AspNetCore配置多环境log4net配置文件
- Liunx foundation - zabbix5 0 monitoring system installation and deployment
- 基于.NetCore开发博客项目 StarBlog - (1) 为什么需要自己写一个博客?
- Depth deterministic strategy gradient (ddpg)
- Thoughts on the 2022 national network security competition of the national secondary vocational group (only one idea for myself) - network security competition questions (10)
- How to count the number of all files in a directory under win10 system
- Shell script learning notes -- shell operation on files sed
猜你喜欢

Airtrack cracking wireless network password (Dictionary running method)

AspNetCore配置多环境log4net配置文件

ele之Table表格的封装

Kubernetes - Introduction to actual combat

Xamarin效果第二十二篇之录音效果

Kubernetes study notes

Cloud computing learning 1 - openstack cloud computing installation and deployment steps with pictures and texts (Xiandian 2.2)

Passing object type parameters through openfeign

Q-Learning & Sarsa

Huawei machine test question -- deformation of hj53 Yang Hui triangle
随机推荐
Kubernetes - Introduction to actual combat
JS using the parameters of art template
Winsock programming interface experiment: implementation of ipconfig
Source code and some understanding of employee management system based on polymorphism
微软是如何解决 PC 端程序多开问题的——内部实现
C#中元组对象Tuple的使用
First knowledge of C language ~ branch statements
.Net Core 限流控制-AspNetCoreRateLimit
.NET点滴:说说Middleware构造中获取不到Scoped服务的问题
【工欲善其事必先利其器】论文编辑及文献管理(Endnote,Latex,JabRef ,overleaf)资源下载及使用指南
Encapsulation of ele table
How to count the number of all files in a directory under win10 system
Airtrack cracking wireless network password (Dictionary running method)
腾讯视频涨价:一年多赚74亿!关注我领取腾讯VIP会员,周卡低至7元
Specific field information of MySQL export table (detailed operation of Navicat client)
SQL statement - DDL
Linux redis - redis ha sentinel cluster construction details & redis master-slave deployment
Shell script learning notes - regular expressions
tf. keras. layers. Inputlayer function
Response processing of openfeign