hello world

stay foolish, stay hungry

consul做grpc服务注册和服务发现

分布式服务通常有多个实例,按负载均衡在服务端侧或客户端侧可以将服务的负载均衡分为服务端负载均衡和客户端负载均衡。

服务端负载

服务端通常依赖 HA Proxy 组件实现负载均衡,比如 nginx 等,而且 nginx 可以做 grpc 的反向代理,整体结构如图 1 所示。

图1 nginx 做 grpc 反向代理

这种方式不是本文探讨的重点,具体实现和配置可以参考相关 nginx blog

客户端负载

客户端负载依赖服务注册中心,服务端启动时,向注册中心注册服务的地址和端口,客户端启动时从注册中心拉取服务的地址,整体结构如图 2 所示。

图2 服务注册和服务发现

grpc 提供了 helloworld demo。

客户端通过 grpc.DialContext 创建 grpc 连接,grpc 的 resolver 解析 target 参数获得服务端地址。这个 demo 的 target 为 localhost:50051,这种格式的 target 会使用默认的 passthrough 解析器解析,即将 target 地址直接返回给r.cc, grpc 客户端利用这个地址建立 grpc 链接。

target 的解析过程:

  • 解析 target 的 scheme
  • 通过 scheme 获取对应的 resolver builder
  • 通过 resolver builder 创建 resolver
  • resolver 解析 target 获取服务端服务(ip + port)列表

grpc 提供了 “passthrough”(默认) 和 “dns” 两种 scheme resolver实现,可以参考name resolve 文档。重新实现 resolver 即可自定义 target 解析策略,从而实现服务发现。

consul 做 grpc 服务注册发现中心

target 格式

设计 target URL 格式:

consul://ip:port/${group}/${service_name}

grpc target 结构体定义:

type Target struct {
    Scheme    string
    Authority string
    Endpoint  string
    URL       url.URL
}

target 的 scheme 为 “consul”, Endpoint 为 “ip:port”。注册到 consul 的服务必须包含「服务名」,这里是 “service_name”,服务可以分组(tag)。

服务端实现

服务端启动后,需要向 consul 注册服务的 ip、端口、分组(tag) 和 服务名。

consul, _ := api.NewClient(&api.Config{Address: "127.0.0.1:8500", Transport: cleanhttp.DefaultPooledTransport()})

_ = consul.Agent().ServiceRegister(&api.AgentServiceRegistration{
    ID:      uuid.String(),
    Name:    service,
    Tags:    []string{s.group},
    Port:    s.port,
    Address: s.ip,
})

consul 支持 grpc health check,可以参考 health check 文档,实现 health check 服务。

客户端实现

客户端需要实现相应的 resolver。这里需要实现 resolver.Builder 接口和 resolver.Resolver 接口。

Builder 接口实现如下:

func init() {
    resolver.Register(&consulBuilder{})
}

type consulBuilder struct {
}

func (*consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    consulHost := target.URL.Host
    urlPath := target.URL.Path

    urlPath = strings.Trim(urlPath, "/")
    ps := strings.Split(urlPath, "/")
    group := ps[0]
    serviceName := ps[1]

    consulClient, err := api.NewClient(&api.Config{Address: consulHost, Transport: cleanhttp.DefaultPooledTransport()})
    if err != nil {
        return nil, fmt.Errorf("resolve consul path error %v", err)
    }

    r := &consulResolver{
        client:               consulClient,
        cc:                   cc,
        disableServiceConfig: opts.DisableServiceConfig,
        group:                group,
        serviceName:          serviceName,
    }

    // subscribe and watch
    go r.watch()

    return r, nil
}

func (*consulBuilder) Scheme() string {
    return "consul"
}

builder 接口返回 resolver,在 ResolveNow 方法中调用 consul api 获取服务端地址。

services, _, _ := r.client.Health().Service(r.serviceName, r.group, true,
        &api.QueryOptions{WaitIndex: r.lastIndex})

addresses := make([]resolver.Address, 0, len(services))
for _, service := range services {
    addr := resolver.Address{
        Addr:       fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port),
        ServerName: r.serviceName,
    }
    addresses = append(addresses, addr)
}

r.cc.UpdateState(resolver.State{Addresses: addresses})

详细代码可以参考「代码实现」章节。

测试一下

  • 启动 consul:
consul agent --dev
  • 启动服务端:
go run ./example/server/server.go

  • 启动客户端:
go run ./example/client/client.go

代码实现

参考

  1. https://www.nginx.com/blog/nginx-1-13-10-grpc/
  2. https://github.com/grpc/grpc-go/tree/master/examples/helloworld
  3. https://github.com/grpc/grpc/blob/master/doc/naming.md
  4. https://github.com/grpc/grpc/blob/master/doc/health-checking.md