当前位置:网站首页>Source code interpretation of Flink index parameters (read quantity, sent quantity, sent bytes, received bytes, etc.)
Source code interpretation of Flink index parameters (read quantity, sent quantity, sent bytes, received bytes, etc.)
2022-04-23 03:02:00 【Yang Linwei】
List of articles
01 introduction
attach :Flink Source code download address
stay Flink Of Web page , If you are careful, you can see in the monitoring page , There are details of the task , There are detailed monitoring indicators , Here's the picture ( Number of records sent 、 Number of records received 、 Number of bytes sent , Number of bytes received, etc ):
A lot of times , We all need “ Take out the data ”, And use it in our needs , So how to extract these data ? This paper analyzes the source code .
02 Source code analysis
2.1 Source entrance
stay Flink
Of web
page , Press F12
View source code , You can see :
- Address of the interface :http:// domain name /jobs/ad75bbaaa624e41a249825a9820a65cc
- Response content :
{
"jid": "ad75bbaaa624e41a249825a9820a65cc",
"name": "insert-into_default_catalog.default_database.t_student_copy",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1650352652357,
"end-time": -1,
"duration": 64629227,
"maxParallelism": -1,
"now": 1650417281584,
"timestamps": {
"INITIALIZING": 1650352652357,
"FAILED": 0,
"CREATED": 1650352652449,
"RESTARTING": 0,
"FAILING": 0,
"FINISHED": 0,
"SUSPENDED": 0,
"RECONCILING": 0,
"CANCELLING": 0,
"CANCELED": 0,
"RUNNING": 1650352653087
},
"vertices": [
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"name": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1650352658363,
"end-time": -1,
"duration": 64623221,
"tasks": {
"CREATED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"RECONCILING": 0,
"CANCELED": 0,
"RUNNING": 1,
"DEPLOYING": 0,
"FINISHED": 0,
"FAILED": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true
}
}
],
"status-counts": {
"CREATED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"RECONCILING": 0,
"CANCELED": 0,
"RUNNING": 1,
"DEPLOYING": 0,
"FINISHED": 0,
"FAILED": 0,
"SCHEDULED": 0
},
"plan": {
"jid": "ad75bbaaa624e41a249825a9820a65cc",
"name": "insert-into_default_catalog.default_database.t_student_copy",
"nodes": [
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
"optimizer_properties": {
}
}
]
}
}
You can see ,vertices.[0].metrics
The content below is the content to be read in this article , Here's the picture :
Ctrl+H
Global search Flink
Source code , We might think of looking at the interface first “/jobs/{jobId}
”, In fact, this is inefficient , The best way is to use its “ specific characteristics ”, such as , We can return the fields from read-bytes
Starting with , Find the definition in org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo
This class :
Okay , We can IOMetricsInfo
This class is used as our source code analysis entry .
2.2 IOMetricsInfo
stay IOMetricsInfo
,Ctrl+G see , You can see that this class has multiple places to be called , In fact, the real thing is to be JobDetailsHandler
Called , Other class suffixes are Test
Test class , So it is not the next step of the analysis , Let's see below. JobDetailsHandler
.
stay JobDetailsHandler
, You can see that the index value is the total counts
What I got from , Continue to look at counts
counts
It's assigned here :
Next , Let's see. MutableIOMetrics
This class .
2.3 MutableIOMetrics
Enter the... Specified in the previous step MutableIOMetrics
Inside addIOMetrics
Method , You can see that the code depends on the running state of the program , Get the index value from different places :
- Termination status : from
AccessExecution
The index value is obtained - Running state : from
MetricFetcher
The index value is obtained
Because our program is running , Of course , We need to study MetricFetcher
How to get the value in this class .
2.3 MetricFetcher
fetcher: It means grab , It can be understood as taking data
I translated MetricFetcher
Comments for this class , The contents are as follows :
package org.apache.flink.runtime.rest.handler.legacy.metrics;
/** * MetricFetcher Can be used from JobManager And all registered taskmanager Get indicators from . * <p> * Only when calling {@link MetricFetcher#update()} The index will be obtained , If there is enough time since the last call to pass . * * @author : YangLinWei * @createTime: 2022/4/20 10:30 In the morning * @version: 1.0.0 */
public interface MetricFetcher {
/** * obtain {@link MetricStore}, It contains all the indicators currently obtained . * * @return {@link MetricStore} Include all acquired indicators */
MetricStore getMetricStore();
/** * Trigger the acquisition of indicators */
void update();
/** * @return Timestamp of the last update . */
long getLastUpdateTime();
}
continue Ctrl+T View its implementation :
You can see that there are several implementation classes , Beyond all doubt ,MetricFetcherImpl
Is its real implementation class , Look at the code inside .
2.3.1 MetricFetcherImpl
MetricFetcherImpl
There are several ways , as follows :
We need to know where these indicators come from ? There is not much code in it , Most of them are not what we need , After reading , You can know , Indicators are from
queryMetrics
Get this method in . Look at the code for this method :
/** * Query the metrics from the given QueryServiceGateway. * * @param queryServiceGateway to query for metrics */
private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
queryServiceGateway
.queryMetrics(timeout)
.whenCompleteAsync(
(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
if (t != null) {
LOG.debug("Fetching metrics failed.", t);
} else {
metrics.addAll(deserializer.deserialize(result));
}
},
executor);
}
therefore , The code has been tracked for so long , The discovery indicator is from the gateway (MetricQueryServiceGateway
) Call the interface to get , So we need to look at the source code, the gateway interface (queryMetrics
) Code implementation of .
2.4 MetricQueryServiceGateway
From the previous step , You can know that MetricQueryServiceGateway
Of queryMetrics
Interface , Concrete implementation MetricQueryService
Class queryMetrics
Method , The code is as follows :
@Override
public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(
Time timeout) {
return callAsync(
() -> enforceSizeLimit(serializer.serialize(counters, gauges, histograms, meters)),
timeout);
}
I want to see others callAsync
Method :
It can be learned that , The essence is to use rpcServer
I called the interface remotely to get the index ( Where exactly is called ?).
Let's see. RpcEndpoint
This class .
2.5 RpcEndpoint
Let's see. RpcEndpoint
The method structure of this class :
From these method names , We can know , It's similar to a HTTP
The server , So we can also know , original Flink
Of Web
This is the server for page access . Look at the construction method :
See how to open the service inside :
You can know , It's called AkkaRpcService
Of startServer
Method to open the service .
Okay , It's time to stop here , Because it deviates from the center of this article , What we need to know is Where do these indicators come from ? Then how to proceed to the next step ?
Let's go back to 2.4
Inside MetricQueryService
class , See how this class is constructed ?( There is strong coherence here ).
2.6 MetricQueryService
You can see MetricQueryService
There is one in this class createMetricQueryService
Method , This method refers to Create index query service :
See where this method is called :
It can be found in the index service registry (MetricRegistryImpl
) Inside startQueryService
Method is called , Let's see where to call startQueryService
This method :
You can see that there is 3 A place has opened the service of this indicator , Namely :
- ClusterEntrypoint:
Flink
The base class of the cluster entry point - MiniCluster:
MiniCluster
Execute locallyFlink
Mission - TaskManagerRunner: stay
yarn
orstandalone
In mode , This class is the executable entry point of the task manager . It builds related components ( The Internet 、I/O
Manager 、 Memory manager 、RPC
service 、HA
service ) And start the .
For ease of understanding , Local execution is interpreted here Flink
The mode of the task is good , That is, continue to study MiniCluster
.
2.7 MiniCluster
stay MiniCluster
Of start()
Method , You can see the call startQueryService
Method
Keep looking inside metricQueryServiceRpcService
Enter the reference , You can know ,metricQueryServiceRpcService
( Index query service ) It is initialized from the configuration .
Keep looking. configuration
To configure :
It can be learned that , Configuration is from miniClusterConfiguration
What I got from , further :
Find out , The configuration is obtained from the constructor , Continue to see where to call MiniCluster
The constructor method of this class :
There are many classes that call this method , According to the name , It is reasonable to know that LocalExecutor
This class .
2.8 LocalExecutor
I am right. LocalExecutor The understanding of the : One for local execution Pipelines
( for example : multiple FlinkSQL
) The actuator of .
Look where it's called MiniCluster
Construction method of :
Continue to see where to call create
Method , It can be learned that in LocalExecutorFactory
Inside getExecutor Method is called :
Ctrl+T
, You can see in the ExecutionEnviroment
and StreamExecutionEnviroment
In the call :
Oh, Ho , This is not what we do every day Flink
Have you developed two commonly used classes . Open it up StreamExecutionEnviroment
Look at this class .
2.8 StreamExecutionEnviroment
You can see , In it executeAsync
The method replaces :
Come here , We know that the configuration is initialized from the user StreamExecutionEnviroment
Incoming .
03 Summary
Where to get the parameters of specific indicators , We have a good analytical idea , We can write our own Flink The program , It uses StreamExecutionEnviroment
, Then break the source code of this article , You know the context .
Because of the length of this article , Continue on in the next blog .
版权声明
本文为[Yang Linwei]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220632523438.html
边栏推荐
- Chapter VI project information management system summary
- Typescript Learning Guide
- Shell learning notes -- shell processing of output stream awk
- Niuke white moon race 5 [problem solving mathematics field]
- 基于.NetCore开发博客项目 StarBlog - (2) 环境准备和创建项目
- Reverse a linked list < difficulty coefficient >
- 《信息系统项目管理师总结》第六章 项目人力资源管理
- Guangcheng cloud service can fill in a daily report regularly every day
- Centos7 install MySQL 8 0
- Opencv fills the rectangle with a transparent color
猜你喜欢
[hcip] detailed explanation of six LSAS commonly used by OSPF
Summary of software test interview questions
L2-006 樹的遍曆(中後序確定二叉樹&層序遍曆)
Liunx foundation - zabbix5 0 monitoring system installation and deployment
基于ele封装下拉菜单等组件
Distributed system services
Log cutting - build a remote log collection server
Response processing of openfeign
Notes sur le développement de la tarte aux framboises (XII): commencer à étudier la suite UNO - 220 de la tarte aux framboises de contrôle industriel advantech (i): Introduction et fonctionnement du s
Introduction to ACM [inclusion exclusion theorem]
随机推荐
VirtualBox virtual machine (Oracle VM)
The difference between encodeuri and encodeuricomponent
Array and collection types passed by openfeign parameters
.Net Core 限流控制-AspNetCoreRateLimit
PDH optical transceiver 4-way E1 + 4-way 100M Ethernet 4-way 2m optical transceiver FC single fiber 20km rack type
Opencv combines multiple pictures into video
Typescript Learning Guide
Opencv reads webcam video and saves it locally
Summary of software test interview questions
[hcip] detailed explanation of six LSAS commonly used by OSPF
[Euler plan question 13] sum of large numbers
Openfeign timeout setting
Openfeign service call
Binary tree
Codeforces round 784 (Div. 4) (a - H)
How to write the expected salary on your resume to double your salary during the interview?
《信息系统项目管理师总结》第七章 项目沟通管理
Summary of interface automation interview questions for software testing
Numpy append function
Shell learning notes -- shell processing of output stream awk