Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit a87be5d

Browse files
committed
refactor: extract incremental graph to separate file
Replicates graphql/graphql-js@cb43c83
1 parent 8d6fd0b commit a87be5d

File tree

3 files changed

+270
-175
lines changed

3 files changed

+270
-175
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
"""Incremental Graphs."""
2+
3+
from __future__ import annotations
4+
5+
from asyncio import Event, Task, ensure_future
6+
from typing import (
7+
TYPE_CHECKING,
8+
Any,
9+
Awaitable,
10+
Iterator,
11+
Sequence,
12+
cast,
13+
)
14+
15+
from graphql.execution.types import (
16+
is_deferred_fragment_record,
17+
is_deferred_grouped_field_set_record,
18+
)
19+
20+
from ..pyutils.is_awaitable import is_awaitable
21+
22+
if TYPE_CHECKING:
23+
from graphql.execution.types import (
24+
DeferredFragmentRecord,
25+
DeferredGroupedFieldSetResult,
26+
IncrementalDataRecord,
27+
IncrementalDataRecordResult,
28+
ReconcilableDeferredGroupedFieldSetResult,
29+
StreamItemsRecord,
30+
StreamItemsResult,
31+
SubsequentResultRecord,
32+
)
33+
34+
__all__ = ["IncrementalGraph"]
35+
36+
37+
class IncrementalGraph:
38+
"""Helper class to execute incremental Graphs.
39+
40+
For internal use only.
41+
"""
42+
43+
_pending: dict[SubsequentResultRecord, None]
44+
_new_pending: dict[SubsequentResultRecord, None]
45+
_completed_result_queue: list[IncrementalDataRecordResult]
46+
47+
_resolve: Event | None
48+
_tasks: set[Task[Any]]
49+
50+
def __init__(self) -> None:
51+
"""Initialize the IncrementalGraph."""
52+
self._pending = {}
53+
self._new_pending = {}
54+
self._completed_result_queue = []
55+
self._resolve = None # lazy initialization
56+
self._tasks = set()
57+
58+
def add_incremental_data_records(
59+
self, incremental_data_records: Sequence[IncrementalDataRecord]
60+
) -> None:
61+
"""Add incremental data records."""
62+
for incremental_data_record in incremental_data_records:
63+
if is_deferred_grouped_field_set_record(incremental_data_record):
64+
for (
65+
deferred_fragment_record
66+
) in incremental_data_record.deferred_fragment_records:
67+
deferred_fragment_record.expected_reconcilable_results += 1
68+
self._add_deferred_fragment_record(deferred_fragment_record)
69+
70+
deferred_result = incremental_data_record.result
71+
if is_awaitable(deferred_result):
72+
73+
async def enqueue_deferred(
74+
deferred_result: Awaitable[DeferredGroupedFieldSetResult],
75+
) -> None:
76+
self._enqueue_completed_deferred_grouped_field_set(
77+
await deferred_result
78+
)
79+
80+
self._add_task(enqueue_deferred(deferred_result))
81+
else:
82+
self._enqueue_completed_deferred_grouped_field_set(
83+
deferred_result, # type: ignore
84+
)
85+
continue
86+
87+
incremental_data_record = cast("StreamItemsRecord", incremental_data_record)
88+
stream_record = incremental_data_record.stream_record
89+
if stream_record.id is None:
90+
self._new_pending[stream_record] = None
91+
92+
stream_result = incremental_data_record.result
93+
if is_awaitable(stream_result):
94+
95+
async def enqueue_stream(
96+
stream_result: Awaitable[StreamItemsResult],
97+
) -> None:
98+
self._enqueue_completed_stream_items(await stream_result)
99+
100+
self._add_task(enqueue_stream(stream_result))
101+
else:
102+
self._enqueue_completed_stream_items(stream_result) # type: ignore
103+
104+
def get_new_pending(self) -> list[SubsequentResultRecord]:
105+
"""Get new pending subsequent result records."""
106+
maybe_empty_new_pending = self._new_pending
107+
pending = self._pending
108+
add_non_empty_new_pending = self._add_non_empty_new_pending
109+
new_pending: list[SubsequentResultRecord] = []
110+
append_new_pending = new_pending.append
111+
for node in maybe_empty_new_pending:
112+
if is_deferred_fragment_record(node):
113+
if node.expected_reconcilable_results:
114+
pending[node] = None
115+
append_new_pending(node)
116+
continue
117+
for child in node.children:
118+
add_non_empty_new_pending(child, new_pending)
119+
else:
120+
pending[node] = None
121+
append_new_pending(node)
122+
self._new_pending.clear()
123+
return new_pending
124+
125+
def completed_results(self) -> Iterator[IncrementalDataRecordResult]:
126+
"""Yield completed incremental data record results."""
127+
queue = self._completed_result_queue
128+
while queue:
129+
completed_result = queue.pop(0)
130+
yield completed_result
131+
132+
def has_next(self) -> bool:
133+
"""Check if there are more results to process."""
134+
return bool(self._pending)
135+
136+
def complete_deferred_fragment(
137+
self,
138+
deferred_fragment_record: DeferredFragmentRecord,
139+
) -> list[ReconcilableDeferredGroupedFieldSetResult] | None:
140+
"""Complete a deferred fragment."""
141+
reconcilable_results = deferred_fragment_record.reconcilable_results
142+
if deferred_fragment_record.expected_reconcilable_results != len(
143+
reconcilable_results
144+
):
145+
return None
146+
del self._pending[deferred_fragment_record]
147+
new_pending = self._new_pending
148+
extend = self._completed_result_queue.extend
149+
for child in deferred_fragment_record.children:
150+
new_pending[child] = None
151+
extend(child.results)
152+
return reconcilable_results
153+
154+
def remove_subsequent_result_record(
155+
self,
156+
subsequent_result_record: SubsequentResultRecord,
157+
) -> None:
158+
"""Remove a subsequent result record as no longer pending."""
159+
del self._pending[subsequent_result_record]
160+
161+
def _add_deferred_fragment_record(
162+
self, deferred_fragment_record: DeferredFragmentRecord
163+
) -> None:
164+
"""Add deferred fragment record."""
165+
parent = deferred_fragment_record.parent
166+
if parent is None:
167+
if deferred_fragment_record.id is not None:
168+
return
169+
self._new_pending[deferred_fragment_record] = None
170+
return
171+
if deferred_fragment_record in parent.children:
172+
return
173+
parent.children[deferred_fragment_record] = None
174+
self._add_deferred_fragment_record(parent)
175+
176+
def _add_non_empty_new_pending(
177+
self,
178+
deferred_fragment_record: DeferredFragmentRecord,
179+
new_pending: list[SubsequentResultRecord],
180+
) -> None:
181+
"""Add non-empty new pending deferred fragment record."""
182+
if deferred_fragment_record.expected_reconcilable_results:
183+
self._pending[deferred_fragment_record] = None
184+
new_pending.append(deferred_fragment_record)
185+
return
186+
add = self._add_non_empty_new_pending # pragma: no cover
187+
for child in deferred_fragment_record.children: # pragma: no cover
188+
add(child, new_pending)
189+
190+
def _enqueue_completed_deferred_grouped_field_set(
191+
self, result: DeferredGroupedFieldSetResult
192+
) -> None:
193+
"""Enqueue completed deferred grouped field set result."""
194+
has_pending_parent = False
195+
for deferred_fragment_record in result.deferred_fragment_records:
196+
if deferred_fragment_record.id is not None:
197+
has_pending_parent = True
198+
deferred_fragment_record.results.append(result)
199+
append = self._completed_result_queue.append
200+
if has_pending_parent:
201+
append(result)
202+
self._trigger()
203+
204+
def _enqueue_completed_stream_items(self, result: StreamItemsResult) -> None:
205+
"""Enqueue completed stream items result."""
206+
self._completed_result_queue.append(result)
207+
self._trigger()
208+
209+
def _trigger(self) -> None:
210+
"""Trigger the resolve event."""
211+
resolve = self._resolve
212+
if resolve is not None:
213+
resolve.set()
214+
self._resolve = Event()
215+
216+
async def new_completed_result_available(self) -> None:
217+
"""Get an awaitable that resolves when a new completed result is available."""
218+
resolve = self._resolve
219+
if resolve is None:
220+
self._resolve = resolve = Event()
221+
await resolve.wait()
222+
223+
def _add_task(self, awaitable: Awaitable[Any]) -> None:
224+
"""Add the given task to the tasks set for later execution."""
225+
tasks = self._tasks
226+
task = ensure_future(awaitable)
227+
tasks.add(task)
228+
task.add_done_callback(tasks.discard)

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /