consul做grpc服务注册和服务发现
分布式服务通常有多个实例,按负载均衡在服务端侧或客户端侧可以将服务的负载均衡分为服务端负载均衡和客户端负载均衡。
服务端负载
服务端通常依赖 HA Proxy 组件实现负载均衡,比如 nginx 等,而且 nginx 可以做 grpc 的反向代理,整体结构如图 1 所示。
图1 nginx 做 grpc 反向代理
这种方式不是本文探讨的重点,具体实现和配置可以参考相关 nginx blog。
客户端负载
客户端负载依赖服务注册中心,服务端启动时,向注册中心注册服务的地址和端口,客户端启动时从注册中心拉取服务的地址,整体结构如图 2 所示。
图2 服务注册和服务发现
grpc 提供了 helloworld demo。
客户端通过 grpc.DialContext
创建 grpc 连接,grpc 的 resolver 解析 target 参数获得服务端地址。这个 demo 的 target 为 localhost:50051
,这种格式的 target 会使用默认的 passthrough 解析器解析,即将 target 地址直接返回给r.cc
, grpc 客户端利用这个地址建立 grpc 链接。
target 的解析过程:
- 解析 target 的 scheme
- 通过 scheme 获取对应的 resolver builder
- 通过 resolver builder 创建 resolver
- resolver 解析 target 获取服务端服务(ip + port)列表
grpc 提供了 “passthrough”(默认) 和 “dns” 两种 scheme resolver实现,可以参考name resolve 文档。重新实现 resolver 即可自定义 target 解析策略,从而实现服务发现。
consul 做 grpc 服务注册发现中心
target 格式
设计 target URL 格式:
consul://ip:port/${group}/${service_name}
grpc target 结构体定义:
type Target struct {
Scheme string
Authority string
Endpoint string
URL url.URL
}
target 的 scheme 为 “consul”, Endpoint 为 “ip:port”。注册到 consul 的服务必须包含「服务名」,这里是 “service_name”,服务可以分组(tag)。
服务端实现
服务端启动后,需要向 consul 注册服务的 ip、端口、分组(tag) 和 服务名。
consul, _ := api.NewClient(&api.Config{Address: "127.0.0.1:8500", Transport: cleanhttp.DefaultPooledTransport()})
_ = consul.Agent().ServiceRegister(&api.AgentServiceRegistration{
ID: uuid.String(),
Name: service,
Tags: []string{s.group},
Port: s.port,
Address: s.ip,
})
consul 支持 grpc health check,可以参考 health check 文档,实现 health check 服务。
客户端实现
客户端需要实现相应的 resolver。这里需要实现 resolver.Builder
接口和 resolver.Resolver
接口。
Builder
接口实现如下:
func init() {
resolver.Register(&consulBuilder{})
}
type consulBuilder struct {
}
func (*consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
consulHost := target.URL.Host
urlPath := target.URL.Path
urlPath = strings.Trim(urlPath, "/")
ps := strings.Split(urlPath, "/")
group := ps[0]
serviceName := ps[1]
consulClient, err := api.NewClient(&api.Config{Address: consulHost, Transport: cleanhttp.DefaultPooledTransport()})
if err != nil {
return nil, fmt.Errorf("resolve consul path error %v", err)
}
r := &consulResolver{
client: consulClient,
cc: cc,
disableServiceConfig: opts.DisableServiceConfig,
group: group,
serviceName: serviceName,
}
// subscribe and watch
go r.watch()
return r, nil
}
func (*consulBuilder) Scheme() string {
return "consul"
}
builder 接口返回 resolver,在 ResolveNow
方法中调用 consul api 获取服务端地址。
services, _, _ := r.client.Health().Service(r.serviceName, r.group, true,
&api.QueryOptions{WaitIndex: r.lastIndex})
addresses := make([]resolver.Address, 0, len(services))
for _, service := range services {
addr := resolver.Address{
Addr: fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port),
ServerName: r.serviceName,
}
addresses = append(addresses, addr)
}
r.cc.UpdateState(resolver.State{Addresses: addresses})
详细代码可以参考「代码实现」章节。
测试一下
- 启动 consul:
consul agent --dev
- 启动服务端:
go run ./example/server/server.go
- 启动客户端:
go run ./example/client/client.go
代码实现
- go 版本代码实现: https://github.com/sunpe/grpc_with_consul_go
- java 版本代码实现: https://github.com/sunpe/grpc_with_consul_java