Skip to content

Remote Procedure Call

Go语言的RPC包的路径为net/rpc,也就是放在了net包目录下面。因此我们可以猜测该RPC包是建立在net包基础之上的。

Go标准包中已经提供了对RPC的支持,而且支持三个级别的RPC:TCP、HTTP、JSONRPC。但Go的RPC包是独一无二的RPC,它和传统的RPC系统不同,它只支持Go开发的服务器与客户端之间的交互,因为在内部,它们采用了Gob来编码。

Go RPC的函数只有符合下面的条件才能被远程访问,不然会被忽略,详细的要求如下:

  • 函数必须是导出的(首字母大写)
  • 必须有两个导出类型的参数,
  • 第一个参数是接收的参数,第二个参数是返回给客户端的参数,第二个参数必须是指针类型的
  • 函数还要有一个返回值error

举个🌰,以下的RPC函数格式正确:

func (t *T) MethodName(argType T1, replyType *T2) error

T、T1和T2类型必须能被encoding/gob包编解码。

任何的RPC都需要通过网络来传递数据,Go RPC可以利用HTTP和TCP来传递数据,利用HTTP的好处是可以直接复用net/http里面的一些函数。

RPC版 Hello World

我们先构造一个HelloService类型,其中的Hello方法用于实现打印功能:

type HelloService struct {}

func (p *HelloService) Hello(request string, reply *string) error {
    *reply = "hello:" + request
    return nil
}

其中Hello方法必须满足Go语言的RPC规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个error类型,同时必须是公开的方法。

然后就可以将HelloService类型的对象注册为一个RPC服务:

func main() {
    rpc.RegisterName("HelloService", new(HelloService))

    listener, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }

    conn, err := listener.Accept()
    if err != nil {
        log.Fatal("Accept error:", err)
    }

    rpc.ServeConn(conn)
}

其中rpc.Register函数调用会将对象类型中所有满足RPC规则的对象方法注册为RPC函数,所有注册的方法会放在“HelloService”服务空间之下。然后我们建立一个唯一的TCP链接,并且通过rpc.ServeConn函数在该TCP链接上为对方提供RPC服务。

下面是客户端请求HelloService服务的代码:

func main() {
    client, err := rpc.Dial("tcp", "localhost:1234")
    if err != nil {
        log.Fatal("dialing:", err)
    }

    var reply string
    err = client.Call("HelloService.Hello", "hello", &reply)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(reply)
}

首先是通过rpc.Dial拨号RPC服务,然后通过client.Call调用具体的RPC方法。在调用client.Call时,第一个参数是用点号链接的RPC服务名字和方法名字,第二和第三个参数分别我们定义RPC方法的两个参数。

重构 Hello World

一般RPC开发会先定义接口规范,再定义两端的具体操作。

RPC服务的接口规范分为三个部分:首先是服务的名字,然后是服务要实现的详细方法列表,最后是注册该类型服务的函数。为了避免名字冲突,我们在RPC服务的名字中增加了包路径前缀(这个是RPC服务抽象的包路径,并非完全等价Go语言的包路径)。RegisterHelloService注册服务时,编译器会要求传入的对象满足HelloServiceInterface接口。

服务的名字和接口:

type HelloServiceClient struct {
    *rpc.Client
}

var _ HelloServiceInterface = (*HelloServiceClient)(nil)

func DialHelloService(network, address string) (*HelloServiceClient, error) {
    c, err := rpc.Dial(network, address)
    if err != nil {
        return nil, err
    }
    return &HelloServiceClient{Client: c}, nil
}

func (p *HelloServiceClient) Hello(request string, reply *string) error {
    return p.Client.Call(HelloServiceName+".Hello", request, reply)
}

客户端代码:

func main() {
    client, err := DialHelloService("tcp", "localhost:1234")
    if err != nil {
        log.Fatal("dialing:", err)
    }

    var reply string
    err = client.Hello("hello", &reply)
    if err != nil {
        log.Fatal(err)
    }
}

服务端代码:

type HelloService struct {}

func (p *HelloService) Hello(request string, reply *string) error {
    *reply = "hello:" + request
    return nil
}

func main() {
    RegisterHelloService(new(HelloService))

    listener, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Fatal("Accept error:", err)
        }

        go rpc.ServeConn(conn)
    }
}

在新的RPC服务端实现中,我们用RegisterHelloService函数来注册函数,这样不仅可以避免命名服务名称的工作,同时也保证了传入的服务对象满足了RPC接口的定义。最后我们新的服务改为支持多个TCP链接,然后为每个TCP链接提供RPC服务。

语言无关

语言无关是指:无论采用何种编程语言,只要遵循同样的json结构,以同样的流程就可以和Go语言编写的RPC服务进行通信。这样我们就实现了语言无关的RPC。

标准库的RPC默认采用Go语言特有的gob编码,因此从其它语言调用Go语言实现的RPC服务将比较困难,而每个RPC以及服务的使用者都可能采用不同的编程语言。得益于RPC的框架设计,Go语言的RPC其实也是很容易实现跨语言支持的。

Go语言的RPC框架有两个比较有特色的设计:

  1. RPC数据打包时可以通过插件实现自定义的编码和解码。
  2. RPC建立在抽象的io.ReadWriteCloser接口之上的,我们可以将RPC架设在不同的通讯协议之上。

举个🌰,这里我们将尝试通过官方自带的net/rpc/jsonrpc扩展实现一个跨语言的RPC。

基于JSON编码重新实现RPC服务端:

func main() {
    rpc.RegisterName("HelloService", new(HelloService))

    listener, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Fatal("Accept error:", err)
        }

        go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))  // 取代了rpc.ServeConn函数
    }
}

代码中最大的变化是用rpc.ServeCodec函数替代了rpc.ServeConn函数,传入的参数是针对服务端的json编解码器。

JSON版本的客户端:

func main() {
    conn, err := net.Dial("tcp", "localhost:1234")
    if err != nil {
        log.Fatal("net.Dial:", err)
    }

    client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))

    var reply string
    err = client.Call("HelloService.Hello", "hello", &reply)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(reply)
}

先手工调用net.Dial函数建立TCP链接,然后基于该链接建立针对客户端的json编解码器。

在确保客户端可以正常调用RPC服务的方法之后,我们在客户端用一个普通的TCP服务代替Go语言版本的RPC服务,这样可以查看客户端调用时发送的数据格式。比如在客户端通过nc命令nc -l 1234在同样的端口启动一个TCP服务。然后再次执行一次RPC调用将会发现nc输出了以下的信息:

{"method":"HelloService.Hello","params":["hello"],"id":0}

method部分对应要调用的rpc服务和方法组合成的名字,params部分的第一个元素为参数,id是由调用端维护的一个唯一的调用编号。

请求的json数据对象在内部对应两个结构体:客户端是clientRequest,服务端是serverRequest。clientRequest和serverRequest结构体的内容基本是一致的:

type clientRequest struct {
    Method string         `json:"method"`
    Params [1]interface{} `json:"params"`
    Id     uint64         `json:"id"`
}

type serverRequest struct {
    Method string           `json:"method"`
    Params *json.RawMessage `json:"params"`
    Id     *json.RawMessage `json:"id"`
}

在获取到RPC调用对应的json数据后,我们可以通过直接向架设了RPC服务的TCP服务器发送json数据模拟RPC方法调用:

$ echo -e '{"method":"HelloService.Hello","params":["hello"],"id":1}' | nc localhost 1234

返回的结果也是一个json格式的数据:

{"id":1,"result":"hello:hello","error":null}

其中id对应输入的id参数,result为返回的结果,error部分在出问题时表示错误信息。对于顺序调用来说,id不是必须的。但是Go语言的RPC框架支持异步调用,当返回结果的顺序和调用的顺序不一致时,可以通过id来识别对应的调用。

返回的json数据也是对应内部的两个结构体:客户端是clientResponse,服务端是serverResponse。两个结构体的内容同样也是类似的:

type clientResponse struct {
    Id     uint64           `json:"id"`
    Result *json.RawMessage `json:"result"`
    Error  interface{}      `json:"error"`
}

type serverResponse struct {
    Id     *json.RawMessage `json:"id"`
    Result interface{}      `json:"result"`
    Error  interface{}      `json:"error"`
}

Http上的RPC

Go语言内在的 RPC 框架已经支持在 Http 协议上提供 RPC 服务。但是框架的 http 服务同样采用了内置的 gob 协议,并且没有提供采用其它协议的接口,因此从其它语言依然无法访问的。

举个🌰

func main() {
    rpc.RegisterName("HelloService", new(HelloService))

    http.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
        var conn io.ReadWriteCloser = struct {
            io.Writer
            io.ReadCloser
        }{
            ReadCloser: r.Body,
            Writer:     w,
        }

        rpc.ServeRequest(jsonrpc.NewServerCodec(conn))
    })

    http.ListenAndServe(":1234", nil)
}

RPC 的服务架设在 “/jsonrpc” 路径,在处理函数中基于 http.ResponseWriter 和 http.Request 类型的参数构造一个 io.ReadWriteCloser 类型的 conn 通道。然后基于 conn 构建针对服务端的 json 编码解码器。最后通过 rpc.ServeRequest 函数为每次请求处理一次 RPC 方法调用。

模拟一次RPC调用的过程就是向该链接发送一个 json 字符串:

$ curl localhost:1234/jsonrpc -X POST \
    --data '{"method":"HelloService.Hello","params":["hello"],"id":0}'

返回的结果依然是 json 字符串:

{"id":0,"result":"hello:hello","error":null}

更多 RPC 示例

HTTP RPC

服务端:

package main

import (
    "errors"
    "fmt"
    "net/http"
    "net/rpc"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
    *reply = args.A * args.B
    return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
    if args.B == 0 {
        return errors.New("divide by zero")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    return nil
}

func main() {

    arith := new(Arith)
    rpc.Register(arith)
    rpc.HandleHTTP()

    err := http.ListenAndServe(":1234", nil)
    if err != nil {
        fmt.Println(err.Error())
    }
}

我们注册了一个 Arith 的 RPC 服务,然后通过rpc.HandleHTTP函数把该服务注册到了 HTTP 协议上,就可以利用 http 的方式来传递数据了。

客户端:

package main

import (
    "fmt"
    "log"
    "net/rpc"
    "os"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Usage: ", os.Args[0], "server")
        os.Exit(1)
    }
    serverAddress := os.Args[1]

    client, err := rpc.DialHTTP("tcp", serverAddress+":1234")
    if err != nil {
        log.Fatal("dialing:", err)
    }
    // Synchronous call
    args := Args{17, 8}
    var reply int
    err = client.Call("Arith.Multiply", args, &reply)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)

    var quot Quotient
    err = client.Call("Arith.Divide", args, &quot)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d/%d=%d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem)

}

把上面的服务端和客户端的代码分别编译,然后先把服务端开启,然后开启客户端,输入代码,就会输出如下信息:

$ ./http_c localhost
Arith: 17*8=136
Arith: 17/8=2 remainder 1

通过上面的调用可以看到参数和返回值是自定义的 struct 类型,在服务端把它们当做调用函数的参数的类型,在客户端作为client.Call的第2,3两个参数的类型。客户端的 Call 函数有3个参数,第1个要调用的函数的名字,第2个是要传递的参数,第3个要返回的参数(注意是指针类型)。

TCP RPC

服务端:

这个代码和 http 的服务器相比,不同在于采用了 TCP 协议,需要自己控制连接,当有客户端连接上来后,我们需要把这个连接交给 rpc 来处理。

package main

import (
    "errors"
    "fmt"
    "net"
    "net/rpc"
    "os"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
    *reply = args.A * args.B
    return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
    if args.B == 0 {
        return errors.New("divide by zero")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    return nil
}

func main() {

    arith := new(Arith)
    rpc.Register(arith)

    tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
    checkError(err)

    listener, err := net.ListenTCP("tcp", tcpAddr)
    checkError(err)

    for {
        conn, err := listener.Accept()
        if err != nil {
            continue
        }
        rpc.ServeConn(conn)
    }

}

func checkError(err error) {
    if err != nil {
        fmt.Println("Fatal error ", err.Error())
        os.Exit(1)
    }
}

客户端:

和 http 的客户端代码对比,唯一的区别是用 rpc.Dial 取代了 rpc.DialHTTP,其他处理一模一样。

package main

import (
    "fmt"
    "log"
    "net/rpc"
    "os"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Usage: ", os.Args[0], "server:port")
        os.Exit(1)
    }
    service := os.Args[1]

    client, err := rpc.Dial("tcp", service)
    if err != nil {
        log.Fatal("dialing:", err)
    }
    // Synchronous call
    args := Args{17, 8}
    var reply int
    err = client.Call("Arith.Multiply", args, &reply)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)

    var quot Quotient
    err = client.Call("Arith.Divide", args, &quot)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d/%d=%d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem)

}

JSON RPC

JSON RPC是数据编码采用了JSON,而不是gob编码,其他和上面介绍的RPC概念一模一样。详见 jsonrpc包

json-rpc是基于TCP协议实现的,目前它还不支持HTTP方式。

服务端:

package main

import (
    "errors"
    "fmt"
    "net"
    "net/rpc"
    "net/rpc/jsonrpc"
    "os"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
    *reply = args.A * args.B
    return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
    if args.B == 0 {
        return errors.New("divide by zero")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    return nil
}

func main() {

    arith := new(Arith)
    rpc.Register(arith)

    tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
    checkError(err)

    listener, err := net.ListenTCP("tcp", tcpAddr)
    checkError(err)

    for {
        conn, err := listener.Accept()
        if err != nil {
            continue
        }
        jsonrpc.ServeConn(conn)
    }

}

func checkError(err error) {
    if err != nil {
        fmt.Println("Fatal error ", err.Error())
        os.Exit(1)
    }
}

客户端:

package main

import (
    "fmt"
    "log"
    "net/rpc/jsonrpc"
    "os"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Usage: ", os.Args[0], "server:port")
        log.Fatal(1)
    }
    service := os.Args[1]

    client, err := jsonrpc.Dial("tcp", service)
    if err != nil {
        log.Fatal("dialing:", err)
    }
    // Synchronous call
    args := Args{17, 8}
    var reply int
    err = client.Call("Arith.Multiply", args, &reply)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)

    var quot Quotient
    err = client.Call("Arith.Divide", args, &quot)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d/%d=%d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem)

}