ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

如何从Cassandra增加数据流读取并行性

2019-07-01 14:49:28  阅读:251  来源: 互联网

标签:java cassandra google-bigquery google-cloud-platform google-cloud-dataflow


我试图将大量数据(2 TB,30kkk行)从Cassandra导出到BigQuery.我的所有基础设施都在GCP上.我的Cassandra集群有4个节点(4个vCPU,26 GB内存,每个2000 GB PD(HDD)).集群中有一个种子节点.我需要在写入BQ之前转换我的数据,所以我使用的是Dataflow.工人类型是n1-highmem-2.工人和Cassandra实例位于Europe-west1-c的同一区域.我对Cassandra的限制:

Cassandra limits settings

我负责读取转换的部分管道代码位于here.

自动缩放

问题是,当我没有设置–numWorkers时,自动调整设置的工人数量(平均2个工人):

Worker number with autoscaling

负载均衡

当我设置–numWorkers = 15时,读取速率不会增加,只有2名工作人员与Cassandra通信(我可以从iftop告诉它,只有这些工作者有CPU负载~60%).

同时Cassandra节点没有很多负载(CPU使用率为20-30%).种子节点的网络和磁盘使用率比其他节点大约高2倍,但不会太高,我认为:

Network usage of the seed

Disk usage of the seed node

对于非种子节点:

Network usage of the not seed

Disk usage of the not seed node

管道发射警告

管道启动时我有一些警告:

WARNING: Size estimation of the source failed: 
org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.132.9.101:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.101:9042] Cannot connect), /10.132.9.102:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.102:9042] Cannot connect), /10.132.9.103:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.103:9042] Cannot connect), /10.132.9.104:9042 [only showing errors of first 3 hosts, use getErrors() for more details])

我的Cassandra集群位于GCE本地网络中,它接收到一些查询是从我的本地计算机生成的,无法访问集群(我正在使用Dataflow Eclipse插件启动管道,如here所述).这些查询是关于表的大小估计.我可以手动指定尺寸估算或从GCE实例启动pipline吗?或者我可以忽略这些警告吗?它对阅读率有影响吗?

我试图从GCE VM启动管道.连接没有问题.我的表中没有varchar列,但是我收到了这样的警告(datastax驱动程序中没有编解码器[varchar< - > java.lang.Long]). :

WARNING: Can't estimate the size
com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> java.lang.Long]
        at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:741)
        at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:588)
        at com.datastax.driver.core.CodecRegistry.access$500(CodecRegistry.java:137)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:246)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:232)
        at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        at com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
        at com.datastax.driver.core.CodecRegistry.lookupCodec(CodecRegistry.java:522)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:485)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:467)
        at com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:69)
        at com.datastax.driver.core.AbstractGettableByIndexData.getLong(AbstractGettableByIndexData.java:152)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:26)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:95)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getTokenRanges(CassandraServiceImpl.java:279)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getEstimatedSizeBytes(CassandraServiceImpl.java:135)
        at org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource.getEstimatedSizeBytes(CassandraIO.java:308)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.startDynamicSplitThread(BoundedReadEvaluatorFactory.java:166)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:142)
        at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
        at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

管道读取代码

// Read data from Cassandra table
PCollection<Model> pcollection = p.apply(CassandraIO.<Model>read()
        .withHosts(Arrays.asList("10.10.10.101", "10.10.10.102", "10.10.10.103", "10.10.10.104")).withPort(9042)
        .withKeyspace(keyspaceName).withTable(tableName)
        .withEntity(Model.class).withCoder(SerializableCoder.of(Model.class))
        .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));

// Transform pcollection to KV PCollection by rowName
PCollection<KV<Long, Model>> pcollection_by_rowName = pcollection
        .apply(ParDo.of(new DoFn<Model, KV<Long, Model>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                c.output(KV.of(c.element().rowName, c.element()));
            }
        }));

拆分数(Stackdriver日志)

W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 

我试过了什么

没有效果:

>将读取一致性级别设置为ONE
> nodetool setstreamthroughput 1000,nodetool setinterdcstreamthroughput 1000
>增加Cassandra读取并发性(在cassandra.yaml中):concurrent_reads:32
>设置不同数量的工人1-40.

一些影响:
 我将@Spkff建议设为numSplits = 10.现在我可以在日志中看到:

I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
W  Can't estimate the size 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@6d83ee93 produced 10 bundles with total serialized response size 20799 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@25d02f5c produced 10 bundles with total serialized response size 19359 
I  Splitting source [0, 1) produced 1 bundles with total serialized response size 1091 
I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
I  Splitting source [0, 0) produced 0 bundles with total serialized response size 76 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@2661dcf3 produced 10 bundles with total serialized response size 18527 

但我有另一个例外:

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.Cassandra...
(5d6339652002918d): java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@5f18c296
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:582)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:347)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:183)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:43)
    at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl$CassandraReaderImpl.start(CassandraServiceImpl.java:80)
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:579)
    ... 14 more
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:144)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186)
    at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:50)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:817)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:651)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1077)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1000)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more

也许有一个错误:CassandraServiceImpl.java#L220

这句话看起来像是错误的:CassandraServiceImpl.java#L207

我对CassandraIO代码所做的更改

正如@jkff所提议的那样,我以我需要的方式改变了CassandraIO:

@VisibleForTesting
protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
                                              long desiredBundleSizeBytes,
                                              long estimatedSizeBytes) {
  long numSplits = 1;
  List<BoundedSource<T>> sourceList = new ArrayList<>();
  if (desiredBundleSizeBytes > 0) {
    numSplits = estimatedSizeBytes / desiredBundleSizeBytes;
  }
  if (numSplits <= 0) {
    LOG.warn("Number of splits is less than 0 ({}), fallback to 10", numSplits);
    numSplits = 10;
  }

  LOG.info("Number of splits is {}", numSplits);

  Long startRange = MIN_TOKEN;
  Long endRange = MAX_TOKEN;
  Long startToken, endToken;

  String pk = "$pk";
  switch (spec.table()) {
  case "table1":
          pk = "table1_pk";
          break;
  case "table2":
  case "table3":
          pk = "table23_pk";
          break;
  }

  endToken = startRange;
  Long incrementValue = endRange / numSplits - startRange / numSplits;
  String splitQuery;
  if (numSplits == 1) {
    // we have an unique split
    splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
    sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
  } else {
    // we have more than one split
    for (int i = 0; i < numSplits; i++) {
      startToken = endToken;
      endToken = startToken + incrementValue;
      Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
      if (i > 0) {
        builder = builder.and(QueryBuilder.gte("token(" + pk + ")", startToken));
      }
      if (i < (numSplits - 1)) {
        builder = builder.and(QueryBuilder.lt("token(" + pk + ")", endToken));
      }
      sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString()));
    }
  }
  return sourceList;
}

解决方法:

我认为这应该被归类为CassandraIO中的一个错误.我提交了BEAM-3424.您可以尝试构建自己的Beam版本,默认值1更改为100或类似的东西,而此问题正在修复.

我还在尺寸估算期间提交了BEAM-3425的bug.

标签:java,cassandra,google-bigquery,google-cloud-platform,google-cloud-dataflow
来源: https://codeday.me/bug/20190701/1348021.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有