3
\$\begingroup\$

Problem:

Whenever new data are inserted, extract BigQuery tables as csv files and store them in Cloud Storage.

My plan: Set up an Eventarc trigger based on Event method google.cloud.bigquery.v2.JobService.InsertJob for Cloud Function to identify when new data are inserted into BigQuery.

from google.cloud import bigquery
import functions_framework
client = bigquery.Client()
bucket_name = "bucket"
project = "astute-coda-410816"
dataset_id = "dataset"
def move_data(table):
 #extract BigQuery table
 destination_uri = f"gs://{bucket_name}/{table}/{table}-*.csv"
 dataset_ref = bigquery.DatasetReference(project,dataset_id)
 table_ref = dataset_ref.table(f"{table}")
 job_config = bigquery.job.ExtractJobConfig(print_header=False)
 client.extract_table(table_ref, destination_uri, location="US",job_config=job_config)
 print(f"Exported {project}:{dataset_id}.{table} to {destination_uri}")
@functions_framework.cloud_event
def transfer(cloudevent):
 payload = cloudevent.data.get("protoPayload")
 status = payload.get("status")
 if not status: #if status is empty, the insert job is successful and tables should be extracted to Cloud Storage
 move_data("table_1")
 move_data("table_2")
 move_data("table_3")

My questions are:

  • Anything else I should do to improve my code?
  • Is there a way to run in parallel the data transfer for all 3 tables?
asked Feb 26, 2024 at 0:46
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

Looks good.

optional type annotation

This would be a better signature:

def move_data(table: str) -> None:

It turns out we're really passing in a table name, rather than some fancy table object. I didn't learn that detail until I got down to the calling code. This line mislead me:

 ... = dataset_ref.table(f"{table}")

I have no idea why you're calling str(table), given that table should already be a str.

In the interest of naming consistency, consider calling it just bucket rather than bucket_name.

nit: Running $ black -S *.py on this wouldn't hurt, to tidy up the spacing a bit.

meaningful identifier

Thank you for this helpful comment, I appreciate it.

 if not status: # if status is empty, the insert job is successful and tables should be extracted to Cloud Storage

I imagine Google's docs refer to the return value as a status.

Here, it might be more helpful to name it errors, and then there would be no need for that comment.

Let's talk about the missing else: clause. If there are errors, wouldn't you like for a logger to report the details?

multiprocessing

a way to run in parallel the data transfer for all 3 tables?

import multiprocessing
 ...
 tables = [f"table_{i}" for i in range(1, 4)]
 with multiprocessing.Pool() as pool:
 pool.map(move_data, tables)

Consider using one of the variant mappers, such as imap_unordered(), which grant greater latitude to the scheduler by relaxing the ordering constraints.


This code achieves its design goals.

I would be willing to delegate or accept maintenance tasks on it.

answered Feb 26, 2024 at 1:53
\$\endgroup\$
1
  • \$\begingroup\$ You're correct, table is already a string. For the f"{table}" line, it was originally f"{table}_i" with i being part of for i in range(len(table_list)). I later changed this but forgot to remove the f string \$\endgroup\$ Commented Feb 26, 2024 at 12:42

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.