当前位置:网站首页>(8) FlinkSQL custom UDF
(8) FlinkSQL custom UDF
2022-08-08 13:03:00 【NBI big data】
Flink provides the basic ability to customize functions. When it needs to meet the needs of special business scenarios, it can customize its own UDF according to its own needs. The following will briefly demonstrate the definition of a UDF and the use process of UDF:
(1) Define a UDF
package com.udf;import org.apache.flink.table.functions.ScalarFunction;/*** Created by lj on 2022-07-25.*/public class TestUDF extends ScalarFunction {public String eval(String value) {return value + "_udf";}}
(2) Using UDF
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {@Overridepublic WaterSensor map(String s) throws Exception {String[] split = s.split(",");return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));}});// convert stream to tableTable table = tableEnv.fromDataStream(waterDS,$("id"),$("ts"),$("vc"),$("pt").proctime());tableEnv.createTemporaryView("EventTable", table);/*// 1. Call the custom udf function directly// table.select(call(myFunction.class,$("id"))).execute().print();// 2. Register first before usingtableEnv.createTemporarySystemFunction("MyLength",myFunction.class);//2.1 Using the registered custom function named MyLength// table.select(call("MyLength",$("id"))).execute().print();// 2.2 Use sql to use custom functionstableEnv.sqlQuery("select id, MyLength(id) from "+table).execute().print();* */tableEnv.createTemporarySystemFunction("MyLength",TestUDF.class);Table result = tableEnv.sqlQuery("SELECT " +"id as componentname, " + //window_start, window_end,"COUNT(ts) as componentcount ,SUM(ts) as componentsum, " +"MyLength(cast(COUNT(ts) as string)) as testudf " +"FROM TABLE( " +"TUMBLE( TABLE EventTable , " +"DESCRIPTOR(pt), " +"INTERVAL '10' SECOND)) " +"GROUP BY id , window_start, window_end");tableEnv.toRetractStream(result, Row.class).print("toRetractStream"); //indentation modeenv.execute();}
(3) Application effect
边栏推荐
- 使用shardingjdbc实现读写分离配置
- The maximum validity period of an SSL certificate is 13 months. Is it necessary to apply for multiple years at a time?
- 【C语言】动态内存管理
- 三个点语法和DOM观察者
- 一些常见的web小功能
- The most complete JVM performance tuning in history: thread + subsystem + class loading + memory allocation + garbage collection
- Study: Toxic PFAS chemicals make rainwater unsafe to drink around the world
- Doris学习笔记之优化
- 建材业深陷数字化困局,B2B协同系统标准化交易流程,解决企业交易网络化难题
- 十年架构五年生活-08 第一次背锅
猜你喜欢
随机推荐
Program Environment and Preprocessing
C语言小项目 -- 通讯录(静态版+动态版+文件版)
五心红娘6月成功案列
Redis的那些事:一文入门Redis的基础操作
curl获取harbor镜像仓库项目下的镜像列表
SSL证书最长有效期13个月,还有必要一次申请多年吗?
The most complete JVM performance tuning in history: thread + subsystem + class loading + memory allocation + garbage collection
建材业深陷数字化困局,B2B协同系统标准化交易流程,解决企业交易网络化难题
[界面开发]DevExpress WinForms流程图控件——XtraDiagrams组件入门指南
Jenkins-安装(2)
C# 反射 操作列表类型属性
学习与尝试 --> 事件风暴
The programmer essential VS debugging technique
odps sql被删除了,能找回来吗
2022-08-04
深度剖析-class的几个对象(utlis,component)-瀑布流-懒加载(概念,作用,原理,实现步骤)
为你的网站加上live2d的动态小挂件,博君一晒
STM32的内存管理相关(内存架构,内存管理,map文件分析)
ctfshow 七夕杯(复现)
Doris学习笔记之优化