当前位置:网站首页>Streamexecutionenvironment of Flink source code
Streamexecutionenvironment of Flink source code
2022-04-23 05:04:00 【Great Wall ol】
StreamExecutionEnvironment
ExecutionConfig
Parallelism Parallelism
- Have default values , be equal to cpu Number
- getter and setter Method
- Maximum parallelism : 0 < maxParallelism <= 2^15 - 1
Restart strategy RestartStrategies
- setRestartStrategy()
- getRestartStrategy()
Chain optimization isChainingEnabled
- isChainingEnabled
- disableOperatorChaining
Retry count numberOfExecutionRetries
- setNumberOfExecutionRetries
- getNumberOfExecutionRetries
TimeCharacteristic
The default is processing time
-
setStreamTimeCharacteristic( Time type )
- Processing time AutoWatermarkInterval The watermark interval is 0ms
- The time of the event AutoWatermarkInterval The watermark interval is 200ms
CheckpointConfig
-
checkpointingMode :EXACTLY_ONCE、AT_LEAST_ONCE
- getCheckpointingMode()
-
checkpointTimeout
-
CheckpointInterval
- enableCheckpointing
-
forceCheckpointing
- isForceCheckpointing
- setForceCheckpointing
-
PauseBetweenCheckpoints
- minPauseBetweenCheckpoints
- maxPauseBetweenCheckpoints
-
maxConcurrentCheckpoints
-
failOnCheckpointingErrors
- isFailOnCheckpointingErrors
-
ExternalizedCheckpointCleanup
Allow external persistent checkpoints
Enumeration values are :
- DELETE_ON_CANCELLATION:job Delete persistent checkpoints and states when canceling . After canceling the job , Cannot recover from an external checkpoint .
- RETAIN_ON_CANCELLATION:job Keep persistent checkpoints and states when canceling , After canceling the job , Checkpoint metadata and status must be deleted manually
Method :
enableExternalizedCheckpoints(cleanupMode) Set the mode of checkpoint persistent storage
isExternalizedCheckpointsEnabled() Whether to allow persistence to external storage .
getExternalizedCheckpointCleanup() Returns the pattern of persistent external storage
StateBackend
-
Describes how to store and set operator States and checkpoints
-
Defines during execution Checkpoint data and state will be persistent data structures ( It can be hashtable、RockDB、 Other data storage )
The main method
establish environment
- StreamExecutionEnvironment.getExecutionEnvironment
- StreamExecutionEnvironment.createLocalEnvironment
- StreamExecutionEnvironment.createRemoteEnvironment
data source
Collection based data sources
generateSequence
env.generateSequence(long from,long to) Generated from from To to A sequence of numbers , And take the sequence data as the input data source . If the parallelism is set to 1, Then the generated element sequence is ordered .
fromElements
Create a data flow from a given sequence of objects , All objects must be of the same type . The degree of parallelism is 1.
For uncertain generic classes , You may need to display the specified TypeInformation.
fromCollection
Create a data stream from a given non empty set or iterator , The degree of parallelism is 1.
fromParallelCollection
Create a data stream that contains elements in a separable iterator , Allows the framework to create a parallel data stream to return elements in the iterator .
Iterators are not modified until they are actually executed , Therefore, the data type returned by the iterator must be in Type The form of class shows ( because JAVA The compiler removes generic information )
File based data sources
readTextFile
- Read a given file line by line , And create a data flow , Contains a string containing the contents of each line . The file will be read using the system's default character set or the specified character set .
- readTextFile Called
readFile(TextInputFormat( route , Character set , Separator ), File processing mode . Deal with it once , The time interval , Read the file as text )
Method - readFile() Method is called
createFileInput(TextInputFormat( route , Character set , Separator ),typeInformation, File processing mode , The time interval )
Method . - createFileInput call addSource() Method .
readFile
According to the specified file format TextInputFormat Read the file .
createFileInput
createFileInput(TextInputFormat( route , Character set , Separator ),typeInformation, File processing mode , The time interval )
// Create a periodic file scanning monitor with incoming time interval
ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
// Create a ready operator
ContinuousFileReaderOperator<OUT> reader = new ContinuousFileReaderOperator<>(inputFormat);
// Create an input source , Concurrency reading environment Concurrency configuration in .
SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, reader);
FileProcessingMode There are two kinds of ,
- PROCESS_ONCE Process the current content on the path , And then quit .
- PROCESS_CONTINUOUSLY Scan paths regularly for new content .
createInput
according to TextInputFormat A common way to create an input stream
be based on Socket Data source
socketTextStream
From a socket Create a data stream containing the received wireless string , The received string is defaulted by the system decode Of .socket Itself does not report termination , The result is only socket The graceful termination will start retry .
socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry);
addSource Add data sources
Add a data source to the topology of the data flow .
The default concurrency is 1, If you want to execute concurrently , Can achieve ParallelSourceFunction Or inheritance RichParallelSourceFunction
function – the user defined function
sourceName – Name of the data source
typeInfo – the user defined type information for the stream
@SuppressWarnings("unchecked")
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
if (typeInfo == null) {
if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
}
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
StreamSource<OUT, ?> sourceOperator;
if (function instanceof StoppableFunction) {
sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
} else {
sourceOperator = new StreamSource<>(function);
}
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
Task run
execute
Trigger program execution . The environment will execute the program sink All upstream and downstream of the operator .sink There are two kinds of operators , Print the result set or send the result set to the message queue .
You can display the specified jobName, The default is “Flink Streaming Job”.
getStreamGraph
Get the of the streaming task streamGraph.
- take environment Medium State backend , Whether chain optimization , Environmental context Construct a flow chart as a parameter
- take sinks Before operator transformList To iterate , Set in iteration streamGraph Of bufferTimeout、transofrmationUID、transformationUserHash、source
getExecutionPlan
-
Running the program is to create a plan , Return to the executed data flow graph.
-
This method is before the plan is executed , Need to be called .
-
return JSON character string .
cast2StoppableSourceFunction
clean
Returns the of the given function “closure cleaned” edition . Only when the ExecutionConfig Only when closure cleanup is not disabled in
addOperator
Add an operator to the transform list in .List<StreamTransformation<?>> transformations
Not necessarily called by the user , To create an operator API Method must call this method .
registerCacheFile
Register a file in the distributed cache , This file will be customized by all users when running function As a local localPath visit .
The file can be a local file , It can also be a distributed file . If necessary , The runtime will temporarily copy files to the local cache .
版权声明
本文为[Great Wall ol]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220552270257.html
边栏推荐
- QPushButton slot function is triggered multiple times
- scp命令详解
- leetcode——启发式搜索
- Innovation training (10)
- Learning Android V from scratch - UI
- Machine learning - linear regression
- Progress of innovation training (III)
- Download PDF from HowNet (I don't want to use CAJViewer anymore!!!)
- 信息学奥赛一本通 1955:【11NOIP普及组】瑞士轮 | OpenJudge 4.1 4363:瑞士轮 | 洛谷 P1309 [NOIP2011 普及组] 瑞士轮
- 用LCR表完美测试无线充电系统中的线圈
猜你喜欢
A trinomial expression that causes a null pointer
【数据库】MySQL基本操作(基操~)
[WinUI3]編寫一個仿Explorer文件管理器
泰克示波器DPO3054自校准SPC失败维修
[database] MySQL basic operation (basic operation ~)
直播带货表格模板-自动显示图片-自动关联系列商品
[2021] Spatio-Temporal Graph Contrastive Learning
Sword finger offer: the path with a certain value in the binary tree (backtracking)
Solve valueerror: argument must be a deny tensor: 0 - got shape [198602], but wanted [198602, 16]
Wine (COM) - basic concept
随机推荐
Use AES encryption - reuse the wisdom of predecessors
Get the number of days between dates, get the Chinese date, get the date of the next Monday of the date, get the working day, get the rest day
Uglifyjs compress JS
Sword finger offer: the median in the data stream (priority queue large top heap small top heap leetcode 295)
[winui3] write an imitation Explorer file manager
泰克示波器DPO3054自校准SPC失败维修
静态流水线和动态流水线的区别认识
DIY 一个 Excel 版的子网计算器
Customize the navigation bar at the top of wechat applet (adaptive wechat capsule button, flex layout)
The WebService interface writes and publishes calls to the WebService interface (2)
Progress of innovation training (III)
Sword finger offer: the path with a certain value in the binary tree (backtracking)
Custom switch control
#define 定义常量和宏,指针和结构体
vscode ipynb文件没有代码高亮和代码补全解决方法
How to exit VIM
JS détermine si la chaîne de nombres contient des caractères
Cross border e-commerce | Facebook and instagram: which social media is more suitable for you?
Backup MySQL database with Navicat
Pixel 5 5g unlocking tutorial (including unlocking BL, installing edxposed and root)