Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 7d050b5

Browse files
fix(core): Fix wrong async types when instrumenting anthropic's stream api (#18007)
The issue surfaced when `message.stream` was used in conjunction with the `stream: true` option which would lead to us returning async results instead of the expected MessageStream from anthropic ai. We now take this into account and tightened the types. Closes: #17977
1 parent 925a4ea commit 7d050b5

File tree

3 files changed

+111
-13
lines changed

3 files changed

+111
-13
lines changed

‎dev-packages/node-integration-tests/suites/tracing/anthropic/scenario-stream.mjs‎

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,62 @@ function createMockStreamEvents(model = 'claude-3-haiku-20240307') {
3232
return generator();
3333
}
3434

35+
// Mimics Anthropic SDK's MessageStream class
36+
class MockMessageStream {
37+
constructor(model) {
38+
this._model = model;
39+
this._eventHandlers = {};
40+
}
41+
42+
on(event, handler) {
43+
if (!this._eventHandlers[event]) {
44+
this._eventHandlers[event] = [];
45+
}
46+
this._eventHandlers[event].push(handler);
47+
48+
// Start processing events asynchronously (don't await)
49+
if (event === 'streamEvent' && !this._processing) {
50+
this._processing = true;
51+
this._processEvents();
52+
}
53+
54+
return this;
55+
}
56+
57+
async _processEvents() {
58+
try {
59+
const generator = createMockStreamEvents(this._model);
60+
for await (const event of generator) {
61+
if (this._eventHandlers['streamEvent']) {
62+
for (const handler of this._eventHandlers['streamEvent']) {
63+
handler(event);
64+
}
65+
}
66+
}
67+
68+
// Emit 'message' event when done
69+
if (this._eventHandlers['message']) {
70+
for (const handler of this._eventHandlers['message']) {
71+
handler();
72+
}
73+
}
74+
} catch (error) {
75+
if (this._eventHandlers['error']) {
76+
for (const handler of this._eventHandlers['error']) {
77+
handler(error);
78+
}
79+
}
80+
}
81+
}
82+
83+
async *[Symbol.asyncIterator]() {
84+
const generator = createMockStreamEvents(this._model);
85+
for await (const event of generator) {
86+
yield event;
87+
}
88+
}
89+
}
90+
3591
class MockAnthropic {
3692
constructor(config) {
3793
this.apiKey = config.apiKey;
@@ -68,9 +124,9 @@ class MockAnthropic {
68124
};
69125
}
70126

71-
async_messagesStream(params){
72-
awaitnewPromise(resolve=>setTimeout(resolve,5));
73-
return createMockStreamEvents(params?.model);
127+
// This should return synchronously (like the real Anthropic SDK)
128+
_messagesStream(params){
129+
return newMockMessageStream(params?.model);
74130
}
75131
}
76132

@@ -90,13 +146,27 @@ async function run() {
90146
}
91147

92148
// 2) Streaming via messages.stream API
93-
const stream2 = awaitclient.messages.stream({
149+
const stream2 = client.messages.stream({
94150
model: 'claude-3-haiku-20240307',
95151
messages: [{ role: 'user', content: 'Stream this too' }],
96152
});
97153
for await (const _ of stream2) {
98154
void _;
99155
}
156+
157+
// 3) Streaming via messages.stream API with redundant stream: true param
158+
const stream3 = client.messages.stream({
159+
model: 'claude-3-haiku-20240307',
160+
messages: [{ role: 'user', content: 'Stream with param' }],
161+
stream: true, // This param is redundant but should not break synchronous behavior
162+
});
163+
// Verify it has .on() method immediately (not a Promise)
164+
if (typeof stream3.on !== 'function') {
165+
throw new Error('BUG: messages.stream() with stream: true did not return MessageStream synchronously!');
166+
}
167+
for await (const _ of stream3) {
168+
void _;
169+
}
100170
});
101171
}
102172

‎dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts‎

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,23 @@ describe('Anthropic integration', () => {
308308
'gen_ai.usage.total_tokens': 25,
309309
}),
310310
}),
311+
// messages.stream with redundant stream: true param
312+
expect.objectContaining({
313+
description: 'messages claude-3-haiku-20240307 stream-response',
314+
op: 'gen_ai.messages',
315+
data: expect.objectContaining({
316+
'gen_ai.system': 'anthropic',
317+
'gen_ai.operation.name': 'messages',
318+
'gen_ai.request.model': 'claude-3-haiku-20240307',
319+
'gen_ai.request.stream': true,
320+
'gen_ai.response.streaming': true,
321+
'gen_ai.response.model': 'claude-3-haiku-20240307',
322+
'gen_ai.response.id': 'msg_stream_1',
323+
'gen_ai.usage.input_tokens': 10,
324+
'gen_ai.usage.output_tokens': 15,
325+
'gen_ai.usage.total_tokens': 25,
326+
}),
327+
}),
311328
]),
312329
};
313330

@@ -331,6 +348,14 @@ describe('Anthropic integration', () => {
331348
'gen_ai.response.text': 'Hello from stream!',
332349
}),
333350
}),
351+
expect.objectContaining({
352+
description: 'messages claude-3-haiku-20240307 stream-response',
353+
op: 'gen_ai.messages',
354+
data: expect.objectContaining({
355+
'gen_ai.response.streaming': true,
356+
'gen_ai.response.text': 'Hello from stream!',
357+
}),
358+
}),
334359
]),
335360
};
336361

‎packages/core/src/utils/anthropic-ai/index.ts‎

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,8 @@ function handleStreamingError(error: unknown, span: Span, methodPath: string): n
205205
* Handle streaming cases with common logic
206206
*/
207207
function handleStreamingRequest<T extends unknown[], R>(
208-
originalMethod: (...args: T) => Promise<R>,
209-
target: (...args: T) => Promise<R>,
208+
originalMethod: (...args: T) => R|Promise<R>,
209+
target: (...args: T) => R|Promise<R>,
210210
context: unknown,
211211
args: T,
212212
requestAttributes: Record<string, unknown>,
@@ -215,15 +215,17 @@ function handleStreamingRequest<T extends unknown[], R>(
215215
params: Record<string, unknown> | undefined,
216216
options: AnthropicAiOptions,
217217
isStreamRequested: boolean,
218-
): Promise<R> {
218+
isStreamingMethod: boolean,
219+
): R | Promise<R> {
219220
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
220221
const spanConfig = {
221222
name: `${operationName} ${model} stream-response`,
222223
op: getSpanOperation(methodPath),
223224
attributes: requestAttributes as Record<string, SpanAttributeValue>,
224225
};
225226

226-
if (isStreamRequested) {
227+
// messages.stream() always returns a sync MessageStream, even with stream: true param
228+
if (isStreamRequested && !isStreamingMethod) {
227229
return startSpanManual(spanConfig, async span => {
228230
try {
229231
if (options.recordInputs && params) {
@@ -260,13 +262,13 @@ function handleStreamingRequest<T extends unknown[], R>(
260262
* @see https://docs.sentry.io/platforms/javascript/guides/node/tracing/instrumentation/ai-agents-module/#manual-instrumentation
261263
*/
262264
function instrumentMethod<T extends unknown[], R>(
263-
originalMethod: (...args: T) => Promise<R>,
265+
originalMethod: (...args: T) => R|Promise<R>,
264266
methodPath: AnthropicAiInstrumentedMethod,
265267
context: unknown,
266268
options: AnthropicAiOptions,
267-
): (...args: T) => Promise<R> {
269+
): (...args: T) => R|Promise<R> {
268270
return new Proxy(originalMethod, {
269-
apply(target, thisArg, args: T): Promise<R> {
271+
apply(target, thisArg, args: T): R|Promise<R> {
270272
const requestAttributes = extractRequestAttributes(args, methodPath);
271273
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
272274
const operationName = getFinalOperationName(methodPath);
@@ -287,6 +289,7 @@ function instrumentMethod<T extends unknown[], R>(
287289
params,
288290
options,
289291
isStreamRequested,
292+
isStreamingMethod,
290293
);
291294
}
292295

@@ -320,7 +323,7 @@ function instrumentMethod<T extends unknown[], R>(
320323
},
321324
);
322325
},
323-
}) as (...args: T) => Promise<R>;
326+
}) as (...args: T) => R|Promise<R>;
324327
}
325328

326329
/**
@@ -333,7 +336,7 @@ function createDeepProxy<T extends object>(target: T, currentPath = '', options:
333336
const methodPath = buildMethodPath(currentPath, String(prop));
334337

335338
if (typeof value === 'function' && shouldInstrument(methodPath)) {
336-
return instrumentMethod(value as (...args: unknown[]) => Promise<unknown>, methodPath, obj, options);
339+
return instrumentMethod(value as (...args: unknown[]) => unknown|Promise<unknown>, methodPath, obj, options);
337340
}
338341

339342
if (typeof value === 'function') {

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /