当前位置:网站首页>Streamexecutionenvironment of Flink source code
Streamexecutionenvironment of Flink source code
2022-04-23 05:04:00 【Great Wall ol】
StreamExecutionEnvironment
ExecutionConfig
Parallelism Parallelism
- Have default values , be equal to cpu Number
- getter and setter Method
- Maximum parallelism : 0 < maxParallelism <= 2^15 - 1
Restart strategy RestartStrategies
- setRestartStrategy()
- getRestartStrategy()
Chain optimization isChainingEnabled
- isChainingEnabled
- disableOperatorChaining
Retry count numberOfExecutionRetries
- setNumberOfExecutionRetries
- getNumberOfExecutionRetries
TimeCharacteristic
The default is processing time
-
setStreamTimeCharacteristic( Time type )
- Processing time AutoWatermarkInterval The watermark interval is 0ms
- The time of the event AutoWatermarkInterval The watermark interval is 200ms
CheckpointConfig
-
checkpointingMode :EXACTLY_ONCE、AT_LEAST_ONCE
- getCheckpointingMode()
-
checkpointTimeout
-
CheckpointInterval
- enableCheckpointing
-
forceCheckpointing
- isForceCheckpointing
- setForceCheckpointing
-
PauseBetweenCheckpoints
- minPauseBetweenCheckpoints
- maxPauseBetweenCheckpoints
-
maxConcurrentCheckpoints
-
failOnCheckpointingErrors
- isFailOnCheckpointingErrors
-
ExternalizedCheckpointCleanup
Allow external persistent checkpoints
Enumeration values are :
- DELETE_ON_CANCELLATION:job Delete persistent checkpoints and states when canceling . After canceling the job , Cannot recover from an external checkpoint .
- RETAIN_ON_CANCELLATION:job Keep persistent checkpoints and states when canceling , After canceling the job , Checkpoint metadata and status must be deleted manually
Method :
enableExternalizedCheckpoints(cleanupMode) Set the mode of checkpoint persistent storage
isExternalizedCheckpointsEnabled() Whether to allow persistence to external storage .
getExternalizedCheckpointCleanup() Returns the pattern of persistent external storage
StateBackend
-
Describes how to store and set operator States and checkpoints
-
Defines during execution Checkpoint data and state will be persistent data structures ( It can be hashtable、RockDB、 Other data storage )
The main method
establish environment
- StreamExecutionEnvironment.getExecutionEnvironment
- StreamExecutionEnvironment.createLocalEnvironment
- StreamExecutionEnvironment.createRemoteEnvironment
data source
Collection based data sources
generateSequence
env.generateSequence(long from,long to) Generated from from To to A sequence of numbers , And take the sequence data as the input data source . If the parallelism is set to 1, Then the generated element sequence is ordered .
fromElements
Create a data flow from a given sequence of objects , All objects must be of the same type . The degree of parallelism is 1.
For uncertain generic classes , You may need to display the specified TypeInformation.
fromCollection
Create a data stream from a given non empty set or iterator , The degree of parallelism is 1.
fromParallelCollection
Create a data stream that contains elements in a separable iterator , Allows the framework to create a parallel data stream to return elements in the iterator .
Iterators are not modified until they are actually executed , Therefore, the data type returned by the iterator must be in Type The form of class shows ( because JAVA The compiler removes generic information )
File based data sources
readTextFile
- Read a given file line by line , And create a data flow , Contains a string containing the contents of each line . The file will be read using the system's default character set or the specified character set .
- readTextFile Called
readFile(TextInputFormat( route , Character set , Separator ), File processing mode . Deal with it once , The time interval , Read the file as text )Method - readFile() Method is called
createFileInput(TextInputFormat( route , Character set , Separator ),typeInformation, File processing mode , The time interval )Method . - createFileInput call addSource() Method .
readFile
According to the specified file format TextInputFormat Read the file .
createFileInput
createFileInput(TextInputFormat( route , Character set , Separator ),typeInformation, File processing mode , The time interval )
// Create a periodic file scanning monitor with incoming time interval
ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
// Create a ready operator
ContinuousFileReaderOperator<OUT> reader = new ContinuousFileReaderOperator<>(inputFormat);
// Create an input source , Concurrency reading environment Concurrency configuration in .
SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, reader);
FileProcessingMode There are two kinds of ,
- PROCESS_ONCE Process the current content on the path , And then quit .
- PROCESS_CONTINUOUSLY Scan paths regularly for new content .
createInput
according to TextInputFormat A common way to create an input stream
be based on Socket Data source
socketTextStream
From a socket Create a data stream containing the received wireless string , The received string is defaulted by the system decode Of .socket Itself does not report termination , The result is only socket The graceful termination will start retry .
socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry);
addSource Add data sources
Add a data source to the topology of the data flow .
The default concurrency is 1, If you want to execute concurrently , Can achieve ParallelSourceFunction Or inheritance RichParallelSourceFunction
function – the user defined function
sourceName – Name of the data source
typeInfo – the user defined type information for the stream
@SuppressWarnings("unchecked")
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
if (typeInfo == null) {
if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
}
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
StreamSource<OUT, ?> sourceOperator;
if (function instanceof StoppableFunction) {
sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
} else {
sourceOperator = new StreamSource<>(function);
}
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
Task run
execute
Trigger program execution . The environment will execute the program sink All upstream and downstream of the operator .sink There are two kinds of operators , Print the result set or send the result set to the message queue .
You can display the specified jobName, The default is “Flink Streaming Job”.
getStreamGraph
Get the of the streaming task streamGraph.
- take environment Medium State backend , Whether chain optimization , Environmental context Construct a flow chart as a parameter
- take sinks Before operator transformList To iterate , Set in iteration streamGraph Of bufferTimeout、transofrmationUID、transformationUserHash、source
getExecutionPlan
-
Running the program is to create a plan , Return to the executed data flow graph.
-
This method is before the plan is executed , Need to be called .
-
return JSON character string .
cast2StoppableSourceFunction
clean
Returns the of the given function “closure cleaned” edition . Only when the ExecutionConfig Only when closure cleanup is not disabled in
addOperator
Add an operator to the transform list in .List<StreamTransformation<?>> transformations
Not necessarily called by the user , To create an operator API Method must call this method .
registerCacheFile
Register a file in the distributed cache , This file will be customized by all users when running function As a local localPath visit .
The file can be a local file , It can also be a distributed file . If necessary , The runtime will temporarily copy files to the local cache .
版权声明
本文为[Great Wall ol]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220552270257.html
边栏推荐
- Leetcode 1547: minimum cost of cutting sticks
- 深度学习笔记 —— 物体检测和数据集 + 锚框
- AQS source code reading
- Day.js 常用方法
- Graduation project
- Barcode generation and decoding, QR code generation and decoding
- 负载均衡简介
- What are instruction cycles, machine cycles, and clock cycles?
- Deep learning notes - data expansion
- [WinUI3]編寫一個仿Explorer文件管理器
猜你喜欢

redis数据类型有哪些

【数据库】MySQL单表查询
![[winui3] write an imitation Explorer file manager](/img/3e/62794f1939da7f36f7a4e9dbf0aa7a.png)
[winui3] write an imitation Explorer file manager

Sword finger offer: the path with a certain value in the binary tree (backtracking)
![[2022 ICLR] Pyraformer: Low-Complexity Pyramidal Attention for Long-Range 时空序列建模和预测](/img/7c/51ac43080d9721f1bdc1cd78cd685b.png)
[2022 ICLR] Pyraformer: Low-Complexity Pyramidal Attention for Long-Range 时空序列建模和预测

Perfect test of coil in wireless charging system with LCR meter

Innovation training (V) configuration information

多线程基本概念(并发与并行、线程与进程)和入门案例

MySQL -- execution process and principle of a statement

用LCR表完美测试无线充电系统中的线圈
随机推荐
机器学习---线性回归
The difference between static pipeline and dynamic pipeline
Wine (COM) - basic concept
Deep learning notes - semantic segmentation and data sets
POI export message list (including pictures)
MySQL 慢查询
多线程基本概念(并发与并行、线程与进程)和入门案例
Progress of innovation training (IV)
Sword finger offer: push in and pop-up sequence of stack
Opencv + clion face recognition + face model training
深度学习笔记 —— 数据增广
MySQL slow query
AQS source code reading
[2022 ICLR] Pyramid: low complexity pyramid attention for long range spatiotemporal sequence modeling and prediction
C list field sorting contains numbers and characters
Informatics Aosai yibentong 1212: letters | openjudge 2.5 156: Letters
In aggregated query without group by, expression 1 of select list contains nonaggregated column
[2021] Spatio-Temporal Graph Contrastive Learning
Innovation training (10)
PHP+MySQL 制作留言板