Bigtable to Cloud Storage Avro template

The Bigtable to Cloud Storage Avro template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Avro format. You can use the template to move data from Bigtable to Cloud Storage.

Pipeline requirements

  • The Bigtable table must exist.
  • The output Cloud Storage bucket must exist before running the pipeline.

Template parameters

Required parameters

  • bigtableProjectId: The ID of the Google Cloud project that contains the Bigtable instance that you want to read data from.
  • bigtableInstanceId: The ID of the Bigtable instance that contains the table.
  • bigtableTableId: The ID of the Bigtable table to export.
  • outputDirectory: The Cloud Storage path where data is written. For example, gs://mybucket/somefolder.
  • filenamePrefix: The prefix of the Avro filename. For example, output-. Defaults to: part.

Optional parameters

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Bigtable to Avro Files on Cloud Storage template .
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gclouddataflowjobsrunJOB_NAME\
--gcs-locationgs://dataflow-templates-REGION_NAME/VERSION/Cloud_Bigtable_to_GCS_Avro\
--regionREGION_NAME\
--parameters\
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
outputDirectory=OUTPUT_DIRECTORY,\
filenamePrefix=FILENAME_PREFIX

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • BIGTABLE_PROJECT_ID: the ID of the Google Cloud project of the Bigtable instance that you want to read data from
  • INSTANCE_ID: the ID of the Bigtable instance that contains the table
  • TABLE_ID: the ID of the Bigtable table to export
  • OUTPUT_DIRECTORY: the Cloud Storage path where data is written, for example, gs://mybucket/somefolder
  • FILENAME_PREFIX: the prefix of the Avro filename, for example, output-

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POSThttps://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Bigtable_to_GCS_Avro
{
"jobName":"JOB_NAME",
"parameters":{
"bigtableProjectId":"BIGTABLE_PROJECT_ID",
"bigtableInstanceId":"INSTANCE_ID",
"bigtableTableId":"TABLE_ID",
"outputDirectory":"OUTPUT_DIRECTORY",
"filenamePrefix":"FILENAME_PREFIX",
},
"environment":{"zone":"us-central1-f"}
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • BIGTABLE_PROJECT_ID: the ID of the Google Cloud project of the Bigtable instance that you want to read data from
  • INSTANCE_ID: the ID of the Bigtable instance that contains the table
  • TABLE_ID: the ID of the Bigtable table to export
  • OUTPUT_DIRECTORY: the Cloud Storage path where data is written, for example, gs://mybucket/somefolder
  • FILENAME_PREFIX: the prefix of the Avro filename, for example, output-

Template source code

Java

/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
packagecom.google.cloud.teleport.bigtable;
importcom.google.bigtable.v2.Cell;
importcom.google.bigtable.v2.Column;
importcom.google.bigtable.v2.Family;
importcom.google.bigtable.v2.Row;
importcom.google.cloud.teleport.bigtable.BigtableToAvro.Options;
importcom.google.cloud.teleport.metadata.Template;
importcom.google.cloud.teleport.metadata.TemplateCategory;
importcom.google.cloud.teleport.metadata.TemplateParameter;
importcom.google.cloud.teleport.util.DualInputNestedValueProvider;
importcom.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
importcom.google.protobuf.ByteOutput ;
importcom.google.protobuf.ByteString ;
importcom.google.protobuf.UnsafeByteOperations ;
importjava.io.IOException;
importjava.nio.ByteBuffer;
importjava.util.ArrayList;
importjava.util.List;
importorg.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
importorg.apache.beam.sdk.Pipeline;
importorg.apache.beam.sdk.PipelineResult;
importorg.apache.beam.sdk.extensions.avro.io.AvroIO;
importorg.apache.beam.sdk.io.FileSystems;
importorg.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
importorg.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
importorg.apache.beam.sdk.options.Default;
importorg.apache.beam.sdk.options.PipelineOptions;
importorg.apache.beam.sdk.options.PipelineOptionsFactory;
importorg.apache.beam.sdk.options.ValueProvider;
importorg.apache.beam.sdk.transforms.MapElements;
importorg.apache.beam.sdk.transforms.SerializableFunction;
importorg.apache.beam.sdk.transforms.SimpleFunction;
/**
 * Dataflow pipeline that exports data from a Cloud Bigtable table to Avro files in GCS. Currently,
 * filtering on Cloud Bigtable table is not supported.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_Avro.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
name="Cloud_Bigtable_to_GCS_Avro",
category=TemplateCategory.BATCH,
displayName="Cloud Bigtable to Avro Files in Cloud Storage",
description=
"The Bigtable to Cloud Storage Avro template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Avro format. "
+"You can use the template to move data from Bigtable to Cloud Storage.",
optionsClass=Options.class,
documentation=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-avro",
contactInformation="https://cloud.google.com/support",
requirements={
"The Bigtable table must exist.",
"The output Cloud Storage bucket must exist before running the pipeline."
})
publicclass BigtableToAvro{
/** Options for the export pipeline. */
publicinterface OptionsextendsPipelineOptions{
@TemplateParameter.ProjectId(
order=1,
groupName="Source",
description="Project ID",
helpText=
"The ID of the Google Cloud project that contains the Bigtable instance that you want to read data from.")
ValueProvider<String>getBigtableProjectId();
@SuppressWarnings("unused")
voidsetBigtableProjectId(ValueProvider<String>projectId);
@TemplateParameter.Text(
order=2,
groupName="Source",
regexes={"[a-z][a-z0-9\\-]+[a-z0-9]"},
description="Instance ID",
helpText="The ID of the Bigtable instance that contains the table.")
ValueProvider<String>getBigtableInstanceId();
@SuppressWarnings("unused")
voidsetBigtableInstanceId(ValueProvider<String>instanceId);
@TemplateParameter.Text(
order=3,
groupName="Source",
regexes={"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description="Table ID",
helpText="The ID of the Bigtable table to export.")
ValueProvider<String>getBigtableTableId();
@SuppressWarnings("unused")
voidsetBigtableTableId(ValueProvider<String>tableId);
@TemplateParameter.GcsWriteFolder(
order=4,
groupName="Target",
description="Output file directory in Cloud Storage",
helpText="The Cloud Storage path where data is written.",
example="gs://mybucket/somefolder")
ValueProvider<String>getOutputDirectory();
@SuppressWarnings("unused")
voidsetOutputDirectory(ValueProvider<String>outputDirectory);
@TemplateParameter.Text(
order=5,
groupName="Target",
description="Avro file prefix",
helpText="The prefix of the Avro filename. For example, `output-`.")
@Default.String("part")
ValueProvider<String>getFilenamePrefix();
@SuppressWarnings("unused")
voidsetFilenamePrefix(ValueProvider<String>filenamePrefix);
@TemplateParameter.Text(
order=6,
groupName="Source",
optional=true,
regexes={"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description="Application profile ID",
helpText=
"The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
@Default.String("default")
ValueProvider<String>getBigtableAppProfileId();
@SuppressWarnings("unused")
voidsetBigtableAppProfileId(ValueProvider<String>appProfileId);
}
/**
 * Runs a pipeline to export data from a Cloud Bigtable table to Avro files in GCS.
 *
 * @param args arguments to the pipeline
 */
publicstaticvoidmain(String[]args){
Optionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
PipelineResultresult=run(options);
// Wait for pipeline to finish only if it is not constructing a template.
if(options.as(DataflowPipelineOptions.class).getTemplateLocation()==null){
result.waitUntilFinish();
}
}
publicstaticPipelineResultrun(Optionsoptions){
Pipelinepipeline=Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
BigtableIO.Readread=
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withAppProfileId(options.getBigtableAppProfileId())
.withTableId(options.getBigtableTableId());
// Do not validate input fields if it is running as a template.
if(options.as(DataflowPipelineOptions.class).getTemplateLocation()!=null){
read=read.withoutValidation();
}
ValueProvider<String>filePathPrefix=
DualInputNestedValueProvider.of(
options.getOutputDirectory(),
options.getFilenamePrefix(),
newSerializableFunction<TranslatorInput<String,String>,String>(){
@Override
publicStringapply(TranslatorInput<String,String>input){
returnFileSystems.matchNewResource(input.getX(),true)
.resolve(input.getY(),StandardResolveOptions.RESOLVE_FILE)
.toString();
}
});
pipeline
.apply("Read from Bigtable",read)
.apply("Transform to Avro",MapElements.via(newBigtableToAvroFn()))
.apply(
"Write to Avro in GCS",
AvroIO.write(BigtableRow.class).to(filePathPrefix).withSuffix(".avro"));
returnpipeline.run ();
}
/** Translates Bigtable {@link Row} to Avro {@link BigtableRow}. */
staticclass BigtableToAvroFnextendsSimpleFunction<Row,BigtableRow>{
@Override
publicBigtableRowapply(Rowrow){
ByteBufferkey=ByteBuffer.wrap (toByteArray(row.getKey()));
List<BigtableCell>cells=newArrayList<>();
for(Familyfamily:row.getFamiliesList()){
StringfamilyName=family.getName();
for(Columncolumn:family.getColumnsList()){
ByteBufferqualifier=ByteBuffer.wrap (toByteArray(column.getQualifier()));
for(Cellcell:column.getCellsList()){
longtimestamp=cell.getTimestampMicros();
ByteBuffervalue=ByteBuffer.wrap (toByteArray(cell.getValue()));
cells.add(newBigtableCell(familyName,qualifier,timestamp,value));
}
}
}
returnnewBigtableRow(key,cells);
}
}
/**
 * Extracts the byte array from the given {@link ByteString} without copy.
 *
 * @param byteString A {@link ByteString} from which to extract the array.
 * @return an array of byte.
 */
protectedstaticbyte[]toByteArray(finalByteString byteString){
try{
ZeroCopyByteOutputbyteOutput=newZeroCopyByteOutput();
UnsafeByteOperations .unsafeWriteTo (byteString,byteOutput);
returnbyteOutput.bytes;
}catch(IOExceptione){
returnbyteString.toByteArray ();
}
}
privatestaticfinalclass ZeroCopyByteOutputextendsByteOutput {
privatebyte[]bytes;
@Override
publicvoidwriteLazy(byte[]value,intoffset,intlength){
if(offset!=0||length!=value.length){
thrownewUnsupportedOperationException();
}
bytes=value;
}
@Override
publicvoidwrite(bytevalue){
thrownewUnsupportedOperationException();
}
@Override
publicvoidwrite(byte[]value,intoffset,intlength){
thrownewUnsupportedOperationException();
}
@Override
publicvoidwrite(ByteBuffervalue){
thrownewUnsupportedOperationException();
}
@Override
publicvoidwriteLazy(ByteBuffervalue){
thrownewUnsupportedOperationException();
}
}
}

What's next

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025年10月30日 UTC.