ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

大数据Spark结合图数据库Neo4j设计架构

2019-02-24 15:47:56  阅读:652  来源: 互联网

标签:p2 架构 uuid val get msisdn record Neo4j Spark


  1. Introduce

   大数据分布式技术结合图库Neo4J项目,由于Neo4j采用单节点,性能存在以下问题:

  1. . 插入速率随着图库数据增加而减少,成反比相关。
  2. . 对前端页面查询点边关系,测试一条数据耗时10s以上。

 

    所以重新设计架构,采用分布式中间件来取代单节点式Neo4j部分功能。经测试,几套架构尚可满足Spark离线处理和实时计算需求。

 

  1. Coding Introduce
def getDriver(): Driver = {
    val url = Contants.NEO4j_URL
    val user = Contants.NEO4J_USER
    val password = Contants.NEO4j_PWD
    val driver = GraphDatabase.driver(url, AuthTokens.basic(user, password), Config.build()
      .withMaxIdleSessions(1000)
      .withConnectionLivenessCheckTimeout(10,TimeUnit.SECONDS)
      .toConfig)
    return driver
  }


  def getSession(driver: Driver): Session = {
    val session = driver.session()
    return session
  }

 

 

def relationShip(session: Session, msisdn: String, touser: String,capdate:String): String = {
      //查询个人和个人之间的文件关系
      val result = session.run("match (p1:person)-[r:fileTofile]-(p2:person) where p1.msisdn={msisdn} and p2.touser={touser} return r.uuid as uuid,p1.wxid as wxid1,p1.name as name1,p2.msisdn as msisdn2,p2.wxid as wxid2,p2.name as name2",
        parameters("msisdn", msisdn, "touser", touser))
    if (result.hasNext) {
      val record = result.next()
      val uuid = record.get("uuid").asString()
      val wxid1 = record.get("wxid1").asString()
      val name1 = record.get("name1").asString()
      val msisdn2 = record.get("msisdn2").asString()
      val wxid2 = record.get("wxid2").asString()
      val name2 = record.get("name2").asString()
      return uuid + "|" + wxid1 + "|" + name1 + "|" + msisdn2 + "|" + wxid2 + "|" + name2
    } else {
      val uuid = UUID.randomUUID().toString.replaceAll("-", "")
      val rel = session.run("match (p1:person),(p2:person) where p1.msisdn={msisdn} and p2.touser={touser} merge (p1)-[r:fileTofile]-(p2) on create set r.uuid={uuid},r.capdate={capdate} return p1.wxid as wxid1,p1.name as name1,p2.msisdn as msisdn2,p2.wxid as wxid2,p2.name as name2;",
        parameters("msisdn", msisdn, "touser", touser, "uuid", uuid,"capdate",capdate))
      if (rel.hasNext) {
        val record = rel.next()
        val wxid1 = record.get("wxid1").asString()
        val name1 = record.get("name1").asString()
        val msisdn2 = record.get("msisdn2").asString()
        val wxid2 = record.get("wxid2")
        val name2 = record.get("name2")
        return uuid + "|" + wxid1 + "|" + name1 + "|" + msisdn2 + "|" + wxid2 + "|" + name2
      }
    }

  1. 传入msisdn,touser查询该关系是否存在。

   若存在,则返回关系+节点属性

如果不存在,则新建关系,且关系属性上使用唯一UUID作为标识。同时返回关系+属性参数。

 

要注意遍历RDD时一定要在每次遍历查询之后关闭Neo4j的Driver,防止内存溢出。

try {
      resultMappRdd.foreachRDD(rdd => {
        rdd.saveToEs("wechat_neo4j/file")

      })
    } catch {
      case e: InterruptedException =>
        Thread.currentThread().interrupt()
    } finally {
      try {
        if (ssc != null) {
          ssc.start()
          ssc.awaitTermination()
        }
      } catch {
        case e:Exception =>
          e.printStackTrace()
      }

之后把接口方法体中返回的节点+关系数据遍历插入ES中。

注意在实时计算采用这种方法时,有时会在流量暴增情况下出现:上个队列批次尚未处理完成,下个批次队列就进入线程中。会出现OOM问题。所以可以使用Thread.currentThred.interrupt方法让OOM之后数据重新开始。

同时为了解决实时计算流量暴增情况:

  1. 可以使用Redis在一个批次最后做一次性查询或者建立关系。这样就要对节点属性和关系    属性做redis同步。
  2.  使用Kafka的反压策略,限制Kafka消费者读取速率等。

标签:p2,架构,uuid,val,get,msisdn,record,Neo4j,Spark
来源: https://blog.csdn.net/BlackArmand/article/details/87903973

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有