当前位置:网站首页>Read the data in Presto through sparksql and save it to Clickhouse
Read the data in Presto through sparksql and save it to Clickhouse
2022-04-23 12:57:00 【Look at the data at the top of the mountain】
The overall structure
Config
package com.fuwei.bigdata.profile.conf
import org.slf4j.LoggerFactory
import scopt.OptionParser
case class Config(
env:String = "",
username:String = "",
password:String = "",
url:String = "",
cluster:String = "",
startDate:String = "",
endDate:String = "",
proxyUser:String = "",
topK:Int = 25
)
object Config{
private val logger = LoggerFactory.getLogger("Config")
/**
* take args Parameter data encapsulation Config In the object
*/
def parseConfig(obj:Object,args:Array[String]):Config = {
//1、 Get the program name through our class name
val programName: String = obj.getClass.getSimpleName.replaceAll("\\$", "")
//2、 Get a parser , Parsers parse parameters
val parser = new OptionParser[Config]("spark sql "+programName) {
//2.1 Add instructions
head(programName,"v1.0") // It's equivalent to looking up
//2.2 to env Attribute assignment
// This effect is -v perhaps --v , hinder text() Is the content of the description
opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod")
opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username")
programName match {
case "LabelGenerator" => {
logger.info("LabelGenerator")
opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username")
opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password")
opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url")
opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster")
}
case _ =>
}
}
parser.parse(args,Config()) match { // The main function of this is to analyze the parameters , See if there is a value in the parameter
case Some(conf) => conf
case None => {
logger.error("can not parse args")
System.exit(-1)
null
}
}
}
}
LabelGenerator
package com.fuwei.bigdata.profile
import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
/**
* Generate the class of basic portrait label
*/
object LabelGenerator {
private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//1、 Analytical parameters
val params: Config = Config.parseConfig(LabelGenerator, args)
//2、 obtain SparkSession
val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName)
//val spark: SparkSession = SparkUtils.getSparkSession("dev", "test")
import spark.implicits._
//3、 Read home data
val df: DataFrame = spark.read.option("sep", "\t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types")
df.createOrReplaceTempView("phone_info") // Build a virtual table
//4、baseFeatrueSql
val userSql =
"""
|select
|t1.distinct_id as uid,
|t1.gender,
|t1.age,
|case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile,
|case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix,
|t2.model
|from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2
|on t1.distinct_id = t2.uid
|""".stripMargin
val userDF: DataFrame = spark.sql(userSql)
userDF.createOrReplaceTempView("user_info")
//4.2baseFeatureSql
val baseFeatureSql =
"""
|select
|t1.uid,
|t1.gender,
|t1.age,
|t1.email_suffix,
|t1.model,
|concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region
|from user_info as t1 left join phone_info as t2
|on
|t2.phone = substring(t1.mobile,0,7)
|""".stripMargin
//4.3、 Build table
spark.sql(
"""
|create table if not exists dws_news.user_profile_base(
|uid string,
|gender string,
|age string,
|email_suffix string,
|model string,
|region string
|)
|stored as parquet
|""".stripMargin)
//4.4 Query generation df
val baseFeaturedDF: DataFrame = spark.sql(baseFeatureSql)
baseFeaturedDF.cache() // Persist the queried data in memory , Turn it off after use
// Import the queried data into the data table ( Query generation df Data to HDFS)
baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base")
//5、 Save the data to clickhouse:1.meta:(filename,pl),2. Place holder
val meta = TableUtils.getClickHouseUserProfileBaseTable(baseFeaturedDF,params)
//6、 Insert ClickHouse data
//6.1 Inserted sql
val insertCHSql =
s"""
|insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATABASE}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2})
|""".stripMargin
logger.warn(insertCHSql)
//6.2 Insert data by partition into clickhouse Table of
baseFeaturedDF.foreachPartition(partition => {
TableUtils.insertBaseFeaturedTable(partition,insertCHSql,params)
})
baseFeaturedDF.unpersist()// Turn off persistence
//7、 Release resources
spark.stop()
logger.info("job has success")
}
}
ClickHouseUtils
package com.fuwei.bigdata.profile.utils
import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties
object ClickHouseUtils {
/**
* Connect clickhouse
* @param username
* @param password
* @param url
* @return
*/
def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
val properties = new ClickHouseProperties()
properties.setUser(username)
properties.setPassword(password)
val dataSource = new ClickHouseDataSource(url, properties)
dataSource
}
/**
* Convert the type and return to age String, gender String
*/
def df2TypeName2CH(dfCol: String): Unit ={
dfCol.split(",").map(line => {
val fields: Array[String] = line.split(" ")
val fName: String = fields(0)
val fType: String = dfType2chType(fields(1)) // Start type conversion
fName +" "+ fType // The final result is age String, gender String
}).mkString(",")
}
/**
* take df Of type convert to clickhouse Of type
*
* @param fieldType
* @return
*/
def dfType2chType(fieldType: String):String = {
fieldType.toLowerCase() match {
case "string" => "String"
case "integer" => "Int32"
case "long" => "Int64"
case "bigint" => "Int64"
case "double" => "Float64"
case "float" => "Float32"
case "timestamp" => "Datetime"
case _ => "String"
}
}
}
SparkUtils( This connection can be used in general in the future )
package com.fuwei.bigdata.profile.utils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object SparkUtils {
private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)
def getSparkSession(env:String,appName:String):SparkSession = {
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.hive.metastore.version", "1.2.1")
.set("spark.sql.cbo.enabled", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
env match {
case "prod" => {
conf.setAppName(appName+"_prod")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case "dev" => {
conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case _ => {
logger.error("not match env")
System.exit(-1)
null
}
}
}
}
TableUtils
package com.fuwei.bigdata.profile.utils
import com.qf.bigdata.profile.conf.Config
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.sql.{DataFrame, Row}
import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource}
import java.sql.PreparedStatement
/**
* @author:lifuwei
* @time:2022-01-07
* @params: This class is mainly used to put hive The data read in is stored in clickhouse in
*/
object TableUtils {
/**
* towards clickhouse Insert data
* @param partition
* @param insertCHSql
* @param params
*/
def insertBaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = {
//1、 Get Clickhouse Data source
val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url)
val connection: ClickHouseConnection = dataSource.getConnection
val ps: PreparedStatement = connection.prepareStatement(insertCHSql) // insert data
var batchCount = 0
val batchSize = 2000
var lastBatchTime = System.currentTimeMillis()
//2、 Fill in the parameter value corresponding to the placeholder
partition.foreach(row => {
var index = 1// Set the index subscript of the value
row.schema.fields.foreach(field => {
field.dataType match {
case StringType => ps.setString(index,row.getAs[String](field.name))
case LongType => ps.setLong(index,row.getAs[Long](field.name))
case IntegerType => ps.setInt(index,row.getAs[Int](field.name))
case _ => logger.error(s"type is err,${field.dataType}")
}
index +=1
})
//3、 Add to batch
ps.addBatch()
batchCount += 1
//4、 Control batch size
var currentTime = System.currentTimeMillis()
if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){
ps.executeBatch()// Execute a batch
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s")
batchCount = 0
lastBatchTime = currentTime
}
})
//5、 If the above conditions are not met, the control is executed immediately after the end of the loop ps Data in
ps.executeBatch()
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s")
//6、 Release resources
ps.close()
connection.close()
}
private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName)
/**
* according to dataframe Generate clickhouse In the table
* @param baseFeaturedDF : dataframe
* @param params : Data values
* @return Back to dataframe The name and placeholder of each column
*/
/*
* baseFeaturedDF Of DF Of schema
* fieldName:uid,gender,age,region,model,email_suffix
* fieldType:string,string,string,string,string,string
*
* The form of data we need to insert is
* insert user_profile_base into value(?,?,?,?,?,?)
*
* So we need to get three things in it , The first is the parameter , The second parameter type , The third is the inserted value
* */
val USER_PROFILE_CLICKHOUSE_DATABASE = "app_news" // Database created
val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" // Create a table
def getClickHouseUserProfileBaseTable(baseFeaturedDF: DataFrame, params: Config ):(String,String)= {
//schema Is to get all the metadata of the table ( Including the above three )
//foldLeft It's a folding function
/*
* baseFeaturedDF.schema : obtain df The overall structure of
* baseFeaturedDF.schema.fields : Encapsulate the overall architecture into an array
* baseFeaturedDF.schema.fields.foldLeft : Collapse this array
* ("","","") : This indicates the initial value of the input
* */
val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")(
(z,f) => {
// The data type we want to return is :(age,gender , age string, gender string, ?,?)
if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){
// It means it's not the first time to splice
(z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?")
}else{
(f.name,f.name+" "+ f.dataType.simpleString,"?")
}
}
)
/*
* 4、 take spark The expression of is converted to clickhouse The expression of
* stay spark Medium string, But in clickhouse Medium is String
* The final result is age String,gender String ......
* */
val chCol = ClickHouseUtils.df2TypeName2CH(fieldType)
//5、 Get to connect to ch Of cluster
val cluster:String = params.cluster
//6、 Create database
val createCHDataBaseSql =
s"""
|create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATABASE}
|""".stripMargin
//7、 Create table
/*
* ENGINE = MergeTree(): stay clickhouse You need to use an engine engine , Here we use the merge tree engine MergeTree()
* */
val createCHTableSql =
s"""
|create table ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol})
|ENGINE = MergeTree()
|ORDER BY(uid)
|""".stripMargin
//8、 Delete the SQL
val dropCHTableSql =
s"""
|drop table if exists ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}
|""".stripMargin
//9、 Connect clickhouse
val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url)
val connection: ClickHouseConnection = dataSource.getConnection
logger.warn(createCHDataBaseSql)
var ps: PreparedStatement = connection.prepareStatement(createCHDataBaseSql)// Building database
ps.execute()
logger.warn(dropCHTableSql)
ps = connection.prepareStatement(dropCHTableSql) // Delete table
ps.execute()
logger.warn(createCHTableSql)
ps = connection.prepareStatement(createCHTableSql)// Build table
ps.execute()
ps.close()
connection.close()
logger.info("success!!!!!!!!!")
(fileName,pl)
}
}
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fuwei.bigdata</groupId>
<artifactId>user-profile</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.12</scala.version>
<play-json.version>2.3.9</play-json.version>
<maven-scala-plugin.version>2.10.1</maven-scala-plugin.version>
<scala-maven-plugin.version>3.2.0</scala-maven-plugin.version>
<maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
<spark.version>2.4.5</spark.version>
<scope.type>compile</scope.type>
<json.version>1.2.3</json.version>
<!--compile provided-->
</properties>
<dependencies>
<!--json package -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${json.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.11</artifactId>
<version>4.0.0-RC2</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-bundle_2.11</artifactId>
<version>0.5.2-incubating</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
<scope>${scope.type}</scope>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>alimaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${scala-maven-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-archetype-plugin</artifactId>
<version>2.2</version>
</plugin>
</plugins>
</build>
</project>
test
##1. take core-site.xml\yarn-site.xml\hive-site.xml Copy to project resources Under the table of contents
##2. clean and package
##3. hive metastore The service must be on
##4. yarn/hdfs You have to drive
##5. clickhouse/chproxy Also open
##6. Preparation and submission jar Bag spark Script
${SPARK_HOME}/bin/spark-submit \
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name log2hudi \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--class com.qf.bigdata.profile.LabelGenerator \
/data/jar/user-profile.jar \
-e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1
##7. adopt clickhouse-client To test
clickhouse-client --host 10.206.0.4 --port 9999 --password qwert
版权声明
本文为[Look at the data at the top of the mountain]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230615025003.html
边栏推荐
- World Book Day: I'd like to recommend these books
- 云原生KubeSphere部署Mysql
- 软件测试周刊(第68期):解决棘手问题的最上乘方法是:静观其变,顺水推舟。
- CVPR 2022&NTIRE 2022|首个用于高光谱图像重建的 Transformer
- BUUCTF WEB [BJDCTF2020]The mystery of ip
- NPDP | how can product managers not be excluded by programmers?
- Web17 -- use of El and JSTL
- [vulnhub range] - DC2
- Byte jump 2020 autumn recruitment programming question: quickly find your own ranking according to the job number
- If you were a golang interviewer, what questions would you ask?
猜你喜欢
随机推荐
Baserecyclerviewadapterhelper realizes pull-down refresh and pull-up loading
[daily question] chessboard question
BUUCTF WEB [BJDCTF2020]The mystery of ip
Byte jump 2020 autumn recruitment programming question: quickly find your own ranking according to the job number
STM32 is connected to the motor drive, the DuPont line supplies power, and then the back burning problem
31. 下一个排列
【蓝桥杯】4月17日省赛刷题训练(前3道题)
Record some NPM related problems (messy records)
STM32 project transplantation: transplantation between chip projects of different models: Ze to C8
31. Next arrangement
Luogu p3236 [hnoi2014] picture frame solution
SSM框架系列——注解开发day2-2
SSM framework series - annotation development day2-2
Kubernets Getting started tutoriel
航芯技术分享 | ACM32 MCU安全特性概述
BaseRecyclerViewAdapterHelper 实现下拉刷新和上拉加载
unity常见的问题(一)
How to prevent the website from being hacked and tampered with
Calculate the past date and days online, and calculate the number of live days
box-sizing