Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 0e19673

Browse files
authored
ref(core): Improve promise buffer (#17788)
Extracted this out of #17782, this improved our promise buffer class a bit: 1. Remove the code path without a `limit`, as we never use this (there is a default limit used). There is also really no reason to use this without a limit, the limit is the whole purpose of this class. 2. Use a `Set` instead of an array for the internal buffer handling, this should slightly streamline stuff. 3. For `drain`, we can simplify the implementation without a timeout drastically. We can use `Promise.race()` to handle this more gracefully, which should be supported everywhere. 4. Some slight refactorings, actually improving timing semantics slightly.
1 parent 80d5ff2 commit 0e19673

File tree

2 files changed

+180
-70
lines changed

2 files changed

+180
-70
lines changed

‎packages/core/src/utils/promisebuffer.ts‎

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { rejectedSyncPromise, resolvedSyncPromise,SyncPromise } from './syncpromise';
1+
import { rejectedSyncPromise, resolvedSyncPromise } from './syncpromise';
22

33
export interface PromiseBuffer<T> {
44
// exposes the internal array so tests can assert on the state of it.
55
// XXX: this really should not be public api.
6-
$: Array<PromiseLike<T>>;
6+
$: PromiseLike<T>[];
77
add(taskProducer: () => PromiseLike<T>): PromiseLike<T>;
88
drain(timeout?: number): PromiseLike<boolean>;
99
}
@@ -14,11 +14,11 @@ export const SENTRY_BUFFER_FULL_ERROR = Symbol.for('SentryBufferFullError');
1414
* Creates an new PromiseBuffer object with the specified limit
1515
* @param limit max number of promises that can be stored in the buffer
1616
*/
17-
export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
18-
const buffer: Array<PromiseLike<T>> = [];
17+
export function makePromiseBuffer<T>(limit: number=100): PromiseBuffer<T> {
18+
const buffer: Set<PromiseLike<T>> = newSet();
1919

2020
function isReady(): boolean {
21-
return limit===undefined||buffer.length < limit;
21+
return buffer.size < limit;
2222
}
2323

2424
/**
@@ -27,8 +27,8 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
2727
* @param task Can be any PromiseLike<T>
2828
* @returns Removed promise.
2929
*/
30-
function remove(task: PromiseLike<T>): PromiseLike<T|void> {
31-
returnbuffer.splice(buffer.indexOf(task),1)[0]||Promise.resolve(undefined);
30+
function remove(task: PromiseLike<T>): void {
31+
buffer.delete(task);
3232
}
3333

3434
/**
@@ -48,19 +48,11 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
4848

4949
// start the task and add its promise to the queue
5050
const task = taskProducer();
51-
if (buffer.indexOf(task) === -1) {
52-
buffer.push(task);
53-
}
54-
void task
55-
.then(() => remove(task))
56-
// Use `then(null, rejectionHandler)` rather than `catch(rejectionHandler)` so that we can use `PromiseLike`
57-
// rather than `Promise`. `PromiseLike` doesn't have a `.catch` method, making its polyfill smaller. (ES5 didn't
58-
// have promises, so TS has to polyfill when down-compiling.)
59-
.then(null, () =>
60-
remove(task).then(null, () => {
61-
// We have to add another catch here because `remove()` starts a new promise chain.
62-
}),
63-
);
51+
buffer.add(task);
52+
void task.then(
53+
() => remove(task),
54+
() => remove(task),
55+
);
6456
return task;
6557
}
6658

@@ -74,34 +66,28 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
7466
* `false` otherwise
7567
*/
7668
function drain(timeout?: number): PromiseLike<boolean> {
77-
return new SyncPromise<boolean>((resolve, reject) => {
78-
let counter = buffer.length;
69+
if (!buffer.size) {
70+
return resolvedSyncPromise(true);
71+
}
7972

80-
if (!counter) {
81-
return resolve(true);
82-
}
73+
// We want to resolve even if one of the promises rejects
74+
const drainPromise = Promise.allSettled(Array.from(buffer)).then(() => true);
75+
76+
if (!timeout) {
77+
return drainPromise;
78+
}
8379

84-
// wait for `timeout` ms and then resolve to `false` (if not cancelled first)
85-
const capturedSetTimeout = setTimeout(() => {
86-
if (timeout && timeout > 0) {
87-
resolve(false);
88-
}
89-
}, timeout);
80+
const promises = [drainPromise, new Promise<boolean>(resolve => setTimeout(() => resolve(false), timeout))];
9081

91-
// if all promises resolve in time, cancel the timer and resolve to `true`
92-
buffer.forEach(item => {
93-
void resolvedSyncPromise(item).then(() => {
94-
if (!--counter) {
95-
clearTimeout(capturedSetTimeout);
96-
resolve(true);
97-
}
98-
}, reject);
99-
});
100-
});
82+
// Promise.race will resolve to the first promise that resolves or rejects
83+
// So if the drainPromise resolves, the timeout promise will be ignored
84+
return Promise.race(promises);
10185
}
10286

10387
return {
104-
$: buffer,
88+
get $(): PromiseLike<T>[] {
89+
return Array.from(buffer);
90+
},
10591
add,
10692
drain,
10793
};

‎packages/core/test/lib/utils/promisebuffer.test.ts‎

Lines changed: 152 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,163 @@
11
import { describe, expect, test, vi } from 'vitest';
22
import { makePromiseBuffer } from '../../../src/utils/promisebuffer';
3-
import { SyncPromise } from '../../../src/utils/syncpromise';
3+
import { rejectedSyncPromise,resolvedSyncPromise } from '../../../src/utils/syncpromise';
44

55
describe('PromiseBuffer', () => {
66
describe('add()', () => {
7-
test('no limit', () => {
8-
const buffer = makePromiseBuffer();
9-
const p = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve)));
10-
void buffer.add(p);
11-
expect(buffer.$.length).toEqual(1);
7+
test('enforces limit of promises', async () => {
8+
const buffer = makePromiseBuffer(5);
9+
10+
const producer1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
11+
const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
12+
const producer3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
13+
const producer4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
14+
const producer5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
15+
const producer6 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
16+
17+
void buffer.add(producer1);
18+
void buffer.add(producer2);
19+
void buffer.add(producer3);
20+
void buffer.add(producer4);
21+
void buffer.add(producer5);
22+
await expect(buffer.add(producer6)).rejects.toThrowError();
23+
24+
expect(producer1).toHaveBeenCalledTimes(1);
25+
expect(producer2).toHaveBeenCalledTimes(1);
26+
expect(producer3).toHaveBeenCalledTimes(1);
27+
expect(producer4).toHaveBeenCalledTimes(1);
28+
expect(producer5).toHaveBeenCalledTimes(1);
29+
expect(producer6).not.toHaveBeenCalled();
30+
31+
expect(buffer.$.length).toEqual(5);
32+
33+
await buffer.drain();
34+
35+
expect(buffer.$.length).toEqual(0);
36+
37+
expect(producer1).toHaveBeenCalledTimes(1);
38+
expect(producer2).toHaveBeenCalledTimes(1);
39+
expect(producer3).toHaveBeenCalledTimes(1);
40+
expect(producer4).toHaveBeenCalledTimes(1);
41+
expect(producer5).toHaveBeenCalledTimes(1);
42+
expect(producer6).not.toHaveBeenCalled();
43+
});
44+
45+
test('sync promises', async () => {
46+
const buffer = makePromiseBuffer(1);
47+
let task1;
48+
const producer1 = vi.fn(() => {
49+
task1 = resolvedSyncPromise();
50+
return task1;
51+
});
52+
const producer2 = vi.fn(() => resolvedSyncPromise());
53+
expect(buffer.add(producer1)).toEqual(task1);
54+
const add2 = buffer.add(producer2);
55+
56+
// This is immediately executed and removed again from the buffer
57+
expect(buffer.$.length).toEqual(0);
58+
59+
await expect(add2).resolves.toBeUndefined();
60+
61+
expect(producer1).toHaveBeenCalled();
62+
expect(producer2).toHaveBeenCalled();
1263
});
1364

14-
test('with limit', () => {
65+
test('async promises',async () => {
1566
const buffer = makePromiseBuffer(1);
1667
let task1;
1768
const producer1 = vi.fn(() => {
18-
task1 = new SyncPromise(resolve => setTimeout(resolve));
69+
task1 = new Promise(resolve => setTimeout(resolve,1));
1970
return task1;
2071
});
21-
const producer2 = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve)));
72+
const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve,1)));
2273
expect(buffer.add(producer1)).toEqual(task1);
23-
void expect(buffer.add(producer2)).rejects.toThrowError();
74+
const add2 = buffer.add(producer2);
75+
2476
expect(buffer.$.length).toEqual(1);
77+
78+
await expect(add2).rejects.toThrowError();
79+
2580
expect(producer1).toHaveBeenCalled();
2681
expect(producer2).not.toHaveBeenCalled();
2782
});
83+
84+
test('handles multiple equivalent promises', async () => {
85+
const buffer = makePromiseBuffer(10);
86+
87+
const promise = new Promise(resolve => setTimeout(resolve, 1));
88+
89+
const producer = vi.fn(() => promise);
90+
const producer2 = vi.fn(() => promise);
91+
92+
expect(buffer.add(producer)).toEqual(promise);
93+
expect(buffer.add(producer2)).toEqual(promise);
94+
95+
expect(buffer.$.length).toEqual(1);
96+
97+
expect(producer).toHaveBeenCalled();
98+
expect(producer2).toHaveBeenCalled();
99+
100+
await buffer.drain();
101+
102+
expect(buffer.$.length).toEqual(0);
103+
});
28104
});
29105

30106
describe('drain()', () => {
31-
test('without timeout', async () => {
107+
test('drains all promises without timeout', async () => {
32108
const buffer = makePromiseBuffer();
33-
for (let i = 0; i < 5; i++) {
34-
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve)));
35-
}
109+
110+
const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
111+
const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
112+
const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
113+
const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
114+
const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
115+
116+
[p1, p2, p3, p4, p5].forEach(p => {
117+
void buffer.add(p);
118+
});
119+
36120
expect(buffer.$.length).toEqual(5);
37121
const result = await buffer.drain();
38122
expect(result).toEqual(true);
39123
expect(buffer.$.length).toEqual(0);
124+
125+
expect(p1).toHaveBeenCalled();
126+
expect(p2).toHaveBeenCalled();
127+
expect(p3).toHaveBeenCalled();
128+
expect(p4).toHaveBeenCalled();
129+
expect(p5).toHaveBeenCalled();
40130
});
41131

42-
test('with timeout', async () => {
132+
test('drains all promises with timeout', async () => {
43133
const buffer = makePromiseBuffer();
44-
for (let i = 0; i < 5; i++) {
45-
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve, 100)));
46-
}
134+
135+
const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 2)));
136+
const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 4)));
137+
const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 6)));
138+
const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 8)));
139+
const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 10)));
140+
141+
[p1, p2, p3, p4, p5].forEach(p => {
142+
void buffer.add(p);
143+
});
144+
145+
expect(p1).toHaveBeenCalled();
146+
expect(p2).toHaveBeenCalled();
147+
expect(p3).toHaveBeenCalled();
148+
expect(p4).toHaveBeenCalled();
149+
expect(p5).toHaveBeenCalled();
150+
47151
expect(buffer.$.length).toEqual(5);
48-
const result = await buffer.drain(50);
152+
const result = await buffer.drain(8);
49153
expect(result).toEqual(false);
154+
// p5 is still in the buffer
155+
expect(buffer.$.length).toEqual(1);
156+
157+
// Now drain final item
158+
const result2 = await buffer.drain();
159+
expect(result2).toEqual(true);
160+
expect(buffer.$.length).toEqual(0);
50161
});
51162

52163
test('on empty buffer', async () => {
@@ -56,11 +167,26 @@ describe('PromiseBuffer', () => {
56167
expect(result).toEqual(true);
57168
expect(buffer.$.length).toEqual(0);
58169
});
170+
171+
test('resolves even if one of the promises rejects', async () => {
172+
const buffer = makePromiseBuffer();
173+
const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
174+
const p2 = vi.fn(() => new Promise((_, reject) => setTimeout(() => reject(new Error('whoops')), 1)));
175+
void buffer.add(p1);
176+
void buffer.add(p2);
177+
178+
const result = await buffer.drain();
179+
expect(result).toEqual(true);
180+
expect(buffer.$.length).toEqual(0);
181+
182+
expect(p1).toHaveBeenCalled();
183+
expect(p2).toHaveBeenCalled();
184+
});
59185
});
60186

61187
test('resolved promises should not show up in buffer length', async () => {
62188
const buffer = makePromiseBuffer();
63-
const producer = () => new SyncPromise(resolve => setTimeout(resolve));
189+
const producer = () => new Promise(resolve => setTimeout(resolve,1));
64190
const task = buffer.add(producer);
65191
expect(buffer.$.length).toEqual(1);
66192
await task;
@@ -69,20 +195,18 @@ describe('PromiseBuffer', () => {
69195

70196
test('rejected promises should not show up in buffer length', async () => {
71197
const buffer = makePromiseBuffer();
72-
const producer = () => new SyncPromise((_, reject) => setTimeout(reject));
198+
const error = new Error('whoops');
199+
const producer = () => new Promise((_, reject) => setTimeout(() => reject(error), 1));
73200
const task = buffer.add(producer);
74201
expect(buffer.$.length).toEqual(1);
75-
try {
76-
await task;
77-
} catch {
78-
// no-empty
79-
}
202+
203+
await expect(task).rejects.toThrow(error);
80204
expect(buffer.$.length).toEqual(0);
81205
});
82206

83207
test('resolved task should give an access to the return value', async () => {
84208
const buffer = makePromiseBuffer<string>();
85-
const producer = () => newSyncPromise<string>(resolve=>setTimeout(()=>resolve('test')));
209+
const producer = () => resolvedSyncPromise('test');
86210
const task = buffer.add(producer);
87211
const result = await task;
88212
expect(result).toEqual('test');
@@ -91,7 +215,7 @@ describe('PromiseBuffer', () => {
91215
test('rejected task should give an access to the return value', async () => {
92216
expect.assertions(1);
93217
const buffer = makePromiseBuffer<string>();
94-
const producer = () => newSyncPromise<string>((_,reject)=>setTimeout(()=>reject(new Error('whoops'))));
218+
const producer = () => rejectedSyncPromise(new Error('whoops'));
95219
const task = buffer.add(producer);
96220
try {
97221
await task;

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /