Skip to content

Latest commit

 

History

History
167 lines (143 loc) · 7.13 KB

DOCUMENT_CN.md

File metadata and controls

167 lines (143 loc) · 7.13 KB

goev

框架核心就是OS的ev poll(epoll)机制

每个evpoll 可以认为是一个独立的执行栈, 它是线程安全的, 凡是绑定到evpoll中的链接, 在该执行栈中的I/O操作都是线程安全的

鼓励开发者在evpoll执行栈中进行I/O操作, 解析到具体的业务数据后, 可以push到其他线程或协程异步处理, 但是跟I/O相关的, 最好是在evpoll执行栈中操作(也就是OnRead/OnWrite中)

Reactor 是对evpoll的简单的管理对象, 对外提供统一接口
使用方法:

    reactor, err := goev.NewReactor(
        goev.EvDataArrSize(0),     // evpoll内部数据结构需要, 可以实际需要微调提升性能
        goev.EvPollNum(evPollNum), // 指定有多少个线程/协程来执行evpoll工作,
                                   // 每个evpoll独立绑定一个线程/协程, 所以不存在单个链接不存在并发处理
        // goev.SetIOReadWriter(goev.NewIOReadWriter(32*1024, 1024*1024)), 已弃用
                                   // 框架提供的默认I/O操作接口
    )
    if err != nil {
        panic(err.Error())
    }

Reactor通过对fd % evPollNum 均匀分布到各evpoll中去.

goev 是OOP的设计思想, 将每个链接视为一个对象(实现EvHandler接口), 链接有两种来源

一种是acceptor接收到的
acceptor会指定一个 NewEvHandler的方法, 这样每接受到一个新链接, 就会通过此方法分配一个新的对象
acceptor的使用方法:

    _, err = goev.NewAcceptor(
        forAcceptReactor,  // 第1个参数负责轮询listener fd的I/O事件
        ":8080",           // 通常服务监听的地址格式 ipv4
        func() goev.EvHandler { return new(Http) }, // 负责给新链接分配对象
        goev.ListenBacklog(256), // 后边是可选参数
        goev.ReuseAddr(true), // 一般服务器的标配参数(避免在TIME_WAIT状态下)
        goev.ReusePort(true), // 可以多进程/线程同时监听一个地址(ip:port)
        goev.SockRcvBufSize(16*1024), // 此参数是必须在listen 之前设定的, 对控制socket缓冲区的内存有帮助
    )

一种是connector主动连接的
connector.Connect 方法会指定一个链接对象, 连接成功后此对象就会绑定新的链接
connector的使用方法:

    c, err := NewConnector(
        r, // 非阻塞链接使用到的Reactor
        SockRcvBufSize(8*1024) // 新链接指定的接收缓冲区参数, 必须在connect之前设置
    )
    if err != nil {
        panic(err.Error())
    }
    err = c.Connect("127.0.0.1:9999", &Conn{}, 1000)

通过Reactor/Acceptor/Connector就可以组合出一个完全非阻塞/异步化的网络事件处理框架

具体的网络事件处理逻辑在EvHandler的接口实现
开发者要继承(内嵌)Event对象

type Http struct {
    goev.IOHandle  // 特别注意: 如果想使用sync.pool之类的技术复用Http对象, 要记得调用IOHandle.Init()
}
// 根据自己的需要, 实现具体的I/O事件处理方法
// OnOpen 是当acceptor/connector得到链接后首先调用的方法
func (h *Http) OnOpen(fd int) bool {
    // 新链接要手动指定要轮询的事件类型, 注册到指定的Reactor中
    //
    // 特别注意: 当acceptor/connector使用的Reactor指定了多个EvPollNum时, 这时会出现线程切换,
    // 所以一些针对I/O操作的初始化过程要在AddEvHandler之前完成
    //
    // 比如我们要初始化一个buff, 那么如果该buff是在 AddEvHandler 之后初始化的, 
    // 很有可能OnRead方法在buff还没初始化完成就已经调用了
    if err := xxxReactor().AddEvHandler(h, fd, goev.EvIn); err != nil {
        return false // 返回false 会直接回调OnClose方法
    }
    return true
}
// 处理可读事件(即: 链接有数据可以接收)
func (h *Http) OnRead() bool {
	_, n, _ := h.Read() // 使用poller中的无锁共享缓存
	if n == 0 { // Abnormal connection
		return false
	}

	buf := h.WriteBuff()[:0] // poller中的无锁共享缓存
	buf = append(buf, httpRespHeader...)
	buf = append(buf, []byte(liveDate.Load().(string))...)
	buf = append(buf, httpRespContentLength...)
	h.Write(buf)

    /* 2023-08-04 注: goev.IOReadWriter的方式已经弃用, 会非常影响吞吐能力, 会比使用栈空间的make([]byte)吞吐能力差不少
    // goev.IOReadWriter 是框架内置的I/O操作方法, 使用一个全局的buf, 单个evpoll内所有链接共享,
    // 这样减少了临时内存分配和二次拷贝, 更对cpu cache友好!
    recvedData, err := nio.Read(fd)
    if err == goev.ErrRcvBufOutOfLimit { // Abnormal connection
        return false
    }

    // handle recvedData
    // ...
    // build response

    // 构建响应数据, 同样是使用goev.IOReadWriter 内置的共享buf, 进行数据拼装, 减少临时内存分配和二次拷贝
    nio.InitWrite().Append(httpRespHeader).
        Append([]byte(liveDate.Load().(string))).
        Append(httpRespContentLength).
        Write(fd)
    */
    return true
}
func (h *Http) OnClose() {
    // 释放资源, Http对象也会被gc回收的(前提是开发都没有单独将Http对象另外保存起来)
    h.Destroy(h) // 必须的, 框架内部申请的资源也需要销毁
}

定时器的使用

Golang已经内置Timer了, 为什么框架还要在引入?
因为框架中引入timer能让I/O和timer事件全部绑定在一个线程/协栈执行栈中, 避免竞争 一旦使用golang全局timer, 那么timer中如果有I/O操作, 所有I/O操作必要考虑并发保护, 这是一个非常大的开销

比如我们需要定时向对端发送Ping/Pong

func (h *Http) OnOpen(fd int) bool {
    // 新链接要手动指定要轮询的事件类型, 注册到指定的Reactor中
    if err := xxxReactor().AddEvHandler(h, fd, goev.EvIn); err != nil {
        return false // 返回false 会直接回调OnClose方法
    }

    // 特别注意:
    // 1. 注册定时器一定要在 注册I/O事件之后, 这样才能确保跟I/O事件绑定到同一个evpoll上, 以保证Http对象的
    //    I/O和Timer事件都是并发安全的
    if err := h.SchedueTimer(h, 1000, 20*1000); err != nil { 
        xxxReactor().RemoveEvHandler(eh, fd) // 要将刚才添加成功的操作, 撤销掉
        return false // 返回false 会直接回调OnClose方法
    }


    // 至此 Http对象 已经有两处引用: 1. evpoll中, 2. timer中
    return true
}
// 1秒后触发, 并以20秒为周期循环触发
func (h *Http) OnTimeout(now int64) bool {
    // 特别注意: 此时I/O事件可能已经触发过, 链接可能已经关闭, evpoll中不再持有Http对象
    if h.closed == true { // closed 变量不需要保护
        return false // end, timer removed
    }
    // send ping 这里目前并没有实现共享buf,做为下一步优化点
    return true  // keep interval timer
}
func (h *Http) OnClose() {
    // 释放资源, Http对象也会被gc回收的(前提是开发都没有单独将Http对象另外保存起来)
    if h.Fd() != -1 {
        h.closed = true
    }
    h.CancelTimer(h)
    h.Destroy(h) // 必须的, 框架内部申请的资源也需要销毁
}

至此, 一个完整的基础框架就完成了,