分享
  1. 首页
  2. 文章

Derek解读Bytom源码-P2P网络 地址簿

比原链Bytom · · 1218 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

作者:Derek

简介

Github地址:https://github.com/Bytom/bytom

Gitee地址:https://gitee.com/BytomBlockc...

本章介绍bytom代码P2P网络中addrbook地址簿

作者使用MacOS操作系统,其他平台也大同小异

Golang Version: 1.8

addrbook介绍

addrbook用于存储P2P网络中保留最近的对端节点地址
在MacOS下,默认的地址簿路径存储在~/Library/Bytom/addrbook.json

地址簿格式

~/Library/Bytom/addrbook.json

{
 "Key": "359be6d08bc0c6e21c84bbb2",
 "Addrs": [
 {
 "Addr": {
 "IP": "122.224.11.144",
 "Port": 46657
 },
 "Src": {
 "IP": "198.74.61.131",
 "Port": 46657
 },
 "Attempts": 0,
 "LastAttempt": "2018年05月04日T12:58:23.894057702+08:00",
 "LastSuccess": "0001年01月01日T00:00:00Z",
 "BucketType": 1,
 "Buckets": [
 181,
 10
 ]
 }
 ]
}

地址类型

在addrbook中存储的地址有两种:
p2p/addrbook.go

const (
 bucketTypeNew = 0x01 // 标识新地址,不可靠地址(未成功连接过)。只存储在一个bucket中
 bucketTypeOld = 0x02 // 标识旧地址,可靠地址(已成功连接过)。可以存储在多个bucket中,最多为maxNewBucketsPerAddress个
)

<font color=red>注意: 一个地址的类型变更不在此文章中做介绍,后期的文章会讨论该问题</font>

地址簿相关结构体

地址簿

type AddrBook struct {
 cmn.BaseService
 mtx sync.Mutex
 filePath string // 地址簿路径
 routabilityStrict bool // 是否可路由,默认为true
 rand *rand.Rand 
 key string // 地址簿标识,用于计算addrNew和addrOld的索引
 ourAddrs map[string]*NetAddress // 存储本地网络地址,用于添加p2p地址时做排除使用
 addrLookup map[string]*knownAddress // 存储新、旧地址集,用于查询
 addrNew []map[string]*knownAddress // 存储新地址
 addrOld []map[string]*knownAddress // 存储旧地址
 wg sync.WaitGroup
 nOld int // 旧地址数量
 nNew int // 新地址数量
}

已知地址

type knownAddress struct {
 Addr *NetAddress // 已知peer的addr
 Src *NetAddress // 已知peer的addr的来源addr
 Attempts int32 // 连接peer的重试次数
 LastAttempt time.Time // 最近一次尝试连接的时间
 LastSuccess time.Time // 最近一次尝试成功连接的时间
 BucketType byte // 地址的类型(表示可靠地址或不可靠地址)
 Buckets []int // 当前addr所属的buckets
}

routabilityStrict参数表示地址簿是否存储的ip是否可路由。可路由是根据RFC划分,具体参考资料:RFC标准

初始化地址簿

// NewAddrBook creates a new address book.
// Use Start to begin processing asynchronous address updates.
func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
 am := &AddrBook{
 rand: rand.New(rand.NewSource(time.Now().UnixNano())),
 ourAddrs: make(map[string]*NetAddress),
 addrLookup: make(map[string]*knownAddress),
 filePath: filePath,
 routabilityStrict: routabilityStrict,
 }
 am.init()
 am.BaseService = *cmn.NewBaseService(nil, "AddrBook", am)
 return am
}
// When modifying this, don't forget to update loadFromFile()
func (a *AddrBook) init() {
 // 地址簿唯一标识
 a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits
 // New addr buckets, 默认为256个大小
 a.addrNew = make([]map[string]*knownAddress, newBucketCount)
 for i := range a.addrNew {
 a.addrNew[i] = make(map[string]*knownAddress)
 }
 // Old addr buckets,默认为64个大小
 a.addrOld = make([]map[string]*knownAddress, oldBucketCount)
 for i := range a.addrOld {
 a.addrOld[i] = make(map[string]*knownAddress)
 }
}

bytomd启动时加载本地地址簿

loadFromFile在bytomd启动时,首先会加载本地的地址簿

// OnStart implements Service.
func (a *AddrBook) OnStart() error {
 a.BaseService.OnStart()
 a.loadFromFile(a.filePath)
 a.wg.Add(1)
 go a.saveRoutine()
 return nil
}
// Returns false if file does not exist.
// cmn.Panics if file is corrupt.
func (a *AddrBook) loadFromFile(filePath string) bool {
 // If doesn't exist, do nothing.
 // 如果本地地址簿不存在则直接返回
 _, err := os.Stat(filePath)
 if os.IsNotExist(err) {
 return false
 }
 // 加载地址簿json内容
 // Load addrBookJSON{}
 r, err := os.Open(filePath)
 if err != nil {
 cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err))
 }
 defer r.Close()
 aJSON := &addrBookJSON{}
 dec := json.NewDecoder(r)
 err = dec.Decode(aJSON)
 if err != nil {
 cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err))
 }
 // 填充addrNew、addrOld等
 // Restore all the fields...
 // Restore the key
 a.key = aJSON.Key
 // Restore .addrNew & .addrOld
 for _, ka := range aJSON.Addrs {
 for _, bucketIndex := range ka.Buckets {
 bucket := a.getBucket(ka.BucketType, bucketIndex)
 bucket[ka.Addr.String()] = ka
 }
 a.addrLookup[ka.Addr.String()] = ka
 if ka.BucketType == bucketTypeNew {
 a.nNew++
 } else {
 a.nOld++
 }
 }
 return true
}

定时更新地址簿

bytomd会定时更新本地地址簿,默认2分钟一次

func (a *AddrBook) saveRoutine() {
 dumpAddressTicker := time.NewTicker(dumpAddressInterval)
out:
 for {
 select {
 case <-dumpAddressTicker.C:
 a.saveToFile(a.filePath)
 case <-a.Quit:
 break out
 }
 }
 dumpAddressTicker.Stop()
 a.saveToFile(a.filePath)
 a.wg.Done()
 log.Info("Address handler done")
}
func (a *AddrBook) saveToFile(filePath string) {
 log.WithField("size", a.Size()).Info("Saving AddrBook to file")
 a.mtx.Lock()
 defer a.mtx.Unlock()
 // Compile Addrs
 addrs := []*knownAddress{}
 for _, ka := range a.addrLookup {
 addrs = append(addrs, ka)
 }
 aJSON := &addrBookJSON{
 Key: a.key,
 Addrs: addrs,
 }
 jsonBytes, err := json.MarshalIndent(aJSON, "", "\t")
 if err != nil {
 log.WithField("err", err).Error("Failed to save AddrBook to file")
 return
 }
 err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644)
 if err != nil {
 log.WithFields(log.Fields{
 "file": filePath,
 "err": err,
 }).Error("Failed to save AddrBook to file")
 }
}

添加新地址

当peer之间交换addr时,节点会收到对端节点已知的地址信息,这些信息会被当前节点添加到地址簿中

func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) {
 a.mtx.Lock()
 defer a.mtx.Unlock()
 log.WithFields(log.Fields{
 "addr": addr,
 "src": src,
 }).Debug("Add address to book")
 a.addAddress(addr, src)
}
func (a *AddrBook) addAddress(addr, src *NetAddress) {
 // 验证地址是否为可路由地址
 if a.routabilityStrict && !addr.Routable() {
 log.Error(cmn.Fmt("Cannot add non-routable address %v", addr))
 return
 }
 // 验证地址是否为本地节点地址
 if _, ok := a.ourAddrs[addr.String()]; ok {
 // Ignore our own listener address.
 return
 }
 // 验证地址是否存在地址集中
 // 如果存在:则判断该地址是否为old可靠地址、是否超过了最大buckets中。否则根据该地址已经被ka.Buckets引用的个数来随机决定是否添加到地址集中
 // 如果不存在:则添加到地址集中。并标识为bucketTypeNew地址类型
 ka := a.addrLookup[addr.String()]
 if ka != nil {
 // Already old.
 if ka.isOld() {
 return
 }
 // Already in max new buckets.
 if len(ka.Buckets) == maxNewBucketsPerAddress {
 return
 }
 // The more entries we have, the less likely we are to add more.
 factor := int32(2 * len(ka.Buckets))
 if a.rand.Int31n(factor) != 0 {
 return
 }
 } else {
 ka = newKnownAddress(addr, src)
 }
 // 找到该地址在地址集的索引位置并添加
 bucket := a.calcNewBucket(addr, src)
 a.addToNewBucket(ka, bucket)
 log.Info("Added new address ", "address:", addr, " total:", a.size())
}

选择最优节点

地址簿中存储众多地址,在p2p网络中需选择最优的地址去连接
PickAddress(newBias int)函数中newBias是由pex_reactor产生的地址评分。如何计算地址分数在其他章节中再讲
根据地址评分随机选择地址可增加区块链安全性

// Pick an address to connect to with new/old bias.
func (a *AddrBook) PickAddress(newBias int) *NetAddress {
 a.mtx.Lock()
 defer a.mtx.Unlock()
 if a.size() == 0 {
 return nil
 }
 // newBias地址分数限制在0-100分数之间
 if newBias > 100 {
 newBias = 100
 }
 if newBias < 0 {
 newBias = 0
 }
 // Bias between new and old addresses.
 oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(newBias))
 newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias)
 // 根据地址分数计算是否从addrOld或addrNew中随机选择一个地址
 if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation {
 // pick random Old bucket.
 var bucket map[string]*knownAddress = nil
 num := 0
 for len(bucket) == 0 && num < oldBucketCount {
 bucket = a.addrOld[a.rand.Intn(len(a.addrOld))]
 num++
 }
 if num == oldBucketCount {
 return nil
 }
 // pick a random ka from bucket.
 randIndex := a.rand.Intn(len(bucket))
 for _, ka := range bucket {
 if randIndex == 0 {
 return ka.Addr
 }
 randIndex--
 }
 cmn.PanicSanity("Should not happen")
 } else {
 // pick random New bucket.
 var bucket map[string]*knownAddress = nil
 num := 0
 for len(bucket) == 0 && num < newBucketCount {
 bucket = a.addrNew[a.rand.Intn(len(a.addrNew))]
 num++
 }
 if num == newBucketCount {
 return nil
 }
 // pick a random ka from bucket.
 randIndex := a.rand.Intn(len(bucket))
 for _, ka := range bucket {
 if randIndex == 0 {
 return ka.Addr
 }
 randIndex--
 }
 cmn.PanicSanity("Should not happen")
 }
 return nil
}

移除一个地址

当一个地址被标记为Bad时则从地址集中移除。目前bytomd的代码版本并未调用过

func (a *AddrBook) MarkBad(addr *NetAddress) {
 a.RemoveAddress(addr)
}
// RemoveAddress removes the address from the book.
func (a *AddrBook) RemoveAddress(addr *NetAddress) {
 a.mtx.Lock()
 defer a.mtx.Unlock()
 ka := a.addrLookup[addr.String()]
 if ka == nil {
 return
 }
 log.WithField("addr", addr).Info("Remove address from book")
 a.removeFromAllBuckets(ka)
}
func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
 for _, bucketIdx := range ka.Buckets {
 bucket := a.getBucket(ka.BucketType, bucketIdx)
 delete(bucket, ka.Addr.String())
 }
 ka.Buckets = nil
 if ka.BucketType == bucketTypeNew {
 a.nNew--
 } else {
 a.nOld--
 }
 delete(a.addrLookup, ka.Addr.String())
}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:比原链Bytom

查看原文:Derek解读Bytom源码-P2P网络 地址簿

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1218 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏