注册
注册实现了服务发现。
服务将自己注册到注册器中,然后客户端可以通过注册器找到这些服务。
有很多流行的产品可以用作注册器,例如zookeeper、etcd、consul。很多公司实现了他们自己的注册中心。
rpcx为zookeeper、etcd、consul、mDNS提供的开箱即用的注册功能,同时也提供了一对一和一对多的注册。处于测试的目的,rpcx也有一个in-process注册。
peer2peer
例程:102basic
实际上一对一并没有注册。客户端直接连接服务器。它可以用于小规模的系统,一个服务只有一个节点的情况。你可以在系统缩放的时候调整到其他的注册方式。
服务器不需要做更多的配置。
客户端使用Peer2PeerDiscovery
,只需要设置该服务的网络和地址。
因为只有一个节点,因此选择器也是没有意义的。
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
注意:rpcx使用network@Host:port
格式来指明一个服务。nerwork
可以是tcp
、http
、unix
、quic
或者kcp
。Host
可以是主机名或者ip地址。
NewXClient
必须使用服务名作为第一个参数,然后是失败模式,选择器,发现模块和其他选项。
peer2multiple
例程:multiple
如果你有多个服务但是没有中心注册,你可以在客户端中通过程序配置服务的地址。
服务器不需要做更多的配置。
客户端使用MultipleServersDiscovery
,只需要设置服务的地址。
d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
你必须在MultipleServersDiscovery
设置服务信息和元数据。如果增加或着移除某些服务,你可以调用MultipleServersDiscovery.Update来动态更新服务。
func (d *MultipleServersDiscovery) Update(pairs []*KVPair)
zookeeper
例程:zookeeper
Apache Zookeeper是Apache Software Foundation的一个软件项目。它实质上是一个分布式的分层键值对存储,用来给大型分布式系统提供一个分布式的配置服务、同步服务以及名字注册。
服务必须使用ZookeeperRegisterPlugin
插件来把他们的信息注册到zookeeper中,客户端必须使用ZookeeperDiscovery
来获取服务信息。
你必须设置服务地址(newtwork@address
)和zookeeper地址。你应该设置基地址以免你的服务与其他人开发的服务冲突。
你可以设置Metrics
参数,这样rpcx会自动周期性地更新流量信息。如果你想更新测量你必须添加[Metrics]插件到服务器。
你可以设置UpdateInterval
参数来更新租期。rpcx设置租期为UpdateInterval*3
,这意味着如果服务器没有在UpdateInterval*3
的时间内更新租期,该节点会从zookeeper中移除。
// go server.go
func main() {
flag.Parse()
s := server.NewServer()
addRegistryPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
s.Serve("tcp", *addr)
}
func addRegistryPlugin(s *server.Server) {
r := &serverplugin.ZooKeeperRegisterPlugin {
ServiceAddress: "tcp@" + *addr,
ZooKeeperServers: []string{*zkAddr},
BasePath: *basePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
log.Fatal(err)
}
s.Plugins.Add(r)
}
客户端的配置也很简单。
你需要配置zookeeper地址和服务器相同的基地址。
// go client.go
d := client.NewZookeeperDiscovery(*basePath, "Arith",[]string{*zkAddr}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
etcd
例程:etcd
etcd是一个分布式的、可靠的key-value存储,可以用于关键数据的分布式系统。
服务必须使用EtcdRegisterPlugin
插件来把他们的信息注册到etcd中,客户端必须使用EtcdDiscovery
来获取服务信息。
你必须设置服务地址(network@address)和etcd地址。你应该设置基地址来确保你的服务不会跟其他人开发的服务冲突。
你可以设置Metrics
,rpcx会周期性更新流量。如果你想更新测量数据,你必须在服务器中添加【Metrics】插件。
你可以设置UpdateInterval
来刷新租期。rpcx设置租期为UpdateInterval * 3
。这意味着如果服务器没有在UpdateInterval * 3
时间内更新租期,该节点会从etcd中移除。
// go server.go
func main() {
flag.Parse()
s := server.NewServer()
addRegistryPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
s.Serve("tcp", *addr)
}
func addRegistryPlugin(s *server.Server) {
r := &serverplugin.EtcdRegisterPlugin{
ServiceAddress: "tcp@" + *addr,
EtcdServers: []string{*etcdAddr},
BasePath: *basePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
log.Fatal(err)
}
s.Plugins.Add(r)
}
客户端的配置很简单。
你需要设置etcd地址和相同的服务基地址。
```go client.go
d := client.NewEtcdDiscovery(*basePath, "Arith",[]string{*etcdAddr}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
consul
例程:consul
Consul是一个高可靠的分布式服务发现和KV存储软件,设计用于支持现代数据中心以简化分布式系统及配置。
服务必须使用ConsulRegisterPlugin
插件来注册它们的信息到consul中,客户端必须使用ConsulDiscovery
来获取服务信息。
你必须设置服务地址(network@address
)和consul地址。你应该设置基地址以免你的服务与其他人开发的服务冲突。
你可以设置Metrics
参数,这样rpcx会自动周期性地更新流量信息。如果你想更新测量你必须添加[Metrics]插件到服务器。
你可以设置UpdateInterval
参数来更新租期。rpcx设置租期为UpdateInterval*3
,这意味着如果服务器没有在UpdateInterval*3
的时间内更新租期,该节点会从consul中移除。
// go server.go
func main() {
flag.Parse()
s := server.NewServer()
addRegistryPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
s.Serve("tcp", *addr)
}
func addRegistryPlugin(s *server.Server) {
r := &serverplugin.ConsulRegisterPlugin{
ServiceAddress: "tcp@" + *addr,
ConsulServers: []string{*consulAddr},
BasePath: *basePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
log.Fatal(err)
}
s.Plugins.Add(r)
}
客户端的配置也很简单。
你需要配置zookeeper地址和服务器相同的基地址。
// go client.go
d := client.NewConsulDiscovery(*basePath, "Arith",[]string{*consulAddr}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
mDNS
例程:mDNS
mDNS用于在一个没有本地名称服务器的小型网络中解析主机名为IP地址。它是一个零配置的服务,本质上和单播域名系统使(DNS)用相同的编程接口、包格式和操作语义。尽管Stuart Cheshire设计mDNS可以独立工作,它也可以单播DNS服务器协调工作。
mDNS协议作为RFC 6762发布,使用IP组播用户数据报协议(UDP)数据包,由Apple Bonjour,Spotify Connect,Philips Hue,Google Chromecast和open source Avahi (software)软件包实现。安卓包含mDNS实现。mDNS也在windows 10中实现,不过它仅限于发现网络打印机中使用。
mDNS可以与DNS服务发现(DNS-SD)协同工作。DNS-SD是一个RFC 6763单独规定的协同零配置技术。
服务必须使用MDNSRegisterPlugin
插件来注册它们的信息到mDNS中,客户端必须使用MDNSDiscovery
来获取服务信息。
你必须设置服务地址(network@address
)。你应该设置基地址以免你的服务与其他人开发的服务冲突。
你可以设置Metrics
参数,这样rpcx会自动周期性地更新流量信息。如果你想更新测量你必须添加【Metrics】插件到服务器。
你可以设置UpdateInterval
参数来更新tps。如果服务器宕机,它会被自动移出mDNS。
```go server.go
func main() {
flag.Parse()
s := server.NewServer()
addRegistryPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
s.Serve("tcp", *addr)
}
func addRegistryPlugin(s *server.Server) {
r := serverplugin.NewMDNSRegisterPlugin("tcp@"+*addr, 8972, metrics.NewRegistry(), time.Minute, "")
err := r.Start()
if err != nil {
log.Fatal(err)
}
s.Plugins.Add(r)
}
客户端的配置也很简单。
你需要配置mDNS地址和服务器相同的基地址。
// go client.go
d := client.NewMDNSDiscovery("Arith", 10*time.Second, 10*time.Second, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
进程内调度
例程:inprocess
进程内注册是以测试为目的注册。你不应该在生产环境中使用它。
当你开发客户端时,你可能想创建一个mock服务来集成测试。你可以使用Inprocess
注册,然后在客户端部署到生产环境中的时候改为其他注册。
这个插件使用反射来调用同一个进程中的服务。
你在服务的插件容器中添加client.InprocessClient
。客户端中使用InpreocessDiscovery
来寻找已注册的服务来调调用。
// go server.go
func main() {
flag.Parse()
s := server.NewServer()
addRegistryPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
go func() {
s.Serve("tcp", *addr)
}()
d := client.NewInpreocessDiscovery()
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
for i := 0; i < 100; i++ {
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}
}
func addRegistryPlugin(s *server.Server) {
r := client.InprocessClient
s.Plugins.Add(r)
}