Bigtable to Cloud Storage Avro template
Stay organized with collections
Save and categorize content based on your preferences.
The Bigtable to Cloud Storage Avro template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Avro format. You can use the template to move data from Bigtable to Cloud Storage.
Pipeline requirements
- The Bigtable table must exist.
- The output Cloud Storage bucket must exist before running the pipeline.
Template parameters
Required parameters
- bigtableProjectId: The ID of the Google Cloud project that contains the Bigtable instance that you want to read data from.
- bigtableInstanceId: The ID of the Bigtable instance that contains the table.
- bigtableTableId: The ID of the Bigtable table to export.
- outputDirectory: The Cloud Storage path where data is written. For example,
gs://mybucket/somefolder. - filenamePrefix: The prefix of the Avro filename. For example,
output-. Defaults to: part.
Optional parameters
- bigtableAppProfileId: The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Cloud Bigtable to Avro Files on Cloud Storage template .
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gclouddataflowjobsrunJOB_NAME\ --gcs-locationgs://dataflow-templates-REGION_NAME/VERSION/Cloud_Bigtable_to_GCS_Avro\ --regionREGION_NAME\ --parameters\ bigtableProjectId=BIGTABLE_PROJECT_ID,\ bigtableInstanceId=INSTANCE_ID,\ bigtableTableId=TABLE_ID,\ outputDirectory=OUTPUT_DIRECTORY,\ filenamePrefix=FILENAME_PREFIX
Replace the following:
JOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023年09月12日-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME: the region where you want to deploy your Dataflow job—for example,us-central1BIGTABLE_PROJECT_ID: the ID of the Google Cloud project of the Bigtable instance that you want to read data fromINSTANCE_ID: the ID of the Bigtable instance that contains the tableTABLE_ID: the ID of the Bigtable table to exportOUTPUT_DIRECTORY: the Cloud Storage path where data is written, for example,gs://mybucket/somefolderFILENAME_PREFIX: the prefix of the Avro filename, for example,output-
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
POSThttps://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Bigtable_to_GCS_Avro { "jobName":"JOB_NAME", "parameters":{ "bigtableProjectId":"BIGTABLE_PROJECT_ID", "bigtableInstanceId":"INSTANCE_ID", "bigtableTableId":"TABLE_ID", "outputDirectory":"OUTPUT_DIRECTORY", "filenamePrefix":"FILENAME_PREFIX", }, "environment":{"zone":"us-central1-f"} }
Replace the following:
PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023年09月12日-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION: the region where you want to deploy your Dataflow job—for example,us-central1BIGTABLE_PROJECT_ID: the ID of the Google Cloud project of the Bigtable instance that you want to read data fromINSTANCE_ID: the ID of the Bigtable instance that contains the tableTABLE_ID: the ID of the Bigtable table to exportOUTPUT_DIRECTORY: the Cloud Storage path where data is written, for example,gs://mybucket/somefolderFILENAME_PREFIX: the prefix of the Avro filename, for example,output-
Template source code
Java
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
packagecom.google.cloud.teleport.bigtable;
importcom.google.bigtable.v2.Cell;
importcom.google.bigtable.v2.Column;
importcom.google.bigtable.v2.Family;
importcom.google.bigtable.v2.Row;
importcom.google.cloud.teleport.bigtable.BigtableToAvro.Options;
importcom.google.cloud.teleport.metadata.Template;
importcom.google.cloud.teleport.metadata.TemplateCategory;
importcom.google.cloud.teleport.metadata.TemplateParameter;
importcom.google.cloud.teleport.util.DualInputNestedValueProvider;
importcom.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
importcom.google.protobuf.ByteOutput ;
importcom.google.protobuf.ByteString ;
importcom.google.protobuf.UnsafeByteOperations ;
importjava.io.IOException;
importjava.nio.ByteBuffer;
importjava.util.ArrayList;
importjava.util.List;
importorg.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
importorg.apache.beam.sdk.Pipeline;
importorg.apache.beam.sdk.PipelineResult;
importorg.apache.beam.sdk.extensions.avro.io.AvroIO;
importorg.apache.beam.sdk.io.FileSystems;
importorg.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
importorg.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
importorg.apache.beam.sdk.options.Default;
importorg.apache.beam.sdk.options.PipelineOptions;
importorg.apache.beam.sdk.options.PipelineOptionsFactory;
importorg.apache.beam.sdk.options.ValueProvider;
importorg.apache.beam.sdk.transforms.MapElements;
importorg.apache.beam.sdk.transforms.SerializableFunction;
importorg.apache.beam.sdk.transforms.SimpleFunction;
/**
* Dataflow pipeline that exports data from a Cloud Bigtable table to Avro files in GCS. Currently,
* filtering on Cloud Bigtable table is not supported.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_Avro.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name="Cloud_Bigtable_to_GCS_Avro",
category=TemplateCategory.BATCH,
displayName="Cloud Bigtable to Avro Files in Cloud Storage",
description=
"The Bigtable to Cloud Storage Avro template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Avro format. "
+"You can use the template to move data from Bigtable to Cloud Storage.",
optionsClass=Options.class,
documentation=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-avro",
contactInformation="https://cloud.google.com/support",
requirements={
"The Bigtable table must exist.",
"The output Cloud Storage bucket must exist before running the pipeline."
})
publicclass BigtableToAvro{
/** Options for the export pipeline. */
publicinterface OptionsextendsPipelineOptions{
@TemplateParameter.ProjectId(
order=1,
groupName="Source",
description="Project ID",
helpText=
"The ID of the Google Cloud project that contains the Bigtable instance that you want to read data from.")
ValueProvider<String>getBigtableProjectId();
@SuppressWarnings("unused")
voidsetBigtableProjectId(ValueProvider<String>projectId);
@TemplateParameter.Text(
order=2,
groupName="Source",
regexes={"[a-z][a-z0-9\\-]+[a-z0-9]"},
description="Instance ID",
helpText="The ID of the Bigtable instance that contains the table.")
ValueProvider<String>getBigtableInstanceId();
@SuppressWarnings("unused")
voidsetBigtableInstanceId(ValueProvider<String>instanceId);
@TemplateParameter.Text(
order=3,
groupName="Source",
regexes={"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description="Table ID",
helpText="The ID of the Bigtable table to export.")
ValueProvider<String>getBigtableTableId();
@SuppressWarnings("unused")
voidsetBigtableTableId(ValueProvider<String>tableId);
@TemplateParameter.GcsWriteFolder(
order=4,
groupName="Target",
description="Output file directory in Cloud Storage",
helpText="The Cloud Storage path where data is written.",
example="gs://mybucket/somefolder")
ValueProvider<String>getOutputDirectory();
@SuppressWarnings("unused")
voidsetOutputDirectory(ValueProvider<String>outputDirectory);
@TemplateParameter.Text(
order=5,
groupName="Target",
description="Avro file prefix",
helpText="The prefix of the Avro filename. For example, `output-`.")
@Default.String("part")
ValueProvider<String>getFilenamePrefix();
@SuppressWarnings("unused")
voidsetFilenamePrefix(ValueProvider<String>filenamePrefix);
@TemplateParameter.Text(
order=6,
groupName="Source",
optional=true,
regexes={"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description="Application profile ID",
helpText=
"The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
@Default.String("default")
ValueProvider<String>getBigtableAppProfileId();
@SuppressWarnings("unused")
voidsetBigtableAppProfileId(ValueProvider<String>appProfileId);
}
/**
* Runs a pipeline to export data from a Cloud Bigtable table to Avro files in GCS.
*
* @param args arguments to the pipeline
*/
publicstaticvoidmain(String[]args){
Optionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
PipelineResultresult=run(options);
// Wait for pipeline to finish only if it is not constructing a template.
if(options.as(DataflowPipelineOptions.class).getTemplateLocation()==null){
result.waitUntilFinish();
}
}
publicstaticPipelineResultrun(Optionsoptions){
Pipelinepipeline=Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
BigtableIO.Readread=
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withAppProfileId(options.getBigtableAppProfileId())
.withTableId(options.getBigtableTableId());
// Do not validate input fields if it is running as a template.
if(options.as(DataflowPipelineOptions.class).getTemplateLocation()!=null){
read=read.withoutValidation();
}
ValueProvider<String>filePathPrefix=
DualInputNestedValueProvider.of(
options.getOutputDirectory(),
options.getFilenamePrefix(),
newSerializableFunction<TranslatorInput<String,String>,String>(){
@Override
publicStringapply(TranslatorInput<String,String>input){
returnFileSystems.matchNewResource(input.getX(),true)
.resolve(input.getY(),StandardResolveOptions.RESOLVE_FILE)
.toString();
}
});
pipeline
.apply("Read from Bigtable",read)
.apply("Transform to Avro",MapElements.via(newBigtableToAvroFn()))
.apply(
"Write to Avro in GCS",
AvroIO.write(BigtableRow.class).to(filePathPrefix).withSuffix(".avro"));
returnpipeline.run ();
}
/** Translates Bigtable {@link Row} to Avro {@link BigtableRow}. */
staticclass BigtableToAvroFnextendsSimpleFunction<Row,BigtableRow>{
@Override
publicBigtableRowapply(Rowrow){
ByteBufferkey=ByteBuffer.wrap (toByteArray(row.getKey()));
List<BigtableCell>cells=newArrayList<>();
for(Familyfamily:row.getFamiliesList()){
StringfamilyName=family.getName();
for(Columncolumn:family.getColumnsList()){
ByteBufferqualifier=ByteBuffer.wrap (toByteArray(column.getQualifier()));
for(Cellcell:column.getCellsList()){
longtimestamp=cell.getTimestampMicros();
ByteBuffervalue=ByteBuffer.wrap (toByteArray(cell.getValue()));
cells.add(newBigtableCell(familyName,qualifier,timestamp,value));
}
}
}
returnnewBigtableRow(key,cells);
}
}
/**
* Extracts the byte array from the given {@link ByteString} without copy.
*
* @param byteString A {@link ByteString} from which to extract the array.
* @return an array of byte.
*/
protectedstaticbyte[]toByteArray(finalByteString byteString){
try{
ZeroCopyByteOutputbyteOutput=newZeroCopyByteOutput();
UnsafeByteOperations .unsafeWriteTo (byteString,byteOutput);
returnbyteOutput.bytes;
}catch(IOExceptione){
returnbyteString.toByteArray ();
}
}
privatestaticfinalclass ZeroCopyByteOutputextendsByteOutput {
privatebyte[]bytes;
@Override
publicvoidwriteLazy(byte[]value,intoffset,intlength){
if(offset!=0||length!=value.length){
thrownewUnsupportedOperationException();
}
bytes=value;
}
@Override
publicvoidwrite(bytevalue){
thrownewUnsupportedOperationException();
}
@Override
publicvoidwrite(byte[]value,intoffset,intlength){
thrownewUnsupportedOperationException();
}
@Override
publicvoidwrite(ByteBuffervalue){
thrownewUnsupportedOperationException();
}
@Override
publicvoidwriteLazy(ByteBuffervalue){
thrownewUnsupportedOperationException();
}
}
}
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.