Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 97b6192

Browse files
samridh90slandelle
authored andcommitted
Support InputStream based multipart part (#1593), close #857
1 parent 3b3a7da commit 97b6192

File tree

9 files changed

+368
-4
lines changed

9 files changed

+368
-4
lines changed

‎README.md‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ Use the `addBodyPart` method to add a multipart part to the request.
110110
This part can be of type:
111111
* `ByteArrayPart`
112112
* `FilePart`
113+
* `InputStreamPart`
113114
* `StringPart`
114115

115116
### Dealing with Responses

‎client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public long getContentLength() {
5353
public void write(final Channel channel, NettyResponseFuture<?> future) {
5454

5555
Object msg;
56-
if (body instanceof RandomAccessBody && !ChannelManager.isSslHandlerConfigured(channel.pipeline()) && !config.isDisableZeroCopy()) {
56+
if (body instanceof RandomAccessBody && !ChannelManager.isSslHandlerConfigured(channel.pipeline()) && !config.isDisableZeroCopy() && getContentLength() > 0) {
5757
msg = new BodyFileRegion((RandomAccessBody) body);
5858

5959
} else {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.request.body.multipart;
15+
16+
import java.io.InputStream;
17+
import java.nio.charset.Charset;
18+
19+
import static org.asynchttpclient.util.Assertions.assertNotNull;
20+
21+
public class InputStreamPart extends FileLikePart {
22+
23+
private final InputStream inputStream;
24+
private final long contentLength;
25+
26+
public InputStreamPart(String name, InputStream inputStream, String fileName) {
27+
this(name, inputStream, fileName, -1);
28+
}
29+
30+
public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength) {
31+
this(name, inputStream, fileName, contentLength, null);
32+
}
33+
34+
public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType) {
35+
this(name, inputStream, fileName, contentLength, contentType, null);
36+
}
37+
38+
public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset) {
39+
this(name, inputStream, fileName, contentLength, contentType, charset, null);
40+
}
41+
42+
public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset,
43+
String contentId) {
44+
this(name, inputStream, fileName, contentLength, contentType, charset, contentId, null);
45+
}
46+
47+
public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset,
48+
String contentId, String transferEncoding) {
49+
super(name,
50+
contentType,
51+
charset,
52+
fileName,
53+
contentId,
54+
transferEncoding);
55+
this.inputStream = assertNotNull(inputStream, "inputStream");
56+
this.contentLength = contentLength;
57+
}
58+
59+
public InputStream getInputStream() {
60+
return inputStream;
61+
}
62+
63+
public long getContentLength() {
64+
return contentLength;
65+
}
66+
}

‎client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartUtils.java‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public static List<MultipartPart<? extends Part>> generateMultipartParts(List<Pa
7575
} else if (part instanceof StringPart) {
7676
multipartParts.add(new StringMultipartPart((StringPart) part, boundary));
7777

78+
} else if (part instanceof InputStreamPart) {
79+
multipartParts.add(new InputStreamMultipartPart((InputStreamPart) part, boundary));
80+
7881
} else {
7982
throw new IllegalArgumentException("Unknown part type: " + part);
8083
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.request.body.multipart.part;
15+
16+
import io.netty.buffer.ByteBuf;
17+
import org.asynchttpclient.netty.request.body.BodyChunkedInput;
18+
import org.asynchttpclient.request.body.multipart.InputStreamPart;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.nio.ByteBuffer;
23+
import java.nio.channels.Channels;
24+
import java.nio.channels.ReadableByteChannel;
25+
import java.nio.channels.WritableByteChannel;
26+
27+
import static org.asynchttpclient.util.MiscUtils.closeSilently;
28+
29+
public class InputStreamMultipartPart extends FileLikeMultipartPart<InputStreamPart> {
30+
31+
private long position = 0L;
32+
private ByteBuffer buffer;
33+
private ReadableByteChannel channel;
34+
35+
public InputStreamMultipartPart(InputStreamPart part, byte[] boundary) {
36+
super(part, boundary);
37+
}
38+
39+
private ByteBuffer getBuffer() {
40+
if (buffer == null) {
41+
buffer = ByteBuffer.allocateDirect(BodyChunkedInput.DEFAULT_CHUNK_SIZE);
42+
}
43+
return buffer;
44+
}
45+
46+
private ReadableByteChannel getChannel() {
47+
if (channel == null) {
48+
channel = Channels.newChannel(part.getInputStream());
49+
}
50+
return channel;
51+
}
52+
53+
@Override
54+
protected long getContentLength() {
55+
return part.getContentLength();
56+
}
57+
58+
@Override
59+
protected long transferContentTo(ByteBuf target) throws IOException {
60+
InputStream inputStream = part.getInputStream();
61+
int transferred = target.writeBytes(inputStream, target.writableBytes());
62+
if (transferred > 0) {
63+
position += transferred;
64+
}
65+
if (position == getContentLength() || transferred < 0) {
66+
state = MultipartState.POST_CONTENT;
67+
inputStream.close();
68+
}
69+
return transferred;
70+
}
71+
72+
@Override
73+
protected long transferContentTo(WritableByteChannel target) throws IOException {
74+
ReadableByteChannel channel = getChannel();
75+
ByteBuffer buffer = getBuffer();
76+
77+
int transferred = 0;
78+
int read = channel.read(buffer);
79+
80+
if (read > 0) {
81+
buffer.flip();
82+
while (buffer.hasRemaining()) {
83+
transferred += target.write(buffer);
84+
}
85+
buffer.compact();
86+
position += transferred;
87+
}
88+
if (position == getContentLength() || read < 0) {
89+
state = MultipartState.POST_CONTENT;
90+
if (channel.isOpen()) {
91+
channel.close();
92+
}
93+
}
94+
95+
return transferred;
96+
}
97+
98+
@Override
99+
public void close() {
100+
super.close();
101+
closeSilently(part.getInputStream());
102+
closeSilently(channel);
103+
}
104+
105+
}

‎client/src/main/java/org/asynchttpclient/request/body/multipart/part/MultipartPart.java‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ public abstract class MultipartPart<T extends PartBase> implements Closeable {
106106
}
107107

108108
public long length() {
109+
long contentLength = getContentLength();
110+
if (contentLength < 0) {
111+
return contentLength;
112+
}
109113
return preContentLength + postContentLength + getContentLength();
110114
}
111115

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.request.body;
15+
16+
import org.asynchttpclient.AbstractBasicTest;
17+
import org.asynchttpclient.AsyncHttpClient;
18+
import org.asynchttpclient.Response;
19+
import org.asynchttpclient.request.body.multipart.InputStreamPart;
20+
import org.eclipse.jetty.server.Request;
21+
import org.eclipse.jetty.server.handler.AbstractHandler;
22+
import org.testng.annotations.Test;
23+
24+
import javax.servlet.ServletInputStream;
25+
import javax.servlet.http.HttpServletRequest;
26+
import javax.servlet.http.HttpServletResponse;
27+
import java.io.*;
28+
29+
import static java.nio.charset.StandardCharsets.UTF_8;
30+
import static org.asynchttpclient.Dsl.asyncHttpClient;
31+
import static org.asynchttpclient.Dsl.config;
32+
import static org.asynchttpclient.test.TestUtils.LARGE_IMAGE_FILE;
33+
import static org.asynchttpclient.test.TestUtils.createTempFile;
34+
import static org.testng.Assert.assertEquals;
35+
36+
public class InputStreamPartLargeFileTest extends AbstractBasicTest {
37+
38+
@Override
39+
public AbstractHandler configureHandler() throws Exception {
40+
return new AbstractHandler() {
41+
42+
public void handle(String target, Request baseRequest, HttpServletRequest req, HttpServletResponse resp) throws IOException {
43+
44+
ServletInputStream in = req.getInputStream();
45+
byte[] b = new byte[8192];
46+
47+
int count;
48+
int total = 0;
49+
while ((count = in.read(b)) != -1) {
50+
b = new byte[8192];
51+
total += count;
52+
}
53+
resp.setStatus(200);
54+
resp.addHeader("X-TRANSFERRED", String.valueOf(total));
55+
resp.getOutputStream().flush();
56+
resp.getOutputStream().close();
57+
58+
baseRequest.setHandled(true);
59+
}
60+
};
61+
}
62+
63+
@Test
64+
public void testPutImageFile() throws Exception {
65+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
66+
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
67+
Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
68+
assertEquals(response.getStatusCode(), 200);
69+
}
70+
}
71+
72+
@Test
73+
public void testPutImageFileUnknownSize() throws Exception {
74+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
75+
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
76+
Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
77+
assertEquals(response.getStatusCode(), 200);
78+
}
79+
}
80+
81+
@Test
82+
public void testPutLargeTextFile() throws Exception {
83+
File file = createTempFile(1024 * 1024);
84+
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
85+
86+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
87+
Response response = client.preparePut(getTargetUrl())
88+
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), file.length(), "application/octet-stream", UTF_8)).execute().get();
89+
assertEquals(response.getStatusCode(), 200);
90+
}
91+
}
92+
93+
@Test
94+
public void testPutLargeTextFileUnknownSize() throws Exception {
95+
File file = createTempFile(1024 * 1024);
96+
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
97+
98+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
99+
Response response = client.preparePut(getTargetUrl())
100+
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
101+
assertEquals(response.getStatusCode(), 200);
102+
}
103+
}
104+
}

‎client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java‎

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
import org.asynchttpclient.request.body.Body.BodyState;
2020
import org.testng.annotations.Test;
2121

22-
import java.io.File;
23-
import java.io.IOException;
22+
import java.io.*;
2423
import java.net.URISyntaxException;
2524
import java.net.URL;
2625
import java.nio.ByteBuffer;
@@ -63,7 +62,15 @@ private static File getTestfile() throws URISyntaxException {
6362
}
6463

6564
private static MultipartBody buildMultipart() {
66-
return MultipartUtils.newMultipartBody(PARTS, EmptyHttpHeaders.INSTANCE);
65+
List<Part> parts = new ArrayList<>(PARTS);
66+
try {
67+
File testFile = getTestfile();
68+
InputStream inputStream = new BufferedInputStream(new FileInputStream(testFile));
69+
parts.add(new InputStreamPart("isPart", inputStream, testFile.getName(), testFile.length()));
70+
} catch (URISyntaxException | FileNotFoundException e) {
71+
throw new ExceptionInInitializerError(e);
72+
}
73+
return MultipartUtils.newMultipartBody(parts, EmptyHttpHeaders.INSTANCE);
6774
}
6875

6976
private static long transferWithCopy(MultipartBody multipartBody, int bufferSize) throws IOException {

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /