Expand Up
@@ -5,6 +5,7 @@
package io.modelcontextprotocol.client;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.experimental.tasks.TaskStore;
import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
Expand Down
Expand Up
@@ -184,6 +185,8 @@ class SyncSpec {
private final List<Consumer<McpSchema.ProgressNotification>> progressConsumers = new ArrayList<>();
private final List<Consumer<McpSchema.TaskStatusNotification>> taskStatusConsumers = new ArrayList<>();
private Function<CreateMessageRequest, CreateMessageResult> samplingHandler;
private Function<ElicitRequest, ElicitResult> elicitationHandler;
Expand All
@@ -194,6 +197,10 @@ class SyncSpec {
private boolean enableCallToolSchemaCaching = false; // Default to false
private TaskStore<McpSchema.ClientTaskPayloadResult> taskStore;
private Duration taskPollTimeout; // null = use default (5 minutes)
private SyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
Expand Down
Expand Up
@@ -317,6 +324,44 @@ public SyncSpec elicitation(Function<ElicitRequest, ElicitResult> elicitationHan
return this;
}
/**
* Sets the task store for client-side task hosting. When set, the client can host
* tasks for task-augmented sampling and elicitation requests from the server.
*
* <p>
* This is an experimental feature that may change in future releases.
* @param taskStore The task store implementation. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStore is null
*/
public SyncSpec taskStore(TaskStore<McpSchema.ClientTaskPayloadResult> taskStore) {
Assert.notNull(taskStore, "Task store must not be null");
this.taskStore = taskStore;
return this;
}
/**
* Sets the maximum time to wait for a task to reach a terminal state during task
* result polling.
*
* <p>
* When using task-augmented requests (e.g., long-running tool calls), the client
* polls the server for task status updates. This timeout limits how long the
* client will wait for the task to complete, fail, or be cancelled.
*
* <p>
* If not set, defaults to 5 minutes to prevent infinite polling loops.
*
* <p>
* This is an experimental feature that may change in future releases.
* @param timeout maximum poll duration, or null to use the default (5 minutes)
* @return This builder instance for method chaining
*/
public SyncSpec taskPollTimeout(Duration timeout) {
this.taskPollTimeout = timeout;
return this;
}
/**
* Adds a consumer to be notified when the available tools change. This allows the
* client to react to changes in the server's tool capabilities, such as tools
Expand Down
Expand Up
@@ -428,14 +473,42 @@ public SyncSpec progressConsumer(Consumer<McpSchema.ProgressNotification> progre
* @param progressConsumers A list of consumers that receives progress
* notifications. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if progressConsumer is null
* @throws IllegalArgumentException if progressConsumers is null
*/
public SyncSpec progressConsumers(List<Consumer<McpSchema.ProgressNotification>> progressConsumers) {
Assert.notNull(progressConsumers, "Progress consumers must not be null");
this.progressConsumers.addAll(progressConsumers);
return this;
}
/**
* Adds a consumer to be notified of task status notifications from the server.
* This enables clients to receive updates about task progress and status changes.
* @param taskStatusConsumer A consumer that receives task status notifications.
* Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStatusConsumer is null
*/
public SyncSpec taskStatusConsumer(Consumer<McpSchema.TaskStatusNotification> taskStatusConsumer) {
Assert.notNull(taskStatusConsumer, "Task status consumer must not be null");
this.taskStatusConsumers.add(taskStatusConsumer);
return this;
}
/**
* Adds multiple consumers to be notified of task status notifications from the
* server.
* @param taskStatusConsumers A list of consumers that receive task status
* notifications. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStatusConsumers is null
*/
public SyncSpec taskStatusConsumers(List<Consumer<McpSchema.TaskStatusNotification>> taskStatusConsumers) {
Assert.notNull(taskStatusConsumers, "Task status consumers must not be null");
this.taskStatusConsumers.addAll(taskStatusConsumers);
return this;
}
/**
* Add a provider of {@link McpTransportContext}, providing a context before
* calling any client operation. This allows to extract thread-locals and hand
Expand Down
Expand Up
@@ -486,14 +559,15 @@ public SyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching)
public McpSyncClient build() {
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, this.samplingHandler,
this.elicitationHandler, this.enableCallToolSchemaCaching);
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
this.taskStatusConsumers, this.samplingHandler, this.elicitationHandler,
this.enableCallToolSchemaCaching, this.taskPollTimeout, this.taskStore != null);
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
return new McpSyncClient(new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout,
jsonSchemaValidator != null ? jsonSchemaValidator : JsonSchemaValidator.getDefault(),
asyncFeatures ), this.contextProvider);
jsonSchemaValidator != null ? jsonSchemaValidator : JsonSchemaValidator.getDefault(), asyncFeatures,
this.taskStore ), this.contextProvider);
}
}
Expand Down
Expand Up
@@ -540,6 +614,8 @@ class AsyncSpec {
private final List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers = new ArrayList<>();
private final List<Function<McpSchema.TaskStatusNotification, Mono<Void>>> taskStatusConsumers = new ArrayList<>();
private Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler;
private Function<ElicitRequest, Mono<ElicitResult>> elicitationHandler;
Expand All
@@ -548,6 +624,10 @@ class AsyncSpec {
private boolean enableCallToolSchemaCaching = false; // Default to false
private TaskStore<McpSchema.ClientTaskPayloadResult> taskStore;
private Duration taskPollTimeout; // null = use default (5 minutes)
private AsyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
Expand Down
Expand Up
@@ -671,6 +751,22 @@ public AsyncSpec elicitation(Function<ElicitRequest, Mono<ElicitResult>> elicita
return this;
}
/**
* Sets the task store for client-side task hosting. When set, the client can host
* tasks for task-augmented sampling and elicitation requests from the server.
*
* <p>
* This is an experimental feature that may change in future releases.
* @param taskStore The task store implementation. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStore is null
*/
public AsyncSpec taskStore(TaskStore<McpSchema.ClientTaskPayloadResult> taskStore) {
Assert.notNull(taskStore, "Task store must not be null");
this.taskStore = taskStore;
return this;
}
/**
* Adds a consumer to be notified when the available tools change. This allows the
* client to react to changes in the server's tool capabilities, such as tools
Expand Down
Expand Up
@@ -785,7 +881,7 @@ public AsyncSpec progressConsumer(Function<McpSchema.ProgressNotification, Mono<
* @param progressConsumers A list of consumers that receives progress
* notifications. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if progressConsumer is null
* @throws IllegalArgumentException if progressConsumers is null
*/
public AsyncSpec progressConsumers(
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers) {
Expand All
@@ -794,6 +890,35 @@ public AsyncSpec progressConsumers(
return this;
}
/**
* Adds a consumer to be notified of task status notifications from the server.
* This enables clients to receive updates about task progress and status changes.
* @param taskStatusConsumer A consumer that receives task status notifications.
* Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStatusConsumer is null
*/
public AsyncSpec taskStatusConsumer(Function<McpSchema.TaskStatusNotification, Mono<Void>> taskStatusConsumer) {
Assert.notNull(taskStatusConsumer, "Task status consumer must not be null");
this.taskStatusConsumers.add(taskStatusConsumer);
return this;
}
/**
* Adds multiple consumers to be notified of task status notifications from the
* server.
* @param taskStatusConsumers A list of consumers that receive task status
* notifications. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if taskStatusConsumers is null
*/
public AsyncSpec taskStatusConsumers(
List<Function<McpSchema.TaskStatusNotification, Mono<Void>>> taskStatusConsumers) {
Assert.notNull(taskStatusConsumers, "Task status consumers must not be null");
this.taskStatusConsumers.addAll(taskStatusConsumers);
return this;
}
/**
* Sets the JSON schema validator to use for validating tool responses against
* output schemas.
Expand All
@@ -819,6 +944,21 @@ public AsyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching
return this;
}
/**
* Sets the maximum duration to poll for task completion in
* {@code callToolStream()}. If not set, defaults to 5 minutes to prevent infinite
* polling loops.
*
* <p>
* This is an experimental feature that may change in future releases.
* @param timeout maximum poll duration, or null to use the default (5 minutes)
* @return This builder instance for method chaining
*/
public AsyncSpec taskPollTimeout(Duration timeout) {
this.taskPollTimeout = timeout;
return this;
}
/**
* Create an instance of {@link McpAsyncClient} with the provided configurations
* or sensible defaults.
Expand All
@@ -830,9 +970,11 @@ public McpAsyncClient build() {
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
jsonSchemaValidator,
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
this.samplingHandler, this.elicitationHandler, this.enableCallToolSchemaCaching));
this.toolsChangeConsumers, this.taskStatusConsumers, this.resourcesChangeConsumers,
this.resourcesUpdateConsumers, this.promptsChangeConsumers, this.loggingConsumers,
this.progressConsumers, this.samplingHandler, this.elicitationHandler,
this.enableCallToolSchemaCaching, this.taskPollTimeout, this.taskStore != null),
this.taskStore);
}
}
Expand Down