当前位置:网站首页>Greenplum【代码分享 01】实现replace insert或insert on conflict类似on duplicate key update批量入库数据(合并插入无则新增有则更新)
Greenplum【代码分享 01】实现replace insert或insert on conflict类似on duplicate key update批量入库数据(合并插入无则新增有则更新)
2022-04-22 15:52:00 【シ風箏】
阿里云开发者社区有《PostgreSQL upsert 功能 insert on conflict do 的用法》说明,这里不再赘述。
1.问题描述
项目数据库要从 MySQL 切换到 Greenplum 问题最多的就是 SQL 语法不同,MySQL有on duplicate key update实现冲突更新,Greenplum却没有。
PostgreSQL 9.5 引入了 UPSET 功能,其语法insert on conflict do非常强大,支持合并写入(当违反某唯一约束时,冲突则更新,不冲突则写入),同时支持流式计算。
当前使用的 Greenplum 数据库版本为6.13.0其 PostgreSQL 版本为 9.4.24无法使用 UPSERT 语法:
# select version();
PostgreSQL 9.4.24
(Greenplum Database 6.13.0 build commit:4f1adf8e247a9685c19ea02bcaddfdc200937ecd Open Source)
on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Dec 18 2020 22:31:16
遗憾的是20220408发布的最新版本6.20.3依然是9.4
PostgreSQL 9.4.26
(Greenplum Database 6.20.3 build commit:24b949d2585cdbe8a157062fb756013e7c2874ab Open Source)
on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Apr 6 2022 19:59:57
那使用 Greenplum 如何实现合并插入无则新增有则更新的操作呢?阿里云开发者社区有《Greenplum merge insert 用法与性能 insert on conflict》说明,大家可以查看,下边写详细实现。
2.功能实现
2.1 思路分析
- 将要写入的数据保存到无主键约束的临时表。
- 合并临时表数据,使其不存在相同主键(合并逻辑根据由实际情况决定)入库临时表1。
- 临时表1内关联主表数据,查询出与主表数据冲突的数据入库临时表2。
- 根据临时表2的记录删除主表冲突数据。
- 将临时表2及临时表1内的新数据入库主表。
2.2 代码实现
- 代码调用
// 为了实现无侵入处理在原始的入库操作前后增加了 dealDuplicateKeyBeforeInsert 和 dealDuplicateKeyAfterInsert
baseUtils.dealDuplicateKeyBeforeInsert(params);
baseService.insert("basicQuery.insertBatch", params);
baseUtils.dealDuplicateKeyAfterInsert(params);
- 源码
/** * 处理主键值重复(数据入库前操作) * 1.创建无主键的临时表_tmp并修改数据入库表名称 * * @param params 操作相关参数 */
public void dealDuplicateKeyBeforeInsert(Map<String, Object> params) {
String tableName = MapUtils.getString(params, "table_name");
baseService.update("basicQuery.createTmpTable", params);
params.put("table_name", tableName + "_tmp");
}
/** * 处理主键值重复(数据入库后操作) * 1. 合并临时表_tmp的数据保证数据不重复并入库临时表_tmp1(distinctTmpTableData) * 2. 临时表_tmp1关联主表获取重复主键数据并入库临时表_tmp2(queryDuplicateKeyData) * 3. 删除主表重复主键数据(deleteDuplicateKeyData) * 4. 入库主表_tmp1非重复主键数据和_tmp2全部数据(insertDistinctData) * 5. 删除所有使用过的临时表(dropTmpTable) * * @param params 操作相关参数 */
public void dealDuplicateKeyAfterInsert(Map<String, Object> params) {
// 获取表元数据
String tableName = MapUtils.getString(params, "table_name").replace("_tmp", "");
params.put("table_name", tableName);
params.put("tableName", tableName);
List<String> timeFieldNames = baseService.select("basicQuery.queryDateTimeField", tableName);
if(timeFieldNames.size()>0){
String timeFieldName = timeFieldNames.get(0);
params.put("timeFieldName",timeFieldName);
}
// 表主键
Map primaryKey = (Map) baseService.selectOne("basicQuery.queryPrimaryKeyByTableName", params);
String primaryKeyColumnName = MapUtils.getString(primaryKey, "columnName", "");
params.put("primaryKeyColumnName", primaryKeyColumnName);
// 表字段
List<Map> columnList = baseService.select("basicQuery.queryFieldsByTableName", params);
List<String> columnNameList = columnList.stream().map(mapObject -> MapUtils.getObject(mapObject, "column_name").toString()).collect(Collectors.toList());
columnNameList.remove(primaryKeyColumnName);
params.put("columnNameList", columnNameList);
try {
// 1.合并临时表_tmp的数据保证数据不重复并入库临时表_tmp1
baseService.update("basicQuery.distinctTmpTableData", params);
// 2.临时表_tmp1关联主表获取重复主键数据并入库临时表_tmp2
baseService.update("basicQuery.queryDuplicateKeyData", params);
// 3.删除主表重复主键数据
baseService.delete("basicQuery.deleteDuplicateKeyData", params);
// 4.入库主表数据
// 入库字段处理
String insertFields = String.join(",", columnNameList);
String insertTmpFields = "t_tmp." + String.join(",t_tmp.", columnNameList);
params.put("insertFieldStr", insertFields + "," + primaryKeyColumnName);
params.put("insertTmpFieldStr", insertTmpFields + ", t_tmp." + primaryKeyColumnName);
baseService.insert("basicQuery.insertDistinctData", params);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 5.删除临时表
baseService.delete("basicQuery.dropTmpTable", params);
}
}
- Mapper
<!--表信息查询-->
<select id="queryDateTimeField" parameterType="java.lang.String" resultType="java.lang.String">
select lower(column_name) as fieldName from information_schema.columns where table_name=#{tableName} and table_schema='public' and udt_name= 'timestamp'
</select>
<select id="queryFieldsByTableName" parameterType="java.util.Map" resultType="java.util.Map">
select column_name,udt_name from information_schema.columns where table_name= #{tableName} and table_schema='public'
</select>
<select id="queryPrimaryKeyByTableName" parameterType="java.util.Map" resultType="java.util.Map">
SELECT A.attname AS "columnName",( i.keys ).n AS "keySeq",ci.relname AS "pkName"
FROM
pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute A ON ( ct.oid = A.attrelid )
JOIN pg_catalog.pg_namespace n ON ( ct.relnamespace = n.oid )
JOIN (
SELECT
i.indexrelid,
i.indrelid,
i.indisprimary,
information_schema._pg_expandarray ( i.indkey ) AS keys
FROM
pg_catalog.pg_index i
) i ON ( A.attnum = ( i.keys ).x AND A.attrelid = i.indrelid )
JOIN pg_catalog.pg_class ci ON ( ci.oid = i.indexrelid )
WHERE
TRUE
AND n.nspname = 'public'
AND ct.relname = #{tableName}
AND i.indisprimary
</select>
业务相关的SQL:
<!--1.创建临时表-->
<update id="createTmpTable" parameterType="java.util.Map">
create table ${schemaName}${table_name}_tmp ( like ${table_name} )
</update>
<!--2.合并临时表数据保证无重复数据-->
<!--first_value(#{item}) over (partition by ${primaryKeyColumnName} order by ${primaryKeyColumnName} desc) as #{item}-->
<update id="distinctTmpTableData" parameterType="java.util.Map">
create table ${schemaName}${table_name}_tmp1 as
select ${primaryKeyColumnName},
<foreach collection="columnNameList" index="index" item="item" open="" separator=" , " close="">
<choose>
<when test="timeFieldName != null and timeFieldName != ''">
( ARRAY_AGG ( ${item} order by ${timeFieldName} desc ) ) [ 1 ] AS ${item}
</when>
<otherwise>
( ARRAY_AGG ( ${item} ) ) [ 1 ] AS ${item}
</otherwise>
</choose>
</foreach>
from ${schemaName}${table_name}_tmp group by ${primaryKeyColumnName}
</update>
<!--3.关联主表获取重复主键数据并入库临时表-->
<update id="queryDuplicateKeyData" parameterType="java.util.Map">
create table ${schemaName}${table_name}_tmp2 as
select ${primaryKeyColumnName},
<foreach collection="columnNameList" index="index" item="item" open="" separator=" , " close="">
t_tmp.${item}
</foreach>
from
${schemaName}${table_name}_tmp1 as t_tmp
inner join ${schemaName}${table_name} using ( ${primaryKeyColumnName} )
</update>
<!--4.删除主表重复主键数据-->
<delete id="deleteDuplicateKeyData" parameterType="java.util.Map">
delete from ${schemaName}${table_name} t
using ${schemaName}${table_name}_tmp2 t_tmp
where t.${primaryKeyColumnName}=t_tmp.${primaryKeyColumnName}
</delete>
<!--5.入库主表数据-->
<update id="insertDistinctData" parameterType="java.util.Map">
insert into ${schemaName}${table_name} (${insertFieldStr})
select ${insertTmpFieldStr} from ${schemaName}${table_name}_tmp1 t_tmp
left join ${schemaName}${table_name}_tmp2 using ( ${primaryKeyColumnName} )
where ${schemaName}${table_name}_tmp2.* is null
union all select ${insertFieldStr} from ${schemaName}${table_name}_tmp2
</update>
<!--6.删除临时表-->
<delete id="dropTmpTable" parameterType="java.util.Map">
drop table ${schemaName}${table_name}_tmp, ${schemaName}${table_name}_tmp1, ${schemaName}${table_name}_tmp2
</delete>
3.总结
当前代码可实现合并插入无则新增有则更新的功能,但是代码不算健壮。
版权声明
本文为[シ風箏]所创,转载请带上原文链接,感谢
https://blog.csdn.net/weixin_39168541/article/details/124319912
边栏推荐
- OopMap理论篇
- Future development direction of construction industry: digital chemical plant management system
- 企业级知识管理(KM)建设方法及过程
- 关键字精确匹配的优缺点
- leetcode-----奇偶树
- 【里程碑】WireGuard系列文章(七):使用WireGuard和Netmaker创建Full Mesh网络
- [in depth understanding of tcallusdb technology] example code for deleting data - [generic table]
- [in depth understanding of tcallusdb technology] sample code for reading the data of the specified location in the list - [list table]
- 阿里P9手写39模块Redis核心笔记,吃透笔记我面试成功涨薪7K
- ML之RF:kaggle比赛之利用泰坦尼克号数据集建立RF模型对每个人进行获救是否预测
猜你喜欢

【合泰HT32F52352定时器的使用】

阿里P9手写39模块Redis核心笔记,吃透笔记我面试成功涨薪7K
![[in depth understanding of tcallusdb technology] sample code of batch reading data - [generic table]](/img/7b/8c4f1549054ee8c0184495d9e8e378.png)
[in depth understanding of tcallusdb technology] sample code of batch reading data - [generic table]
理想与集度的技术之争:激光雷达究竟装哪儿更安全?

Build your own web site (8)

搭建自己的Web站点(8)

‘telnet‘ 不是内部或外部命令也不是可运行的程序或批处理文件
![[model] state space average modeling - step down](/img/f4/c5599e904a891335ab9bd746b74cab.png)
[model] state space average modeling - step down
![BrokenPipeError: [Errno 32] Broken pipe](/img/75/c0e2f82aa0222a3374fb6c1ffc2778.png)
BrokenPipeError: [Errno 32] Broken pipe

How can AI intelligent video technology be applied to the scene of daily maintenance and supervision of cultural relics and historic buildings?
随机推荐
redis优化系列(一)基于docker搭建Redis主从
Construction method and process of enterprise level knowledge management (km)
Quickly build your own wordpress blog site [play with Huawei cloud]
Graphics 101 matrix transformation (sections 2-4)
Redis thread model
[model] state space average modeling - step down
Build your own web site (8)
SAP UI5 数据类型(data type) 学习笔记
华为云媒体査勇:华为云在视频AI转码领域的技术实践
电脑任务栏卡住
E. 2-Letter Strings
SAP UI5 应用开发教程之七十一 - SAP UI5 页面的嵌套路由试读版
[milestone] wireguard series articles (7): creating a full mesh network using wireguard and netmaker
建筑业未来的发展方向:数字化工厂管理系统
Commitfailedexception exception, reason and solution
[in-depth understanding of tcallusdb technology] sample code for reading all data in the list - [list table]
Interviewer: please talk about = = operator and equals (), from the perspective of basic data type and reference data type
SAP UI5 应用开发教程之七十一 - SAP UI5 页面的嵌套路由
BrokenPipeError: [Errno 32] Broken pipe
[deeply understand tcallusdb technology] update data example code - [generic table]