0

I'm developing a Dataflow streaming job for CSV check triggered by a creation of an object in Cloud Storage (via Pub\Sub notification).

I'm using Dataflow because is a business requirement and for the message de-duplication management (could be possible with Pub\Sub).

In each pipeline step I make a particularry type of control (the check rules are defined in a Google Sheet that I read with a step that I've created in the pipeline). If all steps are ok, copy the file in onother bucket, otherwise send an email of error. For these reason I need a global function to call possibly in all steps.

I've declaired the function afer the library invokation:

from email import header
from hashlib import new
from msilib.schema import Error
import json
import apache_beam as beam
from apache_beam import pvalue, window, GroupByKey
from datetime import datetime
import logging, uuid
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
TOPIC = "TOPIC_PATH"
def test():
 # send an email
 print("Send email")
class ReadPubSubMessage(beam.DoFn):
 def __init__(self):
 self.prod_bucket = "prd-v2"
 self.invalid_bucket = "prd-error"
 def process(self, element, *args, **kwargs):
 import uuid, json
 from datetime import datetime
 # Creating a uuid for the ingested file identification 
 try:
 uuidOne = uuid.uuid1()
 logging.info("Reading PubSub message")
 # Reading the PubSub json end extracting main information
 res = json.loads(element)
 path_loaded_blob = res["id"]
 type_object = res["contentType"]
 
 # Getting the date from the blob path
 list_of_path_parts = path_loaded_blob.split("/")
 
 . # other code
 .
 .
 yield final_list
 except Exception as e:
 test(email)
 logging.error("Error: " + str(e))
beam_options = PipelineOptions()
google_cloud_options = beam_options.view_as(GoogleCloudOptions)
with beam.Pipeline(options=beam_options) as pipeline:
 
 check_csv = (pipeline 
 | "ReadData" >> beam.io.ReadFromPubSub(topic=TOPIC) # Ok 
 | "Decode" >> beam.Map(lambda x: x.decode('utf-8')) # Ok 
 | "Extract informations from PubSub message" >> beam.ParDo(ReadPubSubMessage()) # Ok 
 .
 .
 .
 | beam.Map(lambda x:logging.info(x))
 )

The error that i receive is:

NameError: name 'external_functions' is not defined

I think it's because workers have not the code scope but only the task code.

How can i write a global function in a Streaming Job Dataflow? Or share a basic example using a global function in more tasks in Dataflow?

Thank you for the time

I create a little snippet of code for simulate the situation. I've create another python file containing a function that i call (after making an import of the lib) but i've the same error.

I've tried also to define the function inside the main but obviously doesn't work.

main.py below

import apache_beam as beam
import logging
# import library_list as external_functions
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
# class stepGeneral(beam.DoFn):
# def process(self, element):
# variable = "External function"
# logging.info("Called general method")
# yield variable
TOPIC = "TOPIC NAME"
class step1(beam.DoFn):
 def process(self, element):
 variable = "first"
 logging.info("This is the " + variable + " step")
 yield variable
class step2(beam.DoFn):
 def process(self, element):
 variable = "second"
 logging.info("This is the " + variable + " step")
 # stepGeneral.process()
 external_functions.stepGeneral()
 yield variable
beam_options = PipelineOptions()
google_cloud_options = beam_options.view_as(GoogleCloudOptions)
with beam.Pipeline(options=beam_options) as pipeline:
 
 (pipeline
 | "ReadData" >> beam.io.ReadFromPubSub(topic=TOPIC) # Ok 
 | "First step" >> beam.ParDo(step1())
 | "Second step" >> beam.ParDo(step2())
 # | "window" >> beam.WindowInto(beam.window.FixedWindows(1)) 
 | beam.Map(lambda x:logging.info(x))
 )

And below the library_list.py

import logging
def stepGeneral():
 variable = "External function"
 logging.info("Called general method")
 yield variable
asked Nov 15, 2022 at 14:47

2 Answers 2

1

See https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ for how to ship and manage pipeline dependencies for your Python code. You could consider using cloudpickle which may resolve some issues as well (see https://beam.apache.org/blog/beam-2.36.0/).

answered Nov 15, 2022 at 22:49
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for the answer. I had already seen the first page linked and I suppose that I've to create my own library to call with the instruction linked. Is it so? The second link can be usefull, those are features I didn't know about. Helpfull.
0

As your code starts growing beyond one-single-file (i.e. adding classes), you need to pay more attention into the building and submitting the container.

Pretty much you need to start building your own custom container (https://cloud.google.com/dataflow/docs/guides/using-custom-containers) that inherits from the python+apachebeam container you want, meaning that your Dockerfile will have to start with:

FROM apache/beam_python_sdk:

i.e. FROM apache/beam_python3.8_sdk:2.38.0

When using custom containers, you need to re-run the gcloud builds command to every time you add a new file, to ensure your file makes it into the container.

gcloud --project my-google-project builds submit -t THEIMAGE . --timeout 20m *THEIMAGE is the image location to use in gcr.io. (you can provide a config.yaml file instead of the longer command as in the satellites tutorial)

The satellites tutorial is, in my opinion, quite illustrative of the whole process: https://cloud.google.com/dataflow/docs/tutorials/satellite-images-gpus

I hope this helps.

answered Dec 19, 2022 at 18:07

1 Comment

Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.