Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

add group agg spill #22527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
reusee wants to merge 57 commits into matrixorigin:main
base: main
Choose a base branch
Loading
from reusee:spillmvp
Open

add group agg spill #22527

reusee wants to merge 57 commits into matrixorigin:main from reusee:spillmvp

Conversation

Copy link
Contributor

@reusee reusee commented Sep 19, 2025
edited by qodo-merge-pro bot
Loading

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #3433

What this PR does / why we need it:

add group aggregation spill


PR Type

Enhancement


Description

  • Add memory spill functionality to group aggregation operator

  • Implement spillable data structures and memory management

  • Add spill threshold configuration and memory usage tracking

  • Create comprehensive test coverage for spill scenarios


Diagram Walkthrough

flowchart LR
 A["Group Operator"] --> B["Memory Usage Monitor"]
 B --> C["Spill Threshold Check"]
 C --> D["SpillManager"]
 D --> E["SpillableAggState"]
 E --> F["Serialized Data"]
 F --> G["Memory Recovery"]
 G --> H["Merge Spilled Results"]
Loading

File Walkthrough

Relevant files
Enhancement
exec.go
Add spill initialization and integration logic

pkg/sql/colexec/group/exec.go

  • Initialize SpillManager and SpillThreshold in Prepare method
  • Add memory usage tracking and spill logic in batch consumption
  • Integrate spilled results merging in final result retrieval
+19/-0
group_spill.go
Core spill implementation with state management

pkg/sql/colexec/group/group_spill.go

  • Implement core spill functionality with memory usage monitoring
  • Add spillPartialResults method to serialize and store aggregation
    state
  • Create mergeSpilledResults and restoreAndMergeSpilledAggregators
    methods
  • Handle batch reconstruction and aggregator state restoration
+364/-0
spill.go
Define spill interfaces and types

pkg/sql/colexec/group/spill.go

  • Define SpillableData interface for serializable data structures
  • Create SpillManager interface for spill operations
  • Establish SpillID type for spill identification
+33/-0
spill_memory.go
Memory-based spill manager implementation

pkg/sql/colexec/group/spill_memory.go

  • Implement MemorySpillManager for in-memory spill storage
  • Add atomic memory tracking and thread-safe operations
  • Provide spill, retrieve, and delete functionality
+75/-0
spillable_agg_state.go
Spillable aggregation state implementation

pkg/sql/colexec/group/spillable_agg_state.go

  • Implement SpillableAggState struct for aggregation state serialization
  • Add binary serialization and deserialization methods
  • Handle vector marshaling and type preservation
  • Provide memory estimation and cleanup functionality
+204/-0
types.go
Add spill configuration to group types

pkg/sql/colexec/group/types.go

  • Add SpillManager and SpillThreshold fields to Group struct
  • Extend container with spill-related state tracking fields
  • Update Free and cleanup methods to handle spill resources
+21/-0
Tests
exec_test.go
Update test mock with size method

pkg/sql/colexec/group/exec_test.go

  • Add Size() method to test aggregation executor mock
+4/-0
spill_test.go
Comprehensive spill functionality tests

pkg/sql/colexec/group/spill_test.go

  • Create comprehensive test cases for spill functionality
  • Test single spill cycle and multiple spill cycles
  • Verify memory cleanup and aggregation correctness
  • Include memory leak detection and validation
+160/-0

@matrix-meow matrix-meow added the size/L Denotes a PR that changes [500,999] lines label Sep 19, 2025
Copy link

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis 🔶

3433 - Partially compliant

Compliant requirements:

  • Hash agg tracks memory usage.
  • Hash agg spills when exceeding threshold.

Non-compliant requirements:

  • Hash join must track memory usage and spill.

Requires further human verification:

  • Validate spill behavior under various workloads and configurations in an integrated environment.
  • Measure performance impact of spilling and merging on large datasets.
  • Confirm configuration wiring for SpillThreshold from system/session settings.
⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
Recommended focus areas for review

Memory Accounting

Memory usage tracking only sums selected structures and may miss allocations (e.g., result1 internal buffers, hash table overhead, temporary vectors). Verify accounting is sufficient to trigger spill at the right time and avoid OOM.

func (group *Group) updateMemoryUsage(proc *process.Process) {
	usage := int64(0)
	if !group.ctr.hr.IsEmpty() && group.ctr.hr.Hash != nil {
		usage += int64(group.ctr.hr.Hash.Size())
	}
	for _, bat := range group.ctr.result1.ToPopped {
		if bat != nil {
			usage += int64(bat.Size())
		}
	}
	for _, agg := range group.ctr.result1.AggList {
		if agg != nil {
			usage += agg.Size()
		}
	}
	group.ctr.currentMemUsage = usage
}
Merge Correctness

Merging spilled aggregators iterates per-row and uses currentGroupCount offset; ensure alignment between batches and agg states, and that GroupGrow and Merge indices are correct for all agg types and group-by shapes.

currentGroupCount := 0
for _, bat := range group.ctr.result1.ToPopped {
	if bat != nil {
		currentGroupCount += bat.RowCount()
	}
}
for i, tempAgg := range tempAggs {
	if tempAgg == nil {
		continue
	}
	currentAgg := group.ctr.result1.AggList[i]
	if currentAgg == nil {
		continue
	}
	for spilledGroupIdx := 0; spilledGroupIdx < spillState.GroupCount; spilledGroupIdx++ {
		currentGroupIdx := currentGroupCount + spilledGroupIdx
		if err := currentAgg.Merge(tempAgg, currentGroupIdx, spilledGroupIdx); err != nil {
			return err
		}
	}
}
Serialization Robustness

Custom binary format for vectors and types needs compatibility and error handling; confirm all vector encodings and null bitmaps are preserved and future-proof across versions.

func (s *SpillableAggState) Serialize() ([]byte, error) {
	buf := bytes.NewBuffer(nil)
	if err := binary.Write(buf, binary.LittleEndian, int32(s.GroupCount)); err != nil {
		return nil, err
	}
	if err := binary.Write(buf, binary.LittleEndian, int32(len(s.GroupVectors))); err != nil {
		return nil, err
	}
	if err := binary.Write(buf, binary.LittleEndian, int32(len(s.GroupVectorTypes))); err != nil {
		return nil, err
	}
	for _, typ := range s.GroupVectorTypes {
		typBytes, err := typ.MarshalBinary()
		if err != nil {
			return nil, err
		}
		if err := binary.Write(buf, binary.LittleEndian, int32(len(typBytes))); err != nil {
			return nil, err
		}
		if _, err := buf.Write(typBytes); err != nil {
			return nil, err
		}
	}
	for i, vec := range s.GroupVectors {
		if vec == nil {
			if err := binary.Write(buf, binary.LittleEndian, int32(0)); err != nil {
				return nil, err
			}
			continue
		}
		vecBytes, err := vec.MarshalBinary()
		if err != nil {
			return nil, err
		}
		if err := binary.Write(buf, binary.LittleEndian, int32(len(vecBytes))); err != nil {
			return nil, err
		}
		if _, err := buf.Write(vecBytes); err != nil {
			return nil, err
		}
		if i >= len(s.GroupVectorTypes) {
			s.GroupVectorTypes = append(s.GroupVectorTypes, *vec.GetType())
		}
	}
	if err := binary.Write(buf, binary.LittleEndian, int32(len(s.MarshaledAggStates))); err != nil {
		return nil, err
	}
	for _, aggState := range s.MarshaledAggStates {
		if err := binary.Write(buf, binary.LittleEndian, int32(len(aggState))); err != nil {
			return nil, err
		}
		if _, err := buf.Write(aggState); err != nil {
			return nil, err
		}
	}
	return buf.Bytes(), nil
}

Copy link

qodo-merge-pro bot commented Sep 19, 2025
edited
Loading

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion Impact
High-level
Implement disk-based spilling instead of in-memory

The current MemorySpillManager stores spilled data in memory, which fails to
reduce the overall memory footprint. A disk-based spill manager should be
implemented to properly offload data to disk.

Examples:

pkg/sql/colexec/group/spill_memory.go [24-46]
type MemorySpillManager struct {
	data map[SpillID][]byte
	nextID int64
	totalMem int64
}
func NewMemorySpillManager() *MemorySpillManager {
	return &MemorySpillManager{
		data: make(map[SpillID][]byte),
	}
 ... (clipped 13 lines)
pkg/sql/colexec/group/exec.go [67-69]
	if group.SpillManager == nil {
		group.SpillManager = NewMemorySpillManager()
	}

Solution Walkthrough:

Before:

// pkg/sql/colexec/group/spill_memory.go
type MemorySpillManager struct {
 data map[SpillID][]byte
 // ...
}
func (m *MemorySpillManager) Spill(data SpillableData) (SpillID, error) {
 serialized, err := data.Serialize()
 if err != nil {
 return "", err
 }
 id := ... // generate new ID
 m.data[id] = serialized // Data is stored in an in-memory map
 return id, nil
}

After:

// A conceptual disk-based spill manager
type DiskSpillManager struct {
 dirPath string
 // ...
}
func (m *DiskSpillManager) Spill(data SpillableData) (SpillID, error) {
 serialized, err := data.Serialize()
 if err != nil {
 return "", err
 }
 id := ... // generate new ID
 filePath := path.Join(m.dirPath, string(id))
 err = os.WriteFile(filePath, serialized, 0600) // Write data to disk
 if err != nil {
 return "", err
 }
 return id, nil
}
Suggestion importance[1-10]: 9

__

Why: This suggestion correctly identifies a critical design flaw where the in-memory MemorySpillManager defeats the primary purpose of spilling, which is to reduce process memory pressure by offloading data to persistent storage.

High
Possible issue
(削除) Ensure thread-safe map access (削除ここまで)
Suggestion Impact:A sync.Mutex field was added to MemorySpillManager, and Lock/Unlock (with defer) were introduced around all accesses to the m.data map in Spill, Retrieve, Delete, and Free.

code diff:

 import (
 	"fmt"
+	"sync"
 	"sync/atomic"
 
 	"github.com/matrixorigin/matrixone/pkg/common/mpool"
@@ -25,6 +26,7 @@
 	data map[SpillID][]byte
 	nextID int64
 	totalMem int64
+	mu sync.Mutex
 }
 
 func NewMemorySpillManager() *MemorySpillManager {
@@ -40,12 +42,17 @@
 	}
 
 	id := SpillID(fmt.Sprintf("spill_%d", atomic.AddInt64(&m.nextID, 1)))
+	m.mu.Lock()
+	defer m.mu.Unlock()
 	m.data[id] = serialized
 	atomic.AddInt64(&m.totalMem, int64(len(serialized)))
 	return id, nil
 }
 
 func (m *MemorySpillManager) Retrieve(id SpillID, mp *mpool.MPool) (SpillableData, error) {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
 	serialized, exists := m.data[id]
 	if !exists {
 		return nil, fmt.Errorf("spill data not found: %s", id)
@@ -60,6 +67,9 @@
 }
 
 func (m *MemorySpillManager) Delete(id SpillID) error {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
 	if serialized, exists := m.data[id]; exists {
 		atomic.AddInt64(&m.totalMem, -int64(len(serialized)))
 		delete(m.data, id)
@@ -68,6 +78,9 @@
 }
 
 func (m *MemorySpillManager) Free() {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
 	for id := range m.data {
 		m.Delete(id)
 	}

Add a sync.RWMutex to MemorySpillManager to protect the m.data map from
concurrent access and prevent race conditions.

pkg/sql/colexec/group/spill_memory.go [36-46]

 func (m *MemorySpillManager) Spill(data SpillableData) (SpillID, error) {
 	serialized, err := data.Serialize()
 	if err != nil {
 		return "", err
 	}
 
 	id := SpillID(fmt.Sprintf("spill_%d", atomic.AddInt64(&m.nextID, 1)))
+	m.mu.Lock()
 	m.data[id] = serialized
+	m.mu.Unlock()
 	atomic.AddInt64(&m.totalMem, int64(len(serialized)))
 	return id, nil
 }

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a race condition due to unprotected concurrent access to a map, which could lead to crashes. Adding a mutex is a critical fix for thread safety.

Medium
Avoid using a fallback type

In the Deserialize method of SpillableAggState, instead of falling back to
types.T_any.ToType() when type information is missing for a vector, return an
error to prevent potential data corruption.

pkg/sql/colexec/group/spillable_agg_state.go [148-153]

 		var vecType types.Type
 		if i < len(s.GroupVectorTypes) {
 			vecType = s.GroupVectorTypes[i]
 		} else {
-			vecType = types.T_any.ToType()
+			return fmt.Errorf("missing type information for group vector at index %d", i)
 		}
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that using a fallback type T_any during deserialization can hide data corruption issues; failing fast by returning an error is a much safer and more robust approach.

Medium
Remove side effect from serialization

In the Serialize method of SpillableAggState, remove the code that modifies
s.GroupVectorTypes to eliminate side effects during serialization.

pkg/sql/colexec/group/spillable_agg_state.go [79-81]

-		if i >= len(s.GroupVectorTypes) {
-			s.GroupVectorTypes = append(s.GroupVectorTypes, *vec.GetType())
-		}
 
+
  • Apply / Chat
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that the Serialize method has a side effect, which is poor practice and can lead to bugs. Removing it improves code quality and predictability.

Low
  • Update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

@ouyuanning ouyuanning Awaiting requested review from ouyuanning ouyuanning is a code owner

@aunjgr aunjgr Awaiting requested review from aunjgr aunjgr is a code owner

@zhangxu19830126 zhangxu19830126 Awaiting requested review from zhangxu19830126

At least 1 approving review is required to merge this pull request.

Assignees

No one assigned

Labels

kind/enhancement Review effort 4/5 size/XL Denotes a PR that changes [1000, 1999] lines

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

AltStyle によって変換されたページ (->オリジナル) /