当前位置:网站首页>Apache IoTDB’s UDF源碼分析(1)
Apache IoTDB’s UDF源碼分析(1)
2022-04-22 14:15:00 【哈哈鹹魚zjx】
目錄
命令行注册UDF函數(Create Function xxx as "全限定類名")
前言
繼上個月開始了Apache IoTDB的源碼貢獻,閑來有空時,便會看看感興趣模塊的代碼。這次主要跟大家分享一下自己對Apache IoTDB’s UDF相關源碼的一些分析與理解。至於為什麼選擇閱讀它,或者說為什麼對它感興趣,可能我覺得這也是一個軟件擴展性的一個體現吧!好了,廢話不多說,開始上菜~
注:再囉嗦一下哈!如果大家對Apache IoTDB的UDF的使用沒有一個認識的話,建議最好可以看一下它的官方文檔:UDF官方文檔,我剛開始看的時候感覺有點難看,建議讀者如果跟我一樣的感受,最好是自己跟著寫一個小Demo就清楚了,它github倉庫代碼也有例子。
命令行注册UDF函數(Create Function xxx as "全限定類名")
IoTDB> create function example1 as "org.study.demo.UDTFExample"
Msg: The statement is executed successfully.
這句話背後到底發生了什麼?
語法分析
在server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java裏visitCreateFunction方法裏將該語句轉換成了內部的CreateFunctionOperator。
@Override
public Operator visitCreateFunction(IoTDBSqlParser.CreateFunctionContext ctx) {
CreateFunctionOperator createFunctionOperator =
new CreateFunctionOperator(SQLConstant.TOK_FUNCTION_CREATE);
createFunctionOperator.setUdfName(parseIdentifier(ctx.udfName.getText()));
createFunctionOperator.setClassName(parseStringLiteral(ctx.className.getText()));
return createFunctionOperator;
}
生成物理計劃
server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateFunctionOperator.java
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
return new CreateFunctionPlan(udfName, className);
}
執行物理計劃進行函數注册
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@Override
public boolean processNonQuery(PhysicalPlan plan)
throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
、、、
case CREATE_FUNCTION:
return operateCreateFunction((CreateFunctionPlan) plan);
、、、
}
private boolean operateCreateFunction(CreateFunctionPlan plan) throws UDFRegistrationException {
UDFRegistrationService.getInstance().register(plan.getUdfName(), plan.getClassName(), true);
return true;
}
由此可見關鍵就在於UDFRegistrationService的注册方法
server/src/main/java/org/apache/iotdb/db/query/udf/service/UDFRegistrationService.java
public void register(String functionName, String className, boolean writeToTemporaryLogFile)
throws UDFRegistrationException {
functionName = functionName.toUpperCase();
validateFunctionName(functionName, className); // 檢查函數名是否合法,主要是跟內置的一些函數是否沖突[1]
checkIfRegistered(functionName, className); // 檢查是否注册過
doRegister(functionName, className); // 注册,通過反射構造Method對象,放進ConcurrentHashMap中
tryAppendRegistrationLog(functionName, className, writeToTemporaryLogFile);
}
看到這,UDF注册已經沒有密碼了,就是根據用戶傳進的全限定類名進行反射,獲得Method引用,保存起來用於後續UDF查詢的調用。(詳見代碼注釋)
其實還有個問題,就是Apache IoTDB如何知道到哪加載用戶類呢?
這個其實類似一種約定,代碼內部是寫死的,用戶要將寫好的UDF類打包的jar放到指定目錄ext/udf下,具體見下面的代碼片段。
server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
/** External lib directory for UDF, stores user-uploaded JAR files */
private String udfDir =
IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.UDF_FOLDER_NAME;
node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
public static final String EXT_FOLDER_NAME = "ext";
public static final String UDF_FOLDER_NAME = "udf";
Select帶有UDF函數的查詢
SELECT s1, example1(s1), s2, example1(s2) FROM root.sg1.d1;
接下來就是帶有UDF查詢的SQL語句背後的執行機制,講真的我覺得對於現在的我來說,也是好不容易看懂,但你真要我比較清晰地講出來,可能得再過段時間了,再梳理梳理,這裏先稍微簡單講講吧。
基本思想,肯定後面會根據前面注册的那個Method引用在適當的地方進行對查詢的數據進行轉換。但在具體實現時,那比特作者將其分成三層
InputLayer 2. TransformerLayer 3. OutputLayer
InputLayer: 一個原始的timeseries數據的scan的輸入(多個用到該timeseries的數據也只需一個,避免重複資源浪費、效率等等吧);
TransformerLayer: 如果有UDF會進行轉換,有的是直接透傳
OutputLayer: 最終查詢列的輸出形式
上述的語句最終會轉換成UDTFQueryOperator,然後會進一步進行處理,這裏貼出一些關鍵代碼給大家解解饞,敬請期待下文哈!如果大家實在等不了,可以自行閱讀代碼,如果有什麼見解,歡迎與我討論,共同進步~
@Override
public IntermediateLayer constructIntermediateLayer( // 構建中間層是進行UDF的調用
long queryId,
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
float memoryBudgetInMB = memoryAssigner.assign();
Transformer transformer;
if (isPlainAggregationFunctionExpression) {
transformer =
new TransparentTransformer( // 等於沒有轉換,相當於透傳
rawTimeSeriesInputLayer.constructPointReader(
udtfPlan.getReaderIndexByExpressionName(toString())));
} else {
IntermediateLayer udfInputIntermediateLayer =
constructUdfInputIntermediateLayer(
queryId,
udtfPlan,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
memoryAssigner);
transformer =
constructUdfTransformer( // 執行了validate和beforeStart兩步,根據相應的訪問策略AccessStrategy返回對應的Transformer
queryId,
udtfPlan,
expressionDataTypeMap,
memoryAssigner,
udfInputIntermediateLayer);
}
expressionDataTypeMap.put(this, transformer.getDataType());
expressionIntermediateLayerMap.put(
this,
memoryAssigner.getReference(this) == 1
? new SingleInputColumnSingleReferenceIntermediateLayer(
this, queryId, memoryBudgetInMB, transformer)
: new SingleInputColumnMultiReferenceIntermediateLayer(
this, queryId, memoryBudgetInMB, transformer)); // 它的LayerPointReader是transformer
}
return expressionIntermediateLayerMap.get(this);
}
private UDFQueryTransformer constructUdfTransformer(
long queryId,
UDTFPlan udtfPlan,
Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner,
IntermediateLayer udfInputIntermediateLayer)
throws QueryProcessException, IOException {
UDTFExecutor executor = udtfPlan.getExecutorByFunctionExpression(this);
executor.beforeStart(queryId, memoryAssigner.assign(), expressionDataTypeMap);
AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
switch (accessStrategy.getAccessStrategyType()) {
case ROW_BY_ROW:
return new UDFQueryRowTransformer(udfInputIntermediateLayer.constructRowReader(), executor);
case SLIDING_SIZE_WINDOW:
case SLIDING_TIME_WINDOW:
return new UDFQueryRowWindowTransformer(
udfInputIntermediateLayer.constructRowWindowReader(
accessStrategy, memoryAssigner.assign()),
executor);
default:
throw new UnsupportedOperationException("Unsupported transformer access strategy");
}
}
版权声明
本文为[哈哈鹹魚zjx]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204221414557436.html
边栏推荐
- String inversion exercises 344, 541, 557, 151
- 2D转换(移动:translate,旋转:rotate,缩放:scale,2D转换综合写法)
- TopK
- Blocking queue-
- How to get tuphub Today's hot list and heat?
- Redis connection tool cannot connect to redis in docker
- 2D conversion (move: translate, rotate: rotate, scale: scale, 2D conversion synthesis)
- Ibeacon development summary of Internet of things solution based on tailing micro tlsr825x
- Binarytree exercises constructing binary trees from traversal sequences of front order and middle order, middle order and back order | reconstructing binary trees 654, 105 and 106
- Mysql数据库转存sql文件
猜你喜欢

图的遍历 深度优先DFS 广度优先BFS

深入理解Condition

二月份,我靠这一份PDF文档面试BAT,没想到竟然收到了5个offer

In February, I relied on this PDF document to interview bat. Unexpectedly, I received five offers

Golang: package management

CPT 104_ Lab 09

Multithreading primary

Golang:包管理

深入剖析阻塞队列BlockingQueue (详解ArrayBlockingQueue和LinkedBlockingQueue及其应用)

2D转换(移动:translate,旋转:rotate,缩放:scale,2D转换综合写法)
随机推荐
Binary Tree递归||二叉树展开为链表 530. 二叉搜索树的最小绝对差
Mathorcup ideas sharing in 2022
POM in idea Mysql5. XML file 7 coordinate red error
HashTable哈希表与索引1、599、219
PLSQL developer file encoding format setting
2022 welder (elementary) examination questions and answers
2022焊工(初级)考试试题及答案
阻塞队列-
Golang:包管理
Osgearth configuring Map Resources
Development environment of expert system
2D conversion (move: translate, rotate: rotate, scale: scale, 2D conversion synthesis)
TopK
CVPR 2022 oral | Dalian University of technology proposes to identify deepbdc with small samples, and the performance of six benchmarks is the best
Methods of CRM system to improve customer experience
Lors de l'obtention d'une valeur dans la base de données, la base de données a une valeur, mais elle est vide.
Operation of 2022 chemical automation control instrument examination practice question simulation examination platform
awk命令
深入理解Condition
嵌入式软件bug从哪来,怎么去