原文链接为 https://tilde.town/~hut8/post/grpc-connections/ 使用kimi进行翻译
背景
gRPC是一项出色的现代技术,用于远程过程调用。它允许你在客户端创建一个“存根”对象,该对象的目的是调用服务器上的方法。它是许多情况下REST或GraphQL的绝佳替代品,通常值得学习这项技术。更多信息可以在官方文档中找到。
网络
概念上,我们对客户端和服务器有两种不同的想法。
-
TCP服务器和客户端 - gRPC在HTTP/2之上运行,HTTP/2在TCP之上运行,所以我将讨论TCP中“服务器”和“客户端”的含义。在TCP连接中,客户端是连接的发起者,服务器是连接的接收者。然而,一旦建立了连接,连接就是对称的;客户端和服务器都可以发送和接收消息,直到一方通过shutdown(2)关闭,或通过close(2)关闭连接。
-
gRPC服务器和客户端 - gRPC客户端通过存根调用在服务器上运行的方法。这不是对称的。服务器不能在客户端上调用方法。
服务器/客户端类型耦合问题
如果你有一个gRPC客户端,它也是TCP客户端(它调用connect(2))。如果你有一个gRPC服务器,它也是TCP服务器(它调用listen(2)和accept(2))。
因此,如果你想让两台机器可以相互调用gRPC,那么每台机器都是客户端和服务器(在TCP和gRPC两种意义上),现在你有了两个与彼此无关的TCP连接。在有两朵云实例的场景中,这种架构通常并不复杂。但TCP通常很混乱。防火墙、NAT和动态分配的IP地址可能会使客户端到服务器的单向连接变得复杂。
这里有一个这样的情况的例子:假设你的“服务器”是一个在家庭路由器后面的笔记本电脑上运行的程序,它必须接收命令(例如,远程控制)来自“客户端”(例如,云计算实例)。根据gRPC,客户端必须是云计算实例(这很容易被称为“服务器”),因为那是创建远程过程调用的一方。服务器是笔记本电脑,因为那是实际发生过程调用的地方。
当你在客户端(即云计算实例)上创建一个“存根”时,你必须创建一个TCP连接并连接到服务器(即笔记本电脑)。现在你遇到了几个可能的问题:
你不知道笔记本电脑的IP地址,所以你现在需要一个反向服务,让笔记本电脑告诉云它的IP地址。 笔记本电脑可能在NAT后面,所以一旦你找到了IP地址,笔记本电脑将不得不配置端口转发。这在企业环境中很可能是不可能的。 笔记本电脑可能在防火墙后面,这将禁止传入连接。这可能是本地机器上的,也可能是在网关路由器上的。
这些问题中的一些可能是无法解决的,所以我们需要另一种方法。
gRPC独有的解决方案
服务器不能在客户端gRPC上调用方法。也许你可以让客户端在服务器上调用一个方法,其唯一目的是接收描述服务器希望客户端运行的方法的消息。这可以通过流式响应来完成。但这很复杂,需要大量的代码来绕过gRPC的设计。我在其他地方看到过这个建议,但我认为这是一个丑陋的临时解决方案,它制造的问题比它解决的还要多。考虑一下,你将如何在静态类型的方式中实际调用这些“客户端方法”。
基于隧道的解决方案
将TCP客户端/服务器从gRPC客户端/服务器中解耦的一个好方法是实现某种隧道。在上面的例子中,笔记本电脑可以向云计算实例发起一个TCP连接,然后通过实现特定于语言的接口,“拨号”操作让云计算实例(gRPC客户端)连接到笔记本电脑(gRPC服务器)可以简单地使用现有的TCP连接。
SSH是一个不可思议的协议,它的用途比大多数用户知道的还要多。在我们的情况下,它是将我们的TCP连接从gRPC连接中解耦的完美方式。它还有其他好处:尽管gRPC提供认证和加密,但如果更方便,你可以使用SSH提供的。
这些例子是Go语言特有的,但你可以在任何语言中做类似的事情。gRPC服务器不需要监听端口;你可以传入任何实现了Go的net.Listener
的类型。所以我们可以做一个net.Listener
,它将接受SSH连接,任何时候请求我们的自定义类型的新SSH通道,我们将接受它并返回一个新的net.Conn,这是我们将实现的另一个类型,它只是通过我们的隧道传输数据。
让我们从SSHDataTunnel开始,它是我们的net.Conn
。
import (
"net"
"time"
"golang.org/x/crypto/ssh"
)
// SSHDataTunnel实现了net.Conn
type SSHDataTunnel struct {
Chan ssh.Channel
Conn net.Conn
}
func NewSSHDataTunnel(sshChan ssh.Channel, carrier net.Conn) *SSHDataTunnel {
return &SSHDataTunnel{
Chan: sshChan,
Conn: carrier,
}
}
func (c *SSHDataTunnel) Read(b []byte) (n int, err error) {
return c.Chan.Read(b)
}
func (c *SSHDataTunnel) Write(b []byte) (n int, err error) {
return c.Chan.Write(b)
}
func (c *SSHDataTunnel) Close() error {
return c.Chan.Close()
}
func (c *SSHDataTunnel) LocalAddr() net.Addr {
return c.Conn.LocalAddr()
}
func (c *SSHDataTunnel) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
func (c *SSHDataTunnel) SetDeadline(t time.Time) error {
return c.Conn.SetDeadline(t)
}
func (c *SSHDataTunnel) SetReadDeadline(t time.Time) error {
return c.Conn.SetReadDeadline(t)
}
func (c *SSHDataTunnel) SetWriteDeadline(t time.Time) error {
return c.Conn.SetWriteDeadline(t)
}
// 静态检查类型
var _ net.Conn = &SSHDataTunnel{}
最后一行只是确保我们的类型实际上实现了net.Conn
,如果它没有,它将无法编译。
现在,当我们读取和写入我们的新*SSHDataTunnel
时,数据直接通过SSH连接上的专用通道发送。
SSHChannelListener
是我们的net.Listener
类型,它产生了我们的SSHDataTunnel
。
type SSHChannelListener struct {
// 通道请求本质上与传入的TCP连接相同
Chans <-chan ssh.NewChannel
SSHConn ssh.Conn
TCPConn *net.TCPConn
}
// Accept等待并返回监听器的下一个连接。
func (l *SSHChannelListener) Accept() (net.Conn, error) {
chanRq := <-l.Chans
if chanRq == nil {
return nil, net.ErrClosed
}
if chanRq.ChannelType() != "grpc-tunnel" {
chanRq.Reject(ssh.UnknownChannelType, "unknown channel type")
return nil, errors.New("could not accept on ssh channel listener: unknown channel type")
}
channel, reqs, err := chanRq.Accept()
if err != nil {
return nil, err
}
go ssh.DiscardRequests(reqs)
return &SSHDataTunnel{
Chan: channel,
Conn: l.TCPConn,
}, nil
}
// Close关闭监听器。
// 任何阻塞的Accept操作将被取消阻塞并返回错误。
func (l *SSHChannelListener) Close() error {
return l.SSHConn.Close()
}
// Addr返回监听器的网络地址。
func (l *SSHChannelListener) Addr() net.Addr {
return l.SSHConn.LocalAddr()
}
现在我们有了用SSH代替net.Listener
和net.Conn
的适配器,这是gRPC所需要的。我们仍然需要设置到云计算实例的SSH连接本身,以便构建SSHChannelListener
。这将在TCP客户端上完成,即gRPC服务器(“笔记本电脑”)。
// Connect从TCP客户端(即gRPC服务器)到TCP服务器发起一个出站隧道连接
func Connect() (*SSHChannelListener, error) {
// 解析并建立TCP连接
addr, err := net.ResolveTCPAddr("tcp4", ServerAddress)
if err != nil {
// 真正的错误处理在这里
return nil, err
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
// 真正的错误处理在这里
return nil, err
}
// 使用我们的TCP连接,建立一个SSH连接
sshConn, chans, requests, err := ssh.NewClientConn(conn, ServerAddress, c.ClientConfig)
if err != nil {
// 真正的错误处理在这里
return nil, err
}
// 忽略所有请求(这不包括新通道)
go ssh.DiscardRequests(reqs)
return &SSHChannelListener{
Chans: chans,
SSHConn: sshConn,
TCPConn: conn,
}, nil
}
使用SSHChannelListener在笔记本电脑上使用gRPC很容易。假设你在协议包中有一个名为XServer的服务:
listener, err := Connect()
if err != nil {
// 真正的错误处理在这里
panic(err)
}
s := grpc.NewServer()
protocol.RegisterXServer(s, &xServer{})
s.Serve(listener)
在云计算实例上(TCP服务器/gRPC客户端),我们也可以很容易地连接gRPC。首先,我们将定义一个满足grpc.WithContextDialer
的函数:
func (c *Client) SSHConnDialer(context.Context, string) (net.Conn, error) {
sshChan, reqs, err := c.sshConn.OpenChannel("grpc-tunnel", nil)
if err != nil {
return nil, err
}
go ssh.DiscardRequests(reqs)
conn := &SSHConn{
Chan: sshChan,
Conn: nConn,
}
return conn, nil
}
现在我们将在云计算实例上使用该拨号器构建gRPC客户端:
grpcConn, err := grpc.Dial(ServerAddress,
grpc.WithContextDialer(sshConnDialer),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
// 真正的错误处理在这里
panic(err)
}
defer grpcConn.Close()
svc := protocol.NewXClient(grpcConn)
// 现在你可以像使用普通客户端一样使用svc:
svc.Frobulate()
处理断开连接
检测和处理gRPC的断开连接可能有点棘手,所以我将在这里概述一种方法。与其在客户端调用方法并得到一个错误(这当然是检测断开连接的一种方式),我们可以使用grpc.Handler
在检测到断开连接时提供某种通知。使用chan
使得这变得非常容易。云计算实例(TCP服务器/gRPC客户端)将需要一些额外的代码。
type DisconnectDetector struct {
// 如果需要,还可以添加一个日志记录器
CloseChan chan struct{}
}
// NewDisconnectDetector返回一个有效的DisconnectDetector
func NewDisconnectDetector() *DisconnectDetector {
return &DisconnectDetector{
CloseChan: make(chan struct{}),
}
}
// TagRPC是一个无操作函数
func (h *DisconnectDetector) TagRPC(context.Context, *stats.RPCTagInfo) context.Context {
return context.Background()
}
// HandleRPC是一个无操作函数
func (h *DisconnectDetector) HandleRPC(context.Context, stats.RPCStats) {}
// TagConn是一个无操作函数
func (h *DisconnectDetector) TagConn(context.Context, *stats.ConnTagInfo) context.Context {
return context.Background()
}
// HandleConn处理Conn统计数据。
func (h *DisconnectDetector) HandleConn(c context.Context, s stats.ConnStats) {
switch s.(type) {
case *stats.ConnEnd:
h.CloseChan <- struct{}{}
}
}
在上面的grpc.Dial调用中,我们可以添加另一个参数:
disconnectDetector := NewDisconnectDetector()
grpc.Dial( // 和之前一样,再加上:
grpc.WithStatsHandler(disconnectDetector))
现在,每当你想要在断开连接时得到通知(在单独的goroutine中),只需使用:
<- disconnectDetector.CloseChan
// ... 断开连接时运行的代码
其他参考文章,不属于原文,翻译时添加,以备参考