ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

基于jprofiler 的一个简单dremio 查询处理学习

2022-07-10 20:00:28  阅读:149  来源: 互联网

标签:dremio java jprofiler 查询处理 sabot fragment run null


一个dremio 查询简单调用链的说明

参考命令

  • arthas watch
watch com.dremio.sabot.exec.fragment.FragmentExecutor$AsyncTaskImpl run '{params, target, returnObj, throwExp}' -x 2
  • jprofiler
    可以直接附加就行了

参考调用图

 

 


 

 

代码处理

  • run 处理
 
  private void run(){
    assert taskState != State.DONE : "Attempted to run a task with state of DONE.";
    assert eventProvider != null : "Attempted to run without an eventProvider";
 
    if (!activateResource.isActivated()) {
      // All tasks are expected to begin in a runnable state. So, switch to the BLOCKED state on the
      // first call.
      taskState = State.BLOCKED_ON_SHARED_RESOURCE;
      return;
    }
    stats.runStarted();
 
    // update thread name.
    final Thread currentThread = Thread.currentThread();
    final String originalName = currentThread.getName();
    currentThread.setName(originalName + " - " + name);
 
    try {
 
      // if we're already done, we're finishing clean up. No core method
      // execution is necessary, simply exit this block so we finishRun() below.
      // We do this because a failure state will put us in a situation.
      if(state == FragmentState.CANCELLED || state == FragmentState.FAILED || state == FragmentState.FINISHED) {
        return;
      }
 
      // if there are any deferred exceptions, exit.
      if(deferredException.hasException()) {
        transitionToFailed(null);
        return;
      }
 
      // if cancellation is requested, that is always the top priority.
      if (cancelled.isDone()) {
        Optional<Throwable> failedReason = eventProvider.getFailedReason();
        if (failedReason.isPresent() || foremanDead) {
          // check if it was failed due to an external reason (eg. by heap monitor).
          // foremanDead is true, foremanDeadException must be non null.
          assert(!foremanDead || (foremanDeadException != null));
          transitionToFailed(failedReason.isPresent() ? failedReason.get() : foremanDeadException);
          return;
        }
 
        transitionToCancelled();
        taskState = State.DONE;
        return;
      }
 
      // setup the execution if it isn't setup.
      if(!isSetup){
        stats.setupStarted();
        try {
          if (memoryArbiter != null) {
            memoryArbiter.acquireMemoryGrant(this, getMemoryToAcquire());
          }
          setupExecution();
        } finally {
          stats.setupEnded();
        }
        // exit since we just did setup which could be a non-trivial amount of work. Allow the scheduler to decide whether we should continue.
        return;
      }
 
      // workQueue might contain OOBMessages, which should be held and processed after the setup.
      // This piece should always execute after the setup is done.
      final Runnable work = workQueue.poll();
      if (work != null) {
        // we don't know how long it will take to process one work unit, we rely on the scheduler to execute
        // this fragment again if it didn't run long enough
        work.run();
        return;
      }
 
      // handle any previously sent fragment finished messages.
      FragmentHandle finishedFragment;
      while ((finishedFragment = eventProvider.pollFinishedReceiver()) != null) {
        pipeline.getTerminalOperator().receivingFragmentFinished(finishedFragment);
      }
 
      if (memoryArbiter != null) {
        memoryArbiter.acquireMemoryGrant(this, getMemoryToAcquire());
      }
      // pump the pipeline
      taskState = pumper.run();
 
      // if we've finished all work, let's wrap up.
      if(taskState == State.DONE){
        transitionToFinished();
      }
 
      injector.injectChecked(executionControls, INJECTOR_DO_WORK, OutOfMemoryError.class);
 
    } catch (OutOfMemoryError | OutOfMemoryException e) {
      // handle out of memory errors differently from other error types.
      if (e instanceof OutOfDirectMemoryError || e instanceof OutOfMemoryException || "Direct buffer memory".equals(e.getMessage()) || INJECTOR_DO_WORK.equals(e.getMessage())) {
        transitionToFailed(UserException.memoryError(e)
            .addContext(MemoryDebugInfo.getDetailsOnAllocationFailure(new OutOfMemoryException(e), allocator))
            .buildSilently());
      } else {
        // we have a heap out of memory error. The JVM in unstable, exit.
        ProcessExit.exitHeap(e);
      }
    } catch (Throwable e) {
      transitionToFailed(e);
    } finally {
 
      try {
        finishRun(originalName);
      } finally {
        stats.runEnded();
      }
    }
 
  }
  • setupExecution 处理
    参考代码
 
void setupExecution() throws Exception{
   // drill 的模式以及官方的proflier 可以学习到FragmentMajor 以及FragmentMinor
    final PlanFragmentMajor major = fragment.getMajor();
    final PlanFragmentMinor minor = fragment.getMinor();
 
    logger.debug("Starting fragment {}:{} on {}:{}", major.getHandle().getMajorFragmentId(), getHandle().getMinorFragmentId(), minor.getAssignment().getAddress(), minor.getAssignment().getUserPort());
    outputAllocator = ticket.newChildAllocator("output-frag:" + QueryIdHelper.getFragmentId(getHandle()),
      fragmentOptions.getOption(ExecConstants.OUTPUT_ALLOCATOR_RESERVATION),
      Long.MAX_VALUE);
    contextCreator.setFragmentOutputAllocator(outputAllocator);
 
    final PhysicalOperator rootOperator = reader.readFragment(fragment);
    contextCreator.setMinorFragmentEndpointsFromRootSender(rootOperator);
    FunctionLookupContext functionLookupContextToUse = functionLookupContext;
    if (fragmentOptions.getOption(PlannerSettings.ENABLE_DECIMAL_V2)) {
      functionLookupContextToUse = decimalFunctionLookupContext;
    }
    pipeline = PipelineCreator.get(
        new FragmentExecutionContext(major.getForeman(), sources, cancelled, major.getContext()),
        buffers,
        opCreator,
        contextCreator,
        functionLookupContextToUse,
        rootOperator,
        tunnelProvider,
        new SharedResourcesContextImpl(sharedResources)
        );
 
    pipeline.setup();
 
    clusterCoordinator.getServiceSet(ClusterCoordinator.Role.COORDINATOR).addNodeStatusListener(crashListener);
 
    transitionToRunning();
    isSetup = true;
  }

说明

dremio 实际运行的时候包含了一个社区办的任务调度包dremio-ce-sabot-scheduler,实际执行就是依赖了这个,基于了动态类加载以及配置管理
基于此链路大家再学习以及分析dremio就比较方便了,毕竟dremio 依赖的组件还是比较多的,同时内部还是比较复杂的,后续慢慢详细说明

参考资料

sabot/kernel/src/main/java/com/dremio/sabot/task/TaskPools.java
sabot/kernel/src/main/java/com/dremio/sabot/exec/TaskPoolInitializer.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
sabot/kernel/src/main/java/com/dremio/sabot/exec/fragment/FragmentExecutor.java
sabot/kernel/src/main/java/com/dremio/sabot/driver/PipelineCreator.java

标签:dremio,java,jprofiler,查询处理,sabot,fragment,run,null
来源: https://www.cnblogs.com/rongfengliang/p/16463884.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有