注册

注册实现了服务发现。

服务将自己注册到注册器中,然后客户端可以通过注册器找到这些服务。

有很多流行的产品可以用作注册器,例如zookeeper、etcd、consul。很多公司实现了他们自己的注册中心。

rpcx为zookeeper、etcd、consul、mDNS提供的开箱即用的注册功能,同时也提供了一对一和一对多的注册。处于测试的目的,rpcx也有一个in-process注册。

peer2peer

例程102basic

实际上一对一并没有注册。客户端直接连接服务器。它可以用于小规模的系统,一个服务只有一个节点的情况。你可以在系统缩放的时候调整到其他的注册方式。

服务器不需要做更多的配置。

客户端使用Peer2PeerDiscovery,只需要设置该服务的网络和地址。

因为只有一个节点,因此选择器也是没有意义的。

d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()

注意:rpcx使用network@Host:port格式来指明一个服务。nerwork可以是tcphttpunixquic或者kcpHost可以是主机名或者ip地址。

NewXClient必须使用服务名作为第一个参数,然后是失败模式,选择器,发现模块和其他选项。

peer2multiple

例程multiple

如果你有多个服务但是没有中心注册,你可以在客户端中通过程序配置服务的地址。

服务器不需要做更多的配置。

客户端使用MultipleServersDiscovery,只需要设置服务的地址。

d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()

你必须在MultipleServersDiscovery设置服务信息和元数据。如果增加或着移除某些服务,你可以调用MultipleServersDiscovery.Update来动态更新服务。

func (d *MultipleServersDiscovery) Update(pairs []*KVPair)

zookeeper

例程zookeeper

Apache Zookeeper是Apache Software Foundation的一个软件项目。它实质上是一个分布式的分层键值对存储,用来给大型分布式系统提供一个分布式的配置服务、同步服务以及名字注册。

服务必须使用ZookeeperRegisterPlugin插件来把他们的信息注册到zookeeper中,客户端必须使用ZookeeperDiscovery来获取服务信息。

你必须设置服务地址(newtwork@address)和zookeeper地址。你应该设置基地址以免你的服务与其他人开发的服务冲突。

你可以设置Metrics参数,这样rpcx会自动周期性地更新流量信息。如果你想更新测量你必须添加[Metrics]插件到服务器。

你可以设置UpdateInterval参数来更新租期。rpcx设置租期为UpdateInterval*3,这意味着如果服务器没有在UpdateInterval*3的时间内更新租期,该节点会从zookeeper中移除。

// go server.go 

func main() { 
    flag.Parse()
    s := server.NewServer()
    addRegistryPlugin(s)

    s.RegisterName("Arith", new(example.Arith), "")
    s.Serve("tcp", *addr)
}

func addRegistryPlugin(s *server.Server) {
    r := &serverplugin.ZooKeeperRegisterPlugin {
        ServiceAddress:   "tcp@" + *addr,
        ZooKeeperServers: []string{*zkAddr},
        BasePath:         *basePath,
        Metrics:          metrics.NewRegistry(),
        UpdateInterval:   time.Minute,
    }
    err := r.Start()
    if err != nil {
        log.Fatal(err)
    }
    s.Plugins.Add(r)
}

客户端的配置也很简单。

你需要配置zookeeper地址和服务器相同的基地址。

// go client.go

d := client.NewZookeeperDiscovery(*basePath, "Arith",[]string{*zkAddr}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()

etcd

例程etcd

etcd是一个分布式的、可靠的key-value存储,可以用于关键数据的分布式系统。

服务必须使用EtcdRegisterPlugin插件来把他们的信息注册到etcd中,客户端必须使用EtcdDiscovery来获取服务信息。

你必须设置服务地址(network@address)和etcd地址。你应该设置基地址来确保你的服务不会跟其他人开发的服务冲突。

你可以设置Metrics,rpcx会周期性更新流量。如果你想更新测量数据,你必须在服务器中添加【Metrics】插件。

你可以设置UpdateInterval来刷新租期。rpcx设置租期为UpdateInterval * 3。这意味着如果服务器没有在UpdateInterval * 3时间内更新租期,该节点会从etcd中移除。

// go server.go

func main() {
    flag.Parse()
    s := server.NewServer()
    addRegistryPlugin(s)

    s.RegisterName("Arith", new(example.Arith), "")
    s.Serve("tcp", *addr)
}

func addRegistryPlugin(s *server.Server) {
    r := &serverplugin.EtcdRegisterPlugin{
        ServiceAddress: "tcp@" + *addr,
        EtcdServers:    []string{*etcdAddr},
        BasePath:       *basePath,
        Metrics:        metrics.NewRegistry(),
        UpdateInterval: time.Minute,
    }
    err := r.Start()
    if err != nil {
        log.Fatal(err)
    }
    s.Plugins.Add(r)
}

客户端的配置很简单。

你需要设置etcd地址和相同的服务基地址。

```go client.go

    d := client.NewEtcdDiscovery(*basePath, "Arith",[]string{*etcdAddr}, nil)
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

consul

例程consul

Consul是一个高可靠的分布式服务发现和KV存储软件,设计用于支持现代数据中心以简化分布式系统及配置。

服务必须使用ConsulRegisterPlugin插件来注册它们的信息到consul中,客户端必须使用ConsulDiscovery来获取服务信息。

你必须设置服务地址(network@address)和consul地址。你应该设置基地址以免你的服务与其他人开发的服务冲突。

你可以设置Metrics参数,这样rpcx会自动周期性地更新流量信息。如果你想更新测量你必须添加[Metrics]插件到服务器。

你可以设置UpdateInterval参数来更新租期。rpcx设置租期为UpdateInterval*3,这意味着如果服务器没有在UpdateInterval*3的时间内更新租期,该节点会从consul中移除。

// go server.go
func main() { 
    flag.Parse()
    s := server.NewServer()
    addRegistryPlugin(s)

    s.RegisterName("Arith", new(example.Arith), "")
    s.Serve("tcp", *addr)
}

func addRegistryPlugin(s *server.Server) {
    r := &serverplugin.ConsulRegisterPlugin{
        ServiceAddress: "tcp@" + *addr,
        ConsulServers:  []string{*consulAddr},
        BasePath:       *basePath,
        Metrics:        metrics.NewRegistry(),
        UpdateInterval: time.Minute,
    }
    err := r.Start()
    if err != nil {
        log.Fatal(err)
    }
    s.Plugins.Add(r) 
}

客户端的配置也很简单。

你需要配置zookeeper地址和服务器相同的基地址。

// go client.go

d := client.NewConsulDiscovery(*basePath, "Arith",[]string{*consulAddr}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()

mDNS

例程mDNS

mDNS用于在一个没有本地名称服务器的小型网络中解析主机名为IP地址。它是一个零配置的服务,本质上和单播域名系统使(DNS)用相同的编程接口、包格式和操作语义。尽管Stuart Cheshire设计mDNS可以独立工作,它也可以单播DNS服务器协调工作。

mDNS协议作为RFC 6762发布,使用IP组播用户数据报协议(UDP)数据包,由Apple Bonjour,Spotify Connect,Philips Hue,Google Chromecast和open source Avahi (software)软件包实现。安卓包含mDNS实现。mDNS也在windows 10中实现,不过它仅限于发现网络打印机中使用。

mDNS可以与DNS服务发现(DNS-SD)协同工作。DNS-SD是一个RFC 6763单独规定的协同零配置技术。

服务必须使用MDNSRegisterPlugin插件来注册它们的信息到mDNS中,客户端必须使用MDNSDiscovery来获取服务信息。

你必须设置服务地址(network@address)。你应该设置基地址以免你的服务与其他人开发的服务冲突。

你可以设置Metrics参数,这样rpcx会自动周期性地更新流量信息。如果你想更新测量你必须添加【Metrics】插件到服务器。

你可以设置UpdateInterval参数来更新tps。如果服务器宕机,它会被自动移出mDNS。

```go server.go

func main() {
    flag.Parse()

    s := server.NewServer()
    addRegistryPlugin(s)

    s.RegisterName("Arith", new(example.Arith), "")
    s.Serve("tcp", *addr)
}

func addRegistryPlugin(s *server.Server) {
    r := serverplugin.NewMDNSRegisterPlugin("tcp@"+*addr, 8972, metrics.NewRegistry(), time.Minute, "")
    err := r.Start()
    if err != nil {
        log.Fatal(err)
    }
    s.Plugins.Add(r)
}

客户端的配置也很简单。

你需要配置mDNS地址和服务器相同的基地址。

 // go client.go
    d := client.NewMDNSDiscovery("Arith", 10*time.Second, 10*time.Second, "")
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

进程内调度

例程inprocess

进程内注册是以测试为目的注册。你不应该在生产环境中使用它。

当你开发客户端时,你可能想创建一个mock服务来集成测试。你可以使用Inprocess注册,然后在客户端部署到生产环境中的时候改为其他注册。

这个插件使用反射来调用同一个进程中的服务。

你在服务的插件容器中添加client.InprocessClient。客户端中使用InpreocessDiscovery来寻找已注册的服务来调调用。

// go server.go

func main() {
    flag.Parse()
    s := server.NewServer()
    addRegistryPlugin(s)

    s.RegisterName("Arith", new(example.Arith), "")

    go func() {
        s.Serve("tcp", *addr)
    }()

    d := client.NewInpreocessDiscovery()
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &example.Args{
        A: 10,
        B: 20,
    }

    for i := 0; i < 100; i++ {
        reply := &example.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    }
}

func addRegistryPlugin(s *server.Server) {
    r := client.InprocessClient
    s.Plugins.Add(r)
}

results matching ""

    No results matching ""