ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Hanlp分词器(通过spark)

2022-01-10 16:59:04  阅读:192  来源: 互联网

标签:String val -- hanlp 分词器 import spark Hanlp


这里主要是对内容数据进行标签处理

这里我们是用分词器是HanLP

HanLP是哈工大提供的一种中文分词的工具,因为他支持Java API

这里我们使用spark + hanlp进行中文分词

1、准备工作

##1. 在hdfs创建目录用于存放hanlp的数据
[root@hadoop ~]# hdfs dfs -mkdir -p /common/nlp/data

##2. 将hanlp的工具上传到服务器的指定位置
##3. 解压到当前目录
[root@hadoop soft]# tar -zxvf hanlp.dictionary.tgz

##4. 将语料库上传到hdfs的指定位置
[root@hadoop soft]# hdfs dfs -put ./dictionary/ /common/nlp/data

##5. 将这个hanlp.properties拷贝到当前工程下的resources目录下
使用Hanlp必须继承IIOAdapter,因为是使用我们自定义的分词库
package com.fuwei.bigdata.profile.nlp.hanlp

import com.hankcs.hanlp.corpus.io.{IIOAdapter, IOUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import java.io.{FileInputStream, InputStream, OutputStream}
import java.net.URI


/**
 * 使用Hanlp必须继承IIOAdapter,因为是使用我们自定义的分词库
 * 当用户自定义语料库在HDFS上的时候,配置此IIOAdapter
 * usage:
 * 1、在HDFS创建/commoon/nlp目录
 * 2、将hanlp.directory.tgz上传到hdfs的目录下
 * 3、在当前工程中配置hanlp.properties
 * 4、在语料库.bin的文件如果存在,加载词典的时候就会直接加载,如果有新词的时候,不会直接加载,
 * 如果有新词的时候,不会直接加载,需要将bin删除,才会
 */
class HadoopFileIoAdapter extends IIOAdapter{

    /**
     * 这个主要是我们需要分词的文件的路径
     * @param s
     * @return
     */
    override def open(path: String): InputStream = {
        //1、获取操作hdfs的文件系统对象
        val configuration = new Configuration()
        val fs: FileSystem = FileSystem.get(URI.create(path), configuration)
        //2、判断路径是否存在
        if (fs.exists(new Path(path))){//此时说明存在
            fs.open(new Path(path))
        }else{
            if (IOUtil.isResource(path)){
                //判断这个资源路径是否为hanlp需要的资源路径
                IOUtil.getResourceAsStream("/"+path)
            }else{
                new FileInputStream(path)
            }
        }
    }

    /**
     * 创建一个文件,用于输出处理后的结果
     * @param s
     * @return
     */
    override def create(path: String): OutputStream = {
        val configuration = new Configuration()
        val fs: FileSystem = FileSystem.get(URI.create(path),configuration)
        fs.create(new Path(path))
    }

}

package com.fuwei.bigdata.profile

import com.hankcs.hanlp.dictionary.stopword.CoreStopWordDictionary
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer
import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory

import java.util
import scala.collection.{JavaConversions, mutable}





/**
 * 对内容日志进行标签处理
 */
object NewsContentSegment {
    private val logger = LoggerFactory.getLogger(NewsContentSegment.getClass.getSimpleName)

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.WARN)
        //1、解析参数
        val params: Config = Config.parseConfig(NewsContentSegment, args)
        System.setProperty("HADOOP_USER_NAME",params.proxyUser)
        logger.warn("job is running please wait for a moment ......")

        //2、获取SparkSession
        val spark: SparkSession = SparkUtils.getSparkSession(params.env, NewsContentSegment.getClass.getSimpleName)
        import spark.implicits._

        //3、如果是做本地测试,没有必要显示所有代码,测试10行数据即可
        var limitData = ""
        if (params.env.equalsIgnoreCase("dev")){
            limitData = "limit 10"
        }

        //4、读取源数据
        val sourceArticleDataSQL =
            s"""
               |select
               |""".stripMargin

        val sourceDF: DataFrame = spark.sql(sourceArticleDataSQL)
        sourceDF.show()

        //5、分词
        val termsDF: DataFrame = sourceDF.mapPartitions(partition => {
            //5.1存放结果的集合
            var resTermList: List[(String, String)] = List[(String, String)]()

            //5.2遍历分区数据
            partition.foreach(row => {
                //5.3获取到字段信息
                val article_id: String = row.getAs("").toString
                val context: String = row.getAs("").toString

                //5.4分词
                val terms: util.List[Term] = StandardTokenizer.segment(context)
                //5.5去除停用词
                val stopTerms: util.List[Term] = CoreStopWordDictionary.apply(terms) //去除terms中的停用词

                //5.6转换为scala的buffer
                val stopTermsAsScalaBuffer: mutable.Buffer[Term] = JavaConversions.asScalaBuffer(stopTerms)

                //5.7保留名词,去除单个汉字,单词之间使用逗号隔开
                val convertTerms: String = stopTermsAsScalaBuffer.filter(term => {
                    term.nature.startsWith("n") && term.word.length != 1
                }).map(term => term.word).mkString(",")

                //5.8构建单个结果
                var res = (article_id, convertTerms)

                //5.9去除空值
                if (convertTerms.length != 0) {
                    resTermList = res :: resTermList //向结果中追加
                }
            })
            resTermList.iterator
        }).toDF("article_id", "context_terms")

        termsDF.show()

        //6、写入到hive
        termsDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwd_news.news_article_terms")

        //7、释放资源
        spark.close()
        logger.info(" job has success.....")
    }

}

spark自定义jar包测试

${SPARK_HOME}/bin/spark-submit \
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name user_profile_terms \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--class com.fuwei.bigdata.profile.NewsContentSegment \
/data/jar/user-profile.jar \
-e prod -x root

标签:String,val,--,hanlp,分词器,import,spark,Hanlp
来源: https://blog.csdn.net/li1579026891/article/details/122414432

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有