当前位置:网站首页>Read the data in Presto through sparksql and save it to Clickhouse
Read the data in Presto through sparksql and save it to Clickhouse
2022-04-23 12:57:00 【Look at the data at the top of the mountain】
The overall structure
Config
package com.fuwei.bigdata.profile.conf
import org.slf4j.LoggerFactory
import scopt.OptionParser
case class Config(
env:String = "",
username:String = "",
password:String = "",
url:String = "",
cluster:String = "",
startDate:String = "",
endDate:String = "",
proxyUser:String = "",
topK:Int = 25
)
object Config{
private val logger = LoggerFactory.getLogger("Config")
/**
* take args Parameter data encapsulation Config In the object
*/
def parseConfig(obj:Object,args:Array[String]):Config = {
//1、 Get the program name through our class name
val programName: String = obj.getClass.getSimpleName.replaceAll("\\$", "")
//2、 Get a parser , Parsers parse parameters
val parser = new OptionParser[Config]("spark sql "+programName) {
//2.1 Add instructions
head(programName,"v1.0") // It's equivalent to looking up
//2.2 to env Attribute assignment
// This effect is -v perhaps --v , hinder text() Is the content of the description
opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod")
opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username")
programName match {
case "LabelGenerator" => {
logger.info("LabelGenerator")
opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username")
opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password")
opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url")
opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster")
}
case _ =>
}
}
parser.parse(args,Config()) match { // The main function of this is to analyze the parameters , See if there is a value in the parameter
case Some(conf) => conf
case None => {
logger.error("can not parse args")
System.exit(-1)
null
}
}
}
}
LabelGenerator
package com.fuwei.bigdata.profile
import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
/**
* Generate the class of basic portrait label
*/
object LabelGenerator {
private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//1、 Analytical parameters
val params: Config = Config.parseConfig(LabelGenerator, args)
//2、 obtain SparkSession
val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName)
//val spark: SparkSession = SparkUtils.getSparkSession("dev", "test")
import spark.implicits._
//3、 Read home data
val df: DataFrame = spark.read.option("sep", "\t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types")
df.createOrReplaceTempView("phone_info") // Build a virtual table
//4、baseFeatrueSql
val userSql =
"""
|select
|t1.distinct_id as uid,
|t1.gender,
|t1.age,
|case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile,
|case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix,
|t2.model
|from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2
|on t1.distinct_id = t2.uid
|""".stripMargin
val userDF: DataFrame = spark.sql(userSql)
userDF.createOrReplaceTempView("user_info")
//4.2baseFeatureSql
val baseFeatureSql =
"""
|select
|t1.uid,
|t1.gender,
|t1.age,
|t1.email_suffix,
|t1.model,
|concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region
|from user_info as t1 left join phone_info as t2
|on
|t2.phone = substring(t1.mobile,0,7)
|""".stripMargin
//4.3、 Build table
spark.sql(
"""
|create table if not exists dws_news.user_profile_base(
|uid string,
|gender string,
|age string,
|email_suffix string,
|model string,
|region string
|)
|stored as parquet
|""".stripMargin)
//4.4 Query generation df
val baseFeaturedDF: DataFrame = spark.sql(baseFeatureSql)
baseFeaturedDF.cache() // Persist the queried data in memory , Turn it off after use
// Import the queried data into the data table ( Query generation df Data to HDFS)
baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base")
//5、 Save the data to clickhouse:1.meta:(filename,pl),2. Place holder
val meta = TableUtils.getClickHouseUserProfileBaseTable(baseFeaturedDF,params)
//6、 Insert ClickHouse data
//6.1 Inserted sql
val insertCHSql =
s"""
|insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATABASE}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2})
|""".stripMargin
logger.warn(insertCHSql)
//6.2 Insert data by partition into clickhouse Table of
baseFeaturedDF.foreachPartition(partition => {
TableUtils.insertBaseFeaturedTable(partition,insertCHSql,params)
})
baseFeaturedDF.unpersist()// Turn off persistence
//7、 Release resources
spark.stop()
logger.info("job has success")
}
}
ClickHouseUtils
package com.fuwei.bigdata.profile.utils
import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties
object ClickHouseUtils {
/**
* Connect clickhouse
* @param username
* @param password
* @param url
* @return
*/
def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
val properties = new ClickHouseProperties()
properties.setUser(username)
properties.setPassword(password)
val dataSource = new ClickHouseDataSource(url, properties)
dataSource
}
/**
* Convert the type and return to age String, gender String
*/
def df2TypeName2CH(dfCol: String): Unit ={
dfCol.split(",").map(line => {
val fields: Array[String] = line.split(" ")
val fName: String = fields(0)
val fType: String = dfType2chType(fields(1)) // Start type conversion
fName +" "+ fType // The final result is age String, gender String
}).mkString(",")
}
/**
* take df Of type convert to clickhouse Of type
*
* @param fieldType
* @return
*/
def dfType2chType(fieldType: String):String = {
fieldType.toLowerCase() match {
case "string" => "String"
case "integer" => "Int32"
case "long" => "Int64"
case "bigint" => "Int64"
case "double" => "Float64"
case "float" => "Float32"
case "timestamp" => "Datetime"
case _ => "String"
}
}
}
SparkUtils( This connection can be used in general in the future )
package com.fuwei.bigdata.profile.utils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object SparkUtils {
private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)
def getSparkSession(env:String,appName:String):SparkSession = {
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.hive.metastore.version", "1.2.1")
.set("spark.sql.cbo.enabled", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
env match {
case "prod" => {
conf.setAppName(appName+"_prod")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case "dev" => {
conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case _ => {
logger.error("not match env")
System.exit(-1)
null
}
}
}
}
TableUtils
package com.fuwei.bigdata.profile.utils
import com.qf.bigdata.profile.conf.Config
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.sql.{DataFrame, Row}
import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource}
import java.sql.PreparedStatement
/**
* @author:lifuwei
* @time:2022-01-07
* @params: This class is mainly used to put hive The data read in is stored in clickhouse in
*/
object TableUtils {
/**
* towards clickhouse Insert data
* @param partition
* @param insertCHSql
* @param params
*/
def insertBaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = {
//1、 Get Clickhouse Data source
val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url)
val connection: ClickHouseConnection = dataSource.getConnection
val ps: PreparedStatement = connection.prepareStatement(insertCHSql) // insert data
var batchCount = 0
val batchSize = 2000
var lastBatchTime = System.currentTimeMillis()
//2、 Fill in the parameter value corresponding to the placeholder
partition.foreach(row => {
var index = 1// Set the index subscript of the value
row.schema.fields.foreach(field => {
field.dataType match {
case StringType => ps.setString(index,row.getAs[String](field.name))
case LongType => ps.setLong(index,row.getAs[Long](field.name))
case IntegerType => ps.setInt(index,row.getAs[Int](field.name))
case _ => logger.error(s"type is err,${field.dataType}")
}
index +=1
})
//3、 Add to batch
ps.addBatch()
batchCount += 1
//4、 Control batch size
var currentTime = System.currentTimeMillis()
if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){
ps.executeBatch()// Execute a batch
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s")
batchCount = 0
lastBatchTime = currentTime
}
})
//5、 If the above conditions are not met, the control is executed immediately after the end of the loop ps Data in
ps.executeBatch()
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s")
//6、 Release resources
ps.close()
connection.close()
}
private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName)
/**
* according to dataframe Generate clickhouse In the table
* @param baseFeaturedDF : dataframe
* @param params : Data values
* @return Back to dataframe The name and placeholder of each column
*/
/*
* baseFeaturedDF Of DF Of schema
* fieldName:uid,gender,age,region,model,email_suffix
* fieldType:string,string,string,string,string,string
*
* The form of data we need to insert is
* insert user_profile_base into value(?,?,?,?,?,?)
*
* So we need to get three things in it , The first is the parameter , The second parameter type , The third is the inserted value
* */
val USER_PROFILE_CLICKHOUSE_DATABASE = "app_news" // Database created
val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" // Create a table
def getClickHouseUserProfileBaseTable(baseFeaturedDF: DataFrame, params: Config ):(String,String)= {
//schema Is to get all the metadata of the table ( Including the above three )
//foldLeft It's a folding function
/*
* baseFeaturedDF.schema : obtain df The overall structure of
* baseFeaturedDF.schema.fields : Encapsulate the overall architecture into an array
* baseFeaturedDF.schema.fields.foldLeft : Collapse this array
* ("","","") : This indicates the initial value of the input
* */
val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")(
(z,f) => {
// The data type we want to return is :(age,gender , age string, gender string, ?,?)
if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){
// It means it's not the first time to splice
(z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?")
}else{
(f.name,f.name+" "+ f.dataType.simpleString,"?")
}
}
)
/*
* 4、 take spark The expression of is converted to clickhouse The expression of
* stay spark Medium string, But in clickhouse Medium is String
* The final result is age String,gender String ......
* */
val chCol = ClickHouseUtils.df2TypeName2CH(fieldType)
//5、 Get to connect to ch Of cluster
val cluster:String = params.cluster
//6、 Create database
val createCHDataBaseSql =
s"""
|create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATABASE}
|""".stripMargin
//7、 Create table
/*
* ENGINE = MergeTree(): stay clickhouse You need to use an engine engine , Here we use the merge tree engine MergeTree()
* */
val createCHTableSql =
s"""
|create table ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol})
|ENGINE = MergeTree()
|ORDER BY(uid)
|""".stripMargin
//8、 Delete the SQL
val dropCHTableSql =
s"""
|drop table if exists ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}
|""".stripMargin
//9、 Connect clickhouse
val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url)
val connection: ClickHouseConnection = dataSource.getConnection
logger.warn(createCHDataBaseSql)
var ps: PreparedStatement = connection.prepareStatement(createCHDataBaseSql)// Building database
ps.execute()
logger.warn(dropCHTableSql)
ps = connection.prepareStatement(dropCHTableSql) // Delete table
ps.execute()
logger.warn(createCHTableSql)
ps = connection.prepareStatement(createCHTableSql)// Build table
ps.execute()
ps.close()
connection.close()
logger.info("success!!!!!!!!!")
(fileName,pl)
}
}
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fuwei.bigdata</groupId>
<artifactId>user-profile</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.12</scala.version>
<play-json.version>2.3.9</play-json.version>
<maven-scala-plugin.version>2.10.1</maven-scala-plugin.version>
<scala-maven-plugin.version>3.2.0</scala-maven-plugin.version>
<maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
<spark.version>2.4.5</spark.version>
<scope.type>compile</scope.type>
<json.version>1.2.3</json.version>
<!--compile provided-->
</properties>
<dependencies>
<!--json package -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${json.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.11</artifactId>
<version>4.0.0-RC2</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-bundle_2.11</artifactId>
<version>0.5.2-incubating</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
<scope>${scope.type}</scope>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>alimaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${scala-maven-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-archetype-plugin</artifactId>
<version>2.2</version>
</plugin>
</plugins>
</build>
</project>
test
##1. take core-site.xml\yarn-site.xml\hive-site.xml Copy to project resources Under the table of contents
##2. clean and package
##3. hive metastore The service must be on
##4. yarn/hdfs You have to drive
##5. clickhouse/chproxy Also open
##6. Preparation and submission jar Bag spark Script
${SPARK_HOME}/bin/spark-submit \
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name log2hudi \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--class com.qf.bigdata.profile.LabelGenerator \
/data/jar/user-profile.jar \
-e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1
##7. adopt clickhouse-client To test
clickhouse-client --host 10.206.0.4 --port 9999 --password qwert
版权声明
本文为[Look at the data at the top of the mountain]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230615025003.html
边栏推荐
- Recommended website for drawing result map
- leetcode:437. Path sum III [DFS selected or not selected?]
- Jiachen chapter Genesis "inner universe" joint Edition
- Customize the shortcut options in El date picker, and dynamically set the disabled date
- Unlock openharmony technology day! The annual event is about to open!
- Embrace the new blue ocean of machine vision and hope to open a new "Ji" encounter for the development of digital economy
- SSL certificate refund instructions
- 洛谷P3236 [HNOI2014]画框 题解
- Kubernetes 入門教程
- Can I take the CPDA data analyst certificate for 0 foundation
猜你喜欢
如何实现点击一下物体播放一次动画
Customize the shortcut options in El date picker, and dynamically set the disabled date
Huawei cloud MVP email
梳理網絡IP代理的幾大用途
How to prevent the website from being hacked and tampered with
Record a website for querying compatibility, string Replaceall() compatibility error
4.DRF 权限&访问频率&过滤&排序
Kubernets Getting started tutoriel
Fashion cloud learning - input attribute summary
解决disagrees about version of symbol device_create
随机推荐
梳理网络IP代理的几大用途
Recommended website for drawing result map
mysql支持ip访问
bert-base-chinese下载(智取)
Kubernetes 入門教程
Date time type in database
BUUCTF WEB [BJDCTF2020]ZJCTF,不过如此
Golang implements a five insurance and one gold calculator with web interface
BUUCTF WEB [BJDCTF2020]The mystery of ip
mysql8安装
At instruction of nbiot
网站首页文件被攻击篡改的形式有哪些
将新增和编辑的数据同步更新到列表
CVPR 2022&NTIRE 2022|首个用于高光谱图像重建的 Transformer
8 websites that should be known for product development to enhance work experience
What are the forms of attack and tampering on the home page of the website
The accuracy and speed are perfectly balanced, and the latest image segmentation SOTA model is released!!!
实现一个盒子在父盒子中水平垂直居中的几种“姿势”
The El table horizontal scroll bar is fixed at the bottom of the visual window
Redis deployment of cloud native kubesphere