当前位置:网站首页>(8) FlinkSQL custom UDF
(8) FlinkSQL custom UDF
2022-08-08 13:03:00 【NBI big data】
Flink provides the basic ability to customize functions. When it needs to meet the needs of special business scenarios, it can customize its own UDF according to its own needs. The following will briefly demonstrate the definition of a UDF and the use process of UDF:
(1) Define a UDF
package com.udf;import org.apache.flink.table.functions.ScalarFunction;/*** Created by lj on 2022-07-25.*/public class TestUDF extends ScalarFunction {public String eval(String value) {return value + "_udf";}}
(2) Using UDF
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {@Overridepublic WaterSensor map(String s) throws Exception {String[] split = s.split(",");return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));}});// convert stream to tableTable table = tableEnv.fromDataStream(waterDS,$("id"),$("ts"),$("vc"),$("pt").proctime());tableEnv.createTemporaryView("EventTable", table);/*// 1. Call the custom udf function directly// table.select(call(myFunction.class,$("id"))).execute().print();// 2. Register first before usingtableEnv.createTemporarySystemFunction("MyLength",myFunction.class);//2.1 Using the registered custom function named MyLength// table.select(call("MyLength",$("id"))).execute().print();// 2.2 Use sql to use custom functionstableEnv.sqlQuery("select id, MyLength(id) from "+table).execute().print();* */tableEnv.createTemporarySystemFunction("MyLength",TestUDF.class);Table result = tableEnv.sqlQuery("SELECT " +"id as componentname, " + //window_start, window_end,"COUNT(ts) as componentcount ,SUM(ts) as componentsum, " +"MyLength(cast(COUNT(ts) as string)) as testudf " +"FROM TABLE( " +"TUMBLE( TABLE EventTable , " +"DESCRIPTOR(pt), " +"INTERVAL '10' SECOND)) " +"GROUP BY id , window_start, window_end");tableEnv.toRetractStream(result, Row.class).print("toRetractStream"); //indentation modeenv.execute();}
(3) Application effect
边栏推荐
- [C language] Detailed explanation of custom types: structure, enumeration, union
- 哪来的TB级推荐模型
- Study: Toxic PFAS chemicals make rainwater unsafe to drink around the world
- (原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化
- Docker - persistent database (data volume)
- The maximum validity period of an SSL certificate is 13 months. Is it necessary to apply for multiple years at a time?
- ctfshow 七夕杯(复现)
- The programmer essential VS debugging technique
- 关于微信小程序体验版获取不到openId的问题
- 服务器配置——Linux系统安装Redis
猜你喜欢
随机推荐
Jenkins - Introduction to Continuous Integration (1)
Collection of shell basics
【C语言】动态内存管理
(原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化
使用shardingjdbc实现读写分离配置
SSTI漏洞介绍认识(flask、Werkzeup)
OFD是什么
The programmer essential VS debugging technique
[C language] Dynamic memory management
家电行业趋势:2022从三方面把握家电产品升级方向
SQL INSERT INTO and INSERT INTO the SELECT statement
海外邮件发送指南(一)
什么是IP SSL证书,如何申请?
深析C语言的灵魂 -- 指针
nvm的使用 nodejs版本管理,解决用户名是汉字的问题
服务器配置——Linux系统安装Redis
(6)FlinkSQL将kafka数据写入到mysql方式一
金融行业数智化供应链管理系统:多维度评估分析供应商,赋能智能金融变革
PM2 入门(二)
[C language] file related operations