BigQuery to MongoDB template

The BigQuery to MongoDB template is a batch pipeline that reads rows from a BigQuery and writes them to MongoDB as documents. Currently each row is stored as a document.

Pipeline requirements

  • The source BigQuery table must exist.
  • The target MongoDB instance should be accessible from the Dataflow worker machines.

Template parameters

Required parameters

  • mongoDbUri: The MongoDB connection URI in the format mongodb+srv://:@.
  • database: Database in MongoDB to store the collection. For example, my-db.
  • collection: The name of the collection in the MongoDB database. For example, my-collection.
  • inputTableSpec: The BigQuery table to read from. For example, bigquery-project:dataset.input_table.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the BigQuery to MongoDB template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gclouddataflowflex-templaterunJOB_NAME\
--project=PROJECT_ID\
--region=REGION_NAME\
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_MongoDB\
--parameters\
inputTableSpec=INPUT_TABLE_SPEC,\
mongoDbUri=MONGO_DB_URI,\
database=DATABASE,\
collection=COLLECTION

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • INPUT_TABLE_SPEC: your source BigQuery table name.
  • MONGO_DB_URI: your MongoDB URI.
  • DATABASE: your MongoDB database.
  • COLLECTION: your MongoDB collection.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POSThttps://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
"launch_parameter":{
"jobName":"JOB_NAME",
"parameters":{
"inputTableSpec":"INPUT_TABLE_SPEC",
"mongoDbUri":"MONGO_DB_URI",
"database":"DATABASE",
"collection":"COLLECTION"
},
"containerSpecGcsPath":"gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_MongoDB",
}
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • INPUT_TABLE_SPEC: your source BigQuery table name.
  • MONGO_DB_URI: your MongoDB URI.
  • DATABASE: your MongoDB database.
  • COLLECTION: your MongoDB collection.

Template source code

Java

/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
packagecom.google.cloud.teleport.v2.mongodb.templates;
importcom.google.api.services.bigquery.model.TableRow;
importcom.google.cloud.teleport.metadata.Template;
importcom.google.cloud.teleport.metadata.TemplateCategory;
importcom.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
importcom.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.BigQueryReadOptions;
importcom.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.MongoDbOptions;
importcom.google.cloud.teleport.v2.mongodb.templates.BigQueryToMongoDb.Options;
importorg.apache.beam.sdk.Pipeline;
importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
importorg.apache.beam.sdk.io.mongodb.MongoDbIO;
importorg.apache.beam.sdk.options.PipelineOptions;
importorg.apache.beam.sdk.options.PipelineOptionsFactory;
importorg.apache.beam.sdk.transforms.DoFn;
importorg.apache.beam.sdk.transforms.ParDo;
importorg.bson.Document;
/**
 * The {@link BigQueryToMongoDb} pipeline is a batch pipeline which reads data from BigQuery and
 * outputs the resulting records to MongoDB.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-mongodb/README_BigQuery_to_MongoDB.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
name="BigQuery_to_MongoDB",
category=TemplateCategory.BATCH,
displayName="BigQuery to MongoDB",
description=
"The BigQuery to MongoDB template is a batch pipeline that reads rows from a BigQuery and writes them to MongoDB as documents. "
+"Currently each row is stored as a document.",
optionsClass=Options.class,
flexContainerName="bigquery-to-mongodb",
documentation=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-mongodb",
contactInformation="https://cloud.google.com/support",
preview=true,
requirements={
"The source BigQuery table must exist.",
"The target MongoDB instance should be accessible from the Dataflow worker machines."
})
publicclass BigQueryToMongoDb{
/**
 * Options supported by {@link BigQueryToMongoDb}
 *
 * <p>Inherits standard configuration options.
 */
publicinterface OptionsextendsPipelineOptions,MongoDbOptions,BigQueryReadOptions{}
privatestaticclass ParseAsDocumentsFnextendsDoFn<String,Document>{
@ProcessElement
publicvoidprocessElement(ProcessContextcontext){
context.output(Document.parse(context.element()));
}
}
publicstaticvoidmain(String[]args){
UncaughtExceptionLogger.register();
Optionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
publicstaticbooleanrun(Optionsoptions){
Pipelinepipeline=Pipeline.create(options);
pipeline
.apply(BigQueryIO.readTableRows().withoutValidation().from(options.getInputTableSpec()))
.apply(
"bigQueryDataset",
ParDo.of(
newDoFn<TableRow,Document>(){
@ProcessElement
publicvoidprocess(ProcessContextc){
Documentdoc=newDocument();
TableRowrow=c.element();
row.forEach(
(key,value)->{
if(!key.equals("_id")){
doc.append(key,value);
}
});
c.output(doc);
}
}))
.apply(
MongoDbIO.write()
.withUri(options.getMongoDbUri())
.withDatabase(options.getDatabase())
.withCollection(options.getCollection()));
pipeline.run();
returntrue;
}
}

What's next

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025年10月30日 UTC.