分享
  1. 首页
  2. 文章

区块链教程Fabric1.0源代码分析流言算法Gossip服务端二

兄弟连区块链 · · 1286 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

兄弟连区块链教程Fabric1.0源代码分析流言算法Gossip服务端二

Fabric 1.0源代码笔记 之 gossip(流言算法) #GossipServer(Gossip服务端)

5.2、commImpl结构体方法

//conn.serviceConnection(),启动连接服务
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error
//return &proto.Empty{}
func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error)
func (c *commImpl) GetPKIid() common.PKIidType
//向指定节点发送消息
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)
//探测远程节点是否有响应,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Probe(remotePeer *RemotePeer) error
//握手验证远程节点,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error)
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage
func (c *commImpl) PresumedDead() <-chan common.PKIidType
func (c *commImpl) CloseConn(peer *RemotePeer)
func (c *commImpl) Stop()
//创建并启动gRPC Server,以及注册GossipServer实例
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
//将GossipServer实例注册至peerServer
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
func extractRemoteAddress(stream stream) string
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error) 
//创建gRPC Server,grpc.NewServer(serverOpts...)
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte)
//创建与服务端连接
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error)
//向指定节点发送消息
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage)
//return atomic.LoadInt32(&c.stopping) == int32(1)
func (c *commImpl) isStopping() bool
func (c *commImpl) emptySubscriptions()
func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error)
func (c *commImpl) disconnect(pkiID common.PKIidType)
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error)
//代码在gossip/comm/comm_impl.go

5.2.1、func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error)

创建并启动gRPC Server,以及注册GossipServer实例

func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
  secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {
  var ll net.Listener
  var s *grpc.Server
  var certHash []byte
  if len(dialOpts) == 0 {
    //peer.gossip.dialTimeout,gRPC连接拨号的超时
    dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}
  }
  if port > 0 {
    //创建gRPC Server,grpc.NewServer(serverOpts...)
    s, ll, secureDialOpts, certHash = createGRPCLayer(port)
  }
  commInst := &commImpl{
    selfCertHash: certHash,
    PKIID: idMapper.GetPKIidOfCert(peerIdentity),
    idMapper: idMapper,
    logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
    peerIdentity: peerIdentity,
    opts: dialOpts,
    secureDialOpts: secureDialOpts,
    port: port,
    lsnr: ll,
    gSrv: s,
    msgPublisher: NewChannelDemultiplexer(),
    lock: &sync.RWMutex{},
    deadEndpoints: make(chan common.PKIidType, 100),
    stopping: int32(0),
    exitChan: make(chan struct{}, 1),
    subscriptions: make([]chan proto.ReceivedMessage, 0),
  }
  commInst.connStore = newConnStore(commInst, commInst.logger)
  if port > 0 {
    commInst.stopWG.Add(1)
    go func() {
      defer commInst.stopWG.Done()
      s.Serve(ll) //启动gRPC Server
    }()
    //commInst注册到gRPC Server
    proto.RegisterGossipServer(s, commInst)
  }
  return commInst, nil
}
//代码在gossip/comm/comm_impl.go

5.2.2、func NewCommInstance(s grpc.Server, cert tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error)

将GossipServer实例注册至peerServer

func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
  peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,
  dialOpts ...grpc.DialOption) (Comm, error) {
  dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))
  //构造commImpl
  commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)
  if cert != nil {
    inst := commInst.(*commImpl)
    inst.selfCertHash = certHashFromRawCert(cert.Certificate[0])
  }
  proto.RegisterGossipServer(s, commInst.(*commImpl))
  return commInst, nil
}
//代码在gossip/comm/comm_impl.go

//创建与服务端连接

5.2.3、func (c commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (connection, error)

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
  var err error
  var cc *grpc.ClientConn
  var stream proto.Gossip_GossipStreamClient
  var pkiID common.PKIidType
  var connInfo *proto.ConnectionInfo
  var dialOpts []grpc.DialOption
  dialOpts = append(dialOpts, c.secureDialOpts()...)
  dialOpts = append(dialOpts, grpc.WithBlock())
  dialOpts = append(dialOpts, c.opts...)
  cc, err = grpc.Dial(endpoint, dialOpts...)
  cl := proto.NewGossipClient(cc)
  if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {
    cc.Close()
    return nil, err
  }
  ctx, cf := context.WithCancel(context.Background())
  stream, err = cl.GossipStream(ctx)
  connInfo, err = c.authenticateRemotePeer(stream)
  pkiID = connInfo.ID
  conn := newConnection(cl, cc, stream, nil)
  conn.pkiID = pkiID
  conn.info = connInfo
  conn.logger = c.logger
  conn.cancel = cf
  h := func(m *proto.SignedGossipMessage) {
    c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
      conn: conn,
      lock: conn,
      SignedGossipMessage: m,
      connInfo: connInfo,
    })
  }
  conn.handler = h
  return conn, nil
}
//代码在gossip/comm/comm_impl.go

6、connectionStore和connection结构体及方法

6.1、connection结构体及方法

type connection struct {
  cancel context.CancelFunc
  info *proto.ConnectionInfo
  outBuff chan *msgSending
  logger *logging.Logger // logger
  pkiID common.PKIidType // pkiID of the remote endpoint
  handler handler // function to invoke upon a message reception
  conn *grpc.ClientConn // gRPC connection to remote endpoint
  cl proto.GossipClient // gRPC stub of remote endpoint
  clientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpoint
  serverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpoint
  stopFlag int32 // indicates whether this connection is in process of stopping
  stopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routine
  sync.RWMutex // synchronizes access to shared variables
}
//构造connection
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection
//关闭connection
func (conn *connection) close()
//atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
func (conn *connection) toDie() bool
//conn.outBuff <- m,其中m为msgSending{envelope: msg.Envelope,onErr: onErr,}
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error))
//go conn.readFromStream(errChan, msgChan)、go conn.writeToStream(),同时msg := <-msgChan,conn.handler(msg)
func (conn *connection) serviceConnection() error
//循环不间断从conn.outBuff取数据,然后stream.Send(m.envelope)
func (conn *connection) writeToStream()
//循环不间断envelope, err := stream.Recv()、msg, err := envelope.ToGossipMessage()、msgChan <- msg
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage)
//获取conn.serverStream
func (conn *connection) getStream() stream
//代码在gossip/comm/conn.go

6.2、connectionStore结构体及方法

type connectionStore struct {
  logger *logging.Logger // logger
  isClosing bool // whether this connection store is shutting down
  connFactory connFactory // creates a connection to remote peer
  sync.RWMutex // synchronize access to shared variables
  pki2Conn map[string]*connection //connection map, key为pkiID,value为connection
  destinationLocks map[string]*sync.RWMutex //mapping between pkiIDs and locks,
  // used to prevent concurrent connection establishment to the same remote endpoint
}
//构造connectionStore
func newConnStore(connFactory connFactory, logger *logging.Logger) *connectionStore
//从connection map中获取连接,如无则创建并启动连接,并写入connection map中
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)
//连接数量
func (cs *connectionStore) connNum() int
//关闭指定连接
func (cs *connectionStore) closeConn(peer *RemotePeer)
//关闭所有连接
func (cs *connectionStore) shutdown()
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection
//注册连接
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection
//关闭指定连接
func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) 
//代码在gossip/comm/conn.go

6.2.1、func (cs connectionStore) getConnection(peer RemotePeer) (*connection, error)

func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
  cs.RLock()
  isClosing := cs.isClosing
  cs.RUnlock()
  pkiID := peer.PKIID
  endpoint := peer.Endpoint
  cs.Lock()
  destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
  if !hasConnected {
    destinationLock = &sync.RWMutex{}
    cs.destinationLocks[string(pkiID)] = destinationLock
  }
  cs.Unlock()
  destinationLock.Lock()
  cs.RLock()
  //从connection map中获取
  conn, exists := cs.pki2Conn[string(pkiID)]
  if exists {
    cs.RUnlock()
    destinationLock.Unlock()
    return conn, nil
  }
  cs.RUnlock()
  //创建连接
  createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
  destinationLock.Unlock()
  conn = createdConnection
  cs.pki2Conn[string(createdConnection.pkiID)] = conn
  go conn.serviceConnection() //启动连接的消息接收处理、以及向对方节点发送消息
  return conn, nil
}
//代码在gossip/comm/conn.go

7、ChannelDeMultiplexer结构体及方法(多路复用器)

type ChannelDeMultiplexer struct {
  channels []*channel
  lock *sync.RWMutex
  closed int32
}
//构造ChannelDeMultiplexer
func NewChannelDemultiplexer() *ChannelDeMultiplexer
//atomic.LoadInt32(&m.closed) == int32(1)
func (m *ChannelDeMultiplexer) isClosed() bool
//关闭
func (m *ChannelDeMultiplexer) Close() 
//添加通道
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} 
//挨个通道发送消息
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) 

欢迎继续关注兄弟连区块链教程分享!


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:兄弟连区块链

查看原文:区块链教程Fabric1.0源代码分析流言算法Gossip服务端二

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1286 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏