Batch load data using the Storage Write API
This document describes how to use the BigQuery Storage Write API to batch load data into BigQuery.
In batch-load scenarios, an application writes data and commits it as a single atomic transaction. When using the Storage Write API to batch load data, create one or more streams in pending type. Pending type supports stream-level transactions. Records are buffered in a pending state until you commit the stream.
For batch workloads, also consider using the Storage Write API through the Apache Spark SQL connector for BigQuery using Dataproc, rather than writing custom Storage Write API code.
The Storage Write API is well-suited to a data pipeline architecture. A main process creates a number of streams. For each stream, it assigns a worker thread or a separate process to write a portion of the batch data. Each worker creates a connection to its stream, writes data, and finalizes its stream when it's done. After all of the workers signal successful completion to the main process, the main process commits the data. If a worker fails, its assigned portion of the data will not show up in the final results, and the whole worker can be safely retried. In a more sophisticated pipeline, workers checkpoint their progress by reporting the last offset written to the main process. This approach can result in a robust pipeline that is resilient to failures.
Batch load data using pending type
To use pending type, the application does the following:
- Call
CreateWriteStream
to create one or more streams in pending type. - For each stream, call
AppendRows
in a loop to write batches of records. - For each stream, call
FinalizeWriteStream
. After you call this method, you cannot write any more rows to the stream. If you callAppendRows
after callingFinalizeWriteStream
, it returns aStorageError
withStorageErrorCode.STREAM_FINALIZED
in thegoogle.rpc.Status
error. For more information thegoogle.rpc.Status
error model, see Errors. - Call
BatchCommitWriteStreams
to commit the streams. After you call this method, the data becomes available for reading. If there is an error committing any of the streams, the error is returned in thestream_errors
field of theBatchCommitWriteStreamsResponse
.
Committing is an atomic operation, and you can commit multiple streams at once. A stream can only be committed once, so if the commit operation fails, it is safe to retry it. Until you commit a stream, the data is pending and not visible to reads.
After the stream is finalized and before it is committed, the data can remain in the buffer for up to 4 hours. Pending streams must be committed within 24 hours. There is a quota limit on the total size of the pending stream buffer.
The following code shows how to write data in pending type:
C#
To learn how to install and use the client library for BigQuery, see
BigQuery client libraries.
For more information, see the
BigQuery C# API
reference documentation.
To authenticate to BigQuery, set up Application Default Credentials.
For more information, see
Set up authentication for client libraries.
usingGoogle.Api.Gax.Grpc ;
usingGoogle.Cloud.BigQuery.Storage.V1 ;
usingGoogle.Protobuf ;
usingSystem;
usingSystem.Collections.Generic;
usingSystem.Linq;
usingSystem.Threading.Tasks;
usingstaticGoogle.Cloud.BigQuery.Storage.V1.AppendRowsRequest.Types;
publicclassAppendRowsPendingSample
{
/// <summary>
/// This code sample demonstrates how to write records in pending mode.
/// Create a write stream, write some sample data, and commit the stream to append the rows.
/// The CustomerRecord proto used in the sample can be seen in Resources folder and generated C# is placed in Data folder in
/// https://github.com/GoogleCloudPlatform/dotnet-docs-samples/tree/main/bigquery-storage/api/BigQueryStorage.Samples
/// </summary>
publicasyncTaskAppendRowsPendingAsync(stringprojectId,stringdatasetId,stringtableId)
{
BigQueryWriteClient bigQueryWriteClient=awaitBigQueryWriteClient .CreateAsync ();
// Initialize a write stream for the specified table.
// When creating the stream, choose the type. Use the Pending type to wait
// until the stream is committed before it is visible. See:
// https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
WriteStream stream=newWriteStream {Type=WriteStream .Types.Type .Pending };
TableName tableName=TableName .FromProjectDatasetTable (projectId,datasetId,tableId);
stream=awaitbigQueryWriteClient.CreateWriteStreamAsync (tableName,stream);
// Initialize streaming call, retrieving the stream object
BigQueryWriteClient.AppendRowsStreamrowAppender=bigQueryWriteClient.AppendRows ();
// Sending requests and retrieving responses can be arbitrarily interleaved.
// Exact sequence will depend on client/server behavior.
// Create task to do something with responses from server.
TaskappendResultsHandlerTask=Task.Run(async()=>
{
AsyncResponseStream<AppendRowsResponse>appendRowResults=rowAppender.GetResponseStream();
while(awaitappendRowResults.MoveNextAsync())
{
AppendRowsResponseresponseItem=appendRowResults.Current;
// Do something with responses.
if(responseItem.AppendResult!=null)
{
Console.WriteLine($"Appending rows resulted in: {responseItem.AppendResult}");
}
if(responseItem.Error!=null)
{
Console.Error.WriteLine($"Appending rows resulted in an error: {responseItem.Error.Message}");
foreach(RowErrorrowErrorinresponseItem.RowErrors)
{
Console.Error.WriteLine($"Row Error: {rowError}");
}
}
}
// The response stream has completed.
});
// List of records to be appended in the table.
List<CustomerRecord>records=newList<CustomerRecord>
{
newCustomerRecord{CustomerNumber=1,CustomerName="Alice"},
newCustomerRecord{CustomerNumber=2,CustomerName="Bob"}
};
// Create a batch of row data by appending serialized bytes to the
// SerializedRows repeated field.
ProtoData protoData=newProtoData
{
WriterSchema=newProtoSchema {ProtoDescriptor=CustomerRecord.Descriptor.ToProto()},
Rows=newProtoRows {SerializedRows={records.Select(r=>r.ToByteString ())}}
};
// Initialize the append row request.
AppendRowsRequest appendRowRequest=newAppendRowsRequest
{
WriteStreamAsWriteStreamName=stream.WriteStreamName ,
ProtoRows=protoData
};
// Stream a request to the server.
awaitrowAppender.WriteAsync(appendRowRequest);
// Append a second batch of data.
protoData=newProtoData
{
Rows=newProtoRows {SerializedRows={newCustomerRecord{CustomerNumber=3,CustomerName="Charles"}.ToByteString ()}}
};
// Since this is the second request, you only need to include the row data.
// The name of the stream and protocol buffers descriptor is only needed in
// the first request.
appendRowRequest=newAppendRowsRequest
{
// If Offset is not present, the write is performed at the current end of stream.
ProtoRows=protoData
};
awaitrowAppender.WriteAsync(appendRowRequest);
// Complete writing requests to the stream.
awaitrowAppender.WriteCompleteAsync();
// Await the handler. This will complete once all server responses have been processed.
awaitappendResultsHandlerTask;
// A Pending type stream must be "finalized" before being committed. No new
// records can be written to the stream after this method has been called.
awaitbigQueryWriteClient.FinalizeWriteStreamAsync (stream.Name );
BatchCommitWriteStreamsRequest batchCommitWriteStreamsRequest=newBatchCommitWriteStreamsRequest
{
Parent=tableName.ToString (),
WriteStreams={stream.Name }
};
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse=
awaitbigQueryWriteClient.BatchCommitWriteStreamsAsync (batchCommitWriteStreamsRequest);
if(batchCommitWriteStreamsResponse.StreamErrors ?.Count > 0)
{
// Handle errors here.
Console.WriteLine("Error committing write streams. Individual errors:");
foreach(StorageError errorinbatchCommitWriteStreamsResponse.StreamErrors )
{
Console.WriteLine(error.ErrorMessage );
}
}
else
{
Console.WriteLine($"Writes to stream {stream.Name} have been committed.");
}
}
}
Go
To learn how to install and use the client library for BigQuery, see
BigQuery client libraries.
For more information, see the
BigQuery Go API
reference documentation.
To authenticate to BigQuery, set up Application Default Credentials.
For more information, see
Set up authentication for client libraries.
import(
"context"
"fmt"
"io"
"math/rand"
"time"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"cloud.google.com/go/bigquery/storage/managedwriter"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"github.com/GoogleCloudPlatform/golang-samples/bigquery/snippets/managedwriter/exampleproto"
"google.golang.org/protobuf/proto"
)
// generateExampleMessages generates a slice of serialized protobuf messages using a statically defined
// and compiled protocol buffer file, and returns the binary serialized representation.
funcgenerateExampleMessages(numMessagesint)([][]byte,error){
msgs:=make([][]byte,numMessages)
fori:=0;i < numMessages;i++{
random:=rand.New(rand.NewSource(time.Now().UnixNano()))
// Our example data embeds an array of structs, so we'll construct that first.
sList:=make([]*exampleproto.SampleStruct,5)
fori:=0;i < int(random.Int63n(5)+1);i++{
sList[i]=&exampleproto.SampleStruct{
SubIntCol:proto.Int64(random.Int63()),
}
}
m:=&exampleproto.SampleData{
BoolCol:proto.Bool(true),
BytesCol:[]byte("some bytes"),
Float64Col:proto.Float64(3.14),
Int64Col:proto.Int64(123),
StringCol:proto.String("example string value"),
// These types require special encoding/formatting to transmit.
// DATE values are number of days since the Unix epoch.
DateCol:proto.Int32(int32(time.Now().UnixNano()/86400000000000)),
// DATETIME uses the literal format.
DatetimeCol:proto.String("2022-01-01 12:13:14.000000"),
// GEOGRAPHY uses Well-Known-Text (WKT) format.
GeographyCol:proto.String("POINT(-122.350220 47.649154)"),
// NUMERIC and BIGNUMERIC can be passed as string, or more efficiently
// using a packed byte representation.
NumericCol:proto.String("99999999999999999999999999999.999999999"),
BignumericCol:proto.String("578960446186580977117854925043439539266.34992332820282019728792003956564819967"),
// TIME also uses literal format.
TimeCol:proto.String("12:13:14.000000"),
// TIMESTAMP uses microseconds since Unix epoch.
TimestampCol:proto.Int64(time.Now().UnixNano()/1000),
// Int64List is an array of INT64 types.
Int64List:[]int64{2,4,6,8},
// This is a required field, and thus must be present.
RowNum:proto.Int64(23),
// StructCol is a single nested message.
StructCol:&exampleproto.SampleStruct{
SubIntCol:proto.Int64(random.Int63()),
},
// StructList is a repeated array of a nested message.
StructList:sList,
}
b,err:=proto.Marshal(m)
iferr!=nil{
returnnil,fmt.Errorf("error generating message %d: %w",i,err)
}
msgs[i]=b
}
returnmsgs,nil
}
// appendToPendingStream demonstrates using the managedwriter package to write some example data
// to a pending stream, and then committing it to a table.
funcappendToPendingStream(wio.Writer,projectID,datasetID,tableIDstring)error{
// projectID := "myproject"
// datasetID := "mydataset"
// tableID := "mytable"
ctx:=context.Background()
// Instantiate a managedwriter client to handle interactions with the service.
client,err:=managedwriter.NewClient (ctx,projectID)
iferr!=nil{
returnfmt.Errorf("managedwriter.NewClient: %w",err)
}
// Close the client when we exit the function.
deferclient.Close()
// Create a new pending stream. We'll use the stream name to construct a writer.
pendingStream,err:=client.CreateWriteStream(ctx,&storagepb.CreateWriteStreamRequest {
Parent:fmt.Sprintf("projects/%s/datasets/%s/tables/%s",projectID,datasetID,tableID),
WriteStream:&storagepb.WriteStream {
Type:storagepb.WriteStream_PENDING ,
},
})
iferr!=nil{
returnfmt.Errorf("CreateWriteStream: %w",err)
}
// We need to communicate the descriptor of the protocol buffer message we're using, which
// is analagous to the "schema" for the message. Both SampleData and SampleStruct are
// two distinct messages in the compiled proto file, so we'll use adapt.NormalizeDescriptor
// to unify them into a single self-contained descriptor representation.
m:=&exampleproto.SampleData{}
descriptorProto,err:=adapt.NormalizeDescriptor (m.ProtoReflect().Descriptor())
iferr!=nil{
returnfmt.Errorf("NormalizeDescriptor: %w",err)
}
// Instantiate a ManagedStream, which manages low level details like connection state and provides
// additional features like a future-like callback for appends, etc. NewManagedStream can also create
// the stream on your behalf, but in this example we're being explicit about stream creation.
managedStream,err:=client.NewManagedStream (ctx,managedwriter.WithStreamName (pendingStream.GetName()),
managedwriter.WithSchemaDescriptor (descriptorProto))
iferr!=nil{
returnfmt.Errorf("NewManagedStream: %w",err)
}
defermanagedStream.Close()
// First, we'll append a single row.
rows,err:=generateExampleMessages(1)
iferr!=nil{
returnfmt.Errorf("generateExampleMessages: %w",err)
}
// We'll keep track of the current offset in the stream with curOffset.
varcurOffsetint64
// We can append data asyncronously, so we'll check our appends at the end.
varresults[]*managedwriter.AppendResult
result,err:=managedStream.AppendRows(ctx,rows,managedwriter.WithOffset (0))
iferr!=nil{
returnfmt.Errorf("AppendRows first call error: %w",err)
}
results=append(results,result)
// Advance our current offset.
curOffset=curOffset+1
// This time, we'll append three more rows in a single request.
rows,err=generateExampleMessages(3)
iferr!=nil{
returnfmt.Errorf("generateExampleMessages: %w",err)
}
result,err=managedStream.AppendRows(ctx,rows,managedwriter.WithOffset (curOffset))
iferr!=nil{
returnfmt.Errorf("AppendRows second call error: %w",err)
}
results=append(results,result)
// Advance our offset again.
curOffset=curOffset+3
// Finally, we'll append two more rows.
rows,err=generateExampleMessages(2)
iferr!=nil{
returnfmt.Errorf("generateExampleMessages: %w",err)
}
result,err=managedStream.AppendRows(ctx,rows,managedwriter.WithOffset (curOffset))
iferr!=nil{
returnfmt.Errorf("AppendRows third call error: %w",err)
}
results=append(results,result)
// Now, we'll check that our batch of three appends all completed successfully.
// Monitoring the results could also be done out of band via a goroutine.
fork,v:=rangeresults{
// GetResult blocks until we receive a response from the API.
recvOffset,err:=v.GetResult (ctx)
iferr!=nil{
returnfmt.Errorf("append %d returned error: %w",k,err)
}
fmt.Fprintf(w,"Successfully appended data at offset %d.\n",recvOffset)
}
// We're now done appending to this stream. We now mark pending stream finalized, which blocks
// further appends.
rowCount,err:=managedStream.Finalize (ctx)
iferr!=nil{
returnfmt.Errorf("error during Finalize: %w",err)
}
fmt.Fprintf(w,"Stream %s finalized with %d rows.\n",managedStream.StreamName (),rowCount)
// To commit the data to the table, we need to run a batch commit. You can commit several streams
// atomically as a group, but in this instance we'll only commit the single stream.
req:=&storagepb.BatchCommitWriteStreamsRequest {
Parent:managedwriter.TableParentFromStreamName (managedStream.StreamName ()),
WriteStreams:[]string{managedStream.StreamName ()},
}
resp,err:=client.BatchCommitWriteStreams(ctx,req)
iferr!=nil{
returnfmt.Errorf("client.BatchCommit: %w",err)
}
iflen(resp.GetStreamErrors ()) > 0{
returnfmt.Errorf("stream errors present: %v",resp.GetStreamErrors ())
}
fmt.Fprintf(w,"Table data committed at %s\n",resp.GetCommitTime().AsTime().Format(time.RFC3339Nano))
returnnil
}
Java
To learn how to install and use the client library for BigQuery, see
BigQuery client libraries.
For more information, see the
BigQuery Java API
reference documentation.
To authenticate to BigQuery, set up Application Default Credentials.
For more information, see
Set up authentication for client libraries.
importcom.google.api.core.ApiFuture ;
importcom.google.api.core.ApiFutureCallback ;
importcom.google.api.core.ApiFutures ;
importcom.google.api.gax.retrying.RetrySettings ;
importcom.google.cloud.bigquery.storage.v1.AppendRowsResponse ;
importcom.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest ;
importcom.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse ;
importcom.google.cloud.bigquery.storage.v1.BigQueryWriteClient ;
importcom.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest ;
importcom.google.cloud.bigquery.storage.v1.Exceptions ;
importcom.google.cloud.bigquery.storage.v1.Exceptions.StorageException ;
importcom.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse ;
importcom.google.cloud.bigquery.storage.v1.JsonStreamWriter ;
importcom.google.cloud.bigquery.storage.v1.StorageError ;
importcom.google.cloud.bigquery.storage.v1.TableName ;
importcom.google.cloud.bigquery.storage.v1.WriteStream ;
importcom.google.common.util.concurrent.MoreExecutors;
importcom.google.protobuf.Descriptors.DescriptorValidationException;
importjava.io.IOException;
importjava.util.concurrent.ExecutionException;
importjava.util.concurrent.Phaser;
importjavax.annotation.concurrent.GuardedBy;
importorg.json.JSONArray;
importorg.json.JSONObject;
importorg.threeten.bp.Duration;
publicclass WritePendingStream{
publicstaticvoidrunWritePendingStream()
throwsDescriptorValidationException,InterruptedException,IOException{
// TODO(developer): Replace these variables before running the sample.
StringprojectId="MY_PROJECT_ID";
StringdatasetName="MY_DATASET_NAME";
StringtableName="MY_TABLE_NAME";
writePendingStream(projectId,datasetName,tableName);
}
publicstaticvoidwritePendingStream(StringprojectId,StringdatasetName,StringtableName)
throwsDescriptorValidationException,InterruptedException,IOException{
BigQueryWriteClient client=BigQueryWriteClient .create();
TableName parentTable=TableName .of(projectId,datasetName,tableName);
DataWriterwriter=newDataWriter();
// One time initialization.
writer.initialize(parentTable,client);
try{
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
longoffset=0;
for(inti=0;i < 2;i++){
// Create a JSON object that is compatible with the table schema.
JSONArrayjsonArr=newJSONArray();
for(intj=0;j < 10;j++){
JSONObjectrecord=newJSONObject();
record.put("col1",String.format("batch-record %03d-%03d",i,j));
jsonArr.put(record);
}
writer.append(jsonArr,offset);
offset+=jsonArr.length();
}
}catch(ExecutionExceptione){
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
System.out.println("Failed to append records. \n"+e);
}
// Final cleanup for the stream.
writer.cleanup(client);
System.out.println("Appended records successfully.");
// Once all streams are done, if all writes were successful, commit all of them in one request.
// This example only has the one stream. If any streams failed, their workload may be
// retried on a new stream, and then only the successful stream should be included in the
// commit.
BatchCommitWriteStreamsRequest commitRequest=
BatchCommitWriteStreamsRequest .newBuilder()
.setParent(parentTable.toString ())
.addWriteStreams (writer.getStreamName())
.build();
BatchCommitWriteStreamsResponse commitResponse=client.batchCommitWriteStreams (commitRequest);
// If the response does not have a commit time, it means the commit operation failed.
if(commitResponse.hasCommitTime ()==false){
for(StorageError err:commitResponse.getStreamErrorsList ()){
System.out.println(err.getErrorMessage());
}
thrownewRuntimeException("Error committing the streams");
}
System.out.println("Appended and committed records successfully.");
}
// A simple wrapper object showing how the stateful stream writer should be used.
privatestaticclass DataWriter{
privateJsonStreamWriter streamWriter;
// Track the number of in-flight requests to wait for all responses before shutting down.
privatefinalPhaserinflightRequestCount=newPhaser(1);
privatefinalObjectlock=newObject();
@GuardedBy("lock")
privateRuntimeExceptionerror=null;
voidinitialize(TableName parentTable,BigQueryWriteClient client)
throwsIOException,DescriptorValidationException,InterruptedException{
// Initialize a write stream for the specified table.
// For more information on WriteStream.Type, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html
WriteStream stream=WriteStream .newBuilder().setType(WriteStream .Type.PENDING).build();
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings=
RetrySettings .newBuilder()
.setInitialRetryDelay (Duration.ofMillis(500))
.setRetryDelayMultiplier (1.1)
.setMaxAttempts (5)
.setMaxRetryDelay (Duration.ofMinutes(1))
.build();
CreateWriteStreamRequest createWriteStreamRequest=
CreateWriteStreamRequest .newBuilder()
.setParent(parentTable.toString ())
.setWriteStream(stream)
.build();
WriteStream writeStream=client.createWriteStream (createWriteStreamRequest);
// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.JsonStreamWriter
streamWriter=
JsonStreamWriter .newBuilder(writeStream.getName (),writeStream.getTableSchema ())
.setRetrySettings(retrySettings)
.build();
}
publicvoidappend(JSONArraydata,longoffset)
throwsDescriptorValidationException,IOException,ExecutionException{
synchronized(this.lock){
// If earlier appends have failed, we need to reset before continuing.
if(this.error!=null){
throwthis.error;
}
}
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse>future=streamWriter.append (data,offset);
ApiFutures .addCallback (
future,newAppendCompleteCallback(this),MoreExecutors.directExecutor());
// Increase the count of in-flight requests.
inflightRequestCount.register();
}
publicvoidcleanup(BigQueryWriteClient client){
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();
// Close the connection to the server.
streamWriter.close ();
// Verify that no error occurred in the stream.
synchronized(this.lock){
if(this.error!=null){
throwthis.error;
}
}
// Finalize the stream.
FinalizeWriteStreamResponse finalizeResponse=
client.finalizeWriteStream (streamWriter.getStreamName());
System.out.println("Rows written: "+finalizeResponse.getRowCount ());
}
publicStringgetStreamName(){
returnstreamWriter.getStreamName ();
}
staticclass AppendCompleteCallbackimplementsApiFutureCallback<AppendRowsResponse>{
privatefinalDataWriterparent;
publicAppendCompleteCallback(DataWriterparent){
this.parent=parent;
}
publicvoidonSuccess(AppendRowsResponse response){
System.out.format("Append %d success\n",response.getAppendResult ().getOffset().getValue());
done();
}
publicvoidonFailure(Throwablethrowable){
synchronized(this.parent.lock){
if(this.parent.error==null){
StorageException storageException=Exceptions .toStorageException (throwable);
this.parent.error=
(storageException!=null)?storageException:newRuntimeException(throwable);
}
}
System.out.format("Error: %s\n",throwable.toString());
done();
}
privatevoiddone(){
// Reduce the count of in-flight requests.
this.parent.inflightRequestCount.arriveAndDeregister();
}
}
}
}
Node.js
To learn how to install and use the client library for BigQuery, see
BigQuery client libraries.
For more information, see the
BigQuery Node.js API
reference documentation.
To authenticate to BigQuery, set up Application Default Credentials.
For more information, see
Set up authentication for client libraries.
const{adapt,managedwriter}=require('@google-cloud/bigquery-storage');
const{WriterClient,Writer}=managedwriter;
constcustomer_record_pb=require('./customer_record_pb.js');
const{CustomerRecord}=customer_record_pb;
constprotobufjs=require('protobufjs');
require('protobufjs/ext/descriptor');
asyncfunctionappendRowsPending(){
/**
* If you make updates to the customer_record.proto protocol buffers definition,
* run:
* pbjs customer_record.proto -t static-module -w commonjs -o customer_record.js
* pbjs customer_record.proto -t json --keep-case -o customer_record.json
* from the /samples directory to generate the customer_record module.
*/
// So that BigQuery knows how to parse the serialized_rows, create a
// protocol buffer representation of your message descriptor.
constroot=protobufjs.loadSync('./customer_record.json');
constdescriptor=root.lookupType('CustomerRecord').toDescriptor('proto2');
constprotoDescriptor=adapt.normalizeDescriptor (descriptor).toJSON();
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';
constdestinationTable=`projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
conststreamType=managedwriter.PendingStream ;
constwriteClient=newWriterClient ({projectId});
try{
constwriteStream=awaitwriteClient.createWriteStreamFullResponse ({
streamType,
destinationTable,
});
conststreamId=writeStream.name;
console.log(`Stream created: ${streamId}`);
constconnection=awaitwriteClient.createStreamConnection ({
streamId,
});
constwriter=newWriter ({
connection,
protoDescriptor,
});
letserializedRows=[];
constpendingWrites=[];
// Row 1
letrow={
rowNum:1,
customerName:'Octavia',
};
serializedRows.push(CustomerRecord.encode(row).finish());
// Row 2
row={
rowNum:2,
customerName:'Turing',
};
serializedRows.push(CustomerRecord.encode(row).finish());
// Set an offset to allow resuming this stream if the connection breaks.
// Keep track of which requests the server has acknowledged and resume the
// stream at the first non-acknowledged message. If the server has already
// processed a message with that offset, it will return an ALREADY_EXISTS
// error, which can be safely ignored.
// The first request must always have an offset of 0.
letoffsetValue=0;
// Send batch.
letpw=writer.appendRows({serializedRows},offsetValue);
pendingWrites.push(pw);
serializedRows=[];
// Row 3
row={
rowNum:3,
customerName:'Bell',
};
serializedRows.push(CustomerRecord.encode(row).finish());
// Offset must equal the number of rows that were previously sent.
offsetValue=2;
// Send batch.
pw=writer.appendRows({serializedRows},offsetValue);
pendingWrites.push(pw);
constresults=awaitPromise.all(
pendingWrites.map(pw=>pw.getResult()),
);
console.log('Write results:',results);
const{rowCount}=awaitconnection.finalize();
console.log(`Row count: ${rowCount}`);
constresponse=awaitwriteClient.batchCommitWriteStream ({
parent:destinationTable,
writeStreams:[streamId],
});
console.log(response);
}catch(err){
console.log(err);
}finally{
writeClient.close();
}
}
Python
This example shows a simple record with two fields. For a longer example that
shows how to send different data types, including STRUCT
types, see the
append_rows_proto2 sample on GitHub.
To learn how to install and use the client library for BigQuery, see BigQuery client libraries. For more information, see the BigQuery Python API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries.
"""
This code sample demonstrates how to write records in pending mode
using the low-level generated client for Python.
"""
fromgoogle.cloudimport bigquery_storage_v1
fromgoogle.cloud.bigquery_storage_v1import types , writer
fromgoogle.protobufimport descriptor_pb2
# If you update the customer_record.proto protocol buffer definition, run:
#
# protoc --python_out=. customer_record.proto
#
# from the samples/snippets directory to generate the customer_record_pb2.py module.
from.import customer_record_pb2
defcreate_row_data(row_num: int, name: str):
row = customer_record_pb2.CustomerRecord()
row.row_num = row_num
row.customer_name = name
return row.SerializeToString()
defappend_rows_pending(project_id: str, dataset_id: str, table_id: str):
"""Create a write stream, write some sample data, and commit the stream."""
write_client = bigquery_storage_v1 .BigQueryWriteClient()
parent = write_client.table_path (project_id, dataset_id, table_id)
write_stream = types .WriteStream ()
# When creating the stream, choose the type. Use the PENDING type to wait
# until the stream is committed before it is visible. See:
# https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
write_stream.type_ = types .WriteStream .Type.PENDING
write_stream = write_client.create_write_stream (
parent=parent, write_stream=write_stream
)
stream_name = write_stream.name
# Create a template with fields needed for the first request.
request_template = types .AppendRowsRequest ()
# The initial request must contain the stream name.
request_template.write_stream = stream_name
# So that BigQuery knows how to parse the serialized_rows, generate a
# protocol buffer representation of your message descriptor.
proto_schema = types .ProtoSchema ()
proto_descriptor = descriptor_pb2.DescriptorProto()
customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types .AppendRowsRequest .ProtoData ()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Some stream types support an unbounded number of requests. Construct an
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer .AppendRowsStream (write_client, request_template)
# Create a batch of row data by appending proto2 serialized bytes to the
# serialized_rows repeated field.
proto_rows = types .ProtoRows ()
proto_rows.serialized_rows.append(create_row_data(1, "Alice"))
proto_rows.serialized_rows.append(create_row_data(2, "Bob"))
# Set an offset to allow resuming this stream if the connection breaks.
# Keep track of which requests the server has acknowledged and resume the
# stream at the first non-acknowledged message. If the server has already
# processed a message with that offset, it will return an ALREADY_EXISTS
# error, which can be safely ignored.
#
# The first request must always have an offset of 0.
request = types .AppendRowsRequest ()
request.offset = 0
proto_data = types .AppendRowsRequest .ProtoData ()
proto_data.rows = proto_rows
request.proto_rows = proto_data
response_future_1 = append_rows_stream.send (request)
# Send another batch.
proto_rows = types .ProtoRows ()
proto_rows.serialized_rows.append(create_row_data(3, "Charles"))
# Since this is the second request, you only need to include the row data.
# The name of the stream and protocol buffers DESCRIPTOR is only needed in
# the first request.
request = types .AppendRowsRequest ()
proto_data = types .AppendRowsRequest .ProtoData ()
proto_data.rows = proto_rows
request.proto_rows = proto_data
# Offset must equal the number of rows that were previously sent.
request.offset = 2
response_future_2 = append_rows_stream.send (request)
print(response_future_1.result ())
print(response_future_2.result ())
# Shutdown background threads and close the streaming connection.
append_rows_stream.close ()
# A PENDING type stream must be "finalized" before being committed. No new
# records can be written to the stream after this method has been called.
write_client.finalize_write_stream (name=write_stream.name)
# Commit the stream you created earlier.
batch_commit_write_streams_request = types .BatchCommitWriteStreamsRequest ()
batch_commit_write_streams_request.parent = parent
batch_commit_write_streams_request.write_streams = [write_stream.name]
write_client.batch_commit_write_streams (batch_commit_write_streams_request)
print(f"Writes to stream: '{write_stream.name}' have been committed.")
This code example depends on a compiled protocol module,
customer_record_pb2.py
. To create the compiled module, execute
protoc --python_out=. customer_record.proto
, where protoc
is the
protocol buffer compiler. The customer_record.proto
file defines the format
of the messages used in the Python example.
// The BigQuery Storage API expects protocol buffer data to be encoded in the
// proto2 wire format. This allows it to disambiguate missing optional fields
// fromdefault values without the need for wrapper types.
syntax = "proto2";
// Define a message type representing the rows in your table. The message
// cannot contain fields which are not present in the table.
message CustomerRecord {
optional string customer_name = 1;
// Use the required keyword for client-side validation of required fields.
required int64 row_num = 2;
}