在golang的标准库中没有为tcp直接提供像http那样简单易用的服务框架,我们不妨自己手动实现一个
主体思路
我们的实现的主题思路分为以下四个内容
- 监听服务
- 获取构建新连接对象并设置超时时间及keepalive
- 设置方法退出时连接关闭
- 调用回调接口 TcpHandler
主要结构体和接口
首先是TCPServer的结构体,我们希望用户可以自由构建TcpServer并设置超时时间等自定义选项
type TcpServer struct {
Addr string
Handler TCPHandler <- 对外提供的服务方法接口
err error
BaseCtx context.Context
WriteTimeout time.Duration
ReadTimeout time.Duration
KeepAliveTimeout time.Duration
mu sync.Mutex
inShutdown int32
doneChan chan struct{}
l *onceCloseListener
}
像httpHandler一样,对外提供抽象的ServeTCP方法
type TCPHandler interface {
ServeTCP(ctx context.Context, conn net.Conn)
}
服务启动方法
用户可以通过自行构建TcpServer实例再通过ListenAndServe()调用服务,或通过tcp.ListenAndServe(":8080", handler)
使用默认的TcpServer实例快速启动服务。
在 ListenAndServe()
方法中,进行参数的校验和初始化操作
Serve(l net.Listener)
方法中,通过 l.Accept()
接收信息,包装接收到的conn并另起一个协程处理服务
func ListenAndServe(addr string, handler TCPHandler) error {
server := &TcpServer{Addr: addr, Handler: handler, doneChan: make(chan struct{})}
return server.ListenAndServe()
}
func (s *TcpServer) ListenAndServe() error {
if s.shuttingDown() {
return ErrServerClosed
}
if s.doneChan == nil {
s.doneChan = make(chan struct{})
}
addr := s.Addr
if addr == "" {
return errors.New("need addr")
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return s.Serve(tcpKeepAliveListener{
ln.(*net.TCPListener)})
}
func (s *TcpServer) Serve(l net.Listener) error {
s.l = &onceCloseListener{Listener: l}
defer s.l.Close() //执行listener关闭
if s.BaseCtx == nil {
s.BaseCtx = context.Background()
}
baseCtx := s.BaseCtx
ctx := context.WithValue(baseCtx, ServerContextKey, s) <- 将TcpServer实例存入context中
for {
rw, e := l.Accept()
if e != nil {
select {
case <-s.getDoneChan():
return ErrServerClosed
default:
}
fmt.Printf("accept fail, err: %v\n", e)
continue
}
c := s.newConn(rw)
go c.serve(ctx)
}
return nil
}
包装 net.Conn
为 tcp.conn
type conn struct {
server *TcpServer // 反引用TcpServer
remoteAddr string // 发送端地址
rwc net.Conn
}
func (s *TcpServer) newConn(rwc net.Conn) *conn {
c := &conn{
server: s,
rwc: rwc,
}
// 设置参数
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
if d := c.server.KeepAliveTimeout; d != 0 {
if tcpConn, ok := c.rwc.(*net.TCPConn); ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(d)
}
}
return c
}
由 tcp.conn.Server(ctx)
调用回调函数进行服务处理
func (c *conn) serve(ctx context.Context) {
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
fmt.Printf("tcp: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
c.close()
}()
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
if c.server.Handler == nil {
panic("handler empty")
}
c.server.Handler.ServeTCP(ctx, c.rwc)
}
这样,一个简单易用的TCP服务框架就搭建完成了,其中一些close()
等方法在此处没有展示出来,更多详细代码可在我的代码仓库中查看:https://github.com/Kirov7/fayUtils/net/tcp
扩展中间件的实现
扩展中间件功能的实现思路
- 方法构建
- 构建中间件URL路由
- 构建URL的中间件方法数组
- 使用Use方法整合路由与方法数组
- 方法调用
- 构建方法请求逻辑
- 封装TCPHandler接口与TcpServer整合
TcpSliceRouter.Group(path)
方法,初始化路由分组(默认只能全局)
// 创建 Group
func (g *TcpSliceRouter) Group(path string) *TcpSliceGroup {
if path != "/" {
panic("only accept path=/")
}
return &TcpSliceGroup{
TcpSliceRouter: g,
path: path,
}
}
TcpSliceGroup.Use(middlewares ...TcpHandlerFunc)
构造回调方法
调用 Use
方法传入中间件集合,添加到切片 c.handlers
中
// 构造回调方法
func (g *TcpSliceGroup) Use(middlewares ...TcpHandlerFunc) *TcpSliceGroup {
g.handlers = append(g.handlers, middlewares...)
existsFlag := false
for _, oldGroup := range g.TcpSliceRouter.groups {
if oldGroup == g {
existsFlag = true
}
}
if !existsFlag {
g.TcpSliceRouter.groups = append(g.TcpSliceRouter.groups, g)
}
return g
}
通过 NewTcpSliceRouterHandler
方法传入最后调用的逻辑方法coreFunc
并传入已经 Use
了中间件的, TcpSliceRouter
func NewTcpSliceRouterHandler(coreFunc func(*TcpSliceRouterContext) tcp_server.TCPHandler, router *TcpSliceRouter) *TcpSliceRouterHandler {
return &TcpSliceRouterHandler{
coreFunc: coreFunc,
router: router,
}
}
最终的回调函数 ServeTCP((ctx context.Context, conn net.Conn)
,初始化 context
之后将 coreFunc
追加到 c.handlers
,重置执行光标,从第一个 c.handlers
开始执行中间件
func (w *TcpSliceRouterHandler) ServeTCP(ctx context.Context, conn net.Conn) {
c := newTcpSliceRouterContext(conn, w.router, ctx)
c.handlers = append(c.handlers, func(c *TcpSliceRouterContext) {
w.coreFunc(c).ServeTCP(ctx, conn)
})
c.Reset()
c.Next()
}
在中间件中自行调用Next()
、Abort()
等中间件逻辑,最后所有中间件执行完毕之后执行 coreFunc
(已经被追加到c.handlers
的最后位置)
// 从最先加入中间件开始回调
func (c *TcpSliceRouterContext) Next() {
c.index++
for c.index < int8(len(c.handlers)) {
c.handlers[c.index](c)
c.index++
}
}
// 跳出中间件方法
func (c *TcpSliceRouterContext) Abort() {
c.index = abortIndex
}