Problem:
Whenever new data are inserted, extract BigQuery tables as csv files and store them in Cloud Storage.
My plan:
Set up an Eventarc trigger based on Event method
google.cloud.bigquery.v2.JobService.InsertJob
for Cloud Function to identify when new data are inserted into BigQuery.
from google.cloud import bigquery
import functions_framework
client = bigquery.Client()
bucket_name = "bucket"
project = "astute-coda-410816"
dataset_id = "dataset"
def move_data(table):
#extract BigQuery table
destination_uri = f"gs://{bucket_name}/{table}/{table}-*.csv"
dataset_ref = bigquery.DatasetReference(project,dataset_id)
table_ref = dataset_ref.table(f"{table}")
job_config = bigquery.job.ExtractJobConfig(print_header=False)
client.extract_table(table_ref, destination_uri, location="US",job_config=job_config)
print(f"Exported {project}:{dataset_id}.{table} to {destination_uri}")
@functions_framework.cloud_event
def transfer(cloudevent):
payload = cloudevent.data.get("protoPayload")
status = payload.get("status")
if not status: #if status is empty, the insert job is successful and tables should be extracted to Cloud Storage
move_data("table_1")
move_data("table_2")
move_data("table_3")
My questions are:
- Anything else I should do to improve my code?
- Is there a way to run in parallel the data transfer for all 3 tables?
1 Answer 1
Looks good.
optional type annotation
This would be a better signature:
def move_data(table: str) -> None:
It turns out we're really passing in a table name, rather than some fancy table object. I didn't learn that detail until I got down to the calling code. This line mislead me:
... = dataset_ref.table(f"{table}")
I have no idea why you're calling str(table)
, given
that table
should already be a str
.
In the interest of naming consistency, consider calling
it just bucket
rather than bucket_name
.
nit: Running $ black -S *.py on this wouldn't hurt, to tidy up the spacing a bit.
meaningful identifier
Thank you for this helpful comment, I appreciate it.
if not status: # if status is empty, the insert job is successful and tables should be extracted to Cloud Storage
I imagine Google's docs refer to the return value as a status
.
Here, it might be more helpful to name it errors
,
and then there would be no need for that comment.
Let's talk about the missing else:
clause.
If there are errors, wouldn't you like for a
logger
to report the details?
multiprocessing
a way to run in parallel the data transfer for all 3 tables?
import multiprocessing
...
tables = [f"table_{i}" for i in range(1, 4)]
with multiprocessing.Pool() as pool:
pool.map(move_data, tables)
Consider using one of the variant mappers, such as imap_unordered(), which grant greater latitude to the scheduler by relaxing the ordering constraints.
This code achieves its design goals.
I would be willing to delegate or accept maintenance tasks on it.
-
\$\begingroup\$ You're correct, table is already a string. For the f"{table}" line, it was originally f"{table}_i" with i being part of for i in range(len(table_list)). I later changed this but forgot to remove the f string \$\endgroup\$hashaf– hashaf2024年02月26日 12:42:39 +00:00Commented Feb 26, 2024 at 12:42
Explore related questions
See similar questions with these tags.