ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java – BIGQUERY – 如何使用Pentaho Data Integration(Spoon)创建连接?

2019-07-24 21:03:45  阅读:604  来源: 互联网

标签:java google-bigquery jdbc pentaho pdi


我试图通过Pentaho数据集成访问BigQuery,但我没有成功.

>系统:OSX El Capitan
> Google BigQuery身份验证方法:使用.p12密钥的服务帐户

我已经按照本教程使用了OSX
http://wiki.pentaho.com/display/EAI/Google+BigQuery

这就是我所做的:

>我将“kettle.zip的依赖项”下载并解压缩到PDI_FOLDER / libswt / osx64
>我下载并将“bqjdbc-1.4-standalone.jar”复制到PDI_FOLDER / lib
>之后,我尝试使用New> Database Connection> Generic Database> Native(JDBC)在数据集成中创建新连接

我按照本教程https://code.google.com/p/starschema-bigquery-jdbc/wiki/JDBCURL配置了这个参数的连接.所以参数是:

>自定义连接URL:jdbc:BQDriver:projectid(secretproject)?withServiceAccount = true
>自定义驱动程序类名称:net.starschema.clouddb.jdbc.BQDrive
>用户名:pentaho-data-integration@secretproject.iam.gserviceaccount.com
>密码:/Users/luisfsns/Dropbox/Lendico/etl/marketing/lendico-pentaho-data-integration-googlebigquery.p12

我不知道的事情:

>我的自定义连接URL名称是否正确?我应该提供什么作为项目论证?项目名称或路径的URL?有人能举个例子吗?
>我应该使用任何其他身份验证方法(尽管“服务帐户”)或任何其他类型的私钥,如.json?
>我的自定义驱动程序类名是否正确?

有人能帮助我吗?

这是我尝试测试创建的连接时的日志:

Error connecting to database [Teste] :
org.pentaho.di.core.exception.KettleDatabaseException: Error occurred
while trying to connect to the database

Driver class ‘net.starschema.clouddb.jdbc.BQDrive’ could not be found,
make sure the ‘Generic database’ driver (jar file) is installed.
net.starschema.clouddb.jdbc.BQDrive

org.pentaho.di.core.exception.KettleDatabaseException: Error occurred
while trying to connect to the database

Driver class ‘net.starschema.clouddb.jdbc.BQDrive’ could not be found,
make sure the ‘Generic database’ driver (jar file) is installed.
net.starschema.clouddb.jdbc.BQDrive

at
org.pentaho.di.core.database.Database.normalConnect(Database.java:428)
at org.pentaho.di.core.database.Database.connect(Database.java:358)
at org.pentaho.di.core.database.Database.connect(Database.java:311)
at org.pentaho.di.core.database.Database.connect(Database.java:301)
at
org.pentaho.di.core.database.DatabaseFactory.getConnectionTestReport(DatabaseFactory.java:80)
at
org.pentaho.di.core.database.DatabaseMeta.testConnection(DatabaseMeta.java:2686)
at
org.pentaho.ui.database.event.DataHandler.testDatabaseConnection(DataHandler.java:546)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597) at
org.pentaho.ui.xul.impl.AbstractXulDomContainer.invoke(AbstractXulDomContainer.java:313)
at
org.pentaho.ui.xul.impl.AbstractXulComponent.invoke(AbstractXulComponent.java:157)
at
org.pentaho.ui.xul.impl.AbstractXulComponent.invoke(AbstractXulComponent.java:141)
at
org.pentaho.ui.xul.swt.tags.SwtButton.access$500(SwtButton.java:43)
at
org.pentaho.ui.xul.swt.tags.SwtButton$4.widgetSelected(SwtButton.java:138)
at org.eclipse.swt.widgets.TypedListener.handleEvent(Unknown Source)
at org.eclipse.swt.widgets.EventTable.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Display.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.notifyListeners(Unknown Source) at
org.eclipse.swt.widgets.Display.runDeferredEvents(Unknown Source) at
org.eclipse.swt.widgets.Display.readAndDispatch(Unknown Source) at
org.eclipse.jface.window.Window.runEventLoop(Window.java:820) at
org.eclipse.jface.window.Window.open(Window.java:796) at
org.pentaho.ui.xul.swt.tags.SwtDialog.show(SwtDialog.java:389) at
org.pentaho.ui.xul.swt.tags.SwtDialog.show(SwtDialog.java:318) at
org.pentaho.di.ui.core.database.dialog.XulDatabaseDialog.open(XulDatabaseDialog.java:116)
at
org.pentaho.di.ui.core.database.dialog.DatabaseDialog.open(DatabaseDialog.java:59)
at
org.pentaho.di.ui.spoon.delegates.SpoonDBDelegate.newConnection(SpoonDBDelegate.java:464)
at
org.pentaho.di.ui.spoon.delegates.SpoonDBDelegate.newConnection(SpoonDBDelegate.java:451)
at org.pentaho.di.ui.spoon.Spoon.newConnection(Spoon.java:8728) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597) at
org.pentaho.ui.xul.impl.AbstractXulDomContainer.invoke(AbstractXulDomContainer.java:313)
at
org.pentaho.ui.xul.impl.AbstractXulComponent.invoke(AbstractXulComponent.java:157)
at
org.pentaho.ui.xul.impl.AbstractXulComponent.invoke(AbstractXulComponent.java:141)
at
org.pentaho.ui.xul.jface.tags.JfaceMenuitem.access$100(JfaceMenuitem.java:43)
at
org.pentaho.ui.xul.jface.tags.JfaceMenuitem$1.run(JfaceMenuitem.java:106)
at org.eclipse.jface.action.Action.runWithEvent(Action.java:498) at
org.eclipse.jface.action.ActionContributionItem.handleWidgetSelection(ActionContributionItem.java:545)
at
org.eclipse.jface.action.ActionContributionItem.access$2(ActionContributionItem.java:490)
at
org.eclipse.jface.action.ActionContributionItem$5.handleEvent(ActionContributionItem.java:402)
at org.eclipse.swt.widgets.EventTable.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Display.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.notifyListeners(Unknown Source) at
org.eclipse.swt.widgets.Display.runDeferredEvents(Unknown Source) at
org.eclipse.swt.widgets.Display.readAndDispatch(Unknown Source) at
org.pentaho.di.ui.spoon.Spoon.readAndDispatch(Spoon.java:1319) at
org.pentaho.di.ui.spoon.Spoon.waitForDispose(Spoon.java:7939) at
org.pentaho.di.ui.spoon.Spoon.start(Spoon.java:9190) at
org.pentaho.di.ui.spoon.Spoon.main(Spoon.java:654) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597) at
org.pentaho.commons.launcher.Launcher.main(Launcher.java:92) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597) at
apple.launcher.LaunchRunner.run(LaunchRunner.java:116) at
apple.launcher.LaunchRunner.callMain(LaunchRunner.java:51) at
apple.launcher.JavaApplicationLauncher.launch(JavaApplicationLauncher.java:52)
Caused by: org.pentaho.di.core.exception.KettleDatabaseException:
Driver class ‘net.starschema.clouddb.jdbc.BQDrive’ could not be found,
make sure the ‘Generic database’ driver (jar file) is installed.
net.starschema.clouddb.jdbc.BQDrive

at
org.pentaho.di.core.database.Database.connectUsingClass(Database.java:522)
at
org.pentaho.di.core.database.Database.connectUsingClass(Database.java:4697)
at
org.pentaho.di.core.database.Database.normalConnect(Database.java:414)
… 70 more Caused by: java.lang.ClassNotFoundException:
net.starschema.clouddb.jdbc.BQDrive at
java.net.URLClassLoader$1.run(URLClassLoader.java:202) at
java.security.AccessController.doPrivileged(Native Method) at
java.net.URLClassLoader.findClass(URLClassLoader.java:190) at
java.lang.ClassLoader.loadClass(ClassLoader.java:306) at
java.lang.ClassLoader.loadClass(ClassLoader.java:247) at
org.pentaho.di.core.database.Database.connectUsingClass(Database.java:497)
… 72 more

Custom URL :
jdbc:BQDriver:projectid(secretproject)?withServiceAccount=true Custom
Driver Class:net.starschema.clouddb.jdbc.BQDrive

解决方法:

答案可能不会让你高兴,但我们走了.
可以创建这种连接,但是提取有问题并且行的流量非常慢(Bigquery可以快速处理任何内容,但是这种JDBC使得获取数据的速度非常慢.

我在这里做的是一个Python 2.7脚本,用于将查询提取到表中并将表提取到Google Cloud Storage上的csv文件,然后下载该文件.

这真的很快,你不会有很多错误.

这里是你可以使用的python代码. (您需要安装google storage utils才能轻松地将文件从云端复制到您的机器上)

The SH code: (use in a shell script entry on your JOB)

#!/bin/bash
export PATH=${PATH}

# BOTO is the login manager for GsUtil
export BOTO_DISPLAYENV="/home/mromano/.boto"
export BOTO_CONFIG="/home/mromano/.boto"

rm /tmp/bigquery_extraction_*

#Run Big Query extraction script on python
python "$caminho/google_bigquery_extract_foo_bar.py"

#Give it some seconds to sync data to Google Cloud Storage
sleep 10

#Copy from Google Cloud Storage to local file
/usr/local/bin/gsutil -q cp gs://pentaho_exports/google_bigquery_extract_foo_bar.csv.gz /tmp/google_bigquery_extract_foo_bar.csv.gz

The python script: (create a table with query results, export table to CSV and delete the table)

import httplib2
import logging
logging.basicConfig()

from apiclient.discovery import build
from oauth2client.client import SignedJwtAssertionCredentials
from bigquery import get_client

# BigQuery project id as listed in the Google Developers Console.
project_id = 'ce______?_____8'

# Service account email address as listed in the Google Developers Console.
service_account = '5399951_____?_______73k@developer.gserviceaccount.com'

f = file('../../../../keys/bigquery_key.p12', 'rb')
key = f.read()
f.close()

credentials = SignedJwtAssertionCredentials(
    service_account,
    key,
    scope='https://www.googleapis.com/auth/bigquery')

http = httplib2.Http()
http = credentials.authorize(http)


client = get_client(project_id, credentials=credentials, service_account=service_account)

# Write to table
job = client.write_to_table("""SELECT * FROM 001234.TEST""",
                    'pentaho_export',
                  table='table_foo_bar',
                create_disposition='CREATE_IF_NEEDED',
                write_disposition='WRITE_TRUNCATE')
try:
    job_resource = client.wait_for_job(job, timeout=6000)
    #print job_resource
except BigQueryTimeoutException:
    print "Timeout"

# Exporting
job_export = client.export_data_to_uris( ['gs://pentaho_exports/foo_bar.csv.gz'],
                                   'pentaho_export',
                                   'table_foo_bar',
                   compression='GZIP',
                   field_delimiter='    ')
try:
    job_resource = client.wait_for_job(job_export, timeout=6000)
    #print job_resource
except BigQueryTimeoutException:
    print "Timeout"

# Delete an existing table.
deleted = client.delete_table('pentaho_export', 'table_foo_bar')

我希望它有所帮助. =)

标签:java,google-bigquery,jdbc,pentaho,pdi
来源: https://codeday.me/bug/20190724/1526020.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有