-
Notifications
You must be signed in to change notification settings - Fork 285
add group agg spill #22527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add group agg spill #22527
Conversation
PR Reviewer Guide 🔍
Here are some key observations to aid the review process:
🎫 Ticket compliance analysis 🔶
3433 - Partially compliant
Compliant requirements:
- Hash agg tracks memory usage.
- Hash agg spills when exceeding threshold.
Non-compliant requirements:
- Hash join must track memory usage and spill.
Requires further human verification:
- Validate spill behavior under various workloads and configurations in an integrated environment.
- Measure performance impact of spilling and merging on large datasets.
- Confirm configuration wiring for SpillThreshold from system/session settings.
Memory Accounting
Memory usage tracking only sums selected structures and may miss allocations (e.g., result1 internal buffers, hash table overhead, temporary vectors). Verify accounting is sufficient to trigger spill at the right time and avoid OOM.
func (group *Group) updateMemoryUsage(proc *process.Process) { usage := int64(0) if !group.ctr.hr.IsEmpty() && group.ctr.hr.Hash != nil { usage += int64(group.ctr.hr.Hash.Size()) } for _, bat := range group.ctr.result1.ToPopped { if bat != nil { usage += int64(bat.Size()) } } for _, agg := range group.ctr.result1.AggList { if agg != nil { usage += agg.Size() } } group.ctr.currentMemUsage = usage }
Merge Correctness
Merging spilled aggregators iterates per-row and uses currentGroupCount offset; ensure alignment between batches and agg states, and that GroupGrow and Merge indices are correct for all agg types and group-by shapes.
currentGroupCount := 0 for _, bat := range group.ctr.result1.ToPopped { if bat != nil { currentGroupCount += bat.RowCount() } } for i, tempAgg := range tempAggs { if tempAgg == nil { continue } currentAgg := group.ctr.result1.AggList[i] if currentAgg == nil { continue } for spilledGroupIdx := 0; spilledGroupIdx < spillState.GroupCount; spilledGroupIdx++ { currentGroupIdx := currentGroupCount + spilledGroupIdx if err := currentAgg.Merge(tempAgg, currentGroupIdx, spilledGroupIdx); err != nil { return err } } }
Serialization Robustness
Custom binary format for vectors and types needs compatibility and error handling; confirm all vector encodings and null bitmaps are preserved and future-proof across versions.
func (s *SpillableAggState) Serialize() ([]byte, error) { buf := bytes.NewBuffer(nil) if err := binary.Write(buf, binary.LittleEndian, int32(s.GroupCount)); err != nil { return nil, err } if err := binary.Write(buf, binary.LittleEndian, int32(len(s.GroupVectors))); err != nil { return nil, err } if err := binary.Write(buf, binary.LittleEndian, int32(len(s.GroupVectorTypes))); err != nil { return nil, err } for _, typ := range s.GroupVectorTypes { typBytes, err := typ.MarshalBinary() if err != nil { return nil, err } if err := binary.Write(buf, binary.LittleEndian, int32(len(typBytes))); err != nil { return nil, err } if _, err := buf.Write(typBytes); err != nil { return nil, err } } for i, vec := range s.GroupVectors { if vec == nil { if err := binary.Write(buf, binary.LittleEndian, int32(0)); err != nil { return nil, err } continue } vecBytes, err := vec.MarshalBinary() if err != nil { return nil, err } if err := binary.Write(buf, binary.LittleEndian, int32(len(vecBytes))); err != nil { return nil, err } if _, err := buf.Write(vecBytes); err != nil { return nil, err } if i >= len(s.GroupVectorTypes) { s.GroupVectorTypes = append(s.GroupVectorTypes, *vec.GetType()) } } if err := binary.Write(buf, binary.LittleEndian, int32(len(s.MarshaledAggStates))); err != nil { return nil, err } for _, aggState := range s.MarshaledAggStates { if err := binary.Write(buf, binary.LittleEndian, int32(len(aggState))); err != nil { return nil, err } if _, err := buf.Write(aggState); err != nil { return nil, err } } return buf.Bytes(), nil }
PR Code Suggestions ✨Explore these optional code suggestions:
|
This reverts commit 426ddfc.
8656601
to
1d48563
Compare
Uh oh!
There was an error while loading. Please reload this page.
User description
What type of PR is this?
Which issue(s) this PR fixes:
issue #3433
What this PR does / why we need it:
add group aggregation spill
PR Type
Enhancement
Description
Add memory spill functionality to group aggregation operator
Implement spillable data structures and memory management
Add spill threshold configuration and memory usage tracking
Create comprehensive test coverage for spill scenarios
Diagram Walkthrough
File Walkthrough
exec.go
Add spill initialization and integration logic
pkg/sql/colexec/group/exec.go
SpillManager
andSpillThreshold
inPrepare
methodgroup_spill.go
Core spill implementation with state management
pkg/sql/colexec/group/group_spill.go
spillPartialResults
method to serialize and store aggregationstate
mergeSpilledResults
andrestoreAndMergeSpilledAggregators
methods
spill.go
Define spill interfaces and types
pkg/sql/colexec/group/spill.go
SpillableData
interface for serializable data structuresSpillManager
interface for spill operationsSpillID
type for spill identificationspill_memory.go
Memory-based spill manager implementation
pkg/sql/colexec/group/spill_memory.go
MemorySpillManager
for in-memory spill storagespillable_agg_state.go
Spillable aggregation state implementation
pkg/sql/colexec/group/spillable_agg_state.go
SpillableAggState
struct for aggregation state serializationtypes.go
Add spill configuration to group types
pkg/sql/colexec/group/types.go
SpillManager
andSpillThreshold
fields toGroup
structFree
and cleanup methods to handle spill resourcesexec_test.go
Update test mock with size method
pkg/sql/colexec/group/exec_test.go
Size()
method to test aggregation executor mockspill_test.go
Comprehensive spill functionality tests
pkg/sql/colexec/group/spill_test.go