Skip to content
Menu
CFC Studio
  • 实验室主页
  • CFC 招新简章
  • 友情链接
  • RSS订阅
CFC Studio

Golang实现支持中间件的简易TCP框架

Posted on 2022年8月16日2023年12月14日 by 枫阿雨

在golang的标准库中没有为tcp直接提供像http那样简单易用的服务框架,我们不妨自己手动实现一个

主体思路

我们的实现的主题思路分为以下四个内容

  1. 监听服务
  2. 获取构建新连接对象并设置超时时间及keepalive
  3. 设置方法退出时连接关闭
  4. 调用回调接口 TcpHandler

主要结构体和接口

首先是TCPServer的结构体,我们希望用户可以自由构建TcpServer并设置超时时间等自定义选项

type TcpServer struct {
   Addr    string
   Handler TCPHandler  <- 对外提供的服务方法接口
   err     error
   BaseCtx context.Context

   WriteTimeout     time.Duration
   ReadTimeout      time.Duration
   KeepAliveTimeout time.Duration

   mu         sync.Mutex
   inShutdown int32
   doneChan   chan struct{}
   l          *onceCloseListener
}

像httpHandler一样,对外提供抽象的ServeTCP方法

type TCPHandler interface {
   ServeTCP(ctx context.Context, conn net.Conn)
}

服务启动方法

用户可以通过自行构建TcpServer实例再通过ListenAndServe()调用服务,或通过tcp.ListenAndServe(":8080", handler) 使用默认的TcpServer实例快速启动服务。

在 ListenAndServe() 方法中,进行参数的校验和初始化操作

Serve(l net.Listener) 方法中,通过 l.Accept() 接收信息,包装接收到的conn并另起一个协程处理服务

func ListenAndServe(addr string, handler TCPHandler) error {
    server := &TcpServer{Addr: addr, Handler: handler, doneChan: make(chan struct{})}
    return server.ListenAndServe()
}

func (s *TcpServer) ListenAndServe() error {
   if s.shuttingDown() {
      return ErrServerClosed
   }
   if s.doneChan == nil {
      s.doneChan = make(chan struct{})
   }
   addr := s.Addr
   if addr == "" {
      return errors.New("need addr")
   }
   ln, err := net.Listen("tcp", addr)
   if err != nil {
      return err
   }
   return s.Serve(tcpKeepAliveListener{
      ln.(*net.TCPListener)})
}

func (s *TcpServer) Serve(l net.Listener) error {
    s.l = &onceCloseListener{Listener: l}
    defer s.l.Close() //执行listener关闭
    if s.BaseCtx == nil {
        s.BaseCtx = context.Background()
    }
    baseCtx := s.BaseCtx
    ctx := context.WithValue(baseCtx, ServerContextKey, s) <- 将TcpServer实例存入context中
    for {
        rw, e := l.Accept()
        if e != nil {
            select {
            case <-s.getDoneChan():
                return ErrServerClosed
            default:
            }
            fmt.Printf("accept fail, err: %v\n", e)
            continue
        }
        c := s.newConn(rw)
        go c.serve(ctx)
    }
    return nil
}

包装 net.Conn 为 tcp.conn

type conn struct {
    server     *TcpServer   // 反引用TcpServer
    remoteAddr string       // 发送端地址
    rwc        net.Conn
}

func (s *TcpServer) newConn(rwc net.Conn) *conn {
   c := &conn{
      server: s,
      rwc:    rwc,
   }
   // 设置参数
   if d := c.server.ReadTimeout; d != 0 {
      c.rwc.SetReadDeadline(time.Now().Add(d))
   }
   if d := c.server.WriteTimeout; d != 0 {
      c.rwc.SetWriteDeadline(time.Now().Add(d))
   }
   if d := c.server.KeepAliveTimeout; d != 0 {
      if tcpConn, ok := c.rwc.(*net.TCPConn); ok {
         tcpConn.SetKeepAlive(true)
         tcpConn.SetKeepAlivePeriod(d)
      }
   }
   return c
}

由 tcp.conn.Server(ctx) 调用回调函数进行服务处理

func (c *conn) serve(ctx context.Context) {
   defer func() {
      if err := recover(); err != nil && err != ErrAbortHandler {
         const size = 64 << 10
         buf := make([]byte, size)
         buf = buf[:runtime.Stack(buf, false)]
         fmt.Printf("tcp: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
      }
      c.close()
   }()
   c.remoteAddr = c.rwc.RemoteAddr().String()
   ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
   if c.server.Handler == nil {
      panic("handler empty")
   }
   c.server.Handler.ServeTCP(ctx, c.rwc)
}

这样,一个简单易用的TCP服务框架就搭建完成了,其中一些close() 等方法在此处没有展示出来,更多详细代码可在我的代码仓库中查看:https://github.com/Kirov7/fayUtils/net/tcp

扩展中间件的实现

扩展中间件功能的实现思路

  • 方法构建
    • 构建中间件URL路由
    • 构建URL的中间件方法数组
    • 使用Use方法整合路由与方法数组
  • 方法调用
    • 构建方法请求逻辑
    • 封装TCPHandler接口与TcpServer整合

TcpSliceRouter.Group(path) 方法,初始化路由分组(默认只能全局)

// 创建 Group
func (g *TcpSliceRouter) Group(path string) *TcpSliceGroup {
   if path != "/" {
      panic("only accept path=/")
   }
   return &TcpSliceGroup{
      TcpSliceRouter: g,
      path:           path,
   }
}

TcpSliceGroup.Use(middlewares ...TcpHandlerFunc) 构造回调方法

调用 Use 方法传入中间件集合,添加到切片 c.handlers 中

// 构造回调方法
func (g *TcpSliceGroup) Use(middlewares ...TcpHandlerFunc) *TcpSliceGroup {
   g.handlers = append(g.handlers, middlewares...)
   existsFlag := false
   for _, oldGroup := range g.TcpSliceRouter.groups {
      if oldGroup == g {
         existsFlag = true
      }
   }
   if !existsFlag {
      g.TcpSliceRouter.groups = append(g.TcpSliceRouter.groups, g)
   }
   return g
}

通过 NewTcpSliceRouterHandler 方法传入最后调用的逻辑方法coreFunc并传入已经 Use 了中间件的, TcpSliceRouter

func NewTcpSliceRouterHandler(coreFunc func(*TcpSliceRouterContext) tcp_server.TCPHandler, router *TcpSliceRouter) *TcpSliceRouterHandler {
   return &TcpSliceRouterHandler{
      coreFunc: coreFunc,
      router:   router,
   }
}

最终的回调函数 ServeTCP((ctx context.Context, conn net.Conn),初始化 context 之后将 coreFunc 追加到 c.handlers,重置执行光标,从第一个 c.handlers 开始执行中间件

func (w *TcpSliceRouterHandler) ServeTCP(ctx context.Context, conn net.Conn) {
   c := newTcpSliceRouterContext(conn, w.router, ctx)
   c.handlers = append(c.handlers, func(c *TcpSliceRouterContext) {
      w.coreFunc(c).ServeTCP(ctx, conn)
   })
   c.Reset()
   c.Next()
}

在中间件中自行调用Next()、Abort()等中间件逻辑,最后所有中间件执行完毕之后执行 coreFunc(已经被追加到c.handlers的最后位置)

// 从最先加入中间件开始回调
func (c *TcpSliceRouterContext) Next() {
   c.index++
   for c.index < int8(len(c.handlers)) {
      c.handlers[c.index](c)
      c.index++
   }
}

// 跳出中间件方法
func (c *TcpSliceRouterContext) Abort() {
    c.index = abortIndex
}

发表评论 取消回复

您的电子邮箱地址不会被公开。 必填项已用*标注

分类

  • CFC 周刊 (4)
  • CFC 技术 (44)
  • CFC 日常 (3)
  • 未分类 (15)
  • 活动通知 (3)

标签

ACM Android anime animeloop animeloop-cli APP Apple aria2 Array Blog CFC数据结构与算法训练指南 CoreData CQUT Don't Starve Hexo iBooks JavaScript macOS Matlab moeoverflow OpenCV Programming README RxJS SQLite SQLite3 Steam Swift Theme Web Xcode 主题模板 动漫 博客 反编译 妹子 循环 教程 数据库 游戏 算法 装逼 视频 重庆理工大学 饥荒

登录
©2025 CFC Studio | Powered by WordPress & Superb Themes