Remote Procedure Call¶
Go语言的RPC包的路径为net/rpc,也就是放在了net包目录下面。因此我们可以猜测该RPC包是建立在net包基础之上的。
Go标准包中已经提供了对RPC的支持,而且支持三个级别的RPC:TCP、HTTP、JSONRPC。但Go的RPC包是独一无二的RPC,它和传统的RPC系统不同,它只支持Go开发的服务器与客户端之间的交互,因为在内部,它们采用了Gob来编码。
Go RPC的函数只有符合下面的条件才能被远程访问,不然会被忽略,详细的要求如下:
- 函数必须是导出的(首字母大写)
- 必须有两个导出类型的参数,
- 第一个参数是接收的参数,第二个参数是返回给客户端的参数,第二个参数必须是指针类型的
- 函数还要有一个返回值error
举个🌰,以下的RPC函数格式正确:
func (t *T) MethodName(argType T1, replyType *T2) error
T、T1和T2类型必须能被encoding/gob
包编解码。
任何的RPC都需要通过网络来传递数据,Go RPC可以利用HTTP和TCP来传递数据,利用HTTP的好处是可以直接复用net/http
里面的一些函数。
RPC版 Hello World¶
我们先构造一个HelloService类型,其中的Hello方法用于实现打印功能:
type HelloService struct {}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
return nil
}
其中Hello方法必须满足Go语言的RPC规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个error类型,同时必须是公开的方法。
然后就可以将HelloService类型的对象注册为一个RPC服务:
func main() {
rpc.RegisterName("HelloService", new(HelloService))
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
rpc.ServeConn(conn)
}
其中rpc.Register函数调用会将对象类型中所有满足RPC规则的对象方法注册为RPC函数,所有注册的方法会放在“HelloService”服务空间之下。然后我们建立一个唯一的TCP链接,并且通过rpc.ServeConn函数在该TCP链接上为对方提供RPC服务。
下面是客户端请求HelloService服务的代码:
func main() {
client, err := rpc.Dial("tcp", "localhost:1234")
if err != nil {
log.Fatal("dialing:", err)
}
var reply string
err = client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
首先是通过rpc.Dial拨号RPC服务,然后通过client.Call调用具体的RPC方法。在调用client.Call时,第一个参数是用点号链接的RPC服务名字和方法名字,第二和第三个参数分别我们定义RPC方法的两个参数。
重构 Hello World¶
一般RPC开发会先定义接口规范,再定义两端的具体操作。
RPC服务的接口规范分为三个部分:首先是服务的名字,然后是服务要实现的详细方法列表,最后是注册该类型服务的函数。为了避免名字冲突,我们在RPC服务的名字中增加了包路径前缀(这个是RPC服务抽象的包路径,并非完全等价Go语言的包路径)。RegisterHelloService注册服务时,编译器会要求传入的对象满足HelloServiceInterface接口。
服务的名字和接口:
type HelloServiceClient struct {
*rpc.Client
}
var _ HelloServiceInterface = (*HelloServiceClient)(nil)
func DialHelloService(network, address string) (*HelloServiceClient, error) {
c, err := rpc.Dial(network, address)
if err != nil {
return nil, err
}
return &HelloServiceClient{Client: c}, nil
}
func (p *HelloServiceClient) Hello(request string, reply *string) error {
return p.Client.Call(HelloServiceName+".Hello", request, reply)
}
客户端代码:
func main() {
client, err := DialHelloService("tcp", "localhost:1234")
if err != nil {
log.Fatal("dialing:", err)
}
var reply string
err = client.Hello("hello", &reply)
if err != nil {
log.Fatal(err)
}
}
服务端代码:
type HelloService struct {}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
return nil
}
func main() {
RegisterHelloService(new(HelloService))
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
go rpc.ServeConn(conn)
}
}
在新的RPC服务端实现中,我们用RegisterHelloService函数来注册函数,这样不仅可以避免命名服务名称的工作,同时也保证了传入的服务对象满足了RPC接口的定义。最后我们新的服务改为支持多个TCP链接,然后为每个TCP链接提供RPC服务。
语言无关¶
语言无关是指:无论采用何种编程语言,只要遵循同样的json结构,以同样的流程就可以和Go语言编写的RPC服务进行通信。这样我们就实现了语言无关的RPC。
标准库的RPC默认采用Go语言特有的gob编码,因此从其它语言调用Go语言实现的RPC服务将比较困难,而每个RPC以及服务的使用者都可能采用不同的编程语言。得益于RPC的框架设计,Go语言的RPC其实也是很容易实现跨语言支持的。
Go语言的RPC框架有两个比较有特色的设计:
- RPC数据打包时可以通过插件实现自定义的编码和解码。
- RPC建立在抽象的io.ReadWriteCloser接口之上的,我们可以将RPC架设在不同的通讯协议之上。
举个🌰,这里我们将尝试通过官方自带的net/rpc/jsonrpc扩展实现一个跨语言的RPC。
基于JSON编码重新实现RPC服务端:
func main() {
rpc.RegisterName("HelloService", new(HelloService))
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
go rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) // 取代了rpc.ServeConn函数
}
}
代码中最大的变化是用rpc.ServeCodec函数替代了rpc.ServeConn函数,传入的参数是针对服务端的json编解码器。
JSON版本的客户端:
func main() {
conn, err := net.Dial("tcp", "localhost:1234")
if err != nil {
log.Fatal("net.Dial:", err)
}
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
var reply string
err = client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
先手工调用net.Dial函数建立TCP链接,然后基于该链接建立针对客户端的json编解码器。
在确保客户端可以正常调用RPC服务的方法之后,我们在客户端用一个普通的TCP服务代替Go语言版本的RPC服务,这样可以查看客户端调用时发送的数据格式。比如在客户端通过nc命令nc -l 1234
在同样的端口启动一个TCP服务。然后再次执行一次RPC调用将会发现nc输出了以下的信息:
{"method":"HelloService.Hello","params":["hello"],"id":0}
method部分对应要调用的rpc服务和方法组合成的名字,params部分的第一个元素为参数,id是由调用端维护的一个唯一的调用编号。
请求的json数据对象在内部对应两个结构体:客户端是clientRequest,服务端是serverRequest。clientRequest和serverRequest结构体的内容基本是一致的:
type clientRequest struct {
Method string `json:"method"`
Params [1]interface{} `json:"params"`
Id uint64 `json:"id"`
}
type serverRequest struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params"`
Id *json.RawMessage `json:"id"`
}
在获取到RPC调用对应的json数据后,我们可以通过直接向架设了RPC服务的TCP服务器发送json数据模拟RPC方法调用:
$ echo -e '{"method":"HelloService.Hello","params":["hello"],"id":1}' | nc localhost 1234
返回的结果也是一个json格式的数据:
{"id":1,"result":"hello:hello","error":null}
其中id对应输入的id参数,result为返回的结果,error部分在出问题时表示错误信息。对于顺序调用来说,id不是必须的。但是Go语言的RPC框架支持异步调用,当返回结果的顺序和调用的顺序不一致时,可以通过id来识别对应的调用。
返回的json数据也是对应内部的两个结构体:客户端是clientResponse,服务端是serverResponse。两个结构体的内容同样也是类似的:
type clientResponse struct {
Id uint64 `json:"id"`
Result *json.RawMessage `json:"result"`
Error interface{} `json:"error"`
}
type serverResponse struct {
Id *json.RawMessage `json:"id"`
Result interface{} `json:"result"`
Error interface{} `json:"error"`
}
Http上的RPC¶
Go语言内在的 RPC 框架已经支持在 Http 协议上提供 RPC 服务。但是框架的 http 服务同样采用了内置的 gob 协议,并且没有提供采用其它协议的接口,因此从其它语言依然无法访问的。
举个🌰
func main() {
rpc.RegisterName("HelloService", new(HelloService))
http.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
var conn io.ReadWriteCloser = struct {
io.Writer
io.ReadCloser
}{
ReadCloser: r.Body,
Writer: w,
}
rpc.ServeRequest(jsonrpc.NewServerCodec(conn))
})
http.ListenAndServe(":1234", nil)
}
RPC 的服务架设在 “/jsonrpc” 路径,在处理函数中基于 http.ResponseWriter 和 http.Request 类型的参数构造一个 io.ReadWriteCloser 类型的 conn 通道。然后基于 conn 构建针对服务端的 json 编码解码器。最后通过 rpc.ServeRequest 函数为每次请求处理一次 RPC 方法调用。
模拟一次RPC调用的过程就是向该链接发送一个 json 字符串:
$ curl localhost:1234/jsonrpc -X POST \
--data '{"method":"HelloService.Hello","params":["hello"],"id":0}'
返回的结果依然是 json 字符串:
{"id":0,"result":"hello:hello","error":null}
更多 RPC 示例¶
HTTP RPC¶
服务端:
package main
import (
"errors"
"fmt"
"net/http"
"net/rpc"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func main() {
arith := new(Arith)
rpc.Register(arith)
rpc.HandleHTTP()
err := http.ListenAndServe(":1234", nil)
if err != nil {
fmt.Println(err.Error())
}
}
我们注册了一个 Arith 的 RPC 服务,然后通过rpc.HandleHTTP
函数把该服务注册到了 HTTP 协议上,就可以利用 http 的方式来传递数据了。
客户端:
package main
import (
"fmt"
"log"
"net/rpc"
"os"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
func main() {
if len(os.Args) != 2 {
fmt.Println("Usage: ", os.Args[0], "server")
os.Exit(1)
}
serverAddress := os.Args[1]
client, err := rpc.DialHTTP("tcp", serverAddress+":1234")
if err != nil {
log.Fatal("dialing:", err)
}
// Synchronous call
args := Args{17, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)
var quot Quotient
err = client.Call("Arith.Divide", args, ")
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d/%d=%d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem)
}
把上面的服务端和客户端的代码分别编译,然后先把服务端开启,然后开启客户端,输入代码,就会输出如下信息:
$ ./http_c localhost
Arith: 17*8=136
Arith: 17/8=2 remainder 1
通过上面的调用可以看到参数和返回值是自定义的 struct 类型,在服务端把它们当做调用函数的参数的类型,在客户端作为client.Call
的第2,3两个参数的类型。客户端的 Call 函数有3个参数,第1个要调用的函数的名字,第2个是要传递的参数,第3个要返回的参数(注意是指针类型)。
TCP RPC¶
服务端:
这个代码和 http 的服务器相比,不同在于采用了 TCP 协议,需要自己控制连接,当有客户端连接上来后,我们需要把这个连接交给 rpc 来处理。
package main
import (
"errors"
"fmt"
"net"
"net/rpc"
"os"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func main() {
arith := new(Arith)
rpc.Register(arith)
tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
checkError(err)
listener, err := net.ListenTCP("tcp", tcpAddr)
checkError(err)
for {
conn, err := listener.Accept()
if err != nil {
continue
}
rpc.ServeConn(conn)
}
}
func checkError(err error) {
if err != nil {
fmt.Println("Fatal error ", err.Error())
os.Exit(1)
}
}
客户端:
和 http 的客户端代码对比,唯一的区别是用 rpc.Dial 取代了 rpc.DialHTTP,其他处理一模一样。
package main
import (
"fmt"
"log"
"net/rpc"
"os"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
func main() {
if len(os.Args) != 2 {
fmt.Println("Usage: ", os.Args[0], "server:port")
os.Exit(1)
}
service := os.Args[1]
client, err := rpc.Dial("tcp", service)
if err != nil {
log.Fatal("dialing:", err)
}
// Synchronous call
args := Args{17, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)
var quot Quotient
err = client.Call("Arith.Divide", args, ")
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d/%d=%d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem)
}
JSON RPC¶
JSON RPC是数据编码采用了JSON,而不是gob编码,其他和上面介绍的RPC概念一模一样。详见 jsonrpc包
json-rpc是基于TCP协议实现的,目前它还不支持HTTP方式。
服务端:
package main
import (
"errors"
"fmt"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"os"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func main() {
arith := new(Arith)
rpc.Register(arith)
tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
checkError(err)
listener, err := net.ListenTCP("tcp", tcpAddr)
checkError(err)
for {
conn, err := listener.Accept()
if err != nil {
continue
}
jsonrpc.ServeConn(conn)
}
}
func checkError(err error) {
if err != nil {
fmt.Println("Fatal error ", err.Error())
os.Exit(1)
}
}
客户端:
package main
import (
"fmt"
"log"
"net/rpc/jsonrpc"
"os"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
func main() {
if len(os.Args) != 2 {
fmt.Println("Usage: ", os.Args[0], "server:port")
log.Fatal(1)
}
service := os.Args[1]
client, err := jsonrpc.Dial("tcp", service)
if err != nil {
log.Fatal("dialing:", err)
}
// Synchronous call
args := Args{17, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)
var quot Quotient
err = client.Call("Arith.Divide", args, ")
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d/%d=%d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem)
}