- 
 
 - 
  Notifications
 
You must be signed in to change notification settings  - Fork 4.7k
 
InstantIndexedDataWriter. #8265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
 
Reviewer's GuideThis PR refactors InstantIndexedDataWriter to use async-native locking, restructures the periodic indexing task for better concurrency and error isolation, enhances trait signatures for robust error handling, and cleans up public APIs and logging for clarity and performance. Sequence diagram for periodic indexing task in InstantIndexedDataWritersequenceDiagram
 participant Provider as InstantIndexedDataWriter
 participant CollabMap as collab_by_object
 participant Consumers as consumers
 participant Collab as CollabIndexedData
 loop every 30 seconds
 Provider->>CollabMap: read object snapshots
 Provider->>Consumers: read consumer list
 alt for each object
 CollabMap->>Collab: upgrade Weak reference
 alt Collab is alive
 Collab->>Provider: get_unindexed_data(collab_type)
 Provider->>Consumers: consume_collab(workspace_id, data, object_id, collab_type)
 Consumers-->>Provider: Result<bool, FlowyError>
 else Collab dropped
 Provider->>CollabMap: mark for removal
 end
 end
 alt stale entries exist
 Provider->>CollabMap: remove stale entries
 end
 end
 Class diagram for refactored InstantIndexedDataWriter and related traitsclassDiagram
 class InstantIndexedDataWriter {
 +collab_by_object: Arc<RwLock<HashMap<String, WriteObject>>>
 +consumers: Arc<RwLock<Vec<Box<dyn InstantIndexedDataConsumer>>>>
 +new() InstantIndexedDataWriter
 +num_consumers() usize
 +clear_consumers()
 +register_consumer(consumer: Box<dyn InstantIndexedDataConsumer>)
 +spawn_instant_indexed_provider(runtime: &Runtime) FlowyResult<()>
 +support_collab_type(t: &CollabType) bool
 +index_encoded_collab(workspace_id: Uuid, object_id: Uuid, data: EncodedCollab, collab_type: CollabType) FlowyResult<()>
 +index_unindexed_collab(data: UnindexedCollab) FlowyResult<()>
 +queue_collab_embed(collab_object: CollabObject, collab: Weak<dyn CollabIndexedData>)
 }
 class WriteObject {
 +collab_object: CollabObject
 +collab: Weak<dyn CollabIndexedData>
 }
 class CollabIndexedData {
 <<interface>>
 +get_unindexed_data(collab_type: &CollabType) FlowyResult<Option<UnindexedData>>
 }
 class InstantIndexedDataConsumer {
 <<interface>>
 +consumer_id() String
 +consume_collab(workspace_id: &Uuid, data: Option<UnindexedData>, object_id: &Uuid, collab_type: CollabType) Result<bool, FlowyError>
 +did_delete_collab(workspace_id: &Uuid, object_id: &Uuid) Result<(), FlowyError>
 }
 InstantIndexedDataWriter --> WriteObject
 WriteObject --> CollabObject
 WriteObject --> "collab: Weak" CollabIndexedData
 InstantIndexedDataWriter --> "consumers: Vec<Box>" InstantIndexedDataConsumer
 CollabIndexedData <|.. CollabRwLock
 InstantIndexedDataConsumer <|.. SomeConsumerImpl
 File-Level Changes
 Tips and commandsInteracting with Sourcery
 Customizing Your ExperienceAccess your dashboard to: 
 Getting Help
  | 
 
updated
 
 
 
 CLAassistant
 
 
 
 commented
 Oct 9, 2025 
 
 
 
CLA assistant check 
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review: ## Individual Comments ### Comment 1 <location> `frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs:260` </location> <code_context> - None => Err(FlowyError::internal().with_context("Failed to create unindexed collab")), - Some(data) => { - self.index_unindexed_collab(data).await?; + pub async fn index_unindexed_collab(&self, data: UnindexedCollab) -> FlowyResult<()> { + let consumers_guard = self.consumers.read().await; + for consumer in consumers_guard.iter() { </code_context> <issue_to_address> **nitpick:** Error handling for consumer failures is inconsistent with periodic provider. Use the same log level for consumer errors as in the periodic provider to maintain consistency. </issue_to_address> ### Comment 2 <location> `frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs:69` </location> <code_context> - guard.push(consumer); - } - - pub async fn spawn_instant_indexed_provider(&self, runtime: &Runtime) -> FlowyResult<()> { - let weak_collab_by_object = Arc::downgrade(&self.collab_by_object); - let consumers_weak = Arc::downgrade(&self.consumers); </code_context> <issue_to_address> **issue (complexity):** Consider extracting the snapshot and cleanup logic into helper functions to simplify and clarify the core spawn loop. Consider moving the in‐loop "snapshot" and "cleanup" bits into small helpers to keep the spawn loop focused. For example, after you upgrade the Arcs you could: ```rust // new helper at impl InstantIndexedDataWriter async fn snapshot_entries( collab_by_object: &RwLock<HashMap<String, WriteObject>>, ) -> Vec<(String, CollabObject, Weak<dyn CollabIndexedData>)> { let guard = collab_by_object.read().await; guard .iter() .map(|(id, wo)| (id.clone(), wo.collab_object.clone(), wo.collab.clone())) .collect() } async fn remove_stale( collab_by_object: &RwLock<HashMap<String, WriteObject>>, to_remove: &[String], ) { if to_remove.is_empty() { return; } let mut guard = collab_by_object.write().await; guard.retain(|k, _| !to_remove.contains(k)); } ``` Then your spawn loop reduces to: ```rust // inside runtime.spawn(async move { ... }) let object_snapshots = Self::snapshot_entries(&collab_by_object).await; let consumers = consumers.read().await; if consumers.is_empty() { continue; } let mut to_remove = Vec::new(); for (id, collab_object, weak_rc) in object_snapshots { if let Some(rc) = weak_rc.upgrade() { if let Err(e) = Self::process_single_collab(id.clone(), collab_object, rc, &consumers).await { error!("[Indexing] process failed {}: {}", id, e); } } else { to_remove.push(id); } } Self::remove_stale(&collab_by_object, &to_remove).await; ``` This trims down the body of your core loop by ~40 lines, isolates snapshot/cleanup logic for easier testing, and keeps all functionality intact. </issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Error handling for consumer failures is inconsistent with periodic provider.
Use the same log level for consumer errors as in the periodic provider to maintain consistency.
Uh oh!
There was an error while loading. Please reload this page.
Feature Preview
PR Checklist
Summary by Sourcery
Introduce async-native locking and trait abstractions to improve the instant indexed data pipeline and refactor the periodic provider loop for clarity, robustness, and observability.
New Features:
Enhancements: