当前位置:网站首页>(8)FlinkSQL自定义UDF
(8)FlinkSQL自定义UDF
2022-08-08 12:13:00 【NBI大数据】
Flink提供了自定义函数的基础能力,在需要满足特殊业务场景需求时,根据自身需要按需定制自己的UDF 下面将简单演示一个UDF的定义和UDF的使用过程:
(1)定义一个UDF
package com.udf;
import org.apache.flink.table.functions.ScalarFunction;
/**
* Created by lj on 2022-07-25.
*/
public class TestUDF extends ScalarFunction {
public String eval(String value) {
return value + "_udf";
}
}(2)使用UDF
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String s) throws Exception {
String[] split = s.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
// 将流转化为表
Table table = tableEnv.fromDataStream(waterDS,
$("id"),
$("ts"),
$("vc"),
$("pt").proctime());
tableEnv.createTemporaryView("EventTable", table);
/*
// 1. 直接调用自定义udf 函数
// table.select(call(myFunction.class,$("id"))).execute().print();
// 2. 先注册在使用
tableEnv.createTemporarySystemFunction("MyLength",myFunction.class);
//2.1 在使用注册的自定义函数 名称为MyLength
// table.select(call("MyLength",$("id"))).execute().print();
// 2.2 采用sql 的方式进行使用自定义函数
tableEnv.sqlQuery("select id, MyLength(id) from "+table).execute().print();
* */
tableEnv.createTemporarySystemFunction("MyLength",TestUDF.class);
Table result = tableEnv.sqlQuery(
"SELECT " +
"id as componentname, " + //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum, " +
"MyLength(cast(COUNT(ts) as string)) as testudf " +
"FROM TABLE( " +
"TUMBLE( TABLE EventTable , " +
"DESCRIPTOR(pt), " +
"INTERVAL '10' SECOND)) " +
"GROUP BY id , window_start, window_end"
);
tableEnv.toRetractStream(result, Row.class).print("toRetractStream"); //缩进模式
env.execute();
}(3)应用效果
边栏推荐
- IJCAI 2022 | 基于随机游走聚合的图神经网络
- STM32入门开发 制作红外线遥控器(智能居家-万能遥控器)
- 一起学习集合框架之 TreeSet
- Jenkins - 持续集成介绍(1)
- 【C语言】[编程题]倒置字符串
- 尝试开发微信公众号消息推送功能并且和小程序关联
- 八月粉丝福利来了!大疆手机云台你爱了吗?
- 深度剖析-class的几个对象(utlis,component)-瀑布流-懒加载(概念,作用,原理,实现步骤)
- Combining "xPlus" to discuss the innovation and change of software architecture
- 各位,我想知道,既然数据全部读取过来存放内存,我flink sql窗口关闭之后再次查询这个cdc映射
猜你喜欢

SQL实例 - 胜平负

Kunpeng Developer Creation Day 2022: Kunpeng Full-Stack Innovation and Developers Build Digital Hunan

模式识别 学习笔记:第七章 特征选择

Alibaba微服务组件Nacos注册中心

5S软件就是将软件应用全维度简单化的软件系统

模式识别 学习笔记:第八章 特征提取

day02 -DOM - advanced events (register events, event listeners, delete events, DOM event flow, event objects, prevent default behavior, prevent event bubbling, event delegation) - commonly used mouse

dedecms支持Word图文自动粘贴

如何在go重打印函数调用者信息Caller

关于那些我们都听过的营销工具—优惠券
随机推荐
MeterSphere - open source test platform
#yyds Dry Goods Inventory#【Yugong Series】August 2022 Go Teaching Course 005-Variable
一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持
Acwing3452. 进制转换
LeetCode_1004_最大连续1的个数Ⅲ
LeetCode_487_最大连续1的个数Ⅱ
五心红娘6月成功案列
消防安全知识培训讲座
LeetCode_14_最长公共前缀
The most complete JVM performance tuning in history: thread + subsystem + class loading + memory allocation + garbage collection
ets声明式ui开发,怎么获取当前系统时间
转转商品系统高并发实战(数据篇)
简短截说阐述redis中事务的使用
请问如何实现两个不同环境的MySQL数据库实时同步
哪来的TB级推荐模型
C语言详解系列——指针与结构体
你的 golang 程序正在悄悄内存泄漏
day02 -DOM—高级事件(注册事件、事件监听、删除事件、DOM事件流、事件对象、阻止默认行为、阻止事件冒泡、事件委托)—常用鼠标事件—常用的键盘事件
Combining "xPlus" to discuss the innovation and change of software architecture
Pattern Recognition Study Notes: Chapter 6 Other Classification Methods (Continuously updated...)