分享
  1. 首页
  2. 文章

用consul做grpc的服务发现

直抒胸臆 · · 4054 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

用consul做grpc的服务发现与健康检查

consul

服务发现与负载均衡

当server端是集群部署时,client调用server就需要用到服务发现与负载均衡。通常有两总方式:

  • 一种方式是在client与server之间加代理,由代理来做负载均衡
  • 一种方式是将服务注册到一个数据中心,client通过数据中心查询到所有服务的节点信息,然后自己选择负载均衡的策略。

第一种方式常见的就是用nginx给http服务做负载均衡,client端不直接与server交互,而是把请求并给nginx,nginx再转给后端的服务。
这种方式的优点是:

  • client和server无需做改造,client看不到server的集群,就像单点一样调用就可以

这种方式有几个缺点:

  • 所有的请求都必须经过代理,代理侧容易出现性能瓶颈
  • 代理不能出故障,一旦代理挂了服务就没法访问了。

第二种方式可以参考dubbo的rpc方式,所有的服务都注册在zookeeper上,client端从zookeeper订阅server的列表,然后自己选择把请求发送到哪个server上。对于上面提到的两个缺点,这种方式都很好的避免了:

  • client与server端是直接交互的,server可以做任意的水平扩展,不会出现性能瓶颈
  • 注册中心(zookeeper)通过raft算法实现分布式高可用,不用担心注册中心挂了服务信息丢失的情况。

这种方式的缺点就是实现起来比较复杂。

用第一种方式做grpc的负载均衡时可以有以下的选择:

用第二种方式时,可以选择的数据中心中间件有:

他们都实现了raft算法,都可以用来做注册中心,本篇文章选择consul是因为consul的特点就是做服务发现,有现成的api可以用。

用consul给golang的grpc做服务注册与发现

grpc的resolver

grpc的Dial()和DialContent()方法中都可以添加Load-Balance的选项,Dial方法已经被废弃了,本篇文章介绍使用DialContext的方法。

grpc官方实现了[dns_resolver]()用来做dns的负载均衡。我们通过例子看看grpc client端的代码是怎么写的,然后再理解dns_resolver的源码,最后参照dns_resolver来写自己的consul_resovler。

dns的负载均衡的例子:

package main
import (
 "context"
 "log"
 "google.golang.org/grpc"
 "google.golang.org/grpc/balancer/roundrobin"
 pb "google.golang.org/grpc/examples/helloworld/helloworld"
 "google.golang.org/grpc/resolver"
)
const (
 address = "dns:///dns-record-name:443"
 defaultName = "world"
)
func main() {
 // The secret sauce
 resolver.SetDefaultScheme("dns")
 // Set up a connection to the server.
 
 ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
 if err != nil {
 log.Fatalf("did not connect: %v", err)
 }
 defer conn.Close()
 c := pb.NewGreeterClient(conn)
 // Contact the servers in round-robin manner.
 for i := 0; i < 3; i++ {
 ctx := context.Background()
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})
 if err != nil {
 log.Fatalf("could not greet: %v", err)
 }
 log.Printf("Greeting: %s", r.Message)
 }
}

DialContext的定义如下:

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error)

下面这行代码指明了用dns_resolver,实际上也可以不写,grpc会根据DialContext的第二个参数target来判断选用哪个resolver,例子中传给DialContext的target是 dns:///dns-record-name:443,grpc会自动选择dns_resolver

resolver.SetDefaultScheme("dns")

下面的这个选项,指明了grpc用轮询做为负载均衡的策略

grpc.WithBalancerName(roundrobin.Name)

调用grpc.DialContext之后,grpc会找到对应的resovler,拿到服务的地址列表,然后在调用服务提供的接口时,根据指定的轮询策略选择一个服务。

gRPC Name Resolution里面说了,可以实现自定义的resolver作为插件。

先看看resolver.go的源码,源码路径是$GOPATH/src/google.golang.org/grpc/resolver/resolver.go

m = make(map[string]Builder) //scheme到Builder的map
func Register(b Builder) { //用于resolver注册的接口,dns_resolver.go的init方中调用了这个方法,实际就是更新了map
 m[b.Scheme()] = b
}
type Resolver interface {
 ResolveNow(ResolveNowOption) //立即resolve,重新查询服务信息
 Close() //关闭这个Resolver
}
type Target struct {//uri解析之后的对象, uri的格式详见RFC3986
 Scheme string
 Authority string
 Endpoint string
}
type Address struct {//描述一个服务的地址信息
 Addr string //格式是 host:port
 Type AddressType
 ServerName string
 Metadata interface{}
}
type ClientConn interface {//定义了两个callback函数,用于通知服务信息的更新
 NewAddress(addresses []Address)
 NewServiceConfig(serviceConfig string)
}
type Builder interface { 
 Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) //返回一个Resolver
 Scheme() string //返回scheme如 "dns", "passthrough", "consul"
}
func Get(scheme string) Builder { //grpc.ClientConn会高用这个方法获取指定的Builder接口的实例
 if b, ok := m[scheme]; ok {
 return b
 }
 return nil
}

即使加了注释,估计也很难马上理解这个其中的具体含意,博主也是结合dns_resolver.go,反复读了好几遍才理解resolver.go。其大致的意思是,grpc.DialContext方法调用之后:

  • 解析target(例如dns:///dns-record-name:443)获取scheme
  • 调用resolver.Get方法根据scheme拿到对应的Builder
  • 调用Builder.Build方法

    • 解析target
    • 获取服务地址的信息
    • 调用ClientConn.NewAddress和NewServiceConfig这两个callback把服务信息传递给上层的调用方
    • 返回Resolver接口实例给上层
  • 上层可以通过Resolver.ResolveNow方法主动刷新服务信息

了解了resolver源码的意思之后,再看一下dns_resolver.go就比较清晰了

//注册一个Builder到resolver的map里面
//这个方法会被默认调用,了解go的init可以自行百度
func init() { 
 resolver.Register(NewBuilder())
}
func NewBuilder() resolver.Builder {//创建一个resolver.Builder的实例
 return &dnsBuilder{minFreq: defaultFreq}
}
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
 //解析target拿到ip和端口
 host, port, err := parseTarget(target.Endpoint, defaultPort)
 if err != nil {
 return nil, err
 }
 // IP address.
 if net.ParseIP(host) != nil {
 host, _ = formatIP(host)
 addr := []resolver.Address{{Addr: host + ":" + port}}
 i := &ipResolver{
 cc: cc,
 ip: addr,
 rn: make(chan struct{}, 1),
 q: make(chan struct{}),
 }
 cc.NewAddress(addr)
 go i.watcher()
 return i, nil
 }
 // DNS address (non-IP).
 ctx, cancel := context.WithCancel(context.Background())
 d := &dnsResolver{
 freq: b.minFreq,
 backoff: backoff.Exponential{MaxDelay: b.minFreq},
 host: host,
 port: port,
 ctx: ctx,
 cancel: cancel,
 cc: cc,
 t: time.NewTimer(0),
 rn: make(chan struct{}, 1),
 disableServiceConfig: opts.DisableServiceConfig,
 }
 if target.Authority == "" {
 d.resolver = defaultResolver
 } else {
 d.resolver, err = customAuthorityResolver(target.Authority)
 if err != nil {
 return nil, err
 }
 }
 d.wg.Add(1)
 go d.watcher()//起一个goroutine,因为watcher这个方法是个死循环,当定时器
 return d, nil
}
func (d *dnsResolver) watcher() {
 defer d.wg.Done()
 for {
 //这个select没有default,当没有case满足时会一直阻塞
 //结束阻塞的条件是定时器超时d.t.C,或者d.rn这个channel中有数据可读
 select { 
 case <-d.ctx.Done():
 return
 case <-d.t.C:
 case <-d.rn:
 }
 result, sc := d.lookup()
 // Next lookup should happen within an interval defined by d.freq. It may be
 // more often due to exponential retry on empty address list.
 if len(result) == 0 {
 d.retryCount++
 d.t.Reset(d.backoff.Backoff(d.retryCount))
 } else {
 d.retryCount = 0
 d.t.Reset(d.freq)
 }
 //resolver.ClientConn的两个callback的调用,实现服务信息传入上层
 d.cc.NewServiceConfig(sc)
 d.cc.NewAddress(result)
 }
}
//向channel中写入,用于结束watcher中那个select的阻塞状态,后面的代码就是重新查询服务信息的逻辑
func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) {
 select {
 case i.rn <- struct{}{}:
 default:
 }
}

实现consul_resovler

上面我们了解了grpc的resolver的机制,接下来实现consul_resolver, 我们先把代码的架子搭起来

init() //返回一个resolver.Builder的实例
//实现resolver.Builder的接口中的所有方法就是一个resolver.Builder
type consulBuidler strcut {
}
func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
 //TODO 解析target, 拿到consul的ip和端口
 
 //TODO 用consul的go api连接consul,查询服务结点信息,并且调用resolver.ClientConn的两个callback
}
func (cb *consulBuilder) Scheme() string {
 return "consul"
}
//ResolverNow方法什么也不做,因为和consul保持了发布订阅的关系
//不需要像dns_resolver那个定时的去刷新
func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}
//暂时先什么也不做吧
func (cr *consulResolver) Close() {
}

现在来看,实现consul_resolver.go最大的问题就是怎么用consul提供的go api了,参考这篇文章就可以了,然后consul_resolver.go的代码就出来了

package consul
import (
 "errors"
 "fmt"
 "github.com/hashicorp/consul/api"
 "google.golang.org/grpc/resolver"
 "regexp"
 "sync"
)
const (
 defaultPort = "8500"
)
var (
 errMissingAddr = errors.New("consul resolver: missing address")
 errAddrMisMatch = errors.New("consul resolver: invalied uri")
 errEndsWithColon = errors.New("consul resolver: missing port after port-separator colon")
 regexConsul, _ = regexp.Compile("^([A-z0-9.]+)(:[0-9]{1,5})?/([A-z_]+)$")
)
func Init() {
 fmt.Printf("calling consul init\n")
 resolver.Register(NewBuilder())
}
type consulBuilder struct {
}
type consulResolver struct {
 address string
 wg sync.WaitGroup
 cc resolver.ClientConn
 name string
 disableServiceConfig bool
 lastIndex uint64
}
func NewBuilder() resolver.Builder {
 return &consulBuilder{}
}
func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
 fmt.Printf("calling consul build\n")
 fmt.Printf("target: %v\n", target)
 host, port, name, err := parseTarget(fmt.Sprintf("%s/%s", target.Authority, target.Endpoint))
 if err != nil {
 return nil, err
 }
 cr := &consulResolver{
 address: fmt.Sprintf("%s%s", host, port),
 name: name,
 cc: cc,
 disableServiceConfig: opts.DisableServiceConfig,
 lastIndex: 0,
 }
 cr.wg.Add(1)
 go cr.watcher()
 return cr, nil
}
func (cr *consulResolver) watcher() {
 fmt.Printf("calling consul watcher\n")
 config := api.DefaultConfig()
 config.Address = cr.address
 client, err := api.NewClient(config)
 if err != nil {
 fmt.Printf("error create consul client: %v\n", err)
 return
 }
 for {
 services, metainfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex})
 if err != nil {
 fmt.Printf("error retrieving instances from Consul: %v", err)
 }
 cr.lastIndex = metainfo.LastIndex
 var newAddrs []resolver.Address
 for _, service := range services {
 addr := fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port)
 newAddrs = append(newAddrs, resolver.Address{Addr: addr})
 }
 fmt.Printf("adding service addrs\n")
 fmt.Printf("newAddrs: %v\n", newAddrs)
 cr.cc.NewAddress(newAddrs)
 cr.cc.NewServiceConfig(cr.name)
 }
}
func (cb *consulBuilder) Scheme() string {
 return "consul"
}
func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}
func (cr *consulResolver) Close() {
}
func parseTarget(target string) (host, port, name string, err error) {
 fmt.Printf("target uri: %v\n", target)
 if target == "" {
 return "", "", "", errMissingAddr
 }
 if !regexConsul.MatchString(target) {
 return "", "", "", errAddrMisMatch
 }
 groups := regexConsul.FindStringSubmatch(target)
 host = groups[1]
 port = groups[2]
 name = groups[3]
 if port == "" {
 port = defaultPort
 }
 return host, port, name, nil
}

到此,grpc客户端服务发现就搞定了。

consul的服务注册

服务注册直接用consul的go api就可以了,也是参考前一篇文章,简单的封装一下,consul_register.go的代码如下:

package consul
import (
 "fmt"
 "github.com/hashicorp/consul/api"
 "time"
)
type ConsulService struct {
 IP string
 Port int
 Tag []string
 Name string
}
func RegitserService(ca string, cs *ConsulService) {
 //register consul
 consulConfig := api.DefaultConfig()
 consulConfig.Address = ca
 client, err := api.NewClient(consulConfig)
 if err != nil {
 fmt.Printf("NewClient error\n%v", err)
 return
 }
 agent := client.Agent()
 interval := time.Duration(10) * time.Second
 deregister := time.Duration(1) * time.Minute
 reg := &api.AgentServiceRegistration{
 ID: fmt.Sprintf("%v-%v-%v", cs.Name, cs.IP, cs.Port), // 服务节点的名称
 Name: cs.Name, // 服务名称
 Tags: cs.Tag, // tag,可以为空
 Port: cs.Port, // 服务端口
 Address: cs.IP, // 服务 IP
 Check: &api.AgentServiceCheck{ // 健康检查
 Interval: interval.String(), // 健康检查间隔
 GRPC: fmt.Sprintf("%v:%v/%v", cs.IP, cs.Port, cs.Name), // grpc 支持,执行健康检查的地址,service 会传到 Health.Check 函数中
 DeregisterCriticalServiceAfter: deregister.String(), // 注销时间,相当于过期时间
 },
 }
 fmt.Printf("registing to %v\n", ca)
 if err := agent.ServiceRegister(reg); err != nil {
 fmt.Printf("Service Register error\n%v", err)
 return
 }
}

改造一下grpc的helloworld

把grpc的helloworld的demo改一下,用consul来做服务注册和发现。
server端代码:

package main
import (
 "context"
 "fmt"
 "google.golang.org/grpc"
 "google.golang.org/grpc/health/grpc_health_v1"
 "log"
 "net"
 "server/internal/consul"
 pb "server/proto/helloworld"
)
const (
 port = ":50051"
)
// server is used to implement helloworld.GreeterServer.
type server struct{}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
 log.Printf("Received: %v", in.Name)
 return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func RegisterToConsul() {
 consul.RegitserService("127.0.0.1:8500", &consul.ConsulService{
 Name: "helloworld",
 Tag: []string{"helloworld"},
 IP: "127.0.0.1",
 Port: 50051,
 })
}
//health
type HealthImpl struct{}
// Check 实现健康检查接口,这里直接返回健康状态,这里也可以有更复杂的健康检查策略,比如根据服务器负载来返回
func (h *HealthImpl) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
 fmt.Print("health checking\n")
 return &grpc_health_v1.HealthCheckResponse{
 Status: grpc_health_v1.HealthCheckResponse_SERVING,
 }, nil
}
func (h *HealthImpl) Watch(req *grpc_health_v1.HealthCheckRequest, w grpc_health_v1.Health_WatchServer) error {
 return nil
}
func main() {
 lis, err := net.Listen("tcp", port)
 if err != nil {
 log.Fatalf("failed to listen: %v", err)
 }
 s := grpc.NewServer()
 pb.RegisterGreeterServer(s, &server{})
 grpc_health_v1.RegisterHealthServer(s, &HealthImpl{})
 RegisterToConsul()
 if err := s.Serve(lis); err != nil {
 log.Fatalf("failed to serve: %v", err)
 }
}

client端代码:

package main
import (
 "client/internal/consul"
 pb "client/proto/helloworld"
 "context"
 "google.golang.org/grpc"
 "log"
 "os"
 "time"
)
const (
 target = "consul://127.0.0.1:8500/helloworld"
 defaultName = "world"
)
func main() {
 consul.Init()
 // Set up a connection to the server.
 ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 conn, err := grpc.DialContext(ctx, target, grpc.WithBlock(), grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
 if err != nil {
 log.Fatalf("did not connect: %v", err)
 }
 defer conn.Close()
 c := pb.NewGreeterClient(conn)
 // Contact the server and print out its response.
 name := defaultName
 if len(os.Args) > 1 {
 name = os.Args[1]
 }
 for {
 ctx, _ := context.WithTimeout(context.Background(), time.Second)
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
 if err != nil {
 log.Fatalf("could not greet: %v", err)
 }
 log.Printf("Greeting: %s", r.Message)
 time.Sleep(time.Second * 2)
 }
}

运行一把

启动consul

consul agent -dev

启动hello server

cd server
go run cmd/main.go

启动hello client

cd client
go run cmd/main.go

运行结果:

//client
2019年03月07日 17:22:04 Greeting: Hello world
2019年03月07日 17:22:06 Greeting: Hello world
//server
2019年03月07日 17:22:04 Received: world
2019年03月07日 17:22:06 Received: world

完整工程的git地址
工程使用方法:

cd server
go mod tidy
go run cmd/main.go
cd client
go mod tidy
go run cmd/main.go

请自行解决防火墙的问题

参考文章


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:直抒胸臆

查看原文:用consul做grpc的服务发现

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
4054 次点击
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏