当前位置 : 主页 > 网页制作 > HTTP/TCP >

使用POST请求和Java客户端库加载到BigQuery的任何示例?

来源:互联网 收集:自由互联 发布时间:2021-06-16
有没有人有任何使用以下两种方法为BigQuery创建新插入作业的示例: bigquery java客户端库 根据此处记录的POST请求创建加载作业:https://developers.google.com/bigquery/loading-data-into-bigquery#loadda
有没有人有任何使用以下两种方法为BigQuery创建新插入作业的示例:

> bigquery java客户端库
>根据此处记录的POST请求创建加载作业:https://developers.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest

您需要调用 bigquery.jobs().insert(…)方法.

我不知道你做了什么,但你应该有一个经过身份验证的API客户端至少像:

bigquery = new Bigquery.Builder(HTTP_TRANSPORT, JSON_FACTORY, credentials)
                .setApplicationName("...").build();

这是我使用google-http-client library for java和bigquery-api编写的insertRows方法的简化版本(您应该检查数据集是否存在,验证ID等):

public Long insertRows(String projectId, 
                       String datasetId, 
                       String tableId, 
                       InputStream schema,
                       AbstractInputStreamContent data) {
    try {

        // Defining table fields
        ObjectMapper mapper = new ObjectMapper();
        List<TableFieldSchema> schemaFields = mapper.readValue(schema, new TypeReference<List<TableFieldSchema>>(){});
        TableSchema tableSchema = new TableSchema().setFields(schemaFields);

        // Table reference
        TableReference tableReference = new TableReference()
                .setProjectId(projectId)
                .setDatasetId(datasetId)
                .setTableId(tableId);

        // Load job configuration
        JobConfigurationLoad loadConfig = new JobConfigurationLoad()
                .setDestinationTable(tableReference)
                .setSchema(tableSchema)
                // Data in Json format (could be CSV)
                .setSourceFormat("NEWLINE_DELIMITED_JSON")
                // Table is created if it does not exists
                .setCreateDisposition("CREATE_IF_NEEDED")
                // Append data (not override data)
                .setWriteDisposition("WRITE_APPEND");
        // If your data are coming from Google Cloud Storage
        //.setSourceUris(...);

        // Load job
        Job loadJob = new Job()
                .setJobReference(
                        new JobReference()
                                .setJobId(Joiner.on("-").join("INSERT", projectId, datasetId,
                                        tableId, DateTime.now().toString("dd-MM-yyyy_HH-mm-ss-SSS")))
                                .setProjectId(projectId))
                .setConfiguration(new JobConfiguration().setLoad(loadConfig));
        // Job execution
        Job createTableJob = bigquery.jobs().insert(projectId, loadJob, data).execute();
        // If loading data from Google Cloud Storage
        //createTableJob = bigquery.jobs().insert(projectId, loadJob).execute();

        String jobId = createTableJob.getJobReference().getJobId();
        // Wait for job completion
        createTableJob = waitForJob(projectId, createTableJob);
        Long rowCount = createTableJob != null ? createTableJob.getStatistics().getLoad().getOutputRows() : 0l;
        log.info("{} rows inserted in table '{}' (dataset: '{}', project: '{}')", rowCount, tableId, datasetId, projectId);
        return rowCount;
    }
    catch (IOException e) { throw Throwables.propagate(e); }
}

我不知道您的数据格式,但如果您使用的是文件,则可以添加如下函数:

public Long insertRows(String projectId, String datasetId, String tableId, File schema, File data) {
    try {
        return insertRows(projectId, datasetId, tableId, new FileInputStream(schema),
                new FileContent(MediaType.OCTET_STREAM.toString(), data));
    }
    catch (FileNotFoundException e) { throw Throwables.propagate(e); }
}
网友评论