Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

InstantIndexedDataWriter. #8265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Aathish101 wants to merge 1 commit into AppFlowy-IO:main
base: main
Choose a base branch
Loading
from Aathish101:patch-1
Open

Conversation

@Aathish101
Copy link

@Aathish101 Aathish101 commented Oct 9, 2025
edited by sourcery-ai bot
Loading

Feature Preview


PR Checklist

  • My code adheres to AppFlowy's Conventions
  • I've listed at least one issue that this PR fixes in the description above.
  • I've added a test(s) to validate changes in this PR, or this PR only contains semantic changes.
  • All existing tests are passing.

Summary by Sourcery

Introduce async-native locking and trait abstractions to improve the instant indexed data pipeline and refactor the periodic provider loop for clarity, robustness, and observability.

New Features:

  • Define CollabIndexedData trait for asynchronous retrieval of unindexed data with error handling.
  • Introduce InstantIndexedDataConsumer trait to formalize the consumer interface for indexing collab data.

Enhancements:

  • Refactor InstantIndexedDataWriter to use tokio::sync::RwLock and simplify public API methods.
  • Extract single-collab processing into its own async function and snapshot entries to release locks sooner.
  • Enhance the spawn_instant_indexed_provider loop with consumer presence checks, stale entry cleanup, and timing instrumentation.
  • Simplify index_encoded_collab and index_unindexed_collab methods with streamlined error contexts and logging.

Copy link
Contributor

sourcery-ai bot commented Oct 9, 2025
edited
Loading

Reviewer's Guide

This PR refactors InstantIndexedDataWriter to use async-native locking, restructures the periodic indexing task for better concurrency and error isolation, enhances trait signatures for robust error handling, and cleans up public APIs and logging for clarity and performance.

Sequence diagram for periodic indexing task in InstantIndexedDataWriter

sequenceDiagram
 participant Provider as InstantIndexedDataWriter
 participant CollabMap as collab_by_object
 participant Consumers as consumers
 participant Collab as CollabIndexedData
 loop every 30 seconds
 Provider->>CollabMap: read object snapshots
 Provider->>Consumers: read consumer list
 alt for each object
 CollabMap->>Collab: upgrade Weak reference
 alt Collab is alive
 Collab->>Provider: get_unindexed_data(collab_type)
 Provider->>Consumers: consume_collab(workspace_id, data, object_id, collab_type)
 Consumers-->>Provider: Result<bool, FlowyError>
 else Collab dropped
 Provider->>CollabMap: mark for removal
 end
 end
 alt stale entries exist
 Provider->>CollabMap: remove stale entries
 end
 end
Loading

Class diagram for refactored InstantIndexedDataWriter and related traits

classDiagram
 class InstantIndexedDataWriter {
 +collab_by_object: Arc<RwLock<HashMap<String, WriteObject>>>
 +consumers: Arc<RwLock<Vec<Box<dyn InstantIndexedDataConsumer>>>>
 +new() InstantIndexedDataWriter
 +num_consumers() usize
 +clear_consumers()
 +register_consumer(consumer: Box<dyn InstantIndexedDataConsumer>)
 +spawn_instant_indexed_provider(runtime: &Runtime) FlowyResult<()>
 +support_collab_type(t: &CollabType) bool
 +index_encoded_collab(workspace_id: Uuid, object_id: Uuid, data: EncodedCollab, collab_type: CollabType) FlowyResult<()>
 +index_unindexed_collab(data: UnindexedCollab) FlowyResult<()>
 +queue_collab_embed(collab_object: CollabObject, collab: Weak<dyn CollabIndexedData>)
 }
 class WriteObject {
 +collab_object: CollabObject
 +collab: Weak<dyn CollabIndexedData>
 }
 class CollabIndexedData {
 <<interface>>
 +get_unindexed_data(collab_type: &CollabType) FlowyResult<Option<UnindexedData>>
 }
 class InstantIndexedDataConsumer {
 <<interface>>
 +consumer_id() String
 +consume_collab(workspace_id: &Uuid, data: Option<UnindexedData>, object_id: &Uuid, collab_type: CollabType) Result<bool, FlowyError>
 +did_delete_collab(workspace_id: &Uuid, object_id: &Uuid) Result<(), FlowyError>
 }
 InstantIndexedDataWriter --> WriteObject
 WriteObject --> CollabObject
 WriteObject --> "collab: Weak" CollabIndexedData
 InstantIndexedDataWriter --> "consumers: Vec<Box>" InstantIndexedDataConsumer
 CollabIndexedData <|.. CollabRwLock
 InstantIndexedDataConsumer <|.. SomeConsumerImpl
Loading

File-Level Changes

Change Details Files
Adopt tokio::sync::RwLock and async-native patterns
  • Replaced std::sync locks with tokio::sync::RwLock inside Arc containers
  • Updated struct fields and constructors (new/default) to initialize with async locks
  • Converted sync read/write calls to async await syntax
frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs
Refactor spawn_instant_indexed_provider into a clean, nonblocking workflow
  • Moved ticker setup to skip initial tick and measure execution duration
  • Snapshot collab entries under a read lock then release it before processing
  • Extracted single-collab processing into a new async helper function
  • Batch removal of stale entries under a write lock at end of each tick
frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs
Enhance CollabIndexedData trait for error-rich data retrieval
  • Changed get_unindexed_data return type to FlowyResult<Option>
  • Implemented the trait on CollabRwLock using async read guard and ? operator
  • Added context to FlowyError on UUID parsing failures
frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs
Improve consumer loop and logging behavior
  • Early skip when no consumers are registered with a trace log
  • Use warn! for per-consumer failures to continue processing others
  • Log consumer successes with trace and failures without breaking the loop
frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs
Clean up public API methods and immediate indexing logic
  • Streamlined register_consumer, clear_consumers, num_consumers implementations
  • Unified index_encoded_collab to build UnindexedCollab then call index_unindexed_collab
  • Removed redundant match blocks and simplified error logging
frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Author

updated

Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs:260` </location>
<code_context>
-  None => Err(FlowyError::internal().with_context("Failed to create unindexed collab")),
-  Some(data) => {
-  self.index_unindexed_collab(data).await?;
+ pub async fn index_unindexed_collab(&self, data: UnindexedCollab) -> FlowyResult<()> {
+  let consumers_guard = self.consumers.read().await;
+  for consumer in consumers_guard.iter() {
</code_context>
<issue_to_address>
**nitpick:** Error handling for consumer failures is inconsistent with periodic provider.
Use the same log level for consumer errors as in the periodic provider to maintain consistency.
</issue_to_address>
### Comment 2
<location> `frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs:69` </location>
<code_context>
- guard.push(consumer);
- }
-
- pub async fn spawn_instant_indexed_provider(&self, runtime: &Runtime) -> FlowyResult<()> {
- let weak_collab_by_object = Arc::downgrade(&self.collab_by_object);
- let consumers_weak = Arc::downgrade(&self.consumers);
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the snapshot and cleanup logic into helper functions to simplify and clarify the core spawn loop.
Consider moving the in‐loop "snapshot" and "cleanup" bits into small helpers to keep the spawn loop focused. For example, after you upgrade the Arcs you could:
```rust
// new helper at impl InstantIndexedDataWriter
async fn snapshot_entries(
 collab_by_object: &RwLock<HashMap<String, WriteObject>>,
) -> Vec<(String, CollabObject, Weak<dyn CollabIndexedData>)> {
 let guard = collab_by_object.read().await;
 guard
 .iter()
 .map(|(id, wo)| (id.clone(), wo.collab_object.clone(), wo.collab.clone()))
 .collect()
}
async fn remove_stale(
 collab_by_object: &RwLock<HashMap<String, WriteObject>>,
 to_remove: &[String],
) {
 if to_remove.is_empty() {
 return;
 }
 let mut guard = collab_by_object.write().await;
 guard.retain(|k, _| !to_remove.contains(k));
}
```
Then your spawn loop reduces to:
```rust
// inside runtime.spawn(async move { ... })
let object_snapshots = Self::snapshot_entries(&collab_by_object).await;
let consumers = consumers.read().await;
if consumers.is_empty() { continue; }
let mut to_remove = Vec::new();
for (id, collab_object, weak_rc) in object_snapshots {
 if let Some(rc) = weak_rc.upgrade() {
 if let Err(e) =
 Self::process_single_collab(id.clone(), collab_object, rc, &consumers).await
 {
 error!("[Indexing] process failed {}: {}", id, e);
 }
 } else {
 to_remove.push(id);
 }
}
Self::remove_stale(&collab_by_object, &to_remove).await;
```
This trims down the body of your core loop by ~40 lines, isolates snapshot/cleanup logic for easier testing, and keeps all functionality intact.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

sourcery-ai[bot] reacted with thumbs up emoji sourcery-ai[bot] reacted with thumbs down emoji
None => Err(FlowyError::internal().with_context("Failed to create unindexed collab")),
Some(data) => {
self.index_unindexed_collab(data).await?;
pub async fn index_unindexed_collab(&self, data: UnindexedCollab) -> FlowyResult<()> {
Copy link
Contributor

@sourcery-ai sourcery-ai bot Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Error handling for consumer failures is inconsistent with periodic provider.

Use the same log level for consumer errors as in the periodic provider to maintain consistency.

sourcery-ai[bot] reacted with thumbs up emoji sourcery-ai[bot] reacted with thumbs down emoji
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

@sourcery-ai sourcery-ai[bot] sourcery-ai[bot] left review comments

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

AltStyle によって変換されたページ (->オリジナル) /