当前位置:网站首页>Read the data in Presto through sparksql and save it to Clickhouse
Read the data in Presto through sparksql and save it to Clickhouse
2022-04-23 12:57:00 【Look at the data at the top of the mountain】
The overall structure

Config
package com.fuwei.bigdata.profile.conf
import org.slf4j.LoggerFactory
import scopt.OptionParser
case class Config(
env:String = "",
username:String = "",
password:String = "",
url:String = "",
cluster:String = "",
startDate:String = "",
endDate:String = "",
proxyUser:String = "",
topK:Int = 25
)
object Config{
private val logger = LoggerFactory.getLogger("Config")
/**
* take args Parameter data encapsulation Config In the object
*/
def parseConfig(obj:Object,args:Array[String]):Config = {
//1、 Get the program name through our class name
val programName: String = obj.getClass.getSimpleName.replaceAll("\\$", "")
//2、 Get a parser , Parsers parse parameters
val parser = new OptionParser[Config]("spark sql "+programName) {
//2.1 Add instructions
head(programName,"v1.0") // It's equivalent to looking up
//2.2 to env Attribute assignment
// This effect is -v perhaps --v , hinder text() Is the content of the description
opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod")
opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username")
programName match {
case "LabelGenerator" => {
logger.info("LabelGenerator")
opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username")
opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password")
opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url")
opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster")
}
case _ =>
}
}
parser.parse(args,Config()) match { // The main function of this is to analyze the parameters , See if there is a value in the parameter
case Some(conf) => conf
case None => {
logger.error("can not parse args")
System.exit(-1)
null
}
}
}
}
LabelGenerator
package com.fuwei.bigdata.profile
import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
/**
* Generate the class of basic portrait label
*/
object LabelGenerator {
private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//1、 Analytical parameters
val params: Config = Config.parseConfig(LabelGenerator, args)
//2、 obtain SparkSession
val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName)
//val spark: SparkSession = SparkUtils.getSparkSession("dev", "test")
import spark.implicits._
//3、 Read home data
val df: DataFrame = spark.read.option("sep", "\t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types")
df.createOrReplaceTempView("phone_info") // Build a virtual table
//4、baseFeatrueSql
val userSql =
"""
|select
|t1.distinct_id as uid,
|t1.gender,
|t1.age,
|case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile,
|case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix,
|t2.model
|from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2
|on t1.distinct_id = t2.uid
|""".stripMargin
val userDF: DataFrame = spark.sql(userSql)
userDF.createOrReplaceTempView("user_info")
//4.2baseFeatureSql
val baseFeatureSql =
"""
|select
|t1.uid,
|t1.gender,
|t1.age,
|t1.email_suffix,
|t1.model,
|concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region
|from user_info as t1 left join phone_info as t2
|on
|t2.phone = substring(t1.mobile,0,7)
|""".stripMargin
//4.3、 Build table
spark.sql(
"""
|create table if not exists dws_news.user_profile_base(
|uid string,
|gender string,
|age string,
|email_suffix string,
|model string,
|region string
|)
|stored as parquet
|""".stripMargin)
//4.4 Query generation df
val baseFeaturedDF: DataFrame = spark.sql(baseFeatureSql)
baseFeaturedDF.cache() // Persist the queried data in memory , Turn it off after use
// Import the queried data into the data table ( Query generation df Data to HDFS)
baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base")
//5、 Save the data to clickhouse:1.meta:(filename,pl),2. Place holder
val meta = TableUtils.getClickHouseUserProfileBaseTable(baseFeaturedDF,params)
//6、 Insert ClickHouse data
//6.1 Inserted sql
val insertCHSql =
s"""
|insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATABASE}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2})
|""".stripMargin
logger.warn(insertCHSql)
//6.2 Insert data by partition into clickhouse Table of
baseFeaturedDF.foreachPartition(partition => {
TableUtils.insertBaseFeaturedTable(partition,insertCHSql,params)
})
baseFeaturedDF.unpersist()// Turn off persistence
//7、 Release resources
spark.stop()
logger.info("job has success")
}
}
ClickHouseUtils
package com.fuwei.bigdata.profile.utils
import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties
object ClickHouseUtils {
/**
* Connect clickhouse
* @param username
* @param password
* @param url
* @return
*/
def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
val properties = new ClickHouseProperties()
properties.setUser(username)
properties.setPassword(password)
val dataSource = new ClickHouseDataSource(url, properties)
dataSource
}
/**
* Convert the type and return to age String, gender String
*/
def df2TypeName2CH(dfCol: String): Unit ={
dfCol.split(",").map(line => {
val fields: Array[String] = line.split(" ")
val fName: String = fields(0)
val fType: String = dfType2chType(fields(1)) // Start type conversion
fName +" "+ fType // The final result is age String, gender String
}).mkString(",")
}
/**
* take df Of type convert to clickhouse Of type
*
* @param fieldType
* @return
*/
def dfType2chType(fieldType: String):String = {
fieldType.toLowerCase() match {
case "string" => "String"
case "integer" => "Int32"
case "long" => "Int64"
case "bigint" => "Int64"
case "double" => "Float64"
case "float" => "Float32"
case "timestamp" => "Datetime"
case _ => "String"
}
}
}
SparkUtils( This connection can be used in general in the future )
package com.fuwei.bigdata.profile.utils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object SparkUtils {
private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)
def getSparkSession(env:String,appName:String):SparkSession = {
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.hive.metastore.version", "1.2.1")
.set("spark.sql.cbo.enabled", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
env match {
case "prod" => {
conf.setAppName(appName+"_prod")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case "dev" => {
conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case _ => {
logger.error("not match env")
System.exit(-1)
null
}
}
}
}
TableUtils
package com.fuwei.bigdata.profile.utils
import com.qf.bigdata.profile.conf.Config
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.sql.{DataFrame, Row}
import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource}
import java.sql.PreparedStatement
/**
* @author:lifuwei
* @time:2022-01-07
* @params: This class is mainly used to put hive The data read in is stored in clickhouse in
*/
object TableUtils {
/**
* towards clickhouse Insert data
* @param partition
* @param insertCHSql
* @param params
*/
def insertBaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = {
//1、 Get Clickhouse Data source
val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url)
val connection: ClickHouseConnection = dataSource.getConnection
val ps: PreparedStatement = connection.prepareStatement(insertCHSql) // insert data
var batchCount = 0
val batchSize = 2000
var lastBatchTime = System.currentTimeMillis()
//2、 Fill in the parameter value corresponding to the placeholder
partition.foreach(row => {
var index = 1// Set the index subscript of the value
row.schema.fields.foreach(field => {
field.dataType match {
case StringType => ps.setString(index,row.getAs[String](field.name))
case LongType => ps.setLong(index,row.getAs[Long](field.name))
case IntegerType => ps.setInt(index,row.getAs[Int](field.name))
case _ => logger.error(s"type is err,${field.dataType}")
}
index +=1
})
//3、 Add to batch
ps.addBatch()
batchCount += 1
//4、 Control batch size
var currentTime = System.currentTimeMillis()
if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){
ps.executeBatch()// Execute a batch
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s")
batchCount = 0
lastBatchTime = currentTime
}
})
//5、 If the above conditions are not met, the control is executed immediately after the end of the loop ps Data in
ps.executeBatch()
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s")
//6、 Release resources
ps.close()
connection.close()
}
private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName)
/**
* according to dataframe Generate clickhouse In the table
* @param baseFeaturedDF : dataframe
* @param params : Data values
* @return Back to dataframe The name and placeholder of each column
*/
/*
* baseFeaturedDF Of DF Of schema
* fieldName:uid,gender,age,region,model,email_suffix
* fieldType:string,string,string,string,string,string
*
* The form of data we need to insert is
* insert user_profile_base into value(?,?,?,?,?,?)
*
* So we need to get three things in it , The first is the parameter , The second parameter type , The third is the inserted value
* */
val USER_PROFILE_CLICKHOUSE_DATABASE = "app_news" // Database created
val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" // Create a table
def getClickHouseUserProfileBaseTable(baseFeaturedDF: DataFrame, params: Config ):(String,String)= {
//schema Is to get all the metadata of the table ( Including the above three )
//foldLeft It's a folding function
/*
* baseFeaturedDF.schema : obtain df The overall structure of
* baseFeaturedDF.schema.fields : Encapsulate the overall architecture into an array
* baseFeaturedDF.schema.fields.foldLeft : Collapse this array
* ("","","") : This indicates the initial value of the input
* */
val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")(
(z,f) => {
// The data type we want to return is :(age,gender , age string, gender string, ?,?)
if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){
// It means it's not the first time to splice
(z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?")
}else{
(f.name,f.name+" "+ f.dataType.simpleString,"?")
}
}
)
/*
* 4、 take spark The expression of is converted to clickhouse The expression of
* stay spark Medium string, But in clickhouse Medium is String
* The final result is age String,gender String ......
* */
val chCol = ClickHouseUtils.df2TypeName2CH(fieldType)
//5、 Get to connect to ch Of cluster
val cluster:String = params.cluster
//6、 Create database
val createCHDataBaseSql =
s"""
|create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATABASE}
|""".stripMargin
//7、 Create table
/*
* ENGINE = MergeTree(): stay clickhouse You need to use an engine engine , Here we use the merge tree engine MergeTree()
* */
val createCHTableSql =
s"""
|create table ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol})
|ENGINE = MergeTree()
|ORDER BY(uid)
|""".stripMargin
//8、 Delete the SQL
val dropCHTableSql =
s"""
|drop table if exists ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}
|""".stripMargin
//9、 Connect clickhouse
val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url)
val connection: ClickHouseConnection = dataSource.getConnection
logger.warn(createCHDataBaseSql)
var ps: PreparedStatement = connection.prepareStatement(createCHDataBaseSql)// Building database
ps.execute()
logger.warn(dropCHTableSql)
ps = connection.prepareStatement(dropCHTableSql) // Delete table
ps.execute()
logger.warn(createCHTableSql)
ps = connection.prepareStatement(createCHTableSql)// Build table
ps.execute()
ps.close()
connection.close()
logger.info("success!!!!!!!!!")
(fileName,pl)
}
}
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fuwei.bigdata</groupId>
<artifactId>user-profile</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.12</scala.version>
<play-json.version>2.3.9</play-json.version>
<maven-scala-plugin.version>2.10.1</maven-scala-plugin.version>
<scala-maven-plugin.version>3.2.0</scala-maven-plugin.version>
<maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
<spark.version>2.4.5</spark.version>
<scope.type>compile</scope.type>
<json.version>1.2.3</json.version>
<!--compile provided-->
</properties>
<dependencies>
<!--json package -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${json.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.11</artifactId>
<version>4.0.0-RC2</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-bundle_2.11</artifactId>
<version>0.5.2-incubating</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
<scope>${scope.type}</scope>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>alimaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${scala-maven-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-archetype-plugin</artifactId>
<version>2.2</version>
</plugin>
</plugins>
</build>
</project>
test
##1. take core-site.xml\yarn-site.xml\hive-site.xml Copy to project resources Under the table of contents
##2. clean and package
##3. hive metastore The service must be on
##4. yarn/hdfs You have to drive
##5. clickhouse/chproxy Also open
##6. Preparation and submission jar Bag spark Script
${SPARK_HOME}/bin/spark-submit \
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name log2hudi \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--class com.qf.bigdata.profile.LabelGenerator \
/data/jar/user-profile.jar \
-e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1
##7. adopt clickhouse-client To test
clickhouse-client --host 10.206.0.4 --port 9999 --password qwert
版权声明
本文为[Look at the data at the top of the mountain]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230615025003.html
边栏推荐
- leetcode:437. 路径总和 III【dfs 选还是不选?】
- 22. Bracket generation
- SSM framework series - JUnit unit test optimization day2-3
- Teach you to quickly develop a werewolf killing wechat applet (with source code)
- NPDP|产品经理如何做到不会被程序员排斥?
- BUUCTF WEB [GXYCTF2019]禁止套娃
- 风尚云网学习-input属性总结
- Kubernets Getting started tutoriel
- Customize classloader and implement hot deployment - use loadclass
- SSM框架系列——Junit单元测试优化day2-3
猜你喜欢

Idea的src子文件下无法创建servlet

STM32 project transplantation: transplantation between chip projects of different models: Ze to C8

ZigBee CC2530 minimum system and register configuration (1)

How to click an object to play an animation

There is no need to crack the markdown editing tool typora

Use source insight to view and edit source code

mysql中 innoDB执行过程分析

大家帮我看一下这是啥情况,MySQL5.5的。谢了

CVPR 2022&NTIRE 2022|首个用于高光谱图像重建的 Transformer

Van uploader upload picture implementation process, using native input to upload pictures
随机推荐
31. Next arrangement
Unlock openharmony technology day! The annual event is about to open!
BUUCTF WEB [BJDCTF2020]ZJCTF,不过如此
Try the server for one month for free, and attach the tutorial
SSM framework series - data source configuration day2-1
Analysis of InnoDB execution process in MySQL
SSM framework series - annotation development day2-2
[Blue Bridge Cup] April 17 provincial competition brushing training (the first three questions)
C#,二维贝塞尔拟合曲线(Bézier Curve)参数点的计算代码
Start mqbroker CMD failure resolution
Packet capturing and sorting -- TCP protocol [8]
产品开发都应该知道的8个网站,增强工作体验
Process virtual address space partition
21 days learning mongodb notes
[csnote] ER diagram
22. Bracket generation
What are the forms of attack and tampering on the home page of the website
22. 括号生成
Date time type in database
Go language slicing operation