聊一聊Go网络编程(一)--TCP通信
在网络分层的七层协议中,我们知道TCP处于HTTP层的下方,本质上HTTP应用层包体解析是基于底层的TCP连接建立的。
TCP协议概要
在网络分层的七层协议中,我们知道TCP处于HTTP层的下方,本质上HTTP包体解析是基于底层的TCP连接建立的。
TCP连接标识: 计算机之间在建立网络连接,也就是俗称的握手,本质上是两个文件句柄的关联,即fd,每个网络连接由四个属性唯一标识:<源IP,源端口,目标IP,目标端口>,因此一台机器的连接数受文件句柄ulimit
的限制。
操作系统接口: Socket套接字
长连接KeepAlive的对比
-
HTTP keepalive
众所周知,HTTP连接是无状态的,通常连接用完就销毁,开启keepalive可以告知其保持连接一段时间,避免频繁连接重建。 -
TCP keepalive
Many existing TCP protocols support this way of error handling by defining some sort of heartbeat mechanism that requires each endpoint to send PING/PONG probes at a regular interval in order to detect both networking problems, as well as service health.
TCP有别于HTTP,本身就是为了长连接而设定的,keepalive用于活性检测,可以理解为通过定义某种类型的心跳机制来支持这种错误处理方式,该心跳机制要求每个端点以规则的间隔发送ping/pong探测,以便检测网络问题以及服务健康。
Linux网络参数
在Linux机器可以通过下列网络参数设置TCP保活机制:
# cat /proc/sys/net/ipv4/tcp_keepalive_time
7200
# cat /proc/sys/net/ipv4/tcp_keepalive_intvl
75
# cat /proc/sys/net/ipv4/tcp_keepalive_probes
9
上述是默认设置,表示初始创建连接在两小时(7200秒)之后,每75秒重新发送一次。如果连续9次未收到ACK响应,则连接被标记为断开。
Go API介绍
在Go原生net包中,有下列函数可以干涉TCP连接的保活机制:
func (c *TCPConn) SetKeepAlive(keepalive bool) error
是否开启连接检测func (c *TCPConn) SetKeepAlivePeriod(d time.Duration) error
连接检测间隔,如果不设置默认使用所在操作系统参数设置
用例Demo
下面我们先用一个TCP连接demo进行交互,之后我们再用连接池把TCP连接进行集中管理。
传输结构
简单定义两个结构用于客户端与服务端交互,传输协议用json示范
type Message struct {
Uid string
Val string
}
type Resp struct {
Uid string
Val string
Ts string
}
server端
const TAG = "server: hello, "
func transfer(conn net.Conn) {
defer func() {
remoteAddr := conn.RemoteAddr().String()
log.Print("discard remove add:", remoteAddr)
conn.Close()
}()
// 设置10秒关闭连接
//conn.SetDeadline(time.Now().Add(10 * time.Second))
for {
var msg body.Message
if err := json.NewDecoder(conn).Decode(&msg); err != nil && err != io.EOF {
log.Printf("Decode from client err: %v", err)
// todo... 仿照redis协议写入err前缀符号`-`,通知client错误处理
return
}
if msg.Uid != "" || msg.Val != "" {
//conn.Write([]byte(msg.Val))
var rsp body.Resp
rsp.Uid = msg.Uid
rsp.Val = TAG + msg.Val
ser, _ := json.Marshal(msg)
conn.Write(append(ser, '\n'))
}
}
}
func ListenAndServer() {
log.Print("Start server...")
// 启动监听本地tcp端口3000
listen, err := net.Listen("tcp", "0.0.0.0:3000")
if err != nil {
log.Fatal("Listen failed. msg: ", err)
return
}
for {
conn, err := listen.Accept()
if err != nil {
log.Printf("accept failed, err: %v", err)
continue
}
go transfer(conn)
}
}
client端
定义一个Conn连接类型,用来包装原生tcp和其他额外属性,包括上下文,结果通道等。
type IConn interface {
Close() error
}
// Conn 对应每个连接
type Conn struct {
addr string // 地址
tcp *net.TCPConn // tcp连接实例, 可以是其他类型
ctx context.Context
writer *bufio.Writer
cnlFun context.CancelFunc // 用于通知ctx结束
retChan *sync.Map // 存放通道结果集合的map, 属于统一连接
err error
}
// 为Conn实现Close()函数签名 关闭连接, 关闭消息通道
func (c *Conn) Close() (err error) {
// 执行善后
if c.cnlFun != nil {
c.cnlFun()
}
// 关闭tcp连接
if c.tcp != nil {
err = c.tcp.Close()
}
// 关闭消息通道
if c.retChan != nil {
c.retChan.Range(func(key, value interface{}) bool {
// 根据具体业务断言转换通道类型
if ch, ok := value.(chan string); ok {
close(ch)
}
return true
})
}
return
}
定义连接的配置项option结构
type Option struct {
addr string
size int
readTimeout time.Duration
dialTimeout time.Duration
keepAlive time.Duration
}
紧接着创建连接代码如下:
func NewConn(opt *Option) (c *Conn, err error) {
// 初始化连接
c = &Conn{
addr: opt.addr,
retChan: new(sync.Map),
//err: nil,
}
defer func() {
if err != nil {
if c != nil {
c.Close()
}
}
}()
// 拨号
var conn net.Conn
if conn, err = net.DialTimeout("tcp", opt.addr, opt.dialTimeout); err != nil {
return
} else {
c.tcp = conn.(*net.TCPConn)
}
c.writer = bufio.NewWriter(c.tcp)
//if err = c.tcp.SetKeepAlive(true); err != nil {
if err = c.tcp.SetKeepAlive(false); err != nil {
return
}
if err = c.tcp.SetKeepAlivePeriod(opt.keepAlive); err != nil {
return
}
if err = c.tcp.SetLinger(0); err != nil {
return
}
// 创建上下文管理
c.ctx, c.cnlFun = context.WithCancel(context.Background())
// 异步接收结果到相应的结果集
go receiveResp(c)
return
}
异步接收结果
来看下异步接收结果的代码,其中的receiveResp()
函数,主要进行异步轮询,有几个作用:
- 感知上下文关闭,通常是连接的cancel()被执行
- 接收server端的数据并写入结果通道
retChan
,其类型是并发安全的sync.Map
- 监听server的错误,对异常情况关闭连接
// receiveResp 接收tcp连接的数据
func receiveResp(c *Conn) {
scanner := bufio.NewScanner(c.tcp)
for {
select {
case <-c.ctx.Done():
// c.cnlFun() 被执行了, 如连接池关闭
return
default:
if scanner.Scan() {
// 读取数据
rsp := new(body.Resp)
if err := json.Unmarshal(scanner.Bytes(), rsp); err != nil {
return
}
// 响应id与请求id对应
uid := rsp.Uid
if load, ok := c.retChan.Load(uid); ok {
c.retChan.Delete(uid)
// 消息通道
if ch, ok := load.(chan string); ok {
ch <- rsp.Ts + ": " + rsp.Val
// 在写入端关闭
close(ch)
}
}
} else {
// 错误, 合并了EOF
if scanner.Err() != nil {
c.err = scanner.Err()
} else {
c.err = errors.New("scanner done")
}
c.Close()
return
}
}
}
}
发送请求
/*
Send 发送请求, 返回具体业务通道
注意如果入参的msg消息体是interface{}类型, 最好根据业务进行
类型断言校验, 避免server端解析出错,返回err值用于后续判断
是否归还连接池。
*/
func (c *Conn) Send(ctx context.Context, msg *body.Message) (ch chan string, err error) {
ch = make(chan string)
c.retChan.Store(msg.Uid, ch)
// 请求
js, _ := json.Marshal(msg)
_, err = c.writer.Write(js)
if err != nil {
return
}
err = c.writer.Flush()
// 连接不关闭, 后续可以放入连接池
//c.tcp.CloseWrite()
return
}
实例:
- 启动server端监听:
=== RUN TestListenAndServer
2021/05/10 16:58:20 Start server...
- 发起请求:
var OPT = &Option{
addr: "0.0.0.0:3000",
size: 3,
readTimeout: 3 * time.Second,
dialTimeout: 3 * time.Second,
keepAlive: 1 * time.Second,
}
func createConn(opt *Option) *Conn {
c, err := NewConn(opt)
if err != nil {
panic(err)
}
return c
}
func TestSendMsg(t *testing.T) {
c := createConn(OPT)
msg := &body.Message{Uid: "pixel-1", Val: "pixelpig!"}
rec, err := c.Send(context.Background(), msg)
if err != nil {
t.Error(err)
} else {
t.Logf("rec1: %+v", <-rec)
}
msg.Val = "another pig!"
rec2, err := c.Send(context.Background(), msg)
if err != nil {
t.Error(err)
} else {
t.Logf("rec2: %+v", <-rec2)
}
t.Log("finished")
}
- 客户端输出如下:
=== RUN TestSendMsg
TestSendMsg: conn_test.go:56: rec1: : pixelpig!
TestSendMsg: conn_test.go:64: rec2: : another pig!
TestSendMsg: conn_test.go:66: finished
--- PASS: TestSendMsg (9.94s)
PASS
超时与池化管理
上面是一个比较简单的点对点交互,后续其实还可以考虑连接交互超时的情况:
- 虽然连接结果是异步响应,但是我们有必要对响应进行超时判断,防止单个连接持续阻塞
- 我们要考虑复用,即把健康的连接放入连接池进行管理。
超时判断
超时判断业界有许多做法,比较常见的是用一个select{}
块与time.After()
即可。
下面我们来看下常见的实现:
rec3, err := c.Send(context.Background(), msg)
if err == nil {
select {
case resp := <-rec3:
t.Logf("rec3: %+v", resp)
return
case <-time.After(time.Second * 1):
t.Error("Wait for resp timeout!")
return
}
} else {
t.Error(err)
}
超时输出如下:
=== RUN TestSendMsg
TestSendMsg: conn_test.go:56: rec1: : pixelpig!
TestSendMsg: conn_test.go:76: Wait for resp timeout!
--- FAIL: TestSendMsg (17.99s)
FAIL
连接池管理
这里要考虑的情况稍微复杂点,可以先把难点列出来再逐个击破:
- 池子的连接数上限
- 空闲连接数更新
- 连接获取与归还
- 连接关闭
关于池化操作篇幅可能较长,详解在本系列的下一篇《聊一聊Go网络编程–TCP连接管理(二)》叙述。
参考链接
Notes on TCP keepalive in Go
https://thenotexpert.com/golang-tcp-keepalive/
Using TCP keepalive under Linux
https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html