有没有人有任何使用以下两种方法为BigQuery创建新插入作业的示例: bigquery java客户端库 根据此处记录的POST请求创建加载作业:https://developers.google.com/bigquery/loading-data-into-bigquery#loadda
> bigquery java客户端库
>根据此处记录的POST请求创建加载作业:https://developers.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest
我不知道你做了什么,但你应该有一个经过身份验证的API客户端至少像:
bigquery = new Bigquery.Builder(HTTP_TRANSPORT, JSON_FACTORY, credentials) .setApplicationName("...").build();
这是我使用google-http-client library for java和bigquery-api编写的insertRows方法的简化版本(您应该检查数据集是否存在,验证ID等):
public Long insertRows(String projectId, String datasetId, String tableId, InputStream schema, AbstractInputStreamContent data) { try { // Defining table fields ObjectMapper mapper = new ObjectMapper(); List<TableFieldSchema> schemaFields = mapper.readValue(schema, new TypeReference<List<TableFieldSchema>>(){}); TableSchema tableSchema = new TableSchema().setFields(schemaFields); // Table reference TableReference tableReference = new TableReference() .setProjectId(projectId) .setDatasetId(datasetId) .setTableId(tableId); // Load job configuration JobConfigurationLoad loadConfig = new JobConfigurationLoad() .setDestinationTable(tableReference) .setSchema(tableSchema) // Data in Json format (could be CSV) .setSourceFormat("NEWLINE_DELIMITED_JSON") // Table is created if it does not exists .setCreateDisposition("CREATE_IF_NEEDED") // Append data (not override data) .setWriteDisposition("WRITE_APPEND"); // If your data are coming from Google Cloud Storage //.setSourceUris(...); // Load job Job loadJob = new Job() .setJobReference( new JobReference() .setJobId(Joiner.on("-").join("INSERT", projectId, datasetId, tableId, DateTime.now().toString("dd-MM-yyyy_HH-mm-ss-SSS"))) .setProjectId(projectId)) .setConfiguration(new JobConfiguration().setLoad(loadConfig)); // Job execution Job createTableJob = bigquery.jobs().insert(projectId, loadJob, data).execute(); // If loading data from Google Cloud Storage //createTableJob = bigquery.jobs().insert(projectId, loadJob).execute(); String jobId = createTableJob.getJobReference().getJobId(); // Wait for job completion createTableJob = waitForJob(projectId, createTableJob); Long rowCount = createTableJob != null ? createTableJob.getStatistics().getLoad().getOutputRows() : 0l; log.info("{} rows inserted in table '{}' (dataset: '{}', project: '{}')", rowCount, tableId, datasetId, projectId); return rowCount; } catch (IOException e) { throw Throwables.propagate(e); } }
我不知道您的数据格式,但如果您使用的是文件,则可以添加如下函数:
public Long insertRows(String projectId, String datasetId, String tableId, File schema, File data) { try { return insertRows(projectId, datasetId, tableId, new FileInputStream(schema), new FileContent(MediaType.OCTET_STREAM.toString(), data)); } catch (FileNotFoundException e) { throw Throwables.propagate(e); } }