End to end example for BigQuery TensorFlow reader

View on TensorFlow.org Run in Google Colab View source on GitHub Download notebook

Overview

This tutorial shows how to use BigQuery TensorFlow reader for training neural network using the Keras sequential API.

Dataset

This tutorial uses the United States Census Income Dataset provided by the UC Irvine Machine Learning Repository. This dataset contains information about people from a 1994 Census database, including age, education, marital status, occupation, and whether they make more than 50,000ドル a year.

Setup

Set up your GCP project

The following steps are required, regardless of your notebook environment.

  1. Select or create a GCP project.
  2. Make sure that billing is enabled for your project.
  3. Enable the BigQuery Storage API
  4. Enter your project ID in the cell below. Then run the cell to make sure the Cloud SDK uses the right project for all the commands in this notebook.

Install required Packages, and restart runtime

try:
# Use the Colab's preinstalled TensorFlow 2.x
%tensorflow_version2.x
except:
pass
pipinstallfastavro
pipinstalltensorflow-io==0.9.0
pipinstallgoogle-cloud-bigquery-storage

Authenticate

fromgoogle.colabimport auth
auth.authenticate_user()
print('Authenticated')

Set your PROJECT ID

PROJECT_ID="<YOUR PROJECT>"
!gcloudconfigsetproject$PROJECT_ID
%envGCLOUD_PROJECT=$PROJECT_ID

Import Python libraries, define constants

from__future__import absolute_import, division, print_function, unicode_literals
importos
fromsix.movesimport urllib
importtempfile
importnumpyasnp
importpandasaspd
importtensorflowastf
fromgoogle.cloudimport bigquery
fromgoogle.api_core.exceptionsimport GoogleAPIError
LOCATION = 'us'
# Storage directory
DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data')
# Download options.
DATA_URL = 'https://storage.googleapis.com/cloud-samples-data/ml-engine/census/data'
TRAINING_FILE = 'adult.data.csv'
EVAL_FILE = 'adult.test.csv'
TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE)
EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE)
DATASET_ID = 'census_dataset'
TRAINING_TABLE_ID = 'census_training_table'
EVAL_TABLE_ID = 'census_eval_table'
CSV_SCHEMA = [
 bigquery.SchemaField("age", "FLOAT64"),
 bigquery.SchemaField("workclass", "STRING"),
 bigquery.SchemaField("fnlwgt", "FLOAT64"),
 bigquery.SchemaField("education", "STRING"),
 bigquery.SchemaField("education_num", "FLOAT64"),
 bigquery.SchemaField("marital_status", "STRING"),
 bigquery.SchemaField("occupation", "STRING"),
 bigquery.SchemaField("relationship", "STRING"),
 bigquery.SchemaField("race", "STRING"),
 bigquery.SchemaField("gender", "STRING"),
 bigquery.SchemaField("capital_gain", "FLOAT64"),
 bigquery.SchemaField("capital_loss", "FLOAT64"),
 bigquery.SchemaField("hours_per_week", "FLOAT64"),
 bigquery.SchemaField("native_country", "STRING"),
 bigquery.SchemaField("income_bracket", "STRING"),
 ]
UNUSED_COLUMNS = ["fnlwgt", "education_num"]

Import census data into BigQuery

Define helper methods to load data into BigQuery

def create_bigquery_dataset_if_necessary(dataset_id):
 # Construct a full Dataset object to send to the API.
 client = bigquery.Client(project=PROJECT_ID)
 dataset = bigquery.Dataset(bigquery.dataset.DatasetReference(PROJECT_ID, dataset_id))
 dataset.location = LOCATION
 try:
 dataset = client.create_dataset(dataset) # API request
 return True
 except GoogleAPIError as err:
 if err.code != 409: # http_client.CONFLICT
 raise
 return False
defload_data_into_bigquery(url,table_id):
create_bigquery_dataset_if_necessary(DATASET_ID)
client=bigquery.Client(project=PROJECT_ID)
dataset_ref=client.dataset(DATASET_ID)
table_ref=dataset_ref.table(table_id)
job_config=bigquery.LoadJobConfig()
job_config.write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
job_config.source_format=bigquery.SourceFormat.CSV
job_config.schema=CSV_SCHEMA
load_job=client.load_table_from_uri(
url,table_ref,job_config=job_config
)
print("Starting job {}".format(load_job.job_id))
load_job.result()# Waits for table load to complete.
print("Job finished.")
destination_table=client.get_table(table_ref)
print("Loaded {} rows.".format(destination_table.num_rows))

Load Census data in BigQuery.

load_data_into_bigquery(TRAINING_URL,TRAINING_TABLE_ID)
load_data_into_bigquery(EVAL_URL,EVAL_TABLE_ID)
Starting job 2ceffef8-e6e4-44bb-9e86-3d97b0501187
Job finished.
Loaded 32561 rows.
Starting job bf66f1b3-2506-408b-9009-c19f4ae9f58a
Job finished.
Loaded 16278 rows.

Confirm that data was imported

TODO: replace <YOUR PROJECT> with your PROJECT_ID

%%bigquery --use_bqstorage_api
SELECT*FROM`<YOURPROJECT>.census_dataset.census_training_table`LIMIT5
[フレーム]

Load census data in TensorFlow DataSet using BigQuery reader

Read and transform cesnus data from BigQuery into TensorFlow DataSet

fromtensorflow.python.frameworkimport ops
fromtensorflow.python.frameworkimport dtypes
fromtensorflow_io.bigqueryimport BigQueryClient
fromtensorflow_io.bigqueryimport BigQueryReadSession
deftransform_row(row_dict):
 # Trim all string tensors
 trimmed_dict = { column:
 (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) 
 for (column,tensor) in row_dict.items()
 }
 # Extract feature column
 income_bracket = trimmed_dict.pop('income_bracket')
 # Convert feature column to 0.0/1.0
 income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), 
 lambda: tf.constant(1.0), 
 lambda: tf.constant(0.0))
 return (trimmed_dict, income_bracket_float)
defread_bigquery(table_name):
 tensorflow_io_bigquery_client = BigQueryClient()
 read_session = tensorflow_io_bigquery_client.read_session(
 "projects/" + PROJECT_ID,
 PROJECT_ID, table_name, DATASET_ID,
 list(field.name for field in CSV_SCHEMA 
 if not field.name in UNUSED_COLUMNS),
 list(dtypes.double if field.field_type == 'FLOAT64' 
 else dtypes.string for field in CSV_SCHEMA
 if not field.name in UNUSED_COLUMNS),
 requested_streams=2)
 dataset = read_session.parallel_read_rows()
 transformed_ds = dataset.map(transform_row)
 return transformed_ds
BATCH_SIZE = 32
training_ds = read_bigquery(TRAINING_TABLE_ID).shuffle(10000).batch(BATCH_SIZE)
eval_ds = read_bigquery(EVAL_TABLE_ID).batch(BATCH_SIZE)

Define feature columns

def get_categorical_feature_values(column):
 query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID)
 client = bigquery.Client(project=PROJECT_ID)
 dataset_ref = client.dataset(DATASET_ID)
 job_config = bigquery.QueryJobConfig()
 query_job = client.query(query, job_config=job_config)
 result = query_job.to_dataframe()
 return result.values[:,0]
fromtensorflowimport feature_column
feature_columns = []
# numeric cols
for header in ['capital_gain', 'capital_loss', 'hours_per_week']:
 feature_columns.append(feature_column.numeric_column(header))
# categorical cols
for header in ['workclass', 'marital_status', 'occupation', 'relationship',
 'race', 'native_country', 'education']:
 categorical_feature = feature_column.categorical_column_with_vocabulary_list(
 header, get_categorical_feature_values(header))
 categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)
 feature_columns.append(categorical_feature_one_hot)
# bucketized cols
age = feature_column.numeric_column('age')
age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
feature_columns.append(age_buckets)
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)

Build and train model

Build model

Dense = tf.keras.layers.Dense
model = tf.keras.Sequential(
 [
 feature_layer,
 Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'),
 Dense(75, activation=tf.nn.relu),
 Dense(50, activation=tf.nn.relu),
 Dense(25, activation=tf.nn.relu),
 Dense(1, activation=tf.nn.sigmoid)
 ])
# Compile Keras model
model.compile(
 loss='binary_crossentropy', 
 metrics=['accuracy'])

Train model

model.fit(training_ds, epochs=5)
WARNING:tensorflow:Layer sequential is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2. The layer has dtype float32 because it's dtype defaults to floatx.
If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2.
To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.
WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4276: IndicatorColumn._variable_shape (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4331: VocabularyListCategoricalColumn._num_buckets (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
Epoch 1/5
1018/1018 [==============================] - 17s 17ms/step - loss: 0.5985 - accuracy: 0.8105
Epoch 2/5
1018/1018 [==============================] - 10s 10ms/step - loss: 0.3670 - accuracy: 0.8324
Epoch 3/5
1018/1018 [==============================] - 11s 10ms/step - loss: 0.3487 - accuracy: 0.8393
Epoch 4/5
1018/1018 [==============================] - 11s 10ms/step - loss: 0.3398 - accuracy: 0.8435
Epoch 5/5
1018/1018 [==============================] - 11s 11ms/step - loss: 0.3377 - accuracy: 0.8455
<tensorflow.python.keras.callbacks.History at 0x7f978f5b91d0>

Evaluate model

Evaluate model

loss, accuracy = model.evaluate(eval_ds)
print("Accuracy", accuracy)
509/509 [==============================] - 8s 15ms/step - loss: 0.3338 - accuracy: 0.8398
Accuracy 0.8398452

Evaluate a couple of random samples

sample_x = {
 'age' : np.array([56, 36]), 
 'workclass': np.array(['Local-gov', 'Private']), 
 'education': np.array(['Bachelors', 'Bachelors']), 
 'marital_status': np.array(['Married-civ-spouse', 'Married-civ-spouse']), 
 'occupation': np.array(['Tech-support', 'Other-service']), 
 'relationship': np.array(['Husband', 'Husband']), 
 'race': np.array(['White', 'Black']), 
 'gender': np.array(['Male', 'Male']), 
 'capital_gain': np.array([0, 7298]), 
 'capital_loss': np.array([0, 0]), 
 'hours_per_week': np.array([40, 36]), 
 'native_country': np.array(['United-States', 'United-States'])
 }
model.predict(sample_x)
array([[0.5541261],
 [0.6209938]], dtype=float32)

Resources

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2022年01月10日 UTC.