Spanner Change Streams to Source Database template

Streaming pipeline. Reads data from Spanner Change Streams and writes them to a source.

Template parameters

Parameter Description
changeStreamName The name of the Spanner change stream that the pipeline reads from.
instanceId The name of the Spanner instance where the change stream is present.
databaseId The name of the Spanner database that the change stream monitors.
spannerProjectId The name of the Spanner project.
metadataInstance The instance to store the metadata used by the connector to control the consumption of the change stream API data.
metadataDatabase The database to store the metadata used by the connector to control the consumption of the change stream API data.
sourceShardsFilePath Path to a Cloud Storage file containing connection profile information for source shards.
startTimestamp Optional: The starting timestamp for reading changes. Defaults to empty.
endTimestamp Optional: The end timestamp for reading changes. If no timestamp provided, reads indefinitely. Defaults to empty.
shadowTablePrefix Optional: The prefix used to name shadow tables. Default: shadow_.
sessionFilePath Optional: Session path in Cloud Storage that contains mapping information from HarbourBridge.
filtrationMode Optional: Mode of filtration. Specifies how to drop certain records based on a criteria. Supported modes are: none (filter nothing), forward_migration (filter records written using the forward migration pipeline). Defaults to forward_migration.
shardingCustomJarPath Optional: Custom JAR file location in Cloud Storage that contains the customization logic for fetching the shard id. If you set this parameter, set the shardingCustomJarPath parameter. Defaults to empty.
shardingCustomClassName Optional: Fully qualified class name having the custom shard id implementation. If shardingCustomJarPath is specified, this parameter is required. Defaults to empty.
shardingCustomParameters Optional: String containing any custom parameters to be passed to the custom sharding class. Defaults to empty.
sourceDbTimezoneOffset Optional: The timezone offset from UTC for the source database. Example value: +10:00. Defaults to: +00:00.
dlqGcsPubSubSubscription Optional: The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ retry directory when running in regular mode. The name should be in the format of projects/<project-id>/subscriptions/<subscription-name>. When set, the deadLetterQueueDirectory and dlqRetryMinutes are ignored.
skipDirectoryName Optional: Records skipped from reverse replication are written to this directory. Default directory name is skip.
maxShardConnections Optional: The maximum number of connections that a given shard can accept. Defaults to: 10000.
deadLetterQueueDirectory Optional: The path used when storing the error queue output. The default path is a directory under the Dataflow job's temp location.
dlqMaxRetryCount Optional: The maximum number of times that temporary errors can be retried through the dead-letter queue. Defaults to 500.
runMode Optional: The run mode type. Supported values: regular, retryDLQ. Default: regular. Specify retryDLQ is retry severe dead-letter queue records only.
dlqRetryMinutes Optional: The number of minutes between dead-letter queue retries. Defaults to 10.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Spanner Change Streams to Source Database template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud CLI

In your shell or terminal, run the template:

gclouddataflowflex-templaterunJOB_NAME\
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb\
--project=PROJECT_ID\
--region=REGION_NAME\
--parameters\
changeStreamName=CHANGE_STREAM_NAME,\
instanceId=INSTANCE_ID,\
databaseId=DATABASE_ID,\
spannerProjectId=SPANNER_PROJECT_ID,\
metadataInstance=METADATA_INSTANCE,\
metadataDatabase=METADATA_DATABASE,\
sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • CHANGE_STREAM_NAME: the Name of the change stream to read from
  • INSTANCE_ID: the Cloud Spanner Instance Id.
  • DATABASE_ID: the Cloud Spanner Database Id.
  • SPANNER_PROJECT_ID: the Cloud Spanner Project Id.
  • METADATA_INSTANCE: the Cloud Spanner Instance to store metadata when reading from changestreams
  • METADATA_DATABASE: the Cloud Spanner Database to store metadata when reading from changestreams
  • SOURCE_SHARDS_FILE_PATH: the Path to GCS file containing the Source shard details

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POSThttps://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
"launchParameter":{
"jobName":"JOB_NAME",
"parameters":{
"changeStreamName":"CHANGE_STREAM_NAME",
"instanceId":"INSTANCE_ID",
"databaseId":"DATABASE_ID",
"spannerProjectId":"SPANNER_PROJECT_ID",
"metadataInstance":"METADATA_INSTANCE",
"metadataDatabase":"METADATA_DATABASE",
"sourceShardsFilePath":"SOURCE_SHARDS_FILE_PATH",
},
"containerSpecGcsPath":"gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
"environment":{"maxWorkers":"10"}
}
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • CHANGE_STREAM_NAME: the Name of the change stream to read from
  • INSTANCE_ID: the Cloud Spanner Instance Id.
  • DATABASE_ID: the Cloud Spanner Database Id.
  • SPANNER_PROJECT_ID: the Cloud Spanner Project Id.
  • METADATA_INSTANCE: the Cloud Spanner Instance to store metadata when reading from changestreams
  • METADATA_DATABASE: the Cloud Spanner Database to store metadata when reading from changestreams
  • SOURCE_SHARDS_FILE_PATH: the Path to GCS file containing the Source shard details

Template source code

Java

/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
packagecom.google.cloud.teleport.v2.templates;
import staticcom.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;
importcom.datastax.oss.driver.api.core.CqlSession;
importcom.datastax.oss.driver.api.core.CqlSessionBuilder;
importcom.datastax.oss.driver.api.core.config.DriverConfigLoader;
importcom.google.cloud.Timestamp ;
importcom.google.cloud.spanner.Options.RpcPriority ;
importcom.google.cloud.teleport.metadata.Template ;
importcom.google.cloud.teleport.metadata.TemplateCategory;
importcom.google.cloud.teleport.metadata.TemplateParameter ;
importcom.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
importcom.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
importcom.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO;
importcom.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
importcom.google.cloud.teleport.v2.coders.FailsafeElementCoder;
importcom.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
importcom.google.cloud.teleport.v2.spanner.ddl.Ddl;
importcom.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaMapper;
importcom.google.cloud.teleport.v2.spanner.migrations.schema.IdentityMapper;
importcom.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesBasedMapper;
importcom.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesBasedMapper;
importcom.google.cloud.teleport.v2.spanner.migrations.schema.SessionBasedMapper;
importcom.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
importcom.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
importcom.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerSchema;
importcom.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
importcom.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
importcom.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
importcom.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
importcom.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
importcom.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;
importcom.google.cloud.teleport.v2.spanner.sourceddl.MySqlInformationSchemaScanner;
importcom.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
importcom.google.cloud.teleport.v2.spanner.sourceddl.SourceSchemaScanner;
importcom.google.cloud.teleport.v2.templates.SpannerToSourceDb.Options ;
importcom.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;
importcom.google.cloud.teleport.v2.templates.constants.Constants ;
importcom.google.cloud.teleport.v2.templates.transforms.AssignShardIdFn;
importcom.google.cloud.teleport.v2.templates.transforms.ConvertChangeStreamErrorRecordToFailsafeElementFn;
importcom.google.cloud.teleport.v2.templates.transforms.ConvertDlqRecordToTrimmedShardedDataChangeRecordFn;
importcom.google.cloud.teleport.v2.templates.transforms.FilterRecordsFn;
importcom.google.cloud.teleport.v2.templates.transforms.PreprocessRecordsFn;
importcom.google.cloud.teleport.v2.templates.transforms.SourceWriterTransform;
importcom.google.cloud.teleport.v2.templates.transforms.UpdateDlqMetricsFn;
importcom.google.cloud.teleport.v2.templates.utils.ShadowTableCreator;
importcom.google.cloud.teleport.v2.transforms.DLQWriteTransform;
importcom.google.cloud.teleport.v2.values.FailsafeElement;
importcom.google.common.base.Strings;
importcom.zaxxer.hikari.HikariConfig;
importcom.zaxxer.hikari.HikariDataSource;
importjava.sql.Connection ;
importjava.sql.SQLException;
importjava.util.ArrayList;
importjava.util.Arrays;
importjava.util.List;
importorg.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
importorg.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
importorg.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
importorg.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
importorg.apache.beam.sdk.Pipeline ;
importorg.apache.beam.sdk.PipelineResult;
importorg.apache.beam.sdk.coders.KvCoder;
importorg.apache.beam.sdk.coders.SerializableCoder;
importorg.apache.beam.sdk.coders.StringUtf8Coder;
importorg.apache.beam.sdk.coders.VarLongCoder;
importorg.apache.beam.sdk.io.FileSystems;
importorg.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
importorg.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
importorg.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
importorg.apache.beam.sdk.io.gcp.spanner.SpannerIO;
importorg.apache.beam.sdk.io.gcp.spanner.SpannerServiceFactoryImpl;
importorg.apache.beam.sdk.options.Default;
importorg.apache.beam.sdk.options.PipelineOptions;
importorg.apache.beam.sdk.options.PipelineOptionsFactory;
importorg.apache.beam.sdk.options.StreamingOptions;
importorg.apache.beam.sdk.options.ValueProvider;
importorg.apache.beam.sdk.transforms.Flatten;
importorg.apache.beam.sdk.transforms.MapElements;
importorg.apache.beam.sdk.transforms.ParDo;
importorg.apache.beam.sdk.transforms.Reshuffle;
importorg.apache.beam.sdk.values.PCollection;
importorg.apache.beam.sdk.values.PCollectionList;
importorg.apache.beam.sdk.values.PCollectionTuple;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
/** This pipeline reads Spanner Change streams data and writes them to a source DB. */
@Template(
name="Spanner_to_SourceDb",
category=TemplateCategory.STREAMING,
displayName="Spanner Change Streams to Source Database",
description=
"Streaming pipeline. Reads data from Spanner Change Streams and"
+" writes them to a source.",
optionsClass=Options .class,
flexContainerName="spanner-to-sourcedb",
contactInformation="https://cloud.google.com/support",
hidden=false,
streaming=true)
publicclass SpannerToSourceDb{
privatestaticfinalLoggerLOG=LoggerFactory.getLogger(SpannerToSourceDb.class);
/**
 * Options supported by the pipeline.
 *
 * <p>Inherits standard configuration options.
 */
publicinterface OptionsextendsPipelineOptions,StreamingOptions{
@TemplateParameter.Text (
order=1,
optional=false,
description="Name of the change stream to read from",
helpText=
"This is the name of the Spanner change stream that the pipeline will read from.")
String getChangeStreamName();
voidsetChangeStreamName(String value);
@TemplateParameter.Text (
order=2,
optional=false,
description="Cloud Spanner Instance Id.",
helpText=
"This is the name of the Cloud Spanner instance where the changestream is present.")
String getInstanceId();
voidsetInstanceId(String value);
@TemplateParameter.Text (
order=3,
optional=false,
description="Cloud Spanner Database Id.",
helpText=
"This is the name of the Cloud Spanner database that the changestream is monitoring")
String getDatabaseId();
voidsetDatabaseId(String value);
@TemplateParameter.ProjectId (
order=4,
optional=false,
description="Cloud Spanner Project Id.",
helpText="This is the name of the Cloud Spanner project.")
String getSpannerProjectId();
voidsetSpannerProjectId(String projectId);
@TemplateParameter.Text (
order=5,
optional=false,
description="Cloud Spanner Instance to store metadata when reading from changestreams",
helpText=
"This is the instance to store the metadata used by the connector to control the"
+" consumption of the change stream API data.")
String getMetadataInstance();
voidsetMetadataInstance(String value);
@TemplateParameter.Text (
order=6,
optional=false,
description="Cloud Spanner Database to store metadata when reading from changestreams",
helpText=
"This is the database to store the metadata used by the connector to control the"
+" consumption of the change stream API data.")
String getMetadataDatabase();
voidsetMetadataDatabase(String value);
@TemplateParameter.Text (
order=7,
optional=true,
description="Changes are read from the given timestamp",
helpText="Read changes from the given timestamp.")
@Default.String ("")
String getStartTimestamp();
voidsetStartTimestamp(String value);
@TemplateParameter.Text (
order=8,
optional=true,
description="Changes are read until the given timestamp",
helpText=
"Read changes until the given timestamp. If no timestamp provided, reads indefinitely.")
@Default.String ("")
String getEndTimestamp();
voidsetEndTimestamp(String value);
@TemplateParameter.Text (
order=9,
optional=true,
description="Cloud Spanner shadow table prefix.",
helpText="The prefix used to name shadow tables. Default: `shadow_`.")
@Default.String ("rev_shadow_")
String getShadowTablePrefix();
voidsetShadowTablePrefix(String value);
@TemplateParameter.GcsReadFile(
order=10,
optional=false,
description="Path to GCS file containing the the Source shard details",
helpText="Path to GCS file containing connection profile info for source shards.")
String getSourceShardsFilePath();
voidsetSourceShardsFilePath(String value);
@TemplateParameter.GcsReadFile(
order=11,
optional=true,
description="Session File Path in Cloud Storage",
helpText=
"Session file path in Cloud Storage that contains mapping information from"
+" HarbourBridge")
String getSessionFilePath();
voidsetSessionFilePath(String value);
@TemplateParameter.Enum(
order=12,
optional=true,
enumOptions={@TemplateEnumOption("none"),@TemplateEnumOption("forward_migration")},
description="Filtration mode",
helpText=
"Mode of Filtration, decides how to drop certain records based on a criteria. Currently"
+" supported modes are: none (filter nothing), forward_migration (filter records"
+" written via the forward migration pipeline). Defaults to forward_migration.")
@Default.String ("forward_migration")
String getFiltrationMode();
voidsetFiltrationMode(String value);
@TemplateParameter.GcsReadFile(
order=13,
optional=true,
description="Custom jar location in Cloud Storage",
helpText=
"Custom jar location in Cloud Storage that contains the customization logic"
+" for fetching shard id.")
@Default.String ("")
String getShardingCustomJarPath();
voidsetShardingCustomJarPath(String value);
@TemplateParameter.Text (
order=14,
optional=true,
description="Custom class name",
helpText=
"Fully qualified class name having the custom shard id implementation. It is a"
+" mandatory field in case shardingCustomJarPath is specified")
@Default.String ("")
String getShardingCustomClassName();
voidsetShardingCustomClassName(String value);
@TemplateParameter.Text (
order=15,
optional=true,
description="Custom sharding logic parameters",
helpText=
"String containing any custom parameters to be passed to the custom sharding class.")
@Default.String ("")
String getShardingCustomParameters();
voidsetShardingCustomParameters(String value);
@TemplateParameter.Text (
order=16,
optional=true,
description="SourceDB timezone offset",
helpText=
"This is the timezone offset from UTC for the source database. Example value: +10:00")
@Default.String ("+00:00")
String getSourceDbTimezoneOffset();
voidsetSourceDbTimezoneOffset(String value);
@TemplateParameter.PubsubSubscription(
order=17,
optional=true,
description=
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+" retry directory when running in regular mode.",
helpText=
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+" retry directory when running in regular mode. The name should be in the format"
+" of projects/<project-id>/subscriptions/<subscription-name>. When set, the"
+" deadLetterQueueDirectory and dlqRetryMinutes are ignored.")
String getDlqGcsPubSubSubscription();
voidsetDlqGcsPubSubSubscription(String value);
@TemplateParameter.Text (
order=18,
optional=true,
description="Directory name for holding skipped records",
helpText=
"Records skipped from reverse replication are written to this directory. Default"
+" directory name is skip.")
@Default.String ("skip")
String getSkipDirectoryName();
voidsetSkipDirectoryName(String value);
@TemplateParameter.Long(
order=19,
optional=true,
description="Maximum connections per shard.",
helpText="This will come from shard file eventually.")
@Default.Long(10000)
LonggetMaxShardConnections();
voidsetMaxShardConnections(Longvalue);
@TemplateParameter.Text (
order=20,
optional=true,
description="Dead letter queue directory.",
helpText=
"The file path used when storing the error queue output. "
+"The default file path is a directory under the Dataflow job's temp location.")
@Default.String ("")
String getDeadLetterQueueDirectory();
voidsetDeadLetterQueueDirectory(String value);
@TemplateParameter.Integer(
order=21,
optional=true,
description="Dead letter queue maximum retry count",
helpText=
"The max number of times temporary errors can be retried through DLQ. Defaults to 500.")
@Default.Integer(500)
IntegergetDlqMaxRetryCount();
voidsetDlqMaxRetryCount(Integervalue);
@TemplateParameter.Enum(
order=22,
optional=true,
description="Run mode - currently supported are : regular or retryDLQ",
enumOptions={@TemplateEnumOption("regular"),@TemplateEnumOption("retryDLQ")},
helpText=
"This is the run mode type, whether regular or with retryDLQ.Default is regular."
+" retryDLQ is used to retry the severe DLQ records only.")
@Default.String ("regular")
String getRunMode();
voidsetRunMode(String value);
@TemplateParameter.Integer(
order=23,
optional=true,
description="Dead letter queue retry minutes",
helpText="The number of minutes between dead letter queue retries. Defaults to 10.")
@Default.Integer(10)
IntegergetDlqRetryMinutes();
voidsetDlqRetryMinutes(Integervalue);
@TemplateParameter.Enum(
order=24,
optional=true,
description="Source database type, ex: mysql",
enumOptions={@TemplateEnumOption("mysql"),@TemplateEnumOption("cassandra")},
helpText="The type of source database to reverse replicate to.")
@Default.String ("mysql")
String getSourceType();
voidsetSourceType(String value);
@TemplateParameter.GcsReadFile(
order=25,
optional=true,
description="Custom transformation jar location in Cloud Storage",
helpText=
"Custom jar location in Cloud Storage that contains the custom transformation logic for processing records"
+" in reverse replication.")
@Default.String ("")
String getTransformationJarPath();
voidsetTransformationJarPath(String value);
@TemplateParameter.Text (
order=26,
optional=true,
description="Custom class name for transformation",
helpText=
"Fully qualified class name having the custom transformation logic. It is a"
+" mandatory field in case transformationJarPath is specified")
@Default.String ("")
String getTransformationClassName();
voidsetTransformationClassName(String value);
@TemplateParameter.Text (
order=27,
optional=true,
description="Custom parameters for transformation",
helpText=
"String containing any custom parameters to be passed to the custom transformation class.")
@Default.String ("")
String getTransformationCustomParameters();
voidsetTransformationCustomParameters(String value);
@TemplateParameter.Text (
order=28,
optional=true,
description="Table name overrides from spanner to source",
regexes=
"^\\[([[:space:]]*\\{[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
example="[{Singers, Vocalists}, {Albums, Records}]",
helpText=
"These are the table name overrides from spanner to source. They are written in the"
+"following format: [{SpannerTableName1, SourceTableName1}, {SpannerTableName2, SourceTableName2}]"
+"This example shows mapping Singers table to Vocalists and Albums table to Records.")
@Default.String ("")
String getTableOverrides();
voidsetTableOverrides(String value);
@TemplateParameter.Text (
order=29,
optional=true,
description="Column name overrides from spanner to source",
regexes=
"^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
example=
"[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]",
helpText=
"These are the column name overrides from spanner to source. They are written in the"
+"following format: [{SpannerTableName1.SpannerColumnName1, SpannerTableName1.SourceColumnName1}, {SpannerTableName2.SpannerColumnName1, SpannerTableName2.SourceColumnName1}]"
+"Note that the SpannerTableName should remain the same in both the spanner and source pair. To override table names, use tableOverrides."
+"The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively.")
@Default.String ("")
String getColumnOverrides();
voidsetColumnOverrides(String value);
@TemplateParameter.GcsReadFile(
order=30,
optional=true,
description="File based overrides from spanner to source",
helpText=
"A file which specifies the table and the column name overrides from spanner to source.")
@Default.String ("")
String getSchemaOverridesFilePath();
voidsetSchemaOverridesFilePath(String value);
@TemplateParameter.Text (
order=31,
optional=true,
description="Directory name for holding filtered records",
helpText=
"Records skipped from reverse replication are written to this directory. Default"
+" directory name is skip.")
@Default.String ("filteredEvents")
String getFilterEventsDirectoryName();
voidsetFilterEventsDirectoryName(String value);
@TemplateParameter.Boolean(
order=32,
optional=true,
description="Boolean setting if reverse migration is sharded",
helpText=
"Sets the template to a sharded migration. If source shard template contains more"
+" than one shard, the value will be set to true. This value defaults to false.")
@Default.Boolean(false)
BooleangetIsShardedMigration();
voidsetIsShardedMigration(Booleanvalue);
@TemplateParameter.Text (
order=33,
optional=true,
description="Failure injection parameter",
helpText="Failure injection parameter. Only used for testing.")
@Default.String ("")
String getFailureInjectionParameter();
voidsetFailureInjectionParameter(String value);
@TemplateParameter.Enum(
order=34,
enumOptions={
@TemplateEnumOption("LOW"),
@TemplateEnumOption("MEDIUM"),
@TemplateEnumOption("HIGH")
},
optional=true,
description="Priority for Spanner RPC invocations",
helpText=
"The request priority for Cloud Spanner calls. The value must be one of:"
+" [`HIGH`,`MEDIUM`,`LOW`]. Defaults to `HIGH`.")
@Default.Enum("HIGH")
RpcPriority getSpannerPriority();
voidsetSpannerPriority(RpcPriority value);
}
/**
 * Main entry point for executing the pipeline.
 *
 * @param args The command-line arguments to the pipeline.
 */
publicstaticvoidmain(String []args){
UncaughtExceptionLogger.register();
LOG .info("Starting Spanner change streams to sink");
Options options=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options .class);
options .setStreaming(true);
run(options);
}
/**
 * Runs the pipeline with the supplied options.
 *
 * @param options The execution parameters to the pipeline.
 * @return The result of the pipeline execution.
 */
publicstaticPipelineResultrun(Options options){
Pipeline pipeline=Pipeline .create(options);
pipeline
.getOptions()
.as(DataflowPipelineWorkerPoolOptions.class)
.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
// calculate the max connections per worker
intmaxNumWorkers=
pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers() > 0
?pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers()
:1;
intconnectionPoolSizePerWorker=(int)(options .getMaxShardConnections()/maxNumWorkers);
if(connectionPoolSizePerWorker < 1){
// This can happen when the number of workers is more than max.
// This can cause overload on the source database. Error out and let the user know.
LOG .error(
"Max workers {} is more than max shard connections {}, this can lead to more database"
+" connections than desired",
maxNumWorkers,
options .getMaxShardConnections());
thrownewIllegalArgumentException(
"Max Dataflow workers "
+maxNumWorkers
+" is more than max per shard connections: "
+options .getMaxShardConnections()
+" this can lead to more"
+" database connections than desired. Either reduce the max allowed workers or"
+" incease the max shard connections");
}
// Prepare Spanner config
SpannerConfigspannerConfig=
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options .getSpannerProjectId()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options .getInstanceId()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options .getDatabaseId()))
.withRpcPriority(ValueProvider.StaticValueProvider.of(options .getSpannerPriority()));
// Create shadow tables
// Note that there is a limit on the number of tables that can be created per DB: 5000.
// If we create shadow tables per shard, there will be an explosion of tables.
// Anyway the shadow table has Spanner PK so no need to again separate by the shard
// Lookup by the Spanner PK should be sufficient.
// Prepare Spanner config
SpannerConfigspannerMetadataConfig=
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options .getSpannerProjectId()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options .getMetadataInstance()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options .getMetadataDatabase()))
.withRpcPriority(ValueProvider.StaticValueProvider.of(options .getSpannerPriority()));
ShadowTableCreatorshadowTableCreator=
newShadowTableCreator(
spannerConfig,
spannerMetadataConfig,
SpannerAccessor.getOrCreate(spannerMetadataConfig)
.getDatabaseAdminClient ()
.getDatabase(
spannerMetadataConfig.getInstanceId().get(),
spannerMetadataConfig.getDatabaseId().get())
.getDialect(),
options .getShadowTablePrefix());
DataflowPipelineDebugOptionsdebugOptions=options .as(DataflowPipelineDebugOptions.class);
shadowTableCreator.createShadowTablesInSpanner();
Ddlddl=SpannerSchema.getInformationSchemaAsDdl(spannerConfig);
DdlshadowTableDdl=SpannerSchema.getInformationSchemaAsDdl(spannerMetadataConfig);
List<Shard>shards;
String shardingMode;
if(MYSQL_SOURCE_TYPE.equals(options .getSourceType())){
ShardFileReadershardFileReader=newShardFileReader(newSecretManagerAccessorImpl());
shards=shardFileReader.getOrderedShardDetails(options .getSourceShardsFilePath());
shardingMode=Constants .SHARDING_MODE_MULTI_SHARD;
}else{
CassandraConfigFileReadercassandraConfigFileReader=newCassandraConfigFileReader();
shards=cassandraConfigFileReader.getCassandraShard(options .getSourceShardsFilePath());
LOG .info("Cassandra config is: {}",shards.get(0));
shardingMode=Constants .SHARDING_MODE_SINGLE_SHARD;
}
SourceSchemasourceSchema=fetchSourceSchema(options,shards);
LOG .info("Source schema: {}",sourceSchema);
if(shards.size()==1 && !options .getIsShardedMigration()){
shardingMode=Constants .SHARDING_MODE_SINGLE_SHARD;
Shardshard=shards.get(0);
if(shard .getLogicalShardId()==null){
shard .setLogicalShardId(Constants .DEFAULT_SHARD_ID);
LOG .info(
"Logical shard id was not found, hence setting it to : "+Constants .DEFAULT_SHARD_ID);
}
}
booleanisRegularMode="regular".equals(options .getRunMode());
PCollectionTuplereconsumedElements=null;
DeadLetterQueueManagerdlqManager=buildDlqManager(options);
intreshuffleBucketSize=
maxNumWorkers
*(debugOptions.getNumberOfWorkerHarnessThreads() > 0
?debugOptions.getNumberOfWorkerHarnessThreads()
:Constants .DEFAULT_WORKER_HARNESS_THREAD_COUNT);
if(isRegularMode && (!Strings.isNullOrEmpty(options .getDlqGcsPubSubSubscription()))){
reconsumedElements=
dlqManager.getReconsumerDataTransformForFiles(
pipeline.apply(
"Read retry from PubSub",
newPubSubNotifiedDlqIO(
options .getDlqGcsPubSubSubscription(),
// file paths to ignore when re-consuming for retry
newArrayList<String>(
Arrays.asList(
"/severe/",
"/tmp_retry",
"/tmp_severe/",
".temp",
"/tmp_skip/",
"/"+options .getSkipDirectoryName())))));
}else{
reconsumedElements=
dlqManager.getReconsumerDataTransform(
pipeline.apply(dlqManager.dlqReconsumer(options .getDlqRetryMinutes())));
}
PCollection<FailsafeElement<String,String>>dlqJsonStrRecords=
reconsumedElements
.get(DeadLetterQueueManager.RETRYABLE_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
PCollection<TrimmedShardedDataChangeRecord>dlqRecords=
dlqJsonStrRecords
.apply(
"Convert DLQ records to TrimmedShardedDataChangeRecord",
ParDo.of(newConvertDlqRecordToTrimmedShardedDataChangeRecordFn()))
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
PCollection<TrimmedShardedDataChangeRecord>mergedRecords=null;
if(options .getFailureInjectionParameter()!=null
 && !options .getFailureInjectionParameter().isBlank()){
spannerConfig=
SpannerServiceFactoryImpl.createSpannerService(
spannerConfig,options .getFailureInjectionParameter());
}
if(isRegularMode){
PCollection<TrimmedShardedDataChangeRecord>changeRecordsFromDB=
pipeline
.apply(
getReadChangeStreamDoFn(
options,
spannerConfig))// This emits PCollection<DataChangeRecord> which is Spanner
// change
// stream data
.apply("Reshuffle",Reshuffle.viaRandomKey())
.apply("Filteration",ParDo.of(newFilterRecordsFn(options .getFiltrationMode())))
.apply("Preprocess",ParDo.of(newPreprocessRecordsFn()))
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
mergedRecords=
PCollectionList.of(changeRecordsFromDB)
.and(dlqRecords)
.apply("Flatten",Flatten.pCollections())
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
}else{
mergedRecords=dlqRecords;
}
CustomTransformationcustomTransformation=
CustomTransformation.builder(
options .getTransformationJarPath(),options .getTransformationClassName())
.setCustomParameters(options .getTransformationCustomParameters())
.build();
ISchemaMapperschemaMapper=getSchemaMapper(options,ddl);
if(options .getFailureInjectionParameter()!=null
 && !options .getFailureInjectionParameter().isBlank()){
spannerMetadataConfig=
SpannerServiceFactoryImpl.createSpannerService(
spannerMetadataConfig,options .getFailureInjectionParameter());
}
SourceWriterTransform.ResultsourceWriterOutput=
mergedRecords
.apply(
"AssignShardId",// This emits PCollection<KV<Long,
// TrimmedShardedDataChangeRecord>> which is Spanner change stream data with key as
// PK
// mod
// number of parallelism
ParDo.of(
newAssignShardIdFn(
spannerConfig,
schemaMapper,
ddl,
sourceSchema,
shardingMode,
shards.get(0).getLogicalShardId(),
options .getSkipDirectoryName(),
options .getShardingCustomJarPath(),
options .getShardingCustomClassName(),
options .getShardingCustomParameters(),
options .getMaxShardConnections()*shards.size(),
options .getSourceType())))// currently assuming that all shards accept the
// same
.setCoder(
KvCoder.of(
VarLongCoder.of(),SerializableCoder.of(TrimmedShardedDataChangeRecord.class)))
.apply("Reshuffle2",Reshuffle.of())
.apply(
"Write to source",
newSourceWriterTransform(
shards,
schemaMapper,
spannerMetadataConfig,
options .getSourceDbTimezoneOffset(),
ddl,
shadowTableDdl,
sourceSchema,
options .getShadowTablePrefix(),
options .getSkipDirectoryName(),
connectionPoolSizePerWorker,
options .getSourceType(),
customTransformation));
PCollection<FailsafeElement<String,String>>dlqPermErrorRecords=
reconsumedElements
.get(DeadLetterQueueManager.PERMANENT_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
PCollection<FailsafeElement<String,String>>permErrorsFromSourceWriter=
sourceWriterOutput
.permanentErrors()
.setCoder(StringUtf8Coder.of())
.apply(
"Reshuffle3",Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize))
.apply(
"Convert permanent errors from source writer to DLQ format",
ParDo.of(newConvertChangeStreamErrorRecordToFailsafeElementFn()))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
PCollection<FailsafeElement<String,String>>permanentErrors=
PCollectionList.of(dlqPermErrorRecords)
.and(permErrorsFromSourceWriter)
.apply(Flatten.pCollections())
.apply("Reshuffle",Reshuffle.viaRandomKey());
permanentErrors
.apply("Update DLQ metrics",ParDo.of(newUpdateDlqMetricsFn(isRegularMode)))
.apply(
"DLQ: Write Severe errors to GCS",
MapElements.via(newStringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ for severe errors",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
.withTmpDirectory((options).getDeadLetterQueueDirectory()+"/tmp_severe/")
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String,String>>retryErrors=
sourceWriterOutput
.retryableErrors()
.setCoder(StringUtf8Coder.of())
.apply(
"Reshuffle4",Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize))
.apply(
"Convert retryable errors from source writer to DLQ format",
ParDo.of(newConvertChangeStreamErrorRecordToFailsafeElementFn()))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
retryErrors
.apply(
"DLQ: Write retryable Failures to GCS",
MapElements.via(newStringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ for retryable errors",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime())
.withTmpDirectory(options .getDeadLetterQueueDirectory()+"/tmp_retry/")
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String,String>>skippedRecords=
sourceWriterOutput
.skippedSourceWrites()
.setCoder(StringUtf8Coder.of())
.apply(
"Reshuffle5",Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize))
.apply(
"Convert skipped records from source writer to DLQ format",
ParDo.of(newConvertChangeStreamErrorRecordToFailsafeElementFn()))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
skippedRecords
.apply(
"Write skipped records to GCS",MapElements.via(newStringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Writing skipped records to GCS",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(
options .getDeadLetterQueueDirectory()+"/"+options .getSkipDirectoryName())
.withTmpDirectory(options .getDeadLetterQueueDirectory()+"/tmp_skip/")
.setIncludePaneInfo(true)
.build());
returnpipeline.run();
}
publicstaticSpannerIO.ReadChangeStreamgetReadChangeStreamDoFn(
Options options,SpannerConfigspannerConfig){
Timestamp startTime=Timestamp .now ();
if(!options .getStartTimestamp().equals("")){
startTime=Timestamp .parseTimestamp (options .getStartTimestamp());
}
SpannerIO.ReadChangeStreamreadChangeStreamDoFn=
SpannerIO.readChangeStream ()
.withSpannerConfig(spannerConfig)
.withChangeStreamName(options .getChangeStreamName())
.withMetadataInstance(options .getMetadataInstance())
.withMetadataDatabase(options .getMetadataDatabase())
.withInclusiveStartAt(startTime)
.withRpcPriority(options .getSpannerPriority());
if(!options .getEndTimestamp().equals("")){
returnreadChangeStreamDoFn.withInclusiveEndAt(
Timestamp .parseTimestamp (options .getEndTimestamp()));
}
returnreadChangeStreamDoFn;
}
privatestaticDeadLetterQueueManagerbuildDlqManager(Options options){
String tempLocation=
options .as(DataflowPipelineOptions.class).getTempLocation().endsWith ("/")
?options .as(DataflowPipelineOptions.class).getTempLocation()
:options .as(DataflowPipelineOptions.class).getTempLocation()+"/";
String dlqDirectory=
options .getDeadLetterQueueDirectory().isEmpty()
?tempLocation+"dlq/"
:options .getDeadLetterQueueDirectory();
LOG .info("Dead-letter queue directory: {}",dlqDirectory);
options .setDeadLetterQueueDirectory(dlqDirectory);
if("regular".equals(options .getRunMode())){
returnDeadLetterQueueManager.create(dlqDirectory,options .getDlqMaxRetryCount());
}else{
String retryDlqUri=
FileSystems.matchNewResource(dlqDirectory,true)
.resolve ("severe",StandardResolveOptions.RESOLVE_DIRECTORY)
.toString();
LOG .info("Dead-letter retry directory: {}",retryDlqUri);
returnDeadLetterQueueManager.create(dlqDirectory,retryDlqUri,0);
}
}
privatestaticConnection createJdbcConnection(Shardshard){
try{
String sourceConnectionUrl=
"jdbc:mysql://"+shard .getHost()+":"+shard .getPort()+"/"+shard .getDbName();
HikariConfigconfig=newHikariConfig();
config .setJdbcUrl(sourceConnectionUrl);
config .setUsername(shard .getUserName());
config .setPassword(shard .getPassword());
config .setDriverClassName("com.mysql.cj.jdbc.Driver");
HikariDataSourceds=newHikariDataSource(config);
returnds.getConnection();
}catch(java.sql .SQLExceptione){
LOG .error("Sql error while discovering mysql schema: {}",e);
thrownewRuntimeException(e);
}
}
/**
 * Creates a {@link CqlSession} for the given {@link CassandraShard}.
 *
 * @param cassandraShard The shard containing connection details.
 * @return A {@link CqlSession} instance.
 */
privatestaticCqlSessioncreateCqlSession(CassandraShardcassandraShard){
CqlSessionBuilderbuilder=CqlSession.builder();
DriverConfigLoaderconfigLoader=
CassandraDriverConfigLoader.fromOptionsMap(cassandraShard.getOptionsMap());
builder.withConfigLoader(configLoader);
returnbuilder.build();
}
privatestaticSourceSchemafetchSourceSchema(Options options,List<Shard>shards){
SourceSchemaScannerscanner=null;
SourceSchemasourceSchema=null;
try{
if(options .getSourceType().equals(MYSQL_SOURCE_TYPE)){
Connection connection=createJdbcConnection(shards.get(0));
scanner=newMySqlInformationSchemaScanner(connection,shards.get(0).getDbName());
sourceSchema=scanner.scan();
connection.close ();
}else{
try(CqlSessionsession=createCqlSession((CassandraShard)shards.get(0))){
scanner=
newCassandraInformationSchemaScanner(
session,((CassandraShard)shards.get(0)).getKeySpaceName());
sourceSchema=scanner.scan();
}
}
}catch(SQLExceptione){
thrownewRuntimeException("Unable to discover jdbc schema",e);
}
returnsourceSchema;
}
privatestaticISchemaMappergetSchemaMapper(Options options,Ddlddl){
// Check if config types are specified
booleanhasSessionFile=
options .getSessionFilePath()!=null && !options .getSessionFilePath().equals("");
booleanhasSchemaOverridesFile=
options .getSchemaOverridesFilePath()!=null
 && !options .getSchemaOverridesFilePath().equals("");
booleanhasStringOverrides=
(options .getTableOverrides()!=null && !options .getTableOverrides().equals(""))
||(options .getColumnOverrides()!=null && !options .getColumnOverrides().equals(""));
intoverrideTypesCount=0;
if(hasSessionFile){
overrideTypesCount++;
}
if(hasSchemaOverridesFile){
overrideTypesCount++;
}
if(hasStringOverrides){
overrideTypesCount++;
}
if(overrideTypesCount > 1){
thrownewIllegalArgumentException(
"Only one type of schema override can be specified. Please use only one of: sessionFilePath, "
+"schemaOverridesFilePath, or tableOverrides/columnOverrides.");
}
ISchemaMapperschemaMapper=newIdentityMapper(ddl);
if(hasSessionFile){
schemaMapper=newSessionBasedMapper(options .getSessionFilePath(),ddl);
}elseif(hasSchemaOverridesFile){
schemaMapper=newSchemaFileOverridesBasedMapper(options .getSchemaOverridesFilePath(),ddl);
}elseif(hasStringOverrides){
schemaMapper=
newSchemaStringOverridesBasedMapper(
options .getTableOverrides(),options .getColumnOverrides(),ddl);
}
returnschemaMapper;
}
}

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025年11月07日 UTC.