Cloud Storage Avro to Spanner template
Stay organized with collections
Save and categorize content based on your preferences.
The Cloud Storage Avro files to Spanner template is a batch pipeline that reads Avro files exported from Spanner stored in Cloud Storage and imports them to a Spanner database.
Pipeline requirements
- The target Spanner database must exist and must be empty.
- You must have read permissions for the Cloud Storage bucket and write permissions for the target Spanner database.
- The input Cloud Storage path must exist, and it must include a
spanner-export.jsonfile that contains a JSON description of files to import. - If the source Avro file doesn't contain a primary key, you must create an empty Spanner table with a primary key before you run the template. This step isn't required if the Avro file defines the primary key.
Template parameters
Required parameters
- instanceId: The instance ID of the Spanner database.
- databaseId: The database ID of the Spanner database.
- inputDir: The Cloud Storage path where the Avro files are imported from.
Optional parameters
- spannerHost: The Cloud Spanner endpoint to call in the template. Only used for testing. For example,
https://batch-spanner.googleapis.com. Defaults to: https://batch-spanner.googleapis.com. - waitForIndexes: If
true, the pipeline waits for indexes to be created. Iffalse, the job might complete while indexes are still being created in the background. The default value isfalse. - waitForForeignKeys: If
true, the pipeline waits for foreign keys to be created. Iffalse, the job might complete while foreign keys are still being created in the background. The default value isfalse. - waitForChangeStreams: If
true, the pipeline waits for change streams to be created. Iffalse, the job might complete while change streams are still being created in the background. The default value istrue. - waitForSequences: By default, the import pipeline is blocked on sequence creation. If
false, the import pipeline might complete with sequences still being created in the background. - earlyIndexCreateFlag: Specifies whether early index creation is enabled. If the template runs a large number of DDL statements, it's more efficient to create indexes before loading data. Therefore, the default behavior is to create the indexes first when the number of DDL statements exceeds a threshold. To disable this feature, set
earlyIndexCreateFlagtofalse. The default value istrue. - spannerProjectId: The ID of the Google Cloud project that contains the Spanner database. If not set, the default Google Cloud project is used.
- ddlCreationTimeoutInMinutes: The timeout in minutes for DDL statements performed by the template. The default value is 30 minutes.
- spannerPriority: The request priority for Spanner calls. Possible values are
HIGH,MEDIUM, andLOW. The default value isMEDIUM. - earlyIndexCreateThreshold: The threshold for the number of indexes and foreign keys that determines whether to create indexes before data loading. If the total number of indexes and foreign keys is larger than this threshold, they will be created before data import for better performance. The default value is 40.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
For the job to show up in the Spanner Instances page of the Google Cloud console, the job name must match the following format:
cloud-spanner-import-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME
Replace the following:
SPANNER_INSTANCE_ID: your Spanner instance's IDSPANNER_DATABASE_NAME: your Spanner database's name
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Avro Files on Cloud Storage to Cloud Spanner template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gclouddataflowjobsrunJOB_NAME\ --gcs-locationgs://dataflow-templates-REGION_NAME/VERSION/GCS_Avro_to_Cloud_Spanner\ --regionREGION_NAME\ --staging-locationGCS_STAGING_LOCATION\ --parameters\ instanceId=INSTANCE_ID,\ databaseId=DATABASE_ID,\ inputDir=GCS_DIRECTORY
Replace the following:
JOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023年09月12日-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME: the region where you want to deploy your Dataflow job—for example,us-central1INSTANCE_ID: the ID of the Spanner instance that contains the databaseDATABASE_ID: the ID of the Spanner database to import toGCS_DIRECTORY: the Cloud Storage path where the Avro files are imported from, for example,gs://mybucket/somefolder
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
POSThttps://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Avro_to_Cloud_Spanner { "jobName":"JOB_NAME", "parameters":{ "instanceId":"INSTANCE_ID", "databaseId":"DATABASE_ID", "inputDir":"gs://GCS_DIRECTORY" }, "environment":{ "machineType":"n1-standard-2" } }
Replace the following:
PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023年09月12日-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION: the region where you want to deploy your Dataflow job—for example,us-central1INSTANCE_ID: the ID of the Spanner instance that contains the databaseDATABASE_ID: the ID of the Spanner database to import toGCS_DIRECTORY: the Cloud Storage path where the Avro files are imported from, for example,gs://mybucket/somefolder
Template source code
Java
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
packagecom.google.cloud.teleport.spanner;
importcom.google.cloud.spanner.Options.RpcPriority ;
importcom.google.cloud.spanner.SpannerOptions ;
importcom.google.cloud.teleport.metadata.Template;
importcom.google.cloud.teleport.metadata.TemplateCategory;
importcom.google.cloud.teleport.metadata.TemplateCreationParameter;
importcom.google.cloud.teleport.metadata.TemplateParameter;
importcom.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
importcom.google.cloud.teleport.spanner.ImportPipeline.Options ;
importcom.google.cloud.teleport.spanner.spannerio.SpannerConfig;
importorg.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
importorg.apache.beam.sdk.Pipeline;
importorg.apache.beam.sdk.PipelineResult;
importorg.apache.beam.sdk.options.Default;
importorg.apache.beam.sdk.options.Description;
importorg.apache.beam.sdk.options.PipelineOptions;
importorg.apache.beam.sdk.options.PipelineOptionsFactory;
importorg.apache.beam.sdk.options.ValueProvider;
importorg.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
importorg.apache.beam.sdk.transforms.SerializableFunction;
/**
* Avro to Cloud Spanner Import pipeline.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Avro_to_Cloud_Spanner.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name="GCS_Avro_to_Cloud_Spanner",
category=TemplateCategory.BATCH,
displayName="Avro Files on Cloud Storage to Cloud Spanner",
description=
"The Cloud Storage Avro files to Cloud Spanner template is a batch pipeline that reads Avro files exported from "
+"Cloud Spanner stored in Cloud Storage and imports them to a Cloud Spanner database.",
optionsClass=Options .class,
documentation=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/avro-to-cloud-spanner",
contactInformation="https://cloud.google.com/support",
requirements={
"The target Cloud Spanner database must exist and must be empty.",
"You must have read permissions for the Cloud Storage bucket and write permissions for the target Cloud Spanner database.",
"The input Cloud Storage path must exist, and it must include a <a href=\"https://cloud.google.com/spanner/docs/import-non-spanner#create-export-json\">spanner-export.json</a> file that contains a JSON description of files to import."
})
publicclass ImportPipeline{
/** Options for {@link ImportPipeline}. */
publicinterface OptionsextendsPipelineOptions{
@TemplateParameter.Text(
order=1,
groupName="Target",
regexes={"^[a-z0-9\\-]+$"},
description="Cloud Spanner instance ID",
helpText="The instance ID of the Spanner database.")
ValueProvider<String>getInstanceId();
voidsetInstanceId(ValueProvider<String>value);
@TemplateParameter.Text(
order=2,
groupName="Target",
regexes={"^[a-z_0-9\\-]+$"},
description="Cloud Spanner database ID",
helpText="The database ID of the Spanner database.")
ValueProvider<String>getDatabaseId();
voidsetDatabaseId(ValueProvider<String>value);
@TemplateParameter.GcsReadFolder(
order=3,
groupName="Source",
description="Cloud storage input directory",
helpText="The Cloud Storage path where the Avro files are imported from.")
ValueProvider<String>getInputDir();
voidsetInputDir(ValueProvider<String>value);
@TemplateParameter.Text(
order=4,
groupName="Target",
optional=true,
description="Cloud Spanner Endpoint to call",
helpText="The Cloud Spanner endpoint to call in the template. Only used for testing.",
example="https://batch-spanner.googleapis.com")
@Default.String("https://batch-spanner.googleapis.com")
ValueProvider<String>getSpannerHost();
voidsetSpannerHost(ValueProvider<String>value);
@TemplateParameter.Boolean(
order=5,
optional=true,
description="Wait for Indexes",
helpText=
"If `true`, the pipeline waits for indexes to be created. If `false`, the job might complete while indexes are still being created in the background. The default value is `false`.")
@Default.Boolean(false)
ValueProvider<Boolean>getWaitForIndexes();
voidsetWaitForIndexes(ValueProvider<Boolean>value);
@TemplateParameter.Boolean(
order=6,
optional=true,
description="Wait for Foreign Keys",
helpText=
"If `true`, the pipeline waits for foreign keys to be created. If `false`, the job might complete while foreign keys are still being created in the background. The default value is `false`.")
@Default.Boolean(false)
ValueProvider<Boolean>getWaitForForeignKeys();
voidsetWaitForForeignKeys(ValueProvider<Boolean>value);
@TemplateParameter.Boolean(
order=7,
optional=true,
description="Wait for Change Streams",
helpText=
"If `true`, the pipeline waits for change streams to be created. If `false`, the job might complete while change streams are still being created in the background. The default value is `true`.")
@Default.Boolean(true)
ValueProvider<Boolean>getWaitForChangeStreams();
voidsetWaitForChangeStreams(ValueProvider<Boolean>value);
@TemplateParameter.Boolean(
order=7,
optional=true,
description="Wait for Sequences",
helpText=
"By default, the import pipeline is blocked on sequence creation. If `false`, the import pipeline might"
+" complete with sequences still being created in the background.")
@Default.Boolean(true)
ValueProvider<Boolean>getWaitForSequences();
voidsetWaitForSequences(ValueProvider<Boolean>value);
@TemplateParameter.Boolean(
order=8,
optional=true,
description="Create Indexes early",
helpText=
"Specifies whether early index creation is enabled. If the template runs a large number of DDL statements, it's more efficient to create indexes before loading data. Therefore, the default behavior is to create the indexes first when the number of DDL statements exceeds a threshold. To disable this feature, set `earlyIndexCreateFlag` to `false`. The default value is `true`.")
@Default.Boolean(true)
ValueProvider<Boolean>getEarlyIndexCreateFlag();
voidsetEarlyIndexCreateFlag(ValueProvider<Boolean>value);
@TemplateCreationParameter(value="false")
@Description("If true, wait for job finish")
@Default.Boolean(true)
booleangetWaitUntilFinish();
@TemplateParameter.ProjectId(
order=9,
groupName="Target",
optional=true,
description="Cloud Spanner Project Id",
helpText=
"The ID of the Google Cloud project that contains the Spanner database. If not set, the default Google Cloud project is used.")
ValueProvider<String>getSpannerProjectId();
voidsetSpannerProjectId(ValueProvider<String>value);
voidsetWaitUntilFinish(booleanvalue);
@TemplateParameter.Integer(
order=10,
optional=true,
description="DDL Creation timeout in minutes",
helpText=
"The timeout in minutes for DDL statements performed by the template. The default value is 30 minutes.")
@Default.Integer(30)
ValueProvider<Integer>getDdlCreationTimeoutInMinutes();
voidsetDdlCreationTimeoutInMinutes(ValueProvider<Integer>value);
@TemplateParameter.Enum(
order=11,
groupName="Target",
enumOptions={
@TemplateEnumOption("LOW"),
@TemplateEnumOption("MEDIUM"),
@TemplateEnumOption("HIGH")
},
optional=true,
description="Priority for Spanner RPC invocations",
helpText=
"The request priority for Spanner calls. Possible values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `MEDIUM`.")
ValueProvider<RpcPriority>getSpannerPriority();
voidsetSpannerPriority(ValueProvider<RpcPriority>value);
@TemplateParameter.Integer(
order=12,
optional=true,
description="Early Index Create Threshold",
helpText=
"The threshold for the number of indexes and foreign keys that determines whether to create indexes before data loading. If the total number of indexes and foreign keys is larger than this threshold, they will be created before data import for better performance. The default value is 40.")
@Default.Integer(40)
ValueProvider<Integer>getEarlyIndexCreateThreshold();
voidsetEarlyIndexCreateThreshold(ValueProvider<Integer>value);
}
publicstaticvoidmain(String[]args){
Options options=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options .class);
Pipelinep=Pipeline.create(options);
SpannerConfigspannerConfig=
SpannerConfig.create()
// Temporary fix explicitly setting SpannerConfig.projectId to the default project
// if spannerProjectId is not provided as a parameter. Required as of Beam 2.38,
// which no longer accepts null label values on metrics, and SpannerIO#setup() has
// a bug resulting in the label value being set to the original parameter value,
// with no fallback to the default project.
// TODO: remove NestedValueProvider when this is fixed in Beam.
.withProjectId(
NestedValueProvider.of(
options .getSpannerProjectId(),
(SerializableFunction<String,String>)
input->input!=null?input:SpannerOptions .getDefaultProjectId ()))
.withHost(options .getSpannerHost())
.withInstanceId(options .getInstanceId())
.withDatabaseId(options .getDatabaseId())
.withRpcPriority(options .getSpannerPriority());
p.apply(
newImportTransform(
spannerConfig,
options .getInputDir(),
options .getWaitForIndexes(),
options .getWaitForForeignKeys(),
options .getWaitForChangeStreams(),
options .getWaitForSequences(),
options .getEarlyIndexCreateFlag(),
options .getDdlCreationTimeoutInMinutes(),
options .getEarlyIndexCreateThreshold()));
PipelineResultresult=p.run();
if(options .getWaitUntilFinish()
&&
/* Only if template location is null, there is a dataflow job to wait for. Else it's
* template generation which doesn't start a dataflow job.
*/
options.as(DataflowPipelineOptions.class).getTemplateLocation()==null){
result.waitUntilFinish();
}
}
}
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.