-
Couldn't load subscription status.
- Fork 21.5k
core/txpool/blobpool: allow gaps in blobpool #32717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
29dd549
f9aa51e
d11de18
c860b83
337af57
4c9b5a1
33493ac
0dfc464
a0b7fad
3c0649d
4a1f509
d3a6bb7
7da8e27
bfd7edf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,6 +101,16 @@ const ( | |
| // the pool will still accept and convert legacy blob transactions. After this | ||
| // window, all legacy blob transactions will be rejected. | ||
| conversionTimeWindow = time.Hour * 2 | ||
|
|
||
| // gappedLifetime is the approximate duration for which nonce-gapped transactions | ||
| // are kept before being dropped. Since gapped is only a reorder buffer and it | ||
| // is expected that the original transactions were inserted in the mempool in | ||
| // nonce order, the duration is kept short to avoid DoS vectors. | ||
| gappedLifetime = 1 * time.Minute | ||
|
|
||
| // maxGappedTxs is the maximum number of gapped transactions kept overall. | ||
| // This is a safety limit to avoid DoS vectors. | ||
| maxGapped = 128 | ||
| ) | ||
|
|
||
| // blobTxMeta is the minimal subset of types.BlobTx necessary to validate and | ||
|
|
@@ -337,6 +347,9 @@ type BlobPool struct { | |
| stored uint64 // Useful data size of all transactions on disk | ||
| limbo *limbo // Persistent data store for the non-finalized blobs | ||
|
|
||
| gapped map[common.Address][]*gappedTx // Transactions that are currently gapped (nonce too high) | ||
| gappedSource map[common.Hash]common.Address // Source of gapped transactions to allow rechecking on inclusion | ||
|
|
||
| signer types.Signer // Transaction signer to use for sender recovery | ||
| chain BlockChain // Chain object to access the state through | ||
| cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka) | ||
|
|
@@ -356,6 +369,11 @@ type BlobPool struct { | |
| lock sync.RWMutex // Mutex protecting the pool during reorg handling | ||
| } | ||
|
|
||
| type gappedTx struct { | ||
| tx *types.Transaction | ||
| timestamp time.Time | ||
| } | ||
|
|
||
| // New creates a new blob transaction pool to gather, sort and filter inbound | ||
| // blob transactions from the network. | ||
| func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bool) *BlobPool { | ||
|
|
@@ -372,6 +390,8 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo | |
| lookup: newLookup(), | ||
| index: make(map[common.Address][]*blobTxMeta), | ||
| spent: make(map[common.Address]*uint256.Int), | ||
| gapped: make(map[common.Address][]*gappedTx), | ||
| gappedSource: make(map[common.Hash]common.Address), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -841,6 +861,9 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { | |
| resettimeHist.Update(time.Since(start).Nanoseconds()) | ||
| }(time.Now()) | ||
|
|
||
| // Handle reorg buffer timeouts evicting old gapped transactions | ||
| p.evictGapped() | ||
|
|
||
| statedb, err := p.chain.StateAt(newHead.Root) | ||
| if err != nil { | ||
| log.Error("Failed to reset blobpool state", "err", err) | ||
|
|
@@ -1369,7 +1392,9 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error { | |
| State: p.state, | ||
|
|
||
| FirstNonceGap: func(addr common.Address) uint64 { | ||
| // Nonce gaps are not permitted in the blob pool, the first gap will | ||
| // Nonce gaps are permitted in the blob pool, but only as part of the | ||
| // in-memory 'gapped' buffer. We expose the gap here to validateTx, | ||
| // then handle the error by adding to the buffer. The first gap will | ||
| // be the next nonce shifted by however many transactions we already | ||
| // have pooled. | ||
| return p.state.GetNonce(addr) + uint64(len(p.index[addr])) | ||
|
|
@@ -1448,7 +1473,9 @@ func (p *BlobPool) Has(hash common.Hash) bool { | |
| p.lock.RLock() | ||
| defer p.lock.RUnlock() | ||
|
|
||
| return p.lookup.exists(hash) | ||
| poolHas := p.lookup.exists(hash) | ||
| _, gapped := p.gappedSource[hash] | ||
| return poolHas || gapped | ||
| } | ||
|
|
||
| func (p *BlobPool) getRLP(hash common.Hash) []byte { | ||
|
|
@@ -1696,10 +1723,6 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error { | |
| adds = append(adds, tx.WithoutBlobTxSidecar()) | ||
| } | ||
| } | ||
| if len(adds) > 0 { | ||
| p.discoverFeed.Send(core.NewTxsEvent{Txs: adds}) | ||
| p.insertFeed.Send(core.NewTxsEvent{Txs: adds}) | ||
| } | ||
| return errs | ||
| } | ||
|
|
||
|
|
@@ -1718,6 +1741,13 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { | |
| addtimeHist.Update(time.Since(start).Nanoseconds()) | ||
| }(time.Now()) | ||
|
|
||
| return p.addLocked(tx, true) | ||
| } | ||
|
|
||
| // addLocked inserts a new blob transaction into the pool if it passes validation (both | ||
| // consensus validity and pool restrictions). It must be called with the pool lock held. | ||
| // Only for internal use. | ||
| func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error) { | ||
| // Ensure the transaction is valid from all perspectives | ||
| if err := p.validateTx(tx); err != nil { | ||
| log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err) | ||
|
|
@@ -1730,6 +1760,19 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { | |
| addStaleMeter.Mark(1) | ||
| case errors.Is(err, core.ErrNonceTooHigh): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have been thinking about what form the gapped blobpool should take in the evolution of the blobpool. In my view, we have two possible options. The first is to store a transaction in the gapped set only when "nonce too high" is the sole rejection reason. The second is to store a transaction in the gapped set when "nonce too high" is one of the rejection reasons (so there can other reasons) This PR seems to be implementing the second option, and given the current state of the blobpool, I think this is reasonable. If cells are delivered separately from transactions in the future, first option can be beneficial in some cases, in terms of bandwidth. If a transaction in the buffer cannot be included even after the gap is resolved, then by the time the client discover it needs to be discarded (when all cells are collected and revalidation occurs through the add routine below), it will already have consumed bandwidth to fetch its cells. The first design would help reduce this overhead. However, I am not certain this first option is necessary when we have this gapped buffer, since bandwidth can easily be wasted if someone submits gapped transactions that pass other validations and never fill the gap. I am also not sure about how often this can happen and why the submitter would do such thing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my prototype, the buffer currently stores transactions that pass all validation checks but are simply missing their cells. Still need to do some debugging, but overall structure will remain like this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've selected option 2 (there can be other reasons) here, since that's the option that is a pure reorder buffer, acting before any real blobpool logic. So, at least in theory, that's the least invasive version. However, I can imagine moving to a more complex evaluation before storing the gapped transaction. The |
||
| addGappedMeter.Mark(1) | ||
| // Store the tx in memory, and revalidate later | ||
| from, _ := types.Sender(p.signer, tx) | ||
| allowance := p.gappedAllowance(from) | ||
| if allowance >= 1 && len(p.gapped) < maxGapped { | ||
| // if maxGapped is reached, it is better to give time to gapped | ||
| // transactions by keeping the old and dropping this one | ||
| p.gapped[from] = append(p.gapped[from], &gappedTx{tx: tx, timestamp: time.Now()}) | ||
| p.gappedSource[tx.Hash()] = from | ||
| log.Trace("blobpool:add added to Gapped blob queue", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) | ||
| return nil | ||
| } else { | ||
| log.Trace("blobpool:add no Gapped blob queue allowance", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) | ||
| } | ||
| case errors.Is(err, core.ErrInsufficientFunds): | ||
| addOverdraftedMeter.Mark(1) | ||
| case errors.Is(err, txpool.ErrAccountLimitExceeded): | ||
|
|
@@ -1867,6 +1910,58 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { | |
| p.updateStorageMetrics() | ||
|
|
||
| addValidMeter.Mark(1) | ||
|
|
||
| // Notify all listeners of the new arrival | ||
| p.discoverFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}}) | ||
| p.insertFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}}) | ||
|
|
||
| //check the gapped queue for this account and try to promote | ||
| if gtxs, ok := p.gapped[from]; checkGapped && ok && len(gtxs) > 0 { | ||
| // We have to add in nonce order, but we want to stable sort to cater for situations | ||
| // where transactions are replaced, keeping the original receive order for same nonce | ||
| sort.SliceStable(gtxs, func(i, j int) bool { | ||
| return gtxs[i].tx.Nonce() < gtxs[j].tx.Nonce() | ||
| }) | ||
| for len(gtxs) > 0 { | ||
cskiraly marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| stateNonce := p.state.GetNonce(from) | ||
| firstgap := stateNonce + uint64(len(p.index[from])) | ||
|
|
||
| if gtxs[0].tx.Nonce() > firstgap { | ||
| // Anything beyond the first gap is not addable yet | ||
| break | ||
| } | ||
|
|
||
| // Drop any buffered transactions that became stale in the meantime (included in chain or replaced) | ||
| // If we arrive to the transaction in the pending range (between the state Nonce and first gap, we | ||
| // try to add them now while removing from here. | ||
| tx := gtxs[0].tx | ||
| gtxs[0] = nil | ||
| gtxs = gtxs[1:] | ||
| delete(p.gappedSource, tx.Hash()) | ||
|
|
||
| if tx.Nonce() < stateNonce { | ||
| // Stale, drop it. Eventually we could add to limbo here if hash matches. | ||
| log.Trace("Gapped blob transaction became stale", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "state", stateNonce, "qlen", len(p.gapped[from])) | ||
| continue | ||
| } | ||
|
|
||
| if tx.Nonce() <= firstgap { | ||
| // If we hit the pending range, including the first gap, add it and continue to try to add more. | ||
| // We do not recurse here, but continue to loop instead. | ||
| // We are under lock, so we can add the transaction directly. | ||
| if err := p.addLocked(tx, false); err == nil { | ||
| log.Trace("Gapped blob transaction added to pool", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) | ||
| } else { | ||
| log.Trace("Gapped blob transaction not accepted", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "err", err) | ||
| } | ||
| } | ||
| } | ||
| if len(gtxs) == 0 { | ||
| delete(p.gapped, from) | ||
| } else { | ||
| p.gapped[from] = gtxs | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -2098,6 +2193,50 @@ func (p *BlobPool) Nonce(addr common.Address) uint64 { | |
| return p.state.GetNonce(addr) | ||
| } | ||
|
|
||
| // gappedAllowance returns the number of gapped transactions still | ||
| // allowed for the given account. Allowance is based on a slow-start | ||
| // logic, allowing more gaps (resource usage) to accounts with a | ||
| // higher nonce. Can also return negative values. | ||
| func (p *BlobPool) gappedAllowance(addr common.Address) int { | ||
| // Gaps happen, but we don't want to allow too many. | ||
| // Use log10(nonce+1) as the allowance, with a minimum of 0. | ||
| nonce := p.state.GetNonce(addr) | ||
| allowance := int(math.Log10(float64(nonce + 1))) | ||
| // Cap the allowance to the remaining pool space | ||
| return min(allowance, maxTxsPerAccount-len(p.index[addr])) - len(p.gapped[addr]) | ||
| } | ||
|
|
||
| // evictGapped removes the old transactions from the gapped reorder buffer. | ||
| // Concurrency: The caller must hold the pool lock before calling this function. | ||
| func (p *BlobPool) evictGapped() { | ||
| cutoff := time.Now().Add(-gappedLifetime) | ||
| for from, txs := range p.gapped { | ||
| nonce := p.state.GetNonce(from) | ||
| // Reuse the original slice to avoid extra allocations. | ||
| // This is safe because we only keep references to the original gappedTx objects, | ||
| // and we overwrite the slice for this account after filtering. | ||
| keep := txs[:0] | ||
| for i, gtx := range txs { | ||
| if gtx.timestamp.Before(cutoff) || gtx.tx.Nonce() < nonce { | ||
| // Evict old or stale transactions | ||
| // Should we add stale to limbo here if it would belong? | ||
| delete(p.gappedSource, gtx.tx.Hash()) | ||
| txs[i] = nil // Explicitly nil out evicted element | ||
| } else { | ||
| keep = append(keep, gtx) | ||
| } | ||
| } | ||
| if len(keep) < len(txs) { | ||
| log.Trace("Evicting old gapped blob transactions", "count", len(txs)-len(keep), "from", from) | ||
| } | ||
| if len(keep) == 0 { | ||
| delete(p.gapped, from) | ||
| } else { | ||
| p.gapped[from] = keep | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Stats retrieves the current pool stats, namely the number of pending and the | ||
| // number of queued (non-executable) transactions. | ||
| func (p *BlobPool) Stats() (int, int) { | ||
|
|
@@ -2132,9 +2271,15 @@ func (p *BlobPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*ty | |
| // Status returns the known status (unknown/pending/queued) of a transaction | ||
| // identified by their hashes. | ||
| func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus { | ||
| if p.Has(hash) { | ||
| p.lock.RLock() | ||
| defer p.lock.RUnlock() | ||
|
|
||
| if p.lookup.exists(hash) { | ||
| return txpool.TxStatusPending | ||
| } | ||
| if _, gapped := p.gappedSource[hash]; gapped { | ||
| return txpool.TxStatusQueued | ||
| } | ||
| return txpool.TxStatusUnknown | ||
| } | ||
|
|
||
|
|
||