Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 8f68f63

Browse files
authored
[DE-511] Retriable cursor (#505)
* removed MetaAware * retriable cursor * tests * resilience test * test fix * resume cursor * doc * doc
1 parent 1eb2902 commit 8f68f63

File tree

16 files changed

+352
-266
lines changed

16 files changed

+352
-266
lines changed

‎core/src/main/java/com/arangodb/ArangoCursor.java‎

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222

2323
import com.arangodb.entity.CursorStats;
2424
import com.arangodb.entity.CursorWarning;
25+
import com.arangodb.model.AqlQueryOptions;
2526

2627
import java.io.Closeable;
2728
import java.util.Collection;
2829
import java.util.List;
30+
import java.util.NoSuchElementException;
2931

3032
/**
3133
* @author Mark Vollmary
@@ -76,4 +78,27 @@ public interface ArangoCursor<T> extends ArangoIterable<T>, ArangoIterator<T>, C
7678
*/
7779
boolean isPotentialDirtyRead();
7880

81+
/**
82+
* @return The ID of the batch after the current one. The first batch has an ID of 1 and the value is incremented by
83+
* 1 with every batch. Only set if the allowRetry query option is enabled.
84+
* @since ArangoDB 3.11
85+
*/
86+
String getNextBatchId();
87+
88+
/**
89+
* Returns the next element in the iteration.
90+
* <p/>
91+
* If the cursor allows retries (see {@link AqlQueryOptions#allowRetry(Boolean)}), then it is safe to retry invoking
92+
* this method in case of I/O exceptions (which are actually thrown as {@link com.arangodb.ArangoDBException} with
93+
* cause {@link java.io.IOException}).
94+
* <p/>
95+
* If the cursor does not allow retries (default), then it is not safe to retry invoking this method in case of I/O
96+
* exceptions, since the request to fetch the next batch is not idempotent (i.e. the cursor may advance multiple
97+
* times on the server).
98+
*
99+
* @return the next element in the iteration
100+
* @throws NoSuchElementException if the iteration has no more elements
101+
*/
102+
@Override
103+
T next();
79104
}

‎core/src/main/java/com/arangodb/ArangoDatabase.java‎

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,20 @@ public interface ArangoDatabase extends ArangoSerdeAccessor {
310310
*/
311311
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type);
312312

313+
/**
314+
* Return an cursor from the given cursor-ID if still existing
315+
*
316+
* @param cursorId The ID of the cursor
317+
* @param type The type of the result (POJO or {@link com.arangodb.util.RawData})
318+
* @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see
319+
* {@link AqlQueryOptions#allowRetry(Boolean)}
320+
* @return cursor of the results
321+
* @see <a href= "https://www.arangodb.com/docs/stable/http/aql-query-cursor-accessing-cursors
322+
* .html#read-next-batch-from-cursor">API Documentation</a>
323+
* @since ArangoDB 3.11
324+
*/
325+
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, String nextBatchId);
326+
313327
/**
314328
* Explain an AQL query and return information about it
315329
*

‎core/src/main/java/com/arangodb/entity/MetaAware.java‎

Lines changed: 0 additions & 14 deletions
This file was deleted.

‎core/src/main/java/com/arangodb/internal/ArangoCursorExecute.java‎

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@
2222

2323
import com.arangodb.internal.cursor.entity.InternalCursorEntity;
2424

25-
import java.util.Map;
2625

2726
/**
2827
* @author Mark Vollmary
2928
*/
3029
public interface ArangoCursorExecute {
3130

32-
InternalCursorEntity next(String id, Map<String, String> meta);
31+
InternalCursorEntity next(String id, StringnextBatchId);
3332

34-
void close(String id, Map<String, String> meta);
33+
void close(String id);
3534

3635
}

‎core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java‎

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public <T> ArangoCursor<T> query(
161161
final String query, final Class<T> type, final Map<String, Object> bindVars, final AqlQueryOptions options) {
162162
final InternalRequest request = queryRequest(query, bindVars, options);
163163
final HostHandle hostHandle = new HostHandle();
164-
final InternalCursorEntity result = executor.execute(request, InternalCursorEntity.class, hostHandle);
164+
final InternalCursorEntity result = executor.execute(request, internalCursorEntityDeserializer(), hostHandle);
165165
return createCursor(result, type, options, hostHandle);
166166
}
167167

@@ -183,8 +183,20 @@ public <T> ArangoCursor<T> query(final String query, final Class<T> type) {
183183
@Override
184184
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) {
185185
final HostHandle hostHandle = new HostHandle();
186-
final InternalCursorEntity result = executor
187-
.execute(queryNextRequest(cursorId, null, null), InternalCursorEntity.class, hostHandle);
186+
final InternalCursorEntity result = executor.execute(
187+
queryNextRequest(cursorId, null),
188+
internalCursorEntityDeserializer(),
189+
hostHandle);
190+
return createCursor(result, type, null, hostHandle);
191+
}
192+
193+
@Override
194+
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
195+
final HostHandle hostHandle = new HostHandle();
196+
final InternalCursorEntity result = executor.execute(
197+
queryNextByBatchIdRequest(cursorId, nextBatchId, null),
198+
internalCursorEntityDeserializer(),
199+
hostHandle);
188200
return createCursor(result, type, null, hostHandle);
189201
}
190202

@@ -196,13 +208,15 @@ private <T> ArangoCursor<T> createCursor(
196208

197209
final ArangoCursorExecute execute = new ArangoCursorExecute() {
198210
@Override
199-
public InternalCursorEntity next(final String id, Map<String, String> meta) {
200-
return executor.execute(queryNextRequest(id, options, meta), InternalCursorEntity.class, hostHandle);
211+
public InternalCursorEntity next(final String id, final String nextBatchId) {
212+
InternalRequest request = nextBatchId == null ?
213+
queryNextRequest(id, options) : queryNextByBatchIdRequest(id, nextBatchId, options);
214+
return executor.execute(request, internalCursorEntityDeserializer(), hostHandle);
201215
}
202216

203217
@Override
204-
public void close(final String id, Map<String, String> meta) {
205-
executor.execute(queryCloseRequest(id, options, meta), Void.class, hostHandle);
218+
public void close(final String id) {
219+
executor.execute(queryCloseRequest(id, options), Void.class, hostHandle);
206220
}
207221
};
208222

‎core/src/main/java/com/arangodb/internal/ArangoExecutorSync.java‎

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,9 @@
2121
package com.arangodb.internal;
2222

2323
import com.arangodb.ArangoDBException;
24-
import com.arangodb.entity.MetaAware;
2524
import com.arangodb.internal.config.ArangoConfig;
2625
import com.arangodb.internal.net.CommunicationProtocol;
2726
import com.arangodb.internal.net.HostHandle;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
3027

3128
import java.io.IOException;
3229
import java.lang.reflect.Type;
@@ -36,8 +33,6 @@
3633
*/
3734
public class ArangoExecutorSync extends ArangoExecutor {
3835

39-
private static final Logger LOG = LoggerFactory.getLogger(ArangoExecutorSync.class);
40-
4136
private final CommunicationProtocol protocol;
4237

4338
public ArangoExecutorSync(final CommunicationProtocol protocol, final ArangoConfig config) {
@@ -64,14 +59,7 @@ public <T> T execute(
6459

6560
final InternalResponse response = protocol.execute(interceptRequest(request), hostHandle);
6661
interceptResponse(response);
67-
T deserialize = responseDeserializer.deserialize(response);
68-
69-
if (deserialize instanceof MetaAware) {
70-
LOG.debug("Response is MetaAware {}", deserialize.getClass().getName());
71-
((MetaAware) deserialize).setMeta(response.getMeta());
72-
}
73-
74-
return deserialize;
62+
return responseDeserializer.deserialize(response);
7563
}
7664

7765
public void disconnect() {

‎core/src/main/java/com/arangodb/internal/InternalArangoDatabase.java‎

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.arangodb.entity.*;
2424
import com.arangodb.entity.arangosearch.analyzer.SearchAnalyzer;
2525
import com.arangodb.internal.ArangoExecutor.ResponseDeserializer;
26+
import com.arangodb.internal.cursor.entity.InternalCursorEntity;
2627
import com.arangodb.internal.util.RequestUtils;
2728
import com.arangodb.model.*;
2829
import com.arangodb.model.arangosearch.*;
@@ -156,27 +157,30 @@ protected InternalRequest queryRequest(final String query, final Map<String, Obj
156157
return request;
157158
}
158159

159-
protected InternalRequest queryNextRequest(final String id, final AqlQueryOptions options, Map<String, String> meta) {
160-
160+
protected InternalRequest queryNextRequest(final String id, final AqlQueryOptions options) {
161161
final InternalRequest request = request(name, RequestType.POST, PATH_API_CURSOR, id);
162-
request.putHeaderParams(meta);
162+
return completeQueryNextRequest(request, options);
163+
}
163164

164-
final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();
165+
protected InternalRequest queryNextByBatchIdRequest(final String id,
166+
final String nextBatchId,
167+
final AqlQueryOptions options) {
168+
final InternalRequest request = request(name, RequestType.POST, PATH_API_CURSOR, id, nextBatchId);
169+
return completeQueryNextRequest(request, options);
170+
}
165171

172+
private InternalRequest completeQueryNextRequest(final InternalRequest request, final AqlQueryOptions options) {
173+
final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();
166174
if (Boolean.TRUE.equals(opt.getAllowDirtyRead())) {
167175
RequestUtils.allowDirtyRead(request);
168176
}
169177
request.putHeaderParam(TRANSACTION_ID, opt.getStreamTransactionId());
170178
return request;
171179
}
172180

173-
protected InternalRequest queryCloseRequest(final String id, final AqlQueryOptions options, Map<String, String> meta) {
174-
181+
protected InternalRequest queryCloseRequest(final String id, final AqlQueryOptions options) {
175182
final InternalRequest request = request(name, RequestType.DELETE, PATH_API_CURSOR, id);
176-
request.putHeaderParams(meta);
177-
178183
final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();
179-
180184
if (Boolean.TRUE.equals(opt.getAllowDirtyRead())) {
181185
RequestUtils.allowDirtyRead(request);
182186
}
@@ -243,6 +247,15 @@ protected InternalRequest deleteAqlFunctionRequest(final String name, final AqlF
243247
return request;
244248
}
245249

250+
protected ResponseDeserializer<InternalCursorEntity> internalCursorEntityDeserializer() {
251+
return response -> {
252+
InternalCursorEntity e = getSerde().deserialize(response.getBody(), InternalCursorEntity.class);
253+
boolean potentialDirtyRead = Boolean.parseBoolean(response.getMeta("X-Arango-Potential-Dirty-Read"));
254+
e.setPontentialDirtyRead(potentialDirtyRead);
255+
return e;
256+
};
257+
}
258+
246259
protected ResponseDeserializer<Integer> deleteAqlFunctionResponseDeserializer() {
247260
return response -> getSerde().deserialize(response.getBody(), "/deletedCount", Integer.class);
248261
}

‎core/src/main/java/com/arangodb/internal/cursor/AbstractArangoIterable.java‎

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /