交易内存池底层实现

本文最后更新于:2023年6月19日 晚上

既然要从头搭建一个区块链网络,交易池是绕不开的一个核心底层。对于它,以下几个问题是需要解决的:

  1. 用户发起的交易请求(不论以何种方式)是如何转变为实际的交易的;
  2. 产生的交易如果不能被及时处理,是如何进入交易池的;
  3. 本地产生的交易和从网络中接收到的交易进入交易池是否有区别;
  4. 交易池的基本数据结构是怎么样的(优先队列?)
  5. 从交易池中提取交易进行打包时顺序是怎么样的(调度算法)

    网络中很多文章提到内存池,指的就是是 txpool

交易形成

以下均为作品赛项目的内容。
首先,我们的交易请求会赋值到 TxRaw 结构体的一个实例中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//用于接收客户端发来的序列化post原始信息
type TxRaw struct {
//公钥hash
Address []byte `json:"address"`

Fee int `json:"int"`

//交易核心
RangeBloom bloom.RangeBloom `json:"rbloom"`
TimeBloom bloom.TimeBloom `json:"tbloom"`

//签名相关
Pubkey sm2.PublicKey `json:"pubkey"`
Sign []byte `json:"sign"`

//交易同态密文拼接,用于k-top查找
Cipher string `json:"cipher"`
}

然后这个实例被传递到一个 SendTransaction 函数,用来创建一个交易 Tx。创建交易的过程如下:

  1. 根据 Address 字段找到当前账户
  2. 设置交易默认参数
  3. 检查交易签名是否合法
  4. 检查交易是否冲突
  5. 对交易进行序列化,变为可存储和传输的形式。
  6. 提交交易到交易池

交易管理

以太坊将交易按状态分为两部分:可执行交易非可执行交易。分别记录在 pending 容器中和 queue 容器中。

如上图所示,交易池先采用一个 txLookup (内部为 map)跟踪所有交易。同时将交易根据本地优先,价格优先原则将交易划分为两部分 queue 和 pending。而这两部交易则按账户分别跟踪。
为了不丢失未完成的本地交易,以太坊交易池通过 journal 文件存储和管理当前交易池中的本地交易,并定期更新存储。
下图是交易池对本地待处理交易的磁盘存储管理流程,涉及加载、实时写入和定期更新维护。

存储交易


当交易池新交易来自于本地账户时 ❶,如果已开启记录本地交易,则将此交易加入 journal ❷。到交易池时,将实时存储到 journal 文件中。

1
2
3
4
5
6
7
8
9
10
//core/tx_pool.go:757
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
// Only journal if it's enabled and the transaction is local
if pool.journal == nil || !pool.locals.contains(from) {//❶
return
}
if err := pool.journal.insert(tx); err != nil { //❷
log.Warn("Failed to journal local transaction", "err", err)
}
}

而 journal.insert 则将交易实时写入文件流中 ❸,相当于实时存储到磁盘。而在写入时,是将交易进行 RLP 编码。

1
2
3
4
5
6
7
8
9
10
//core/tx_journal.go:120
func (journal *txJournal) insert(tx *types.Transaction) error {
if journal.writer == nil {
return errNoActiveJournal
}
if err := rlp.Encode(journal.writer, tx); err != nil {//❸
return err
}
return nil
}

插一嘴,为什么要用 RLP 编码?

什么是 RLP

RLP(Recursive Length Prefix) 递归长度前缀编码是以太坊中最常使用的序列化格式方法。
RLP 旨在成为高度简约的序列化方法唯一目标就是存储嵌套的字节数组
RLP 只是以嵌套数组形式存储结构型数据,由上层协议来确定数组的含义。

从图中可以看出,不同类型的数据,将有不同的前缀标识。 前缀也可以理解为报文头,通过报文头可准确获取报文内容。 图中灰色部分则为 RLP 编码输出前缀。

加载已存储交易

在交易池首次启动 journal 时,将主动将该文件已存储的交易加载到交易池。

1
2
3
4
5
6
7
8
9
10
//core/tx_journal.go:61
if _, err := os.Stat(journal.path); os.IsNotExist(err) { //❶
return nil
}
// Open the journal for loading any past transactions
input, err := os.Open(journal.path) //❷
if err != nil {
return err
}
defer input.Close()

处理时,如果文件不存在则退出 ❶,否则 Open 文件,获得 input 文件流 ❷。

1
2
3
//core/tx_journal.go:76
stream := rlp.NewStream(input, 0)//❸
total, dropped := 0, 0

因为存储的内容格式是 rlp 编码内容,因此可以直接初始化 rlp 内容流 ❸,为连续解码做准备。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
var (
failure error
batch types.Transactions
)
for {
tx := new(types.Transaction)
if err = stream.Decode(tx); err != nil { //❹
if err != io.EOF {
failure = err
}
if batch.Len() > 0 {//❼
loadBatch(batch)
}
break
}
total++

if batch = append(batch, tx); batch.Len() > 1024 {//❺
loadBatch(batch)//❻
batch = batch[:0]
}
}

loadBatch := func(txs types.Transactions) {
for _, err := range add(txs) {
if err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++ //❽
}
}
}

直接进入 for 循环遍历,不断从 stream 中一笔笔地解码出交易 ❹。
但交易并非单笔直接载入交易池,而是采用批量提交模式,每 1024 笔交易提交一次 ❺。
批量写入,有利于降低交易池在每次写入交易后的更新。一个批次只需要更新(排序与超限处理等)一次。当然在遍历结束时(err==io.EOF),也需要将当前批次中的交易载入 ❼。
loadBatch 就是将交易一批次加入到交易池,并获得交易池的每笔交易的处理情况。如果交易加入失败,则进行计数 ❽。最终在 load 方法执行完毕时,显示交易载入情况。

交易添加到交易池

SendTransaction 最后调用 SubmitTransaction 函数将交易提交到交易池,不过,更底层的调用是 AddLocals 函数。
这里需要注意,本地交易时调用 AddLocals,而从对等 p2p 节点发来的交易调用的是另一个函数,AddRemotes 函数。调用这两个函数之前都应该验证交易的有效性。

因为交易时通过节点的 API 接收,因此此交易被视为一笔来自本地(local)(图中用红球表示),在经过一系列校验和处理后。交易成功进入交易池,随后向已连接的邻近节点发送此交易。
当邻近节点,如矿工节点从邻近节点接收到此交易时,在进入交易池之前。会将交易标记为来自远方(remote)的交易(图中用绿球表示)。也需要经过校验和处理后,进入矿工节点的交易池,等待矿工打包到区块中。
如果邻近节点,不是矿工,也无妨。因为任何节点会默认将接受到得合法交易及时发送给邻近节点。得益于 P2P 网络,一笔交易平均在 6s 内扩散到整个以太坊公链网络的各个节点中。


下面我们来看一下交易池的结构:

这是以太坊的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

istanbul bool // Fork indicator whether we are in the istanbul stage.

currentState *state.StateDB // Current state in the blockchain head
pendingNonces *txNoncer // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps

locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk

pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price

chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
}

其中最核心的是两个字段,pending 和 queue
queued 存放未来的、当前无法执行的交易。以太坊使用 nonce 值决定某个账户的交易顺序,多条交易值 nonce 值必须连续,如果和过去的交易不连续,则无法执行,我们不妨使用 nonce 值,标记交易的号码,nonce 为 10 的交易,称为第 10 号交易。举个例子,当前账户的 nonce 是 10,txpool 中有该账户的第 100 号交易,但 txpool 中没有第 1199 号交易,这些交易的缺失,造成第 100 号交易无法执行,所以第 100 号交易就是未来的交易、不可执行的交易,存放在 queue 中。
pending 存放可执行的交易。比如我们把上面的 11
99 号交易补全了,那么 11~100 号交易都可以进入到 pending,因为这些交易都是连续的,都可以打包进区块。
当节点收到交易(本地节点发起的或 peer 广播来的)时,会先存放到 queued,txpool 在某些情况下,把 queued 中可执行的交易,转移到 pending 中。
其中 config 的结构体为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// TxPoolConfig are the configuration parameters of the transaction pool.
type TxPoolConfig struct {
Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal

PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)

AccountSlots uint64 // Number of executable transaction slots guaranteed per account
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued

// Quorum
TransactionSizeLimit uint64 // Maximum size allowed for valid transaction (in KB)
MaxCodeSize uint64 // Maximum size allowed of contract code that can be deployed (in KB)

}

var DefaultTxPoolConfig = TxPoolConfig{
Journal: "transactions.rlp",
Rejournal: time.Hour,

PriceLimit: 1,
PriceBump: 10,

AccountSlots: 16,
GlobalSlots: 4096,
AccountQueue: 64,
GlobalQueue: 1024,

Lifetime: 3 * time.Hour,

// Quorum
TransactionSizeLimit: 64,
MaxCodeSize: 24,
}
  • Locals: 定义了一组视为 local 交易的账户地址。任何来自此清单的交易均被视为 local 交易。
  • NoLocals: 是否禁止 local 交易处理。默认为 fasle,允许 local 交易。如果禁止,则来自 local 的交易均视为 remote 交易处理。
  • Journal: 存储 local 交易记录的文件名,默认是./transactions.rlp。
  • Rejournal:定期将 local 交易存储文件中的时间间隔。默认为每小时一次。
  • PriceLimit: remote 交易进入交易池的最低 Price 要求。此设置对 local 交易无效。默认值 1。
  • PriceBump:替换交易时所要求的价格上调涨幅比例最低要求。任何低于要求的替换交易均被拒绝。
  • AccountSlots: 当交易池中可执行交易(是已在等待矿工打包的交易)量超标时,允许每个账户可以保留在交易池最低交易数。默认值是 16 笔。
  • GlobalSlots: 交易池中所允许的可执行交易量上限,高于上限时将释放部分交易。默认是 4096 笔交易。
  • AccountQueue:交易池中单个账户非可执行交易上限,默认是 64 笔。
  • GlobalQueue: 交易池中所有非可执行交易上限,默认 1024 笔。
  • Lifetime: 允许 remote 的非可执行交易可在交易池存活的最长时间。交易池每分钟检查一次,一旦发现有超期的 remote 账户,则移除该账户下的所有非可执行交易。默认为 3 小时。

    参考:https://learnblockchain.cn/books/geth/part2/txpool/txpool.html

上面配置中,包含两个重要概念可执行交易非可执行交易。可执行交易是指从交易池中择优选出的一部分交易可以被执行,打包到区块中。非可执行交易则相反,任何刚进入交易池的交易均属于非可执行状态,在某一个时刻才会提升为可执行状态。

这是 dag 的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type TxPool struct {
// The following variables must only be used atomically.
lastUpdated int64 // last time pool was updated

mtx sync.RWMutex
cfg Config
pool map[chainhash.Hash]*TxDesc
orphans map[chainhash.Hash]*orphanTx
orphansByPrev map[wire.OutPoint]map[chainhash.Hash]*soterutil.Tx
outpoints map[wire.OutPoint]*soterutil.Tx
pennyTotal float64 // exponentially decaying total for penny spends.
lastPennyUnix int64 // unix time of last ``penny spend''

// nextExpireScan is the time after which the orphan pool will be
// scanned in order to evict orphans. This is NOT a hard deadline as
// the scan will only run when an orphan is added to the pool as opposed
// to on an unconditional timer.
nextExpireScan time.Time
}

交易提交到交易池还需要一个 add 函数,逻辑很简单:

  1. 验证交易的有效性
  2. 如果 nonce 已存在,忽略。
  3. 如果 nonce 不存在,不可以替换 pending 中的任何交易,此时将新的交易插入 queue 的末尾

注:交易中的 nonce 指的是 from 账户发出交易的次数, 从 0 开始递增,同一账户的交易会被依次确认,所以同一个 nonce 代表是同一个交易,会优先选择 price 更高的交易。

这是我们的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type TxPool struct {
mtx sync.RWMutex
config TxPoolConfig
currentState *state.StateDB // 区块的当前状态
locals *accountSet //本地交易
journal *txJournal // 用于存储local交易记录的文件

pending map[common.Address]*txList // 存放可执行的交易
queue map[common.Address]*txList // queued存放未来的、当前无法执行的交易

chainHeadSub event.Subscription//用于订阅区块
reqPromoteCh chan *accountSet//账户集合
queueTxEventCh chan *dag.Tx//Tx队列事件
}

type TxPoolConfig struct {
Locals []common.Address
Journal string
Rejournal time.Duration

AccountSlots uint64
GlobalSlots uint64
AccountQueue uint64
GlobalQueue uint64

Lifetime time.Duration
}

var DefaultTxPoolConfig = TxPoolConfig{
Journal: "transactions.rlp",
Rejournal: time.Hour,

AccountSlots: 16,
GlobalSlots: 8192,
AccountQueue: 64,
GlobalQueue: 1024,

Lifetime: 3 * time.Hour,
}

当节点收到交易(本地节点发起的或 peer 广播来的)时,会先存放到 queued,txpool 在某些情况下,把 queued 中可执行的交易,转移到 pending 中。

  • Locals: 定义了一组视为 local 交易的账户地址。任何来自此清单的交易均被视为 local 交易。
  • Journal: 存储 local 交易记录的文件名,默认是./transactions.rlp。
  • Rejournal:定期将 local 交易存储文件中的时间间隔。默认为每小时一次。
  • AccountSlots: 当交易池中可执行交易(是已在等待矿工打包的交易)量超标时,允许每个账户可以保留在交易池最低交易数。默认值是 16 笔。
  • GlobalSlots: 交易池中所允许的可执行交易量上限,高于上限时将释放部分交易。默认是 8192 笔交易。
  • AccountQueue:交易池中单个账户非可执行交易上限,默认是 64 笔。
  • GlobalQueue: 交易池中所有非可执行交易上限,默认 1024 笔。
  • Lifetime: 允许 remote 的非可执行交易可在交易池存活的最长时间。交易池每分钟检查一次,一旦发现有超期的 remote 账户,则移除该账户下的所有非可执行交易。默认为 3 小时。

链状态

在交易池启动后,将订阅链的区块头事件:

1
2
//core/tx_pool.go:274
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)

并开始监听新事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//core/tx_pool.go:305
for {
select {
// Handle ChainHeadEvent
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
pool.mu.Lock()
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
pool.homestead = true
}
pool.reset(head.Header(), ev.Block.Header())
head = ev.Block

pool.mu.Unlock()
}
//...
}
}

接收到事件后,将执行 func (pool *TxPool) reset(oldHead, newHead *types.Header)方法更新 state 和处理交易。核心是将交易池中已经不符合要求的交易删除并更新整理交易

本地交易

在交易池中将交易标记为 local 的有多种用途:

  1. 在本地磁盘存储已发送的交易。这样,本地交易不会丢失,重启节点时可以重新加载到交易池,实时广播出去。
  2. 可以作为外部程序和以太坊沟通的一个渠道。外部程序只需要监听文件内容变化,则可以获得交易清单。
  3. local 交易可优先于 remote 交易。对交易量的限制等操作,不影响 local 下的账户和交易。

对应本地交易存储,在启动交易池时根据配置开启本地交易存储能力:

1
2
3
4
5
6
7
8
//core/tx_pool.go:264
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
//...
}

并从磁盘中加载已有交易到交易池。在新的 local 交易进入交易池时,将被实时写入 journal 文件。

1
2
3
4
5
6
7
8
9
// core/tx_pool.go:757
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
if pool.journal == nil || !pool.locals.contains(from) {
return
}
if err := pool.journal.insert(tx); err != nil {
log.Warn("Failed to journal local transaction", "err", err)
}
}

从上可看到,只有属于 local 账户的交易才会被记录。你又没有注意到,如果仅仅是这样的话,journal 文件是否会跟随本地交易而无限增长?答案是否定的,虽然无法实时从 journal 中移除交易。但是支持定期更新 journal 文件
journal 并不是保存所有的本地交易以及历史,他仅仅是存储当前交易池中存在的本地交易。因此交易池会定期对 journal 文件执行 rotate,将交易池中的本地交易写入 journal 文件,并丢弃旧数据。

1
2
3
4
5
6
7
8
9
10
11
12
journal := time.NewTicker(pool.config.Rejournal)
//...
//core/tx_pool.go:353
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
}
}

新交易信号

文章开头,有提到进入交易池的交易将被广播到网络中。这是依赖于交易池支持外部订阅新交易事件信号。任何订阅此事件的子模块,在交易池出现新的可执行交易时,均可实时接受到此事件通知,并获得新交易信息。

需要注意的是并非所有进入交易池的交易均被通知外部,而是只有交易从非可执行状态变成可执行状态后才会发送信号。

1
2
3
4
//core/tx_pool.go:705
go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
//core/tx_pool.go:1022
go pool.txFeed.Send(NewTxsEvent{promoted})

在交易池中,有两处地方才会执行发送信号。
一是交易时用于替换已经存在的可执行交易时。
二是有新的一批交易从非可执行状态提升到可执行状态后
外部只需要订阅 SubscribeNewTxsEvent(ch chan<- NewTxsEvent)新可执行交易事件,则可实时接受交易。
在 geth 中网络层将订阅交易事件,以便实时广播。

1
2
3
4
5
6
7
8
9
10
11
12
//eth/handler.go:213
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
//eth/handler.go:781
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
case event := <-pm.txsCh:
pm.BroadcastTxs(event.Txs)
//...
}
}

另外是矿工实时订阅交易,以便将交易打包到区块中。

1
2
3
4
5
6
7
8
9
10
//miner/worker.go:207
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
//miner/worker.go:462
txs := make(map[common.Address]types.Transactions)
for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], tx)
}
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs)
w.commitTransactions(txset, coinbase, nil)

清理交易池

交易池是完全存在内存中的,因此有大小限制,每当超过一定的阈值就需要清理。实际实现时,pending 的缓冲区容量默认为 4096,queue 的缓冲区容量默认为 1024。
清理的时机是交易池满的时候,清理的原则是价格较低的最先清理

但是本项目没有设计要清理交易池

惩罚恶意账号

这也是 txpool 很重要的一个属性,可以防止恶意账户以发起大量垃圾交易。防止恶意用户造成:

  1. 占用 txpool 空间
  2. 浪费节点大量内存和 CPU
  3. 降低打包性能

** 只有当交易的总数量超过缓冲区大小时,txpool 才会认为有恶意账户发起大量交易。**pending 和 queued 缓冲区大小不同,但处理策略类似:

  1. pending 的缓冲区容量是 4096,当 pending 的交易数量多于此时,就会运行检查,每个账号的交易数量是否多于 16,把这些账号搜集出来,进行循环依次清理,什么意思呢?就是每轮只删除(移动到 queued)这些账号的每个账号 1 条交易,然后看数量是否降下来了,不满足再进行下一轮,直到满足。
  2. queued 的缓冲区容量是 1024,超过之后清理策略和 pending 差不多,但这里可是真删除了。

该部分功能未抽象成单独的函数,而是在 promoteExecutables()中,就是在每次把 queued 交易转移到 pending 后执行的。
本地交易的特权,txpool 虽然对交易有诸多限制,但如果交易是本节点的账号发起的,以上数量限制等都对他无效。所以,如果你用本节点账号不停的发送交易,并不会被认为是攻击者

回答问题

  1. 用户发起的交易请求(不论以何种方式)是如何转变为实际的交易的;

所有与交易请求相关的参数被赋值到一个结构体中,然后进行序列化转变为可存储和传输的形式,最后生成交易并进行签名

  1. 产生的交易如果不能被及时处理,是如何进入交易池的?

最终是调用一个 add 函数,添加到了一个队列里

  1. 本地产生的交易和从网络中接收到的交易进入交易池是否有区别?

有区别,底层都是调用 add 函数

  1. 交易池的基本数据结构是怎么样的(优先队列?)

交易池是一个结构体,核心是 pending 和 queue 两个 map,map 的键是一个地址,值是一个交易链表形成的队列

  1. 从交易池中提取交易进行打包时顺序是怎么样的(调度算法)

本地优先级大于远程交易

我们可以理解为区块链底层利用交易池对并发产生的请求做了异步化交易产生的时刻交易被打包的时刻随机的。

在实现 TXpool 的时候为了保证数据的一致性会使用大量的锁