ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Castled 源码解析 - container 模块说明

2022-02-01 13:31:22  阅读:223  来源: 互联网

标签:pipeline container getId 源码 pipelineRun Castled warehouse new warehousePollConte


container 属于Castled api 后端服务,后端包含了任务调度,db 迁移,有几个服务是比较重要的
主要是pipelineservice,ExternalAppService,WarehouseService,而且官方还提供了一套基于events 的处理
主要包含PipelineEvent,CastledEvent,其他的主要是基于dropwizard 开发的rest api 了,整体代码并不难
pipelineservice 在其中比较核心,进行了app 与connector 的关联操作,pipelineservice 会使用到event ,task 处理
PipelineExecutor 对于数据的处理主要是在此task 执行的(时间数据拉取以及发送处理就是在这里边的)

pipelineservice核心方法

参考下图

 

 

PipelineExecutor 处理

 
 public String executeTask(Task task) {
        Long pipelineId = ((Number) task.getParams().get(CommonConstants.PIPELINE_ID)).longValue();
        Pipeline pipeline = this.pipelineService.getActivePipeline(pipelineId);
        if (pipeline == null) {
            return null;
        }
        // 挺重要的,进行状态统计的
        WarehouseSyncFailureListener warehouseSyncFailureListener = null;
        Warehouse warehouse = this.warehouseService.getWarehouse(pipeline.getWarehouseId());
        PipelineRun pipelineRun = getOrCreatePipelineRun(pipelineId);
        WarehousePollContext warehousePollContext = WarehousePollContext.builder()
                .primaryKeys(PipelineUtils.getWarehousePrimaryKeys(pipeline)).pipelineUUID(pipeline.getUuid())
                .pipelineRunId(pipelineRun.getId()).warehouseConfig(warehouse.getConfig())
                .dataEncryptionKey(encryptionManager.getEncryptionKey(warehouse.getTeamId()))
                .queryMode(pipeline.getQueryMode())
                .query(pipeline.getSourceQuery()).pipelineId(pipeline.getId()).build();
        try {
          // 调用warehouse connector,获取数据
            WarehouseExecutionContext warehouseExecutionContext = pollRecords(warehouse, pipelineRun, warehousePollContext);
 
            log.info("Poll records completed for pipeline {}", pipeline.getName());
            this.pipelineService.updatePipelineRunstage(pipelineRun.getId(), PipelineRunStage.RECORDS_POLLED);
 
            ExternalApp externalApp = externalAppService.getExternalApp(pipeline.getAppId());
            ExternalAppConnector externalAppConnector = this.externalAppConnectors.get(externalApp.getType());
            RecordSchema appSchema = externalAppConnector.getSchema(externalApp.getConfig(), pipeline.getAppSyncConfig())
                    .getAppSchema();
 
            log.info("App schema fetch completed for pipeline {}", pipeline.getName());
 
            warehousePollContext.setWarehouseSchema(warehouseExecutionContext.getWarehouseSchema());
            warehouseSyncFailureListener = warehouseConnectors.get(warehouse.getType())
                    .syncFailureListener(warehousePollContext);
 
            MysqlErrorTracker mysqlErrorTracker = new MysqlErrorTracker(warehousePollContext);
 
            ErrorOutputStream schemaMappingErrorOutputStream = new ErrorOutputStream(warehouseSyncFailureListener, mysqlErrorTracker);
 
            SchemaMappedMessageInputStream schemaMappedMessageInputStream = new SchemaMappedMessageInputStream(
                    appSchema, warehouseExecutionContext.getMessageInputStreamImpl(), pipeline.getDataMapping().appWarehouseMapping(),
                    pipeline.getDataMapping().warehouseAppMapping(), schemaMappingErrorOutputStream);
 
            SchemaMappedRecordOutputStream schemaMappedRecordOutputStream =
                    new SchemaMappedRecordOutputStream(SchemaUtils.filterSchema(warehousePollContext.getWarehouseSchema(),
                            PipelineUtils.getWarehousePrimaryKeys(pipeline)), warehouseSyncFailureListener,
                            pipeline.getDataMapping().warehouseAppMapping());
 
            ErrorOutputStream sinkErrorOutputStream = new ErrorOutputStream(schemaMappedRecordOutputStream,
                    new SchemaMappedErrorTracker(mysqlErrorTracker, warehouseExecutionContext.getWarehouseSchema(), pipeline.getDataMapping().warehouseAppMapping()));
 
            log.info("App Sync started for pipeline {}", pipeline.getName());
 
            List<String> mappedAppFields = pipeline.getDataMapping().getFieldMappings().stream().filter(mapping -> !mapping.isSkipped())
                    .map(FieldMapping::getAppField).collect(Collectors.toList());
 
            DataSinkRequest dataSinkRequest = DataSinkRequest.builder().externalApp(externalApp).errorOutputStream(sinkErrorOutputStream)
                    .appSyncConfig(pipeline.getAppSyncConfig()).mappedFields(mappedAppFields)
                    .objectSchema(appSchema).primaryKeys(pipeline.getDataMapping().getPrimaryKeys())
                    .messageInputStream(schemaMappedMessageInputStream)
                    .build();
 
           //  进行数据同步使用了MonitoredDataSink 对象,实现了一些统计信息
            PipelineSyncStats pipelineSyncStats = monitoredDataSink.syncRecords(externalAppConnector.getDataSink(),
                    pipelineRun.getPipelineSyncStats(), pipelineRun.getId(), dataSinkRequest);
 
            schemaMappedMessageInputStream.close();
 
            log.info("App Sync completed for pipeline {}", pipeline.getName());
            //flush output streams
            schemaMappingErrorOutputStream.flushFailedRecords();
            sinkErrorOutputStream.flushFailedRecords();
 
            warehouseConnectors.get(warehouse.getType()).getDataPoller().cleanupPipelineRunResources(warehousePollContext);
            // Also add the records that failed schema mapping phase to the final stats
            pipelineSyncStats.setRecordsFailed(schemaMappedMessageInputStream.getFailedRecords() + pipelineSyncStats.getRecordsFailed());
            this.pipelineService.markPipelineRunProcessed(pipelineRun.getId(), pipelineSyncStats);
 
        } catch (Exception e) {
            if (ObjectRegistry.getInstance(AppShutdownHandler.class).isShutdownTriggered()) {
                throw new PipelineInterruptedException();
            }
            this.pipelineService.markPipelineRunFailed(pipelineRun.getId(), Optional.ofNullable(e.getMessage()).orElse("Unknown Error"));
            log.error("Pipeline run failed for pipeline {} ", pipeline.getId(), e);
            this.warehouseConnectors.get(warehouse.getType()).getDataPoller().cleanupPipelineRunResources(warehousePollContext);
            Optional.ofNullable(warehouseSyncFailureListener).ifPresent(syncFailureListener ->
                    syncFailureListener.cleanupResources(pipeline.getUuid(), pipelineRun.getId(), warehouse.getConfig()));
 
            if (e instanceof PipelineExecutionException) {
                handlePipelineExecutionException(pipeline, (PipelineExecutionException) e);
            } else {
                log.error("Pipeline run failed for pipeline {} ", pipeline.getId(), e);
            }
        }
        return null;
    }

说明

目前从代码中可以看到每个创建的任务会发送消息到Castled的统计服务中,如果不需要的话,最好处理下,目前看配置定义,暂时没有开关可以禁用
尽管系统使用了kafka,但是感觉kafaka 的使用并不是很明显(更多是一个任务排队的处理),并不是基于kafka 的消息发送处理

参考资料

https://github.com/castledio/castled

标签:pipeline,container,getId,源码,pipelineRun,Castled,warehouse,new,warehousePollConte
来源: https://www.cnblogs.com/rongfengliang/p/15859023.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有