Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit eff3e19

Browse files
committed
core, graph, store: Asyncify SubgraphStore.assignment_status
1 parent a5ef1bd commit eff3e19

File tree

4 files changed

+16
-9
lines changed

4 files changed

+16
-9
lines changed

‎core/src/subgraph/registrar.rs‎

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ where
121121
self.start_assigned_subgraphs().await?;
122122

123123
// Spawn a task to handle assignment events.
124-
// Blocking due to store interactions. Won't be blocking after #905.
125124
let assignment_event_stream_cancel_handle =
126125
self.assignment_event_stream_cancel_guard.handle();
127126

@@ -147,14 +146,17 @@ where
147146
}
148147
});
149148

150-
graph::spawn_blocking(fut);
149+
graph::spawn(fut);
151150
Ok(())
152151
}
153152

154153
/// Maps an assignment change to an assignment event by checking the
155154
/// current state in the database, ignoring changes that do not affect
156155
/// this node or do not require anything to change.
157-
fn map_assignment(&self, change: AssignmentChange) -> Result<Option<AssignmentEvent>, Error> {
156+
async fn map_assignment(
157+
&self,
158+
change: AssignmentChange,
159+
) -> Result<Option<AssignmentEvent>, Error> {
158160
let (deployment, operation) = change.into_parts();
159161

160162
trace!(self.logger, "Received assignment change";
@@ -167,6 +169,7 @@ where
167169
let assigned = self
168170
.store
169171
.assignment_status(&deployment)
172+
.await
170173
.map_err(|e| anyhow!("Failed to get subgraph assignment entity: {}", e))?;
171174

172175
let logger = self.logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => self.node_id.to_string()));
@@ -223,7 +226,7 @@ where
223226
let this = this.cheap_clone();
224227

225228
async move {
226-
match this.map_assignment(change) {
229+
match this.map_assignment(change).await {
227230
Ok(Some(event)) => stream::once(futures03::future::ok(event)).boxed(),
228231
Ok(None) => stream::empty().boxed(),
229232
Err(e) => stream::once(futures03::future::err(e)).boxed(),

‎graph/src/components/store/traits.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ pub trait SubgraphStore: Send + Sync + 'static {
121121
/// the subgraph is assigned to, and `is_paused` is true if the
122122
/// subgraph is paused.
123123
/// Returns None if the deployment does not exist.
124-
fn assignment_status(
124+
asyncfn assignment_status(
125125
&self,
126126
deployment: &DeploymentLocator,
127127
) -> Result<Option<(NodeId, bool)>, StoreError>;

‎store/postgres/src/primary.rs‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,8 +2088,12 @@ impl Mirror {
20882088
/// the subgraph is assigned to, and `is_paused` is true if the
20892089
/// subgraph is paused.
20902090
/// Returns None if the deployment does not exist.
2091-
pub fn assignment_status(&self, site: &Site) -> Result<Option<(NodeId, bool)>, StoreError> {
2092-
self.read(|conn| queries::assignment_status(conn, site))
2091+
pub async fn assignment_status(
2092+
&self,
2093+
site: Arc<Site>,
2094+
) -> Result<Option<(NodeId, bool)>, StoreError> {
2095+
self.read_async(move |conn| queries::assignment_status(conn, &site))
2096+
.await
20932097
}
20942098

20952099
pub fn find_active_site(&self, subgraph: &DeploymentHash) -> Result<Option<Site>, StoreError> {

‎store/postgres/src/subgraph_store.rs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,12 +1455,12 @@ impl SubgraphStoreTrait for SubgraphStore {
14551455
/// the subgraph is assigned to, and `is_paused` is true if the
14561456
/// subgraph is paused.
14571457
/// Returns None if the deployment does not exist.
1458-
fn assignment_status(
1458+
asyncfn assignment_status(
14591459
&self,
14601460
deployment: &DeploymentLocator,
14611461
) -> Result<Option<(NodeId, bool)>, StoreError> {
14621462
let site = self.find_site(deployment.id.into())?;
1463-
self.mirror.assignment_status(site.as_ref())
1463+
self.mirror.assignment_status(site).await
14641464
}
14651465

14661466
fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError> {

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /