Use Ray on Vertex AI with BigQuery

When you run a Ray application on Vertex AI, use BigQuery as your cloud database. This section covers how to read from and write to a BigQuery database from your Ray cluster on Vertex AI. The steps in this section assume that you use the Vertex AI SDK for Python.

To read from a BigQuery dataset, create a new BigQuery dataset or use an existing dataset.

Import and initialize Ray on Vertex AI client

If you're connected to your Ray cluster on Vertex AI, restart your kernel and run the following code. The runtime_env variable is necessary at connection time to run BigQuery commands.

importray
fromgoogle.cloudimport aiplatform
# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)
runtime_env = {
 "pip":
 ["google-cloud-aiplatform[ray]","ray==2.47.1"]
 }
ray.init (address=address, runtime_env=runtime_env)

Read data from BigQuery

Read data from your BigQuery dataset. A Ray Task must perform the read operation.

aiplatform.init(project=PROJECT_ID, location=LOCATION)
@ray.remote
defrun_remotely():
 importvertex_ray
 dataset = DATASET
 parallelism = PARALLELISM
 query = QUERY
 ds = vertex_ray.data.read_bigquery(
 dataset=dataset,
 parallelism=parallelism,
 query=query
 )
 ds.materialize()

Where:

  • PROJECT_ID: Google Cloud project ID. Find the project ID in the Google Cloud console welcome page.

  • LOCATION: The location where the Dataset is stored. For example, us-central1.

  • DATASET: BigQuery dataset. It must be in the format dataset.table. Set to None if you provide a query.

  • PARALLELISM: An integer that influences how many read tasks are created in parallel. There may be fewer read streams created than you requested.

  • QUERY: A string containing a SQL query to read from BigQuery database. Set to None if no query is required.

Transform data

Update and delete rows and columns from your BigQuery tables using pyarrow or pandas. If you want to use pandas transformations, keep the input type as pyarrow and convert to pandas within the user-defined function (UDF) so you can catch any pandas conversion type errors within the UDF. A Ray Task must perform the transformation.

@ray.remote
defrun_remotely():
 # BigQuery Read first
 importpandasaspd
 importpyarrowaspa
 deffilter_batch(table: pa.Table) -> pa.Table:
 df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
 # PANDAS_TRANSFORMATIONS_HERE
 return pa.Table.from_pandas(df)
 ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
 ds.materialize()
 # You can repartition before writing to determine the number of write blocks
 ds = ds.repartition(4)
 ds.materialize()

Write data to BigQuery

Insert data to your BigQuery dataset. A Ray Task must perform the write.

@ray.remote
defrun_remotely():
 # BigQuery Read and optional data transformation first
 dataset=DATASET
 vertex_ray.data.write_bigquery(
 ds,
 dataset=dataset
 )

Where:

  • DATASET: BigQuery dataset. The dataset must be in the format dataset.table.

What's next

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025年10月13日 UTC.