Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit da2cac8

Browse files
nirinchevblva
andauthored
fix: don't wait for telemetry events MCP-179 (#521)
Co-authored-by: Bianca Lisle <bianca.vianadeaguiar@mongodb.com>
1 parent 14176ba commit da2cac8

File tree

8 files changed

+140
-74
lines changed

8 files changed

+140
-74
lines changed

‎src/common/connectionManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ export type TestConnectionManager = ConnectionManager & {
8282

8383
export abstract class ConnectionManager {
8484
protected clientName: string;
85-
protected readonly _events;
85+
protected readonly _events: EventEmitter<ConnectionManagerEvents>;
8686
readonly events: Pick<EventEmitter<ConnectionManagerEvents>, "on" | "off" | "once">;
8787
private state: AnyConnectionState;
8888

‎src/common/logger.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export const LogId = {
3535
telemetryMetadataError: mongoLogId(1_002_005),
3636
deviceIdResolutionError: mongoLogId(1_002_006),
3737
deviceIdTimeout: mongoLogId(1_002_007),
38+
telemetryClose: mongoLogId(1_002_008),
3839

3940
toolExecute: mongoLogId(1_003_001),
4041
toolExecuteFailure: mongoLogId(1_003_002),

‎src/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ export class Server {
193193
}
194194
}
195195

196-
this.telemetry.emitEvents([event]).catch(()=>{});
196+
this.telemetry.emitEvents([event]);
197197
}
198198

199199
private registerTools(): void {

‎src/telemetry/eventCache.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,18 @@ export class EventCache {
3434
}
3535

3636
/**
37-
* Gets a copy of the currently cached events
37+
* Gets the number of currently cached events
38+
*/
39+
public get size(): number {
40+
return this.cache.size;
41+
}
42+
43+
/**
44+
* Gets a copy of the currently cached events along with their ids
3845
* @returns Array of cached BaseEvent objects
3946
*/
40-
public getEvents(): BaseEvent[] {
41-
return Array.from(this.cache.values());
47+
public getEvents(): {id: number;event: BaseEvent}[] {
48+
return Array.from(this.cache.entries()).map(([id,event])=>({ id, event }));
4249
}
4350

4451
/**
@@ -53,10 +60,11 @@ export class EventCache {
5360
}
5461

5562
/**
56-
* Clears all cached events
63+
* Removes cached events by their ids
5764
*/
58-
public clearEvents(): void {
59-
this.cache.clear();
60-
this.nextId = 0;
65+
public removeEvents(ids: number[]): void {
66+
for (const id of ids) {
67+
this.cache.delete(id);
68+
}
6169
}
6270
}

‎src/telemetry/telemetry.ts

Lines changed: 81 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,25 @@ import { MACHINE_METADATA } from "./constants.js";
77
import { EventCache } from "./eventCache.js";
88
import { detectContainerEnv } from "../helpers/container.js";
99
import type { DeviceId } from "../helpers/deviceId.js";
10+
import { EventEmitter } from "events";
1011

1112
type EventResult = {
1213
success: boolean;
1314
error?: Error;
1415
};
1516

17+
export interface TelemetryEvents {
18+
"events-emitted": [];
19+
"events-send-failed": [];
20+
"events-skipped": [];
21+
}
22+
1623
export class Telemetry {
1724
private isBufferingEvents: boolean = true;
1825
/** Resolves when the setup is complete or a timeout occurs */
1926
public setupPromise: Promise<[string, boolean]> | undefined;
27+
public readonly events: EventEmitter<TelemetryEvents> = new EventEmitter();
28+
2029
private eventCache: EventCache;
2130
private deviceId: DeviceId;
2231

@@ -57,6 +66,12 @@ export class Telemetry {
5766

5867
private async setup(): Promise<void> {
5968
if (!this.isTelemetryEnabled()) {
69+
this.session.logger.info({
70+
id: LogId.telemetryEmitFailure,
71+
context: "telemetry",
72+
message: "Telemetry is disabled.",
73+
noRedaction: true,
74+
});
6075
return;
6176
}
6277

@@ -71,34 +86,47 @@ export class Telemetry {
7186

7287
public async close(): Promise<void> {
7388
this.isBufferingEvents = false;
74-
await this.emitEvents(this.eventCache.getEvents());
89+
90+
this.session.logger.debug({
91+
id: LogId.telemetryClose,
92+
message: `Closing telemetry and flushing ${this.eventCache.size} events`,
93+
context: "telemetry",
94+
});
95+
96+
// Wait up to 5 seconds for events to be sent before closing, but don't throw if it times out
97+
const flushMaxWaitTime = 5000;
98+
let flushTimeout: NodeJS.Timeout | undefined;
99+
await Promise.race([
100+
new Promise<void>((resolve) => {
101+
flushTimeout = setTimeout(() => {
102+
this.session.logger.debug({
103+
id: LogId.telemetryClose,
104+
message: `Failed to flush remaining events within ${flushMaxWaitTime}ms timeout`,
105+
context: "telemetry",
106+
});
107+
resolve();
108+
}, flushMaxWaitTime);
109+
flushTimeout.unref();
110+
}),
111+
this.emit([]),
112+
]);
113+
114+
clearTimeout(flushTimeout);
75115
}
76116

77117
/**
78118
* Emits events through the telemetry pipeline
79119
* @param events - The events to emit
80120
*/
81-
public async emitEvents(events: BaseEvent[]): Promise<void> {
82-
try {
83-
if (!this.isTelemetryEnabled()) {
84-
this.session.logger.info({
85-
id: LogId.telemetryEmitFailure,
86-
context: "telemetry",
87-
message: "Telemetry is disabled.",
88-
noRedaction: true,
89-
});
90-
return;
91-
}
92-
93-
await this.emit(events);
94-
} catch {
95-
this.session.logger.debug({
96-
id: LogId.telemetryEmitFailure,
97-
context: "telemetry",
98-
message: "Error emitting telemetry events.",
99-
noRedaction: true,
100-
});
121+
public emitEvents(events: BaseEvent[]): void {
122+
if (!this.isTelemetryEnabled()) {
123+
this.events.emit("events-skipped");
124+
return;
101125
}
126+
127+
// Don't wait for events to be sent - we should not block regular server
128+
// operations on telemetry
129+
void this.emit(events);
102130
}
103131

104132
/**
@@ -144,32 +172,44 @@ export class Telemetry {
144172
return;
145173
}
146174

147-
const cachedEvents = this.eventCache.getEvents();
148-
const allEvents = [...cachedEvents, ...events];
175+
try {
176+
const cachedEvents = this.eventCache.getEvents();
177+
const allEvents = [...cachedEvents.map((e) => e.event), ...events];
149178

150-
this.session.logger.debug({
151-
id: LogId.telemetryEmitStart,
152-
context: "telemetry",
153-
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
154-
});
179+
this.session.logger.debug({
180+
id: LogId.telemetryEmitStart,
181+
context: "telemetry",
182+
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
183+
});
184+
185+
const result = await this.sendEvents(this.session.apiClient, allEvents);
186+
if (result.success) {
187+
this.eventCache.removeEvents(cachedEvents.map((e) => e.id));
188+
this.session.logger.debug({
189+
id: LogId.telemetryEmitSuccess,
190+
context: "telemetry",
191+
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents)}`,
192+
});
193+
this.events.emit("events-emitted");
194+
return;
195+
}
155196

156-
const result = await this.sendEvents(this.session.apiClient, allEvents);
157-
if (result.success) {
158-
this.eventCache.clearEvents();
159197
this.session.logger.debug({
160-
id: LogId.telemetryEmitSuccess,
198+
id: LogId.telemetryEmitFailure,
161199
context: "telemetry",
162-
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents,null,2)}`,
200+
message: `Error sending event to client: ${result.errorinstanceofError ? result.error.message : String(result.error)}`,
163201
});
164-
return;
202+
this.eventCache.appendEvents(events);
203+
this.events.emit("events-send-failed");
204+
} catch (error) {
205+
this.session.logger.debug({
206+
id: LogId.telemetryEmitFailure,
207+
context: "telemetry",
208+
message: `Error emitting telemetry events: ${error instanceof Error ? error.message : String(error)}`,
209+
noRedaction: true,
210+
});
211+
this.events.emit("events-send-failed");
165212
}
166-
167-
this.session.logger.debug({
168-
id: LogId.telemetryEmitFailure,
169-
context: "telemetry",
170-
message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`,
171-
});
172-
this.eventCache.appendEvents(events);
173213
}
174214

175215
/**

‎src/tools/tool.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,14 @@ export abstract class ToolBase {
8282
});
8383

8484
const result = await this.execute(...args);
85-
await this.emitToolEvent(startTime, result, ...args).catch(() => {});
85+
this.emitToolEvent(startTime, result, ...args);
86+
87+
this.session.logger.debug({
88+
id: LogId.toolExecute,
89+
context: "tool",
90+
message: `Executed tool ${this.name}`,
91+
noRedaction: true,
92+
});
8693
return result;
8794
} catch (error: unknown) {
8895
this.session.logger.error({
@@ -91,7 +98,7 @@ export abstract class ToolBase {
9198
message: `Error executing ${this.name}: ${error as string}`,
9299
});
93100
const toolResult = await this.handleError(error, args[0] as ToolArgs<typeof this.argsShape>);
94-
awaitthis.emitToolEvent(startTime, toolResult, ...args).catch(()=>{});
101+
this.emitToolEvent(startTime, toolResult, ...args);
95102
return toolResult;
96103
}
97104
};
@@ -200,11 +207,11 @@ export abstract class ToolBase {
200207
* @param result - Whether the command succeeded or failed
201208
* @param args - The arguments passed to the tool
202209
*/
203-
private asyncemitToolEvent(
210+
private emitToolEvent(
204211
startTime: number,
205212
result: CallToolResult,
206213
...args: Parameters<ToolCallback<typeof this.argsShape>>
207-
): Promise<void> {
214+
): void {
208215
if (!this.telemetry.isTelemetryEnabled()) {
209216
return;
210217
}
@@ -230,7 +237,7 @@ export abstract class ToolBase {
230237
event.properties.project_id = metadata.projectId;
231238
}
232239

233-
awaitthis.telemetry.emitEvents([event]);
240+
this.telemetry.emitEvents([event]);
234241
}
235242
}
236243

‎src/transports/streamableHttp.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
124124
// eslint-disable-next-line @typescript-eslint/no-misused-promises
125125
keepAliveLoop = setInterval(async () => {
126126
try {
127-
this.logger.debug({
127+
server.session.logger.debug({
128128
id: LogId.streamableHttpTransportKeepAlive,
129129
context: "streamableHttpTransport",
130130
message: "Sending ping",
@@ -138,7 +138,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
138138
} catch (err) {
139139
try {
140140
failedPings++;
141-
this.logger.warning({
141+
server.session.logger.warning({
142142
id: LogId.streamableHttpTransportKeepAliveFailure,
143143
context: "streamableHttpTransport",
144144
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
@@ -162,7 +162,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
162162
this.logger.error({
163163
id: LogId.streamableHttpTransportSessionCloseFailure,
164164
context: "streamableHttpTransport",
165-
message: `Error closing session: ${error instanceof Error ? error.message : String(error)}`,
165+
message: `Error closing session${sessionId}: ${error instanceof Error ? error.message : String(error)}`,
166166
});
167167
}
168168
},

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /