-
Notifications
You must be signed in to change notification settings - Fork 285
add group agg spill #22527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add group agg spill #22527
Conversation
PR Reviewer Guide 🔍
Here are some key observations to aid the review process:
🎫 Ticket compliance analysis 🔶
3433 - Partially compliant
Compliant requirements:
- Hash agg tracks memory usage.
- Hash agg spills when exceeding threshold.
Non-compliant requirements:
- Hash join must track memory usage and spill.
Requires further human verification:
- Validate spill behavior under various workloads and configurations in an integrated environment.
- Measure performance impact of spilling and merging on large datasets.
- Confirm configuration wiring for SpillThreshold from system/session settings.
Memory Accounting
Memory usage tracking only sums selected structures and may miss allocations (e.g., result1 internal buffers, hash table overhead, temporary vectors). Verify accounting is sufficient to trigger spill at the right time and avoid OOM.
func (group *Group) updateMemoryUsage(proc *process.Process) { usage := int64(0) if !group.ctr.hr.IsEmpty() && group.ctr.hr.Hash != nil { usage += int64(group.ctr.hr.Hash.Size()) } for _, bat := range group.ctr.result1.ToPopped { if bat != nil { usage += int64(bat.Size()) } } for _, agg := range group.ctr.result1.AggList { if agg != nil { usage += agg.Size() } } group.ctr.currentMemUsage = usage }
Merge Correctness
Merging spilled aggregators iterates per-row and uses currentGroupCount offset; ensure alignment between batches and agg states, and that GroupGrow and Merge indices are correct for all agg types and group-by shapes.
currentGroupCount := 0 for _, bat := range group.ctr.result1.ToPopped { if bat != nil { currentGroupCount += bat.RowCount() } } for i, tempAgg := range tempAggs { if tempAgg == nil { continue } currentAgg := group.ctr.result1.AggList[i] if currentAgg == nil { continue } for spilledGroupIdx := 0; spilledGroupIdx < spillState.GroupCount; spilledGroupIdx++ { currentGroupIdx := currentGroupCount + spilledGroupIdx if err := currentAgg.Merge(tempAgg, currentGroupIdx, spilledGroupIdx); err != nil { return err } } }
Serialization Robustness
Custom binary format for vectors and types needs compatibility and error handling; confirm all vector encodings and null bitmaps are preserved and future-proof across versions.
func (s *SpillableAggState) Serialize() ([]byte, error) { buf := bytes.NewBuffer(nil) if err := binary.Write(buf, binary.LittleEndian, int32(s.GroupCount)); err != nil { return nil, err } if err := binary.Write(buf, binary.LittleEndian, int32(len(s.GroupVectors))); err != nil { return nil, err } if err := binary.Write(buf, binary.LittleEndian, int32(len(s.GroupVectorTypes))); err != nil { return nil, err } for _, typ := range s.GroupVectorTypes { typBytes, err := typ.MarshalBinary() if err != nil { return nil, err } if err := binary.Write(buf, binary.LittleEndian, int32(len(typBytes))); err != nil { return nil, err } if _, err := buf.Write(typBytes); err != nil { return nil, err } } for i, vec := range s.GroupVectors { if vec == nil { if err := binary.Write(buf, binary.LittleEndian, int32(0)); err != nil { return nil, err } continue } vecBytes, err := vec.MarshalBinary() if err != nil { return nil, err } if err := binary.Write(buf, binary.LittleEndian, int32(len(vecBytes))); err != nil { return nil, err } if _, err := buf.Write(vecBytes); err != nil { return nil, err } if i >= len(s.GroupVectorTypes) { s.GroupVectorTypes = append(s.GroupVectorTypes, *vec.GetType()) } } if err := binary.Write(buf, binary.LittleEndian, int32(len(s.MarshaledAggStates))); err != nil { return nil, err } for _, aggState := range s.MarshaledAggStates { if err := binary.Write(buf, binary.LittleEndian, int32(len(aggState))); err != nil { return nil, err } if _, err := buf.Write(aggState); err != nil { return nil, err } } return buf.Bytes(), nil }
PR Code Suggestions ✨Explore these optional code suggestions:
|
|||||||||||||||
This reverts commit 426ddfc.
8656601 to
1d48563
Compare
Uh oh!
There was an error while loading. Please reload this page.
User description
What type of PR is this?
Which issue(s) this PR fixes:
issue #3433
What this PR does / why we need it:
add group aggregation spill
PR Type
Enhancement
Description
Add memory spill functionality to group aggregation operator
Implement spillable data structures and memory management
Add spill threshold configuration and memory usage tracking
Create comprehensive test coverage for spill scenarios
Diagram Walkthrough
File Walkthrough
exec.go
Add spill initialization and integration logicpkg/sql/colexec/group/exec.go
SpillManagerandSpillThresholdinPreparemethodgroup_spill.go
Core spill implementation with state managementpkg/sql/colexec/group/group_spill.go
spillPartialResultsmethod to serialize and store aggregationstate
mergeSpilledResultsandrestoreAndMergeSpilledAggregatorsmethods
spill.go
Define spill interfaces and typespkg/sql/colexec/group/spill.go
SpillableDatainterface for serializable data structuresSpillManagerinterface for spill operationsSpillIDtype for spill identificationspill_memory.go
Memory-based spill manager implementationpkg/sql/colexec/group/spill_memory.go
MemorySpillManagerfor in-memory spill storagespillable_agg_state.go
Spillable aggregation state implementationpkg/sql/colexec/group/spillable_agg_state.go
SpillableAggStatestruct for aggregation state serializationtypes.go
Add spill configuration to group typespkg/sql/colexec/group/types.go
SpillManagerandSpillThresholdfields toGroupstructFreeand cleanup methods to handle spill resourcesexec_test.go
Update test mock with size methodpkg/sql/colexec/group/exec_test.go
Size()method to test aggregation executor mockspill_test.go
Comprehensive spill functionality testspkg/sql/colexec/group/spill_test.go