ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

python – 流式传输不工作之前的BigQuery表截断

2019-08-29 07:59:17  阅读:210  来源: 互联网

标签:python google-bigquery


我们使用BigQuery Python API来运行一些分析.为此,我们创建了以下适配器:

def stream_data(self, table, data, schema, how=None):
    r = self.connector.tables().list(projectId=self._project_id,
                                     datasetId='lbanor').execute()
    table_exists = [row['tableReference']['tableId'] for row in
                    r['tables'] if
                    row['tableReference']['tableId'] == table]
    if table_exists:
        if how == 'WRITE_TRUNCATE':
            self.connector.tables().delete(projectId=self._project_id,
                                           datasetId='lbanor',
                                           tableId=table).execute()
            body = {
                'tableReference': {
                    'tableId': table,
                    'projectId': self._project_id,
                    'datasetId': 'lbanor'
                },
                'schema': schema
            }
            self.connector.tables().insert(projectId=(
                                           self._project_id),
                                           datasetId='lbanor',
                                           body=body).execute()
    else:
        body = {
            'tableReference': {
                'tableId': table,
                'projectId': self._project_id,
                'datasetId': 'lbanor'
            },
            'schema': schema
        }
        self.connector.tables().insert(projectId=(
                                       self._project_id),
                                       datasetId='lbanor',
                                       body=body).execute()

    body = {
        'rows': [
            {
                'json': data,
                'insertId': str(uuid.uuid4())
            }
        ]
    }
    self.connector.tabledata().insertAll(projectId=(
                                         self._project_id),
                                         datasetId='lbanor',
                                         tableId=table,
                                               body=body).execute(num_retries=5)

其中connector只是构建对象.

其主要目的是将数据流式传输到给定的表.如果表已经存在并且“how”输入作为“WRITE_TRUNCATE”传递,则首先删除该表并再次创建.
之后,继续执行数据流.

当表没有被反复删除时,一切正常.

例如,这是我们运行脚本时没有模拟写截断选项的结果(for循环保持调用stream_data,其中how = None):

[
  {
    "date": "2016-04-25",
    "unix_date": "1461606664981207",
    "init_cv_date": "2016-03-12",
    "end_cv_date": "2016-03-25",
    "days_trained": "56",
    "days_validated": "14",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.31729249914663893"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606599745107",
    "init_cv_date": "2016-03-06",
    "end_cv_date": "2016-03-25",
    "days_trained": "80",
    "days_validated": "20",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.32677143128667446"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606688950415",
    "init_cv_date": "2016-03-14",
    "end_cv_date": "2016-03-25",
    "days_trained": "48",
    "days_validated": "12",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.3129267723358932"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606707195122",
    "init_cv_date": "2016-03-16",
    "end_cv_date": "2016-03-25",
    "days_trained": "40",
    "days_validated": "10",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.310620987663015"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606622432947",
    "init_cv_date": "2016-03-08",
    "end_cv_date": "2016-03-25",
    "days_trained": "72",
    "days_validated": "18",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.32395802949369296"
  }
]

但是当我们使用输入how =“WRITE_TRUNCATE”的相同适配器时,其行为发生了变化并变得不可预测.

有时它工作,数据保存到表中.但有时,即使没有引发错误,也没有数据保存到表中.

尝试查询表时,不返回任何数据.它只返回“查询返回零结果”.

删除表格,再次创建表格并流式传输数据时出错.我们犯了一些错误吗?

如果您需要更多信息,请告诉我.提前致谢!

解决方法:

请参阅Jordan Tigani的回答和Sean Chen对https://stackoverflow.com/a/36417177/132438的评论(两位BigQuery工程师).

总结是:

>重新创建或截断表格时“您需要在流式传输前等待2分钟才能避免数据被丢弃.

这样就可以解释为什么你会得到这种非确定性的行为.

标签:python,google-bigquery
来源: https://codeday.me/bug/20190829/1758713.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有