I am trying to read the InputStream of a large blob from an Azure container and store it in another cloud storage of my own.
For small files, it is working perfectly. But for large blobs (around 3GB), the Azure connection is getting timed out after few minutes.
I have also tried setting all the timeout values to maximum. It didn't help either.
Code snippet:
HttpClient httpClient = new NettyAsyncHttpClientBuilder();
.readTimeout(Duration.ofDays(365))
.responseTimeout(Duration.ofDays(365))
.connectTimeout(Duration.ofDays(365)).build();
RequestRetryOptions retryOptions = new RequestRetryOptions(
RetryPolicyType.EXPONENTIAL,
3,
Duration.ofSeconds(30),
null,
null,
null
);
HttpPipelinePolicy timeoutPolicy = new TimeoutPolicy(Duration.ofDays(365));
HttpClientOptions httpclientoptions = new HttpClientOptions()
.readTimeout(Duration.ofDays(365))
.responseTimeout(Duration.ofDays(365))
.setConnectionIdleTimeout(Duration.ofDays(365))
.setConnectTimeout(Duration.ofDays(365))
.setWriteTimeout(Duration.ofDays(365));
// Building the Azure Client
BlobServiceClient azureClient = new BlobServiceClientBuilder()
.connectionString("My_Connection_String_For_Authentication")
.httpClient(httpClient)
.addPolicy(timeoutPolicy)
.clientOptions(httpclientoptions)
.retryOptions(retryOptions)
.buildClient();
BlobContainerClient containerClient = azureClient.getBlobContainerClient("{azureContainerName}");
PagedResponse<BlobItem> pagedResponse = containerClient.listBlobs(options, continuationToken, null).iterableByPage().iterator().next();
List<BlobItem> pageItems = pagedResponse.getValue();
for(BlobItem blob : pageItems) {
// Opening the inputStream of the Blob
try(InputStream blobInputStream = containerClient.getBlobVersionClient(key, versionId).openInputStream();
BufferedInputStream bufferedStream = new BufferedInputStream(blobInputStream);){
// reading the bufferedStream and piping it to my own cloud storage.
}
When reading the bytes from the InputStream, I'm getting the below error after few minutes.
java.net.SocketException: Broken pipe (Write failed)
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:346)
at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1213)
at okio.OutputStreamSink.write(JvmOkio.kt:56)
at okio.AsyncTimeout$sink1ドル.write(AsyncTimeout.kt:102)
at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
at okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSink.write(Http1ExchangeCodec.kt:311)
at okio.ForwardingSink.write(ForwardingSink.kt:29)
at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
at okio.RealBufferedSink.writeAll(RealBufferedSink.kt:195)
at com.zoho.nebula.requests.okhttp3.Okhttp3HttpClient2ドル.writeTo(Okhttp3HttpClient.java:284)
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:62)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:34)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154)
at com.zoho.nebula.requests.okhttp3.Okhttp3HttpClient.execute(Okhttp3HttpClient.java:89)
... 36 more
Suppressed: java.net.SocketException: Operation timed out (Read failed)
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:478)
at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:472)
at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1333)
at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:976)
at okio.InputStreamSource.read(JvmOkio.kt:93)
at okio.AsyncTimeout$source1ドル.read(AsyncTimeout.kt:128)
at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:430)
at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.kt:323)
at okhttp3.internal.http1.HeadersReader.readLine(HeadersReader.kt:29)
at okhttp3.internal.http1.Http1ExchangeCodec.readResponseHeaders(Http1ExchangeCodec.kt:180)
at okhttp3.internal.connection.Exchange.readResponseHeaders(Exchange.kt:110)
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:93)
Please assist me on how to fix this and explain what am I doing wrong.
Dependencies used:
azure-storage-blob-12.22.3.jar
azure-storage-common-12.21.2.jar
azure-core-1.40.0.jar
azure-storage-common-12.21.2.jar
azure-core-http-netty-1.13.4.jar
reactor-netty-http-1.0.31.jar
reactor-netty-core-1.0.31.jar
netty-resolver-dns-4.1.89.Final.jar
azure-json-1.0.1.jar
1 Answer 1
trying to read the InputStream of a large blob from an Azure container and store it in another cloud storage of my own.
You can use the below code to read the larger files from an Azure blob storage using Azure Java SDK.
Code:
private static final String CONNE_STRING="xxxxt";
private static final String CONTAINER_NAME = "xxxx";
private static final String BLOB_NAME = "xxxxx";
public static void main(String[] args) {
com.azure.core.http.HttpClient httpClient = new NettyAsyncHttpClientBuilder()
.readTimeout(Duration.ofDays(365))
.responseTimeout(Duration.ofDays(365))
.connectTimeout(Duration.ofDays(365))
.build();
RequestRetryOptions retryOptions = new RequestRetryOptions(
RetryPolicyType.EXPONENTIAL,
3,
Duration.ofSeconds(30),
null,
null,
null
);
HttpPipelinePolicy timeoutPolicy = new TimeoutPolicy(Duration.ofDays(365));
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
.connectionString(CONNE_STRING)
.httpClient(httpClient)
.addPolicy(timeoutPolicy)
.retryOptions(retryOptions)
.buildClient();
BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(CONTAINER_NAME);
BlobClient blobClient = containerClient.getBlobClient(BLOB_NAME);
readBlobInChunks(blobClient);
}
private static void readBlobInChunks(BlobClient blobClient) {
final long chunkSize = 8 * 1024 * 1024; // 8MB chunks
try {
BlobProperties properties = blobClient.getProperties();
long blobSize = properties.getBlobSize();
for (long offset = 0; offset < blobSize; offset += chunkSize) {
long count = Math.min(chunkSize, blobSize - offset);
try (InputStream blobInputStream = blobClient.openInputStream(new BlobRange(offset, count), null);
BufferedInputStream bufferedStream = new BufferedInputStream(blobInputStream)) {
processChunk(bufferedStream, count);
}
}
System.out.println("Blob read successfully in chunks.");
} catch (IOException e) {
e.printStackTrace();
}
}
private static void processChunk(InputStream inputStream, long count) throws IOException {
byte[] buffer = new byte[8192];
int bytesRead;
long totalBytesRead = 0;
while (totalBytesRead < count && (bytesRead = inputStream.read(buffer)) != -1) {
totalBytesRead += bytesRead;
System.out.write(buffer, 0, bytesRead);
}
}
The method reads a large file from Azure Blob Storage in 8 MB chunks and processes each chunk. Currently, the processChunk method prints the data to the console.
You can change the processChunk method to upload the data to your own cloud storage instead of printing it. You can also modify the method that reads the chunks to upload each chunk to your cloud storage in parallel.
Output:
Blob read successfully in chunks.
enter image description here
7 Comments
Explore related questions
See similar questions with these tags.