当前位置:网站首页>(8)FlinkSQL自定义UDF
(8)FlinkSQL自定义UDF
2022-08-08 12:13:00 【NBI大数据】
Flink提供了自定义函数的基础能力,在需要满足特殊业务场景需求时,根据自身需要按需定制自己的UDF 下面将简单演示一个UDF的定义和UDF的使用过程:
(1)定义一个UDF
package com.udf;
import org.apache.flink.table.functions.ScalarFunction;
/**
* Created by lj on 2022-07-25.
*/
public class TestUDF extends ScalarFunction {
public String eval(String value) {
return value + "_udf";
}
}
(2)使用UDF
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String s) throws Exception {
String[] split = s.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
// 将流转化为表
Table table = tableEnv.fromDataStream(waterDS,
$("id"),
$("ts"),
$("vc"),
$("pt").proctime());
tableEnv.createTemporaryView("EventTable", table);
/*
// 1. 直接调用自定义udf 函数
// table.select(call(myFunction.class,$("id"))).execute().print();
// 2. 先注册在使用
tableEnv.createTemporarySystemFunction("MyLength",myFunction.class);
//2.1 在使用注册的自定义函数 名称为MyLength
// table.select(call("MyLength",$("id"))).execute().print();
// 2.2 采用sql 的方式进行使用自定义函数
tableEnv.sqlQuery("select id, MyLength(id) from "+table).execute().print();
* */
tableEnv.createTemporarySystemFunction("MyLength",TestUDF.class);
Table result = tableEnv.sqlQuery(
"SELECT " +
"id as componentname, " + //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum, " +
"MyLength(cast(COUNT(ts) as string)) as testudf " +
"FROM TABLE( " +
"TUMBLE( TABLE EventTable , " +
"DESCRIPTOR(pt), " +
"INTERVAL '10' SECOND)) " +
"GROUP BY id , window_start, window_end"
);
tableEnv.toRetractStream(result, Row.class).print("toRetractStream"); //缩进模式
env.execute();
}
(3)应用效果
边栏推荐
猜你喜欢
.NET Community Toolkit 8.0.0 版本发布
RT-Thread记录(三、RT-Thread 线程操作函数及线程管理与FreeRTOS的比较)
Study: Toxic PFAS chemicals make rainwater unsafe to drink around the world
部署spark2.2集群(standalone模式)
写个 shell 玩 数字炸弹
neural network classification
刷题《剑指Offer》day12
Kunpeng Developer Creation Day 2022: Kunpeng Full-Stack Innovation and Developers Build Digital Hunan
字节跳动资深架构师整理2022年秋招最新面试题汇总:208页核心体系
一条SQL在MySQL中是如何执行的
随机推荐
尝试开发微信公众号消息推送功能并且和小程序关联
Jenkins - 持续集成介绍(1)
史上最全JVM性能调优:线程+子系统+类加载+内存分配+垃圾回收
这个选项是不是当数据库主键或唯一键发生冲突时替换数据
在半小时内从无到有开发并调试一款Chrome扩展(Chrome插件/谷歌浏览器插件)
#yyds Dry Goods Inventory#【Yugong Series】August 2022 Go Teaching Course 005-Variable
Replication监控及自动故障切换
shell之常用小工具
关于微信小程序体验版获取不到openId的问题
office安装出现了“office对安装源的访问被拒绝30068-4(5)”错误
leetcode-636:函数的独占时间
Alibaba微服务组件Nacos注册中心
学习与尝试 --&gt; 事件风暴
《show your work》 从现在开始!
phpstyle安装管理mysql
day01 -Web API介绍—DOM 介绍—获取元素—事件基础—操作元素—排他操作—自定义属性操作—节点操作—案例:动态生成表格—创建元素的三种方式(经典面试题)
探究!一个数据包在网络中的心路历程
RT-Thread记录(三、RT-Thread 线程操作函数及线程管理与FreeRTOS的比较)
Mysql的分布式事务原理理解
(原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化