Create a Dataflow pipeline using Go
This page shows you how to use the Apache Beam SDK for Go to build a program that defines a pipeline. Then, you run the pipeline locally and on the Dataflow service. For an introduction to the WordCount pipeline, see the How to use WordCount in Apache Beam video.
Before you begin
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get 300ドル in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloudinit
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloudservicesenabledataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloudauthapplication-defaultlogin
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUsergcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloudinit
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloudservicesenabledataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloudauthapplication-defaultlogin
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUsergcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com"--role=SERVICE_ACCOUNT_ROLE
- Replace
PROJECT_IDwith your project ID. - Replace
PROJECT_NUMBERwith your project number. To find your project number, see Identify projects or use thegcloud projects describecommand. - Replace
SERVICE_ACCOUNT_ROLEwith each individual role.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S(Standard). -
Set the storage location to the following:
US(United States). -
Replace
BUCKET_NAMEwith a unique bucket name. Don't include sensitive information in the bucket name because the bucket namespace is global and publicly visible.
gcloudstoragebucketscreategs://BUCKET_NAME--default-storage-class STANDARD --locationUS -
Set the storage class to
- Copy the Google Cloud project ID and the Cloud Storage bucket name. You need these values later in this quickstart.
Set up your development environment
The Apache Beam SDK is an open source programming model for data pipelines. You define a pipeline with an Apache Beam program and then choose a runner, such as Dataflow, to run your pipeline.
We recommend that you use the latest version of Go when working with the Apache Beam SDK for Go. If you don't have the latest version of Go installed, use Go's Download and install guide to download and install Go for your specific operating system.
To verify the version of Go that you have installed, run the following command in your local terminal:
goversionRun the Beam wordcount example
The Apache Beam SDK for Go includes a
wordcount pipeline example.
The wordcount example does the following:
- Reads a text file as input. By default, it reads a text file located in a
Cloud Storage bucket with the resource name
gs://dataflow-samples/shakespeare/kinglear.txt. - Parses each line into words.
- Performs a frequency count on the tokenized words.
To run the latest version of the Beam wordcount example on your local machine,
perform the following steps:
Use the
git clonecommand to clone theapache/beamGitHub repository:gitclonehttps://github.com/apache/beam.gitSwitch to the
beam/sdks/godirectory:cdbeam/sdks/goUse the following command to run the pipeline:
gorunexamples/wordcount/wordcount.go\ --inputgs://dataflow-samples/shakespeare/kinglear.txt\ --outputoutputsThe
inputflag specifies the file to read, and theoutputflag specifies the filename for the frequency count output.
After the pipeline completes, view the output results:
moreoutputs*To exit, press q.
Modify the pipeline code
The Beam wordcount pipeline distinguishes between uppercase and lowercase
words. The following steps show how to create your own Go module, modify the
wordcount pipeline so that the pipeline is not case-sensitive, and run it on
Dataflow.
Create a Go module
To make changes to the pipeline code, follow these steps.
Create a directory for your Go module in a location of your choice:
mkdirwordcountcdwordcountCreate a Go module. For this example, use
example/dataflowas the module path.gomodinitexample/dataflowDownload the latest copy of the
wordcountcode from the Apache Beam GitHub repository. Put this file into thewordcountdirectory you created.If you are using a non-Linux operating system, you must get the Go
unixpackage. This package is required to run pipelines on the Dataflow service.goget-ugolang.org/x/sys/unixEnsure that the
go.modfile matches the module's source code:gomodtidy
Run the unmodified pipeline
Verify the unmodified wordcount pipeline runs locally.
From the terminal, build and run the pipeline locally:
gorunwordcount.go--inputgs://dataflow-samples/shakespeare/kinglear.txt\ --outputoutputsView the output results:
moreoutputs*To exit, press q.
Change the pipeline code
To change the pipeline so that it is not case-sensitive, modify the code to
apply the strings.ToLower
function to all words.
In an editor of your choice, open the
wordcount.gofile.Examine the
initblock (comments have been removed for clarity):funcinit(){ register.DoFn3x0[context.Context,string,func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }Add a new line to register the
strings.ToLowerfunction:funcinit(){ register.DoFn3x0[context.Context,string,func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }Examine the
CountWordsfunction:funcCountWords(sbeam.Scope,linesbeam.PCollection)beam.PCollection{ s=s.Scope("CountWords") // Convert lines of text into individual words. col:=beam.ParDo(s,&extractFn{SmallWordLength:*smallWordLength},lines) // Count the number of times each word occurs. returnstats.Count(s,col) }To lowercase the words, add a ParDo that applies
strings.ToLowerto every word:funcCountWords(sbeam.Scope,linesbeam.PCollection)beam.PCollection{ s=s.Scope("CountWords") // Convert lines of text into individual words. col:=beam.ParDo(s,&extractFn{SmallWordLength:*smallWordLength},lines) // Map all letters to lowercase. lowercaseWords:=beam.ParDo(s,strings.ToLower,col) // Count the number of times each word occurs. returnstats.Count(s,lowercaseWords) }Save the file.
Run the updated pipeline locally
Run your updated wordcount pipeline locally and verify the output has changed.
Build and run the modified
wordcountpipeline:gorunwordcount.go--inputgs://dataflow-samples/shakespeare/kinglear.txt\ --outputoutputsView the output results of the modified pipeline. All words should be lowercase.
moreoutputs*To exit, press q.
Run the pipeline on the Dataflow service
To run the updated wordcount example on the Dataflow service,
use the following command:
gorunwordcount.go--inputgs://dataflow-samples/shakespeare/kinglear.txt\
--outputgs://BUCKET_NAME/results/outputs\
--runnerdataflow\
--projectPROJECT_ID\
--regionDATAFLOW_REGION\
--staging_locationgs://BUCKET_NAME/binaries/Replace the following:
BUCKET_NAME: the Cloud Storage bucket name.PROJECT_ID: the Google Cloud project ID.DATAFLOW_REGION: the region where you want to deploy the Dataflow job. For example,europe-west1. For a list of available locations, see Dataflow locations. The--regionflag overrides the default region that is set in the metadata server, your local client, or environment variables.
View your results
You can see a list of your Dataflow jobs in Google Cloud console. In the Google Cloud console, go to the Dataflow Jobs page.
The Jobs page displays details of your wordcount job, including a status
of Running at first, and then Succeeded.
When you run a pipeline using Dataflow, your results are stored in a Cloud Storage bucket. View the output results by using either the Google Cloud console or the local terminal.
Console
To view your results in the Google Cloud console, go to the Cloud Storage Buckets page.
From the list of buckets in your project, click the storage bucket that you
created earlier. The output files that your job created are displayed in the
results directory.
Terminal
View the results from your terminal or by using Cloud Shell.
To list the output files, use the
gcloud storage lscommand:gcloudstoragelsgs://BUCKET_NAME/results/outputs*--longReplace
BUCKET_NAMEwith the name of the specified output Cloud Storage bucket.To view the results in the output files, use the
gcloud storage catcommand:gcloudstoragecatgs://BUCKET_NAME/results/outputs*
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
-
Delete the bucket:
gcloud storage buckets delete BUCKET_NAME
If you keep your project, revoke the roles that you granted to the Compute Engine default service account. Run the following command once for each of the following IAM roles:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloudprojectsremove-iam-policy-bindingPROJECT_ID\ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com\ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloudauthapplication-defaultrevoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloudauthrevoke