当前位置:网站首页>Hanlp word splitter (via spark)
Hanlp word splitter (via spark)
2022-04-23 12:57:00 【Look at the data at the top of the mountain】
Here is mainly the label processing of content data
Here we use a word splitter HanLP
HanLP It is a Chinese word segmentation tool provided by Harbin Institute of Technology , Because he supports it Java API
Here we use spark + hanlp Chinese word segmentation
1、 preparation
##1. stay hdfs Create a directory to store hanlp The data of
[root@hadoop ~]# hdfs dfs -mkdir -p /common/nlp/data
##2. take hanlp Upload the tool to the specified location of the server
##3. Unzip to the current directory
[root@hadoop soft]# tar -zxvf hanlp.dictionary.tgz
##4. Upload corpus to hdfs The designated location of
[root@hadoop soft]# hdfs dfs -put ./dictionary/ /common/nlp/data
##5. Put this hanlp.properties Copy to... Under the current project resources Under the table of contents
Use Hanlp Must inherit IIOAdapter, Because we use our custom Thesaurus
package com.fuwei.bigdata.profile.nlp.hanlp
import com.hankcs.hanlp.corpus.io.{IIOAdapter, IOUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.{FileInputStream, InputStream, OutputStream}
import java.net.URI
/**
* Use Hanlp Must inherit IIOAdapter, Because we use our custom Thesaurus
* User defined corpus HDFS Last time , Configure this IIOAdapter
* usage:
* 1、 stay HDFS establish /commoon/nlp Catalog
* 2、 take hanlp.directory.tgz Upload to hdfs Under the directory of
* 3、 Configure... In the current project hanlp.properties
* 4、 In Corpus .bin If the file exists , When the dictionary is loaded, it will be loaded directly , If there are new words , It doesn't load directly ,
* If there are new words , It doesn't load directly , Need to put bin Delete , Will
*/
class HadoopFileIoAdapter extends IIOAdapter{
/**
* This is mainly the path of the file where we need word segmentation
* @param s
* @return
*/
override def open(path: String): InputStream = {
//1、 The retrieval hdfs File system objects for
val configuration = new Configuration()
val fs: FileSystem = FileSystem.get(URI.create(path), configuration)
//2、 Determine if the path exists
if (fs.exists(new Path(path))){// At this point, it indicates that there is
fs.open(new Path(path))
}else{
if (IOUtil.isResource(path)){
// Determine whether the resource path is hanlp Required resource path
IOUtil.getResourceAsStream("/"+path)
}else{
new FileInputStream(path)
}
}
}
/**
* Create a file , Used to output the processed results
* @param s
* @return
*/
override def create(path: String): OutputStream = {
val configuration = new Configuration()
val fs: FileSystem = FileSystem.get(URI.create(path),configuration)
fs.create(new Path(path))
}
}
package com.fuwei.bigdata.profile
import com.hankcs.hanlp.dictionary.stopword.CoreStopWordDictionary
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer
import com.fuwei.bigdata.profile.conf.Config
import com.fuwei.bigdata.profile.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import java.util
import scala.collection.{JavaConversions, mutable}
/**
* Label the content log
*/
object NewsContentSegment {
private val logger = LoggerFactory.getLogger(NewsContentSegment.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//1、 Analytical parameters
val params: Config = Config.parseConfig(NewsContentSegment, args)
System.setProperty("HADOOP_USER_NAME",params.proxyUser)
logger.warn("job is running please wait for a moment ......")
//2、 obtain SparkSession
val spark: SparkSession = SparkUtils.getSparkSession(params.env, NewsContentSegment.getClass.getSimpleName)
import spark.implicits._
//3、 If it's a local test , It is not necessary to display all the codes , test 10 Just row of data
var limitData = ""
if (params.env.equalsIgnoreCase("dev")){
limitData = "limit 10"
}
//4、 Read source data
val sourceArticleDataSQL =
s"""
|select
|""".stripMargin
val sourceDF: DataFrame = spark.sql(sourceArticleDataSQL)
sourceDF.show()
//5、 participle
val termsDF: DataFrame = sourceDF.mapPartitions(partition => {
//5.1 A collection of results
var resTermList: List[(String, String)] = List[(String, String)]()
//5.2 Traverse partition data
partition.foreach(row => {
//5.3 Get field information
val article_id: String = row.getAs("").toString
val context: String = row.getAs("").toString
//5.4 participle
val terms: util.List[Term] = StandardTokenizer.segment(context)
//5.5 Remove stop words
val stopTerms: util.List[Term] = CoreStopWordDictionary.apply(terms) // Remove terms Stop words in
//5.6 Convert to scala Of buffer
val stopTermsAsScalaBuffer: mutable.Buffer[Term] = JavaConversions.asScalaBuffer(stopTerms)
//5.7 Keep the noun , Remove single Chinese characters , Use commas between words
val convertTerms: String = stopTermsAsScalaBuffer.filter(term => {
term.nature.startsWith("n") && term.word.length != 1
}).map(term => term.word).mkString(",")
//5.8 Build a single result
var res = (article_id, convertTerms)
//5.9 Remove null value
if (convertTerms.length != 0) {
resTermList = res :: resTermList // Append... To the result
}
})
resTermList.iterator
}).toDF("article_id", "context_terms")
termsDF.show()
//6、 Write to hive
termsDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwd_news.news_article_terms")
//7、 Release resources
spark.close()
logger.info(" job has success.....")
}
}
spark Customize jar Package testing
${SPARK_HOME}/bin/spark-submit \
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name user_profile_terms \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--class com.fuwei.bigdata.profile.NewsContentSegment \
/data/jar/user-profile.jar \
-e prod -x root
版权声明
本文为[Look at the data at the top of the mountain]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230615024782.html
边栏推荐
- Mysql8 installation
- 31. Next arrangement
- Go language mapping operation
- 只是不断地建构平台,不断地收拢流量,并不能够做好产业互联网
- 云原生KubeSphere部署Redis
- SSM framework series - data source configuration day2-1
- 31. 下一个排列
- Go language slicing operation
- Ad20 supplementary note 3 - shortcut key + continuous update
- 解决disagrees about version of symbol device_create
猜你喜欢

Use compressorjs to compress pictures, optimize functions, and compress pictures in all formats

leetcode:437. 路径总和 III【dfs 选还是不选?】

How to click an object to play an animation

Bert base Chinese Download (SMART)

如何实现点击一下物体播放一次动画

Unlock openharmony technology day! The annual event is about to open!

31. 下一个排列

CVPR 2022&NTIRE 2022|首个用于高光谱图像重建的 Transformer

SSM framework series - annotation development day2-2

网站首页文件被攻击篡改的形式有哪些
随机推荐
BUUCTF WEB [BJDCTF2020]ZJCTF,不过如此
Resolve disagrees about version of symbol device_ create
mysql支持ip访问
31. 下一个排列
实现一个盒子在父盒子中水平垂直居中的几种“姿势”
Deploying MySQL in cloud native kubesphere
0基础可以考CPDA数据分析师证书吗
[csnote] ER diagram
Luogu p5540 [balkanoi2011] timeismoney | minimum product spanning tree problem solution
使用Source Insight查看编辑源代码
Introducing vant components on demand
Go language: passing slices between functions
BUUCTF WEB [BUUCTF 2018]Online Tool
Record the problems encountered in using v-print
精度、速度完美平衡,最新图像分割SOTA模型重磅发布!!!
What are the forms of attack and tampering on the home page of the website
Servlet监听器&过滤器介绍
[Blue Bridge Cup] April 17 provincial competition brushing training (the first three questions)
Plato farm - a game of farm metauniverse with Plato as the goal
SSM framework series - data source configuration day2-1