Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

core/txpool/blobpool: allow gaps in blobpool #32717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
cskiraly wants to merge 14 commits into ethereum:master
base: master
Choose a base branch
Loading
from cskiraly:blobpool-gapped-queue
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
29dd549
core/txpool/blobpool: add queue to blobpool
cskiraly Sep 22, 2025
f9aa51e
core/txpool/blobpool: fix removal from gapped queue
cskiraly Sep 23, 2025
d11de18
core/txpool/blobpool: accept tx if stored in gapped queue
cskiraly Sep 23, 2025
c860b83
core/txpool/blobpool: move tx notification event inside add
cskiraly Sep 23, 2025
337af57
fix comments
cskiraly Sep 23, 2025
4c9b5a1
core/txpool/blobpool: limit gapped allowance based on sender
cskiraly Sep 23, 2025
33493ac
core/txpool/blobpool: limit reorder buffer size for DoS protection
cskiraly Sep 23, 2025
0dfc464
core/txpoo/blobpool: expose gapped txs in Has and Status
cskiraly Sep 23, 2025
a0b7fad
core/txpool/blobpool: limit tx lifetime in gapped reorg buffer
cskiraly Sep 24, 2025
3c0649d
core/txpool/blobpool: evict stale transactions from reorg buffer
cskiraly Sep 24, 2025
4a1f509
core/txpool/blobpool: fix reorder buffer logic
cskiraly Sep 24, 2025
d3a6bb7
fix lint
cskiraly Sep 25, 2025
7da8e27
core/txpool/blobpool: fix blob tx replacement
cskiraly Sep 25, 2025
bfd7edf
core/txpool/blobpool: test gapped blob tx acceptance
cskiraly Oct 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 152 additions & 7 deletions core/txpool/blobpool/blobpool.go
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ const (
// the pool will still accept and convert legacy blob transactions. After this
// window, all legacy blob transactions will be rejected.
conversionTimeWindow = time.Hour * 2

// gappedLifetime is the approximate duration for which nonce-gapped transactions
// are kept before being dropped. Since gapped is only a reorder buffer and it
// is expected that the original transactions were inserted in the mempool in
// nonce order, the duration is kept short to avoid DoS vectors.
gappedLifetime = 1 * time.Minute

// maxGappedTxs is the maximum number of gapped transactions kept overall.
// This is a safety limit to avoid DoS vectors.
maxGapped = 128
)

// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
Expand Down Expand Up @@ -337,6 +347,9 @@ type BlobPool struct {
stored uint64 // Useful data size of all transactions on disk
limbo *limbo // Persistent data store for the non-finalized blobs

gapped map[common.Address][]*gappedTx // Transactions that are currently gapped (nonce too high)
gappedSource map[common.Hash]common.Address // Source of gapped transactions to allow rechecking on inclusion

signer types.Signer // Transaction signer to use for sender recovery
chain BlockChain // Chain object to access the state through
cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka)
Expand All @@ -356,6 +369,11 @@ type BlobPool struct {
lock sync.RWMutex // Mutex protecting the pool during reorg handling
}

type gappedTx struct {
tx *types.Transaction
timestamp time.Time
}

// New creates a new blob transaction pool to gather, sort and filter inbound
// blob transactions from the network.
func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bool) *BlobPool {
Expand All @@ -372,6 +390,8 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo
lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
gapped: make(map[common.Address][]*gappedTx),
gappedSource: make(map[common.Hash]common.Address),
}
}

Expand Down Expand Up @@ -841,6 +861,9 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
resettimeHist.Update(time.Since(start).Nanoseconds())
}(time.Now())

// Handle reorg buffer timeouts evicting old gapped transactions
p.evictGapped()

statedb, err := p.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed to reset blobpool state", "err", err)
Expand Down Expand Up @@ -1369,7 +1392,9 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error {
State: p.state,

FirstNonceGap: func(addr common.Address) uint64 {
// Nonce gaps are not permitted in the blob pool, the first gap will
// Nonce gaps are permitted in the blob pool, but only as part of the
// in-memory 'gapped' buffer. We expose the gap here to validateTx,
// then handle the error by adding to the buffer. The first gap will
// be the next nonce shifted by however many transactions we already
// have pooled.
return p.state.GetNonce(addr) + uint64(len(p.index[addr]))
Expand Down Expand Up @@ -1448,7 +1473,9 @@ func (p *BlobPool) Has(hash common.Hash) bool {
p.lock.RLock()
defer p.lock.RUnlock()

return p.lookup.exists(hash)
poolHas := p.lookup.exists(hash)
_, gapped := p.gappedSource[hash]
return poolHas || gapped
}

func (p *BlobPool) getRLP(hash common.Hash) []byte {
Expand Down Expand Up @@ -1696,10 +1723,6 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
}
if len(adds) > 0 {
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
return errs
}

Expand All @@ -1718,6 +1741,13 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
addtimeHist.Update(time.Since(start).Nanoseconds())
}(time.Now())

return p.addLocked(tx, true)
}

// addLocked inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restrictions). It must be called with the pool lock held.
// Only for internal use.
func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error) {
// Ensure the transaction is valid from all perspectives
if err := p.validateTx(tx); err != nil {
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
Expand All @@ -1730,6 +1760,19 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
addStaleMeter.Mark(1)
case errors.Is(err, core.ErrNonceTooHigh):
Copy link
Contributor

@healthykim healthykim Sep 25, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking about what form the gapped blobpool should take in the evolution of the blobpool. In my view, we have two possible options. The first is to store a transaction in the gapped set only when "nonce too high" is the sole rejection reason. The second is to store a transaction in the gapped set when "nonce too high" is one of the rejection reasons (so there can other reasons) This PR seems to be implementing the second option, and given the current state of the blobpool, I think this is reasonable.

If cells are delivered separately from transactions in the future, first option can be beneficial in some cases, in terms of bandwidth. If a transaction in the buffer cannot be included even after the gap is resolved, then by the time the client discover it needs to be discarded (when all cells are collected and revalidation occurs through the add routine below), it will already have consumed bandwidth to fetch its cells. The first design would help reduce this overhead.

However, I am not certain this first option is necessary when we have this gapped buffer, since bandwidth can easily be wasted if someone submits gapped transactions that pass other validations and never fill the gap. I am also not sure about how often this can happen and why the submitter would do such thing.

Copy link
Contributor

@healthykim healthykim Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/ethereum/go-ethereum/compare/master...healthykim:go-ethereum:bs/cell-blobpool/sparse?expand=1

In my prototype, the buffer currently stores transactions that pass all validation checks but are simply missing their cells. Still need to do some debugging, but overall structure will remain like this
It also handles two replacement case, one for the replacement of buffered tx and the other for the replacement of pooled tx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've selected option 2 (there can be other reasons) here, since that's the option that is a pure reorder buffer, acting before any real blobpool logic. So, at least in theory, that's the least invasive version.

However, I can imagine moving to a more complex evaluation before storing the gapped transaction. The queue in the legacypool does a more complex evaluation. I think we should check that as well before deciding on future evolution.

addGappedMeter.Mark(1)
// Store the tx in memory, and revalidate later
from, _ := types.Sender(p.signer, tx)
allowance := p.gappedAllowance(from)
if allowance >= 1 && len(p.gapped) < maxGapped {
// if maxGapped is reached, it is better to give time to gapped
// transactions by keeping the old and dropping this one
p.gapped[from] = append(p.gapped[from], &gappedTx{tx: tx, timestamp: time.Now()})
p.gappedSource[tx.Hash()] = from
log.Trace("blobpool:add added to Gapped blob queue", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
return nil
} else {
log.Trace("blobpool:add no Gapped blob queue allowance", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
}
case errors.Is(err, core.ErrInsufficientFunds):
addOverdraftedMeter.Mark(1)
case errors.Is(err, txpool.ErrAccountLimitExceeded):
Expand Down Expand Up @@ -1867,6 +1910,58 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
p.updateStorageMetrics()

addValidMeter.Mark(1)

// Notify all listeners of the new arrival
p.discoverFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}})
p.insertFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}})

//check the gapped queue for this account and try to promote
if gtxs, ok := p.gapped[from]; checkGapped && ok && len(gtxs) > 0 {
// We have to add in nonce order, but we want to stable sort to cater for situations
// where transactions are replaced, keeping the original receive order for same nonce
sort.SliceStable(gtxs, func(i, j int) bool {
return gtxs[i].tx.Nonce() < gtxs[j].tx.Nonce()
})
for len(gtxs) > 0 {
stateNonce := p.state.GetNonce(from)
firstgap := stateNonce + uint64(len(p.index[from]))

if gtxs[0].tx.Nonce() > firstgap {
// Anything beyond the first gap is not addable yet
break
}

// Drop any buffered transactions that became stale in the meantime (included in chain or replaced)
// If we arrive to the transaction in the pending range (between the state Nonce and first gap, we
// try to add them now while removing from here.
tx := gtxs[0].tx
gtxs[0] = nil
gtxs = gtxs[1:]
delete(p.gappedSource, tx.Hash())

if tx.Nonce() < stateNonce {
// Stale, drop it. Eventually we could add to limbo here if hash matches.
log.Trace("Gapped blob transaction became stale", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "state", stateNonce, "qlen", len(p.gapped[from]))
continue
}

if tx.Nonce() <= firstgap {
// If we hit the pending range, including the first gap, add it and continue to try to add more.
// We do not recurse here, but continue to loop instead.
// We are under lock, so we can add the transaction directly.
if err := p.addLocked(tx, false); err == nil {
log.Trace("Gapped blob transaction added to pool", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
} else {
log.Trace("Gapped blob transaction not accepted", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "err", err)
}
}
}
if len(gtxs) == 0 {
delete(p.gapped, from)
} else {
p.gapped[from] = gtxs
}
}
return nil
}

Expand Down Expand Up @@ -2098,6 +2193,50 @@ func (p *BlobPool) Nonce(addr common.Address) uint64 {
return p.state.GetNonce(addr)
}

// gappedAllowance returns the number of gapped transactions still
// allowed for the given account. Allowance is based on a slow-start
// logic, allowing more gaps (resource usage) to accounts with a
// higher nonce. Can also return negative values.
func (p *BlobPool) gappedAllowance(addr common.Address) int {
// Gaps happen, but we don't want to allow too many.
// Use log10(nonce+1) as the allowance, with a minimum of 0.
nonce := p.state.GetNonce(addr)
allowance := int(math.Log10(float64(nonce + 1)))
// Cap the allowance to the remaining pool space
return min(allowance, maxTxsPerAccount-len(p.index[addr])) - len(p.gapped[addr])
}

// evictGapped removes the old transactions from the gapped reorder buffer.
// Concurrency: The caller must hold the pool lock before calling this function.
func (p *BlobPool) evictGapped() {
cutoff := time.Now().Add(-gappedLifetime)
for from, txs := range p.gapped {
nonce := p.state.GetNonce(from)
// Reuse the original slice to avoid extra allocations.
// This is safe because we only keep references to the original gappedTx objects,
// and we overwrite the slice for this account after filtering.
keep := txs[:0]
for i, gtx := range txs {
if gtx.timestamp.Before(cutoff) || gtx.tx.Nonce() < nonce {
// Evict old or stale transactions
// Should we add stale to limbo here if it would belong?
delete(p.gappedSource, gtx.tx.Hash())
txs[i] = nil // Explicitly nil out evicted element
} else {
keep = append(keep, gtx)
}
}
if len(keep) < len(txs) {
log.Trace("Evicting old gapped blob transactions", "count", len(txs)-len(keep), "from", from)
}
if len(keep) == 0 {
delete(p.gapped, from)
} else {
p.gapped[from] = keep
}
}
}

// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func (p *BlobPool) Stats() (int, int) {
Expand Down Expand Up @@ -2132,9 +2271,15 @@ func (p *BlobPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*ty
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
if p.Has(hash) {
p.lock.RLock()
defer p.lock.RUnlock()

if p.lookup.exists(hash) {
return txpool.TxStatusPending
}
if _, gapped := p.gappedSource[hash]; gapped {
return txpool.TxStatusQueued
}
return txpool.TxStatusUnknown
}

Expand Down
25 changes: 19 additions & 6 deletions core/txpool/blobpool/blobpool_test.go
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ func TestAdd(t *testing.T) {
"bob": {balance: 21100 + blobSize, nonce: 1},
"claire": {balance: 21100 + blobSize},
"dave": {balance: 21100 + blobSize, nonce: 1},
"eve": {balance: 21100 + blobSize, nonce: 10}, // High nonce to test gapped acceptance
},
adds: []addtx{
{ // New account, no previous txs: accept nonce 0
Expand Down Expand Up @@ -1402,6 +1403,11 @@ func TestAdd(t *testing.T) {
tx: makeUnsignedTx(2, 1, 1, 1),
err: core.ErrNonceTooHigh,
},
{ // Old account, 10 txs in chain: 0 pending: accept nonce 11 as gapped
from: "eve",
tx: makeUnsignedTx(11, 1, 1, 1),
err: nil,
},
},
},
// Transactions from already pooled accounts should only be accepted if
Expand Down Expand Up @@ -1762,13 +1768,20 @@ func TestAdd(t *testing.T) {
t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, errs[0], add.err)
}
if add.err == nil {
size, exist := pool.lookup.sizeOfTx(signed.Hash())
if !exist {
t.Errorf("test %d, tx %d: failed to lookup transaction's size", i, j)
// first check if tx is in the queue
if !pool.Has(signed.Hash()) {
t.Errorf("test %d, tx %d: added transaction not found in pool", i, j)
}
if size != signed.Size() {
t.Errorf("test %d, tx %d: transaction's size mismatches: have %v, want %v",
i, j, size, signed.Size())
// if it is pending, check if size matches
if pool.Status(signed.Hash()) == txpool.TxStatusPending {
size, exist := pool.lookup.sizeOfTx(signed.Hash())
if !exist {
t.Errorf("test %d, tx %d: failed to lookup transaction's size", i, j)
}
if size != signed.Size() {
t.Errorf("test %d, tx %d: transaction's size mismatches: have %v, want %v",
i, j, size, signed.Size())
}
}
}
verifyPoolInternals(t, pool)
Expand Down
Loading

AltStyle によって変換されたページ (->オリジナル) /