BigQuery to MongoDB template
Stay organized with collections
Save and categorize content based on your preferences.
The BigQuery to MongoDB template is a batch pipeline that reads rows from a BigQuery and writes them to MongoDB as documents. Currently each row is stored as a document.
Pipeline requirements
- The source BigQuery table must exist.
- The target MongoDB instance should be accessible from the Dataflow worker machines.
Template parameters
Required parameters
- mongoDbUri: The MongoDB connection URI in the format
mongodb+srv://:@. - database: Database in MongoDB to store the collection. For example,
my-db. - collection: The name of the collection in the MongoDB database. For example,
my-collection. - inputTableSpec: The BigQuery table to read from. For example,
bigquery-project:dataset.input_table.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the BigQuery to MongoDB template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gclouddataflowflex-templaterunJOB_NAME\ --project=PROJECT_ID\ --region=REGION_NAME\ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_MongoDB\ --parameters\ inputTableSpec=INPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION
Replace the following:
PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME: a unique job name of your choiceREGION_NAME: the region where you want to deploy your Dataflow job—for example,us-central1VERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023年09月12日-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC: your source BigQuery table name.MONGO_DB_URI: your MongoDB URI.DATABASE: your MongoDB database.COLLECTION: your MongoDB collection.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
POSThttps://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter":{ "jobName":"JOB_NAME", "parameters":{ "inputTableSpec":"INPUT_TABLE_SPEC", "mongoDbUri":"MONGO_DB_URI", "database":"DATABASE", "collection":"COLLECTION" }, "containerSpecGcsPath":"gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_MongoDB", } }
Replace the following:
PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME: a unique job name of your choiceLOCATION: the region where you want to deploy your Dataflow job—for example,us-central1VERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023年09月12日-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC: your source BigQuery table name.MONGO_DB_URI: your MongoDB URI.DATABASE: your MongoDB database.COLLECTION: your MongoDB collection.
Template source code
Java
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
packagecom.google.cloud.teleport.v2.mongodb.templates;
importcom.google.api.services.bigquery.model.TableRow;
importcom.google.cloud.teleport.metadata.Template;
importcom.google.cloud.teleport.metadata.TemplateCategory;
importcom.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
importcom.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.BigQueryReadOptions;
importcom.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.MongoDbOptions;
importcom.google.cloud.teleport.v2.mongodb.templates.BigQueryToMongoDb.Options;
importorg.apache.beam.sdk.Pipeline;
importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
importorg.apache.beam.sdk.io.mongodb.MongoDbIO;
importorg.apache.beam.sdk.options.PipelineOptions;
importorg.apache.beam.sdk.options.PipelineOptionsFactory;
importorg.apache.beam.sdk.transforms.DoFn;
importorg.apache.beam.sdk.transforms.ParDo;
importorg.bson.Document;
/**
* The {@link BigQueryToMongoDb} pipeline is a batch pipeline which reads data from BigQuery and
* outputs the resulting records to MongoDB.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-mongodb/README_BigQuery_to_MongoDB.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name="BigQuery_to_MongoDB",
category=TemplateCategory.BATCH,
displayName="BigQuery to MongoDB",
description=
"The BigQuery to MongoDB template is a batch pipeline that reads rows from a BigQuery and writes them to MongoDB as documents. "
+"Currently each row is stored as a document.",
optionsClass=Options.class,
flexContainerName="bigquery-to-mongodb",
documentation=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-mongodb",
contactInformation="https://cloud.google.com/support",
preview=true,
requirements={
"The source BigQuery table must exist.",
"The target MongoDB instance should be accessible from the Dataflow worker machines."
})
publicclass BigQueryToMongoDb{
/**
* Options supported by {@link BigQueryToMongoDb}
*
* <p>Inherits standard configuration options.
*/
publicinterface OptionsextendsPipelineOptions,MongoDbOptions,BigQueryReadOptions{}
privatestaticclass ParseAsDocumentsFnextendsDoFn<String,Document>{
@ProcessElement
publicvoidprocessElement(ProcessContextcontext){
context.output(Document.parse(context.element()));
}
}
publicstaticvoidmain(String[]args){
UncaughtExceptionLogger.register();
Optionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
publicstaticbooleanrun(Optionsoptions){
Pipelinepipeline=Pipeline.create(options);
pipeline
.apply(BigQueryIO.readTableRows().withoutValidation().from(options.getInputTableSpec()))
.apply(
"bigQueryDataset",
ParDo.of(
newDoFn<TableRow,Document>(){
@ProcessElement
publicvoidprocess(ProcessContextc){
Documentdoc=newDocument();
TableRowrow=c.element();
row.forEach(
(key,value)->{
if(!key.equals("_id")){
doc.append(key,value);
}
});
c.output(doc);
}
}))
.apply(
MongoDbIO.write()
.withUri(options.getMongoDbUri())
.withDatabase(options.getDatabase())
.withCollection(options.getCollection()));
pipeline.run();
returntrue;
}
}
What's next