libev 中IO事件循环解析,libevio事件解析


1、IO事件基本数据结构ev_io

struct ev_io这个结构体是IO监视器。libev中所有的事件均有自己的一个结构体来表示,如时间事件是ev_time、ev_io等。

基类ev_watcher定义如下:

typedef struct ev_watcher
{
    int active; 
    int pending;
    int priority;
    void *data; 
    void (*cb)(struct ev_loop *loop, struct ev_watcher *w, int revents);
} 

基类中 “active"表示是否激活该watcher,“pending”该监控器是否处于pending状态,“priority"其优先级以及触发后执行的动作的回调函数。

与基类配套的还有个装监控器的List:

typedef struct ev_watcher_list
{
    int active; 
    int pending;
    int priority;
    void *data; 
    void (*cb)(struct ev_loop *loop, struct ev_watcher_list *w, int revents);
    struct ev_watcher_list *next;
} ev_watcher_list;

ev_io是对一个IO事件监视的基础结构体。定义如下:

typedef struct ev_io
{
    int active; 
    int pending;
    int priority;
    void *data; 
    void (*cb)(struct ev_loop *loop, struct ev_io *w, int revents);
    struct ev_watcher_list *next;
 
    int fd;     /* 这里的fd,events就是派生类的私有成员,分别表示监听的文件fd和触发的事件(可读还是可写) */
    int events; 
} ev_io;

源代码里ev_io定义在ev.h中。原文定义中嵌套了一些基类和其他一些宏定义,这里直接写出来,方便理解。可以看到将派生类的私有变量放在了共有部分的后面。这样,当使用C的指针强制转换后,一个指向 struct ev_io对象的基类 ev_watcher 的指针p就可以通过 p->active 访问到派生类中同样表示active的成员了。

2、IO事件的初始化和设置

初始化和设置比较简单,如下:

#define ev_io_init(ev,cb,fd,events)          do { ev_init ((ev), (cb)); ev_io_set ((ev),(fd),(events)); } while (0)
#define ev_io_set(ev,fd_,events_)            do { (ev)->fd = (fd_); (ev)->events = (events_) | EV__IOFDSET; } while (0)

初始化一个IO事件,只需要调用ev_io_init()函数,参数ev表示ev_io指针,cb表示触发事件的回调函数,fd表示要监视的文件描述符,events表示监视的事件。

3、IO事件的注册

先了解 struct ANFD,ANFD表示事件循环中对一个文件描述符fd的监视的基本信息结构体,定义如下:

typedef struct
{
  WL head;//watch_list结构体
  unsigned char events; /* 所监视的事件 */
  unsigned char reify;  /* 标志位,用来标记ANFD需要被重新实例化(EV_ANFD_REIFY, EV__IOFDSET) */
  unsigned char emask;  /* the epoll backend stores the actual kernel mask in here */
  unsigned char unused;
  unsigned int egen;    /* generation counter to counter epoll bugs */
} ANFD;  /* 这里去掉了对epoll的判断和windows的IOCP*/

首先是WL head 这个基类监视器链表,这里首先只用关注一个 “head” ,他是之前说过的wather的基类链表。这里一个ANFD就表示对一个文件描述符的监控,那么对该文件描述的可读还是可写监控,监控的动作是如何定义的,就是通过这个链表,(这个链表的长度一般不会超过3,文件的监控条件无非是可读、可写等)把对该文件描述法的监控器都挂上去,这样就可以通过文件描述符找到了。而前面的说的anfds就是这个对象的数组,下标通过文件描述符fd进行索引。anfds是一个ANFD型动态数组。这样anfds数组就是全部的IO监控,最后可以通过epoll_wait()来监测事件。

每当有新的IO监视器fd加入,调用wlist_add()添加到anfds[fd]的链表head中。如果一个anfds的元素监控条件发生改变,如何修改这个元素的监控条件呢。anfds的下标可以用fd来表示,这里有一个新的数组,数组元素内容是新添加的要监视的IO事件的fd或者修改监视内容的fd,数组名是fdchanges,也是动态数组。这个数组记录了新加入fd或者修改的fd的值,具体实现函数为“fd_change”

inline_size void
fd_change (EV_P_ int fd, int flags)
{
  unsigned char reify = anfds [fd].reify;
  anfds [fd].reify |= flags;//标志,表示fd监视条件被修改了

  if (expect_true (!reify))//如果fd最初的监视条件为空,表示新加入的fd
    {
      ++fdchangecnt;//fd计数器加一
      array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2);//添加到fdchanges数组中
      fdchanges [fdchangecnt - 1] = fd;
    }
  //如果不是新加入的fd,则fdchanges数组中已经有fd了。表示以前添加过对fd的IO监视
}

这时所有的要被监视的fd都存放在fdchanges数组中,当我们运行ev_run时,会调用“fd_reify”,它遍历fdchanges数组,如果发现fd的监视条件发生变化了,就会调用epoll_ctl()函数来改变fd的监视状态。这个fdchanges数组的作用就在于此,他记录了anfds数组中的watcher监控条件可能被修改的文件描述符,并在适当的时候将调用系统的epoll_ctl或则其他文件复用机制修改系统监控的条件。注意,假如我们在某个fd 上已经有个 watch 注册 了 read 事件,这时我们又再添加一个watch,还是read 事件,但是不同的回调函数,在此种情况下,我们不应该调用epoll_ctrl 之类的系统调用(减少系统开销),因为我们的events 集合是没有改变的(表示监视的事件没有发生改变),所以为了达到这个目,anfd[fd] 结构体中还有一个events事件,它是原先的所有watcher 的事件的 ”|“ 操作,向系统的epoll 重新添加描述符的操作 是在下次事件迭代开始前进行的,当我们依次扫描fdchangs,找到对应的anfd 结构,如果发现先前的events 与 当前所有的watcher 的”|“ 操作结果不等,则表示我们需要调用epoll_ctrl 之类的函数来进行更改,反之不做操作即,作为一条原则,在调用系统调用前,我们已经做了充分的检查,确保不进行多余的系统调用!fd_reify()中定义如下:

inline_size void
fd_reify (EV_P)
{
  int i;
  for (i = 0; i < fdchangecnt; ++i)
    {
      int fd = fdchanges [i];//取出可能改变监控条件的fd
      ANFD *anfd = anfds + fd;//得到anfds中下标
      ev_io *w;//顶一个ev_io指针

      unsigned char o_events = anfd->events;
      unsigned char o_reify  = anfd->reify;

      anfd->reify  = 0;

      /*if (expect_true (o_reify & EV_ANFD_REIFY)) probably a deoptimisation */
        {
          anfd->events = 0;

          for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)//这里用到了强制转换,for循环的作用就是
          //获得fd全部的新的监控事件集合,存放在events成员变量中
            anfd->events |= (unsigned char)w->events;

          if (o_events != anfd->events)//如果新监控事件和旧监控事件不同,
            o_reify = EV__IOFDSET; /* actually |= *///修改标志位,表示fd监控条件改变
        }

      if (o_reify & EV__IOFDSET)//fd监控条件改变,调用backend_modify也就是epoll_ctl()修改fd的监控条件
        backend_modify (EV_A_ fd, o_events, anfd->events);
    }

  fdchangecnt = 0;//一次遍历完成,fdchanges数组个数清零
}

所以,总结一下注册过程就是通过之前设置了监控条件IO watcher (ev_io的一个实例)获得监控的文件描述符fd,找到其在anfds中对应的ANFD结构anfds[fd],将该watcher挂到该结构的head链上wlist_add()。由于对应该fd的监控条件有改动了,因此在fdchanges数组中记录下该fd,在后续的步骤中调用系统的接口修改对该fd监控的条件。整个注册示意图如下:

eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, timeout * 1e3);

成功的话,返回了响应事件的个数,然后执行了fd_event()

inline_speed void
fd_event (EV_P_ int fd, int revents)
{
/* do not submit kernel events for fds that have reify set */  
/* because that means they changed while we were polling for new events */
ANFD *anfd = anfds + fd;
 if (expect_true (!anfd->reify))//reify是0
    /*如果reify不是0,则表示我们添加了新的事件在fd上,不是很懂*/
    fd_event_nocheck (EV_A_ fd, revents);
}
fd_event_nocheck 如下
inline_speed void
fd_event_nocheck (EV_P_ int fd, int revents)
{
  ANFD *anfd = anfds + fd;
  ev_io *w;

  for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)//对fd上的监视器依次做检测,
    {
      int ev = w->events & revents;//相应的事件被触发了

      if (ev)//pending条件满足,监控器加入到pendings数组中pendings[pri]上的pendings[pri][old_lenght+1]的位置上
          ev_feed_event (EV_A_ (W)w, ev);
    }
}
void noinline
ev_feed_event (EV_P_ void *w, int revents) EV_THROW
{
  W w_ = (W)w;
  int pri = ABSPRI (w_);

  if (expect_false (w_->pending))
    pendings [pri][w_->pending - 1].events |= revents;
  else
    {
      w_->pending = ++pendingcnt [pri];
      array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);
      pendings [pri][w_->pending - 1].w      = w_;
      pendings [pri][w_->pending - 1].events = revents;
    }

  pendingpri = NUMPRI - 1;
}

以epoll 为例,当epoll_wait 返回一个fd_event 时 ,我们就可以直接定位到对应fd 的 watch list ,这个watch list 的长度一般不会超过3 ,fd_event 会有一个导致触发的事件 ,我们用这个事件依次和各个watch 注册的 event 做 “&” 操作, 如果不为0 ,则把对应的watch 加入到 待处理队列pendings中(当我们启用watcher 优先级模式时,pendings 是个2维数组,此时仅考虑普通模式)

这里要介绍一个新的数据结构,他表示pending中的wather也就是监控条件满足了,但是还没有触发动作的状态。

typedef struct
{
  W w;
  int events; /* the pending event set for the given watcher */
} ANPENDING;

这里 W w应该知道是之前说的基类指针。pendings就是这个类型的一个二维数组数组。其以watcher的优先级(libev可以对watcher优先级进行设置,这里用一维数组下标来表示)为一级下标。再以该优先级上pengding的监控器数目为二级下标(例如在这个fd上的监控数目,加入有读和写,则二维数组的下标就是0和1),对应的监控器中的pending值就是该下标加一的结果。其定义为ANPENDING *pendings [NUMPRI]。同anfds一样,二维数组的第二维 ANPENDING *是一个动态调整大小的数组。这样操作之后。这个一系列的操作可以认为是fd_feed的后续操作,xxx_reify目的最后都是将pending的watcher加入到这个pengdings二维数组中。后续的几个xxx_reify也是一样,等分析到那个类型的监控器类型时在作展开。这里用个图梳理下结构。

215034_LAfF_917596

最后在循环中执行宏EV_INVOKE_PENDING,其实是调用loop->invoke_cb,如果没有自定义修改的话(一般不会修改)就是调用ev_invoke_pending。该函数会依次遍历二维数组pendings,执行pending的每一个watcher上的触发动作回调函数。

至此一次IO触发过程就完成了。

5、总结下

在Libev中watcher要算最关键的数据结构了,整个逻辑都是围绕着watcher做操作。Libev内部维护一个基类ev_wathcer和若干个特定监控器的派生类ev_xxx。在使用的时候首先生成一个特定watcher的实例。并通过该派生对象私有的成员设置其触发条件。然后用anfds或者最小堆管理这些watchers。然后Libev通过backend_poll以及时间堆管理运算出pending的watcher。然后将他们加入到一个以优先级为一维下标的二维数组。在合适的时间依次调用这些pengding的watcher上注册的触发动作回调函数,这样便可以按优先级先后顺序实现“only-for-ordering”的优先级模型。

215211_W3Cy_917596

写这篇博客主要是为了做一个学习记录,里边肯定会有很多错误。学习IO事件时,查阅了不少博文,这几篇的帮组很大,多向大牛学习,文中也大量引用了他们博文中的图片和例子,如有不妥,请告之

http://my.oschina.net/u/917596/blog/177030

https://cnodejs.org/topic/4f16442ccae1f4aa270010a3


几种经典的网络服务器架构模型的分析与比较

前言 事件驱动为广大的程序员所熟悉,其最为人津津乐道的是在图形化界面编程中的应用;事实上,在网络编程中事件驱动也被广泛使用,并大规模部署在高连接数高吞吐量的服务器程序中,如 http 服务器程序、ftp 服务器程序等。相比于传统的网络编程方式,事件驱动能够极大的降低资源占用,增大服务接待能力,并提高网络传输效率。 关于本文提及的服务器模型,搜索网络可以查阅到很多的实现代码,所以,本文将不拘泥于源代码的陈列与分析,而侧重模型的介绍和比较。使用 libev 事件驱动库的服务器模型将给出实现代码。 本文涉及到线程 / 时间图例,只为表明线程在各个 IO 上确实存在阻塞时延,但并不保证时延比例的正确性和 IO 执行先后的正确性;另外,本文所提及到的接口也只是笔者熟悉的 Unix/Linux 接口,并未推荐 Windows 接口,读者可以自行查阅对应的 Windows 接口。阻塞型的网络编程接口几乎所有的程序员第一次接触到的网络编程都是从 listen()、send()、recv()等接口开始的。使用这些接口可以很方便的构建服务器 /客户机的模型。我们假设希望建立一个简单的服务器程序,实现向单个客户机提供类似于“一问一答”的内容服务。图1. 简单的一问一答的服务器 /客户机模型我们注意到,大部分的 socket接口都是阻塞型的。所谓阻塞型接口是指系统调用(一般是 IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。实际上,除非特别指定,几乎所有的 IO接口 (包括 socket 接口 )都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用 send()的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。这给多客户机、多业务逻辑的网络编程带来了挑战。这时,很多程序员可能会选择多线程的方式来解决这个问题。多线程服务器程序 应对多客户机的网络应用,最简单的解决方式是在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。 具体使用多进程还是多线程,并没有一个特定的模式。传统意义上,进程的开销要远远大于线程,所以,如果需要同时为较多的客户机提供服务,则不推荐使用多进程;如果单个服务执行体需要消耗较多的 CPU 资源,譬如需要进行大规模或长时间的数据运算或文件访问,则进程较为安全。通常,使用 pthread_create () 创建新线程,fork() 创建新进程。 我们假设对上述的服务器 / 客户机模型,提出更高的要求,即让服务器同时为多个客户机提供一问一答的服务。于是有了如下的模型。图2. 多线程服务器模型 在上述的线程 / 时间图例中,主线程持续等待客户端的连接请求,如果有连接,则创建新线程,并在新线程中提供为前例同样的问答服务。 很多初学者可能不明白为何一个 socket 可以 accept 多次。实际上,socket 的设计者可能特意为多客户机的情况留下了伏笔,让 accept() 能够返回一个新的 socket。下面是 accept 接口的原型: int accept(int s, struct sockaddr *addr, socklen_t *addrlen); 输入参数 s 是从 socket(),bind() 和 listen() 中沿用下来的 socket 句柄值。执行完 bind() 和 listen() 后,操作系统已经开始在指定的端口处......余下全文>>
 

几种经典的网络服务器架构模型的分析与比较

相比于传统的网络编程方式,事件驱动能够极大的降低资源占用,增大服务接待能力,并提高网络传输效率。 关于本文提及的服务器模型,搜索网络可以查阅到很多的实现代码,所以,本文将不拘泥于源代码的陈列与分析,而侧重模型的介绍和比较。使用 libev 事件驱动库的服务器模型将给出实现代码。 本文涉及到线程/时间图例,只为表明线程在各个 IO 上确实存在阻塞时延,但并不保证时延比例的正确性和 IO 执行先后的正确性;另外,本文所提及到的接口也只是笔者熟悉的 Unix/Linux 接口,并未推荐 Windows 接口,读者可以自行查阅对应的 Windows 接口。 阻塞型的网络编程接口 几乎所有的程序员第一次接触到的网络编程都是从 listen()、send()、recv() 等接口开始的。使用这些接口可以很方便的构建服务器/客户机的模型。 我们假设希望建立一个简单的服务器程序,实现向单个客户机提供类似于“一问一答”的内容服务。 图1. 简单的一问一答的服务器/客户机模型 我们注意到,大部分的 socket 接口都是阻塞型的。所谓阻塞型接口是指系统调用(一般是 IO 接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。 实际上,除非特别指定,几乎所有的 IO 接口(包括 socket 接口)都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用 send() 的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。这给多客户机、多业务逻辑的网络编程带来了挑战。这时,很多程序员可能会 选择多线程的方式来解决这个问题。 多线程服务器程序 应对多客户机的网络应用,最简单的解决方式是在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。 具体使用多进程还是多线程,并没有一个特定的模式。传统意义上,进程的开销要远远大于线程,所以,如果需要同时为较多的客户机提供服务,则不推 荐使用多进程;如果单个服务执行体需要消耗较多的 CPU 资源,譬如需要进行大规模或长时间的数据运算或文件访问,则进程较为安全。通常,使用 pthread_create () 创建新线程,fork() 创建新进程。 我们假设对上述的服务器/客户机模型,提出更高的要求,即让服务器同时为多个客户机提供一问一答的服务。于是有了如下的模型。 图2. 多线程服务器模型 在上述的线程 / 时间图例中,主线程持续等待客户端的连接请求,如果有连接,则创建新线程,并在新线程中提供为前例同样的问答服务。 很多初学者可能不明白为何一个 socket 可以 accept 多次。实际上,socket 的设计者可能特意为多客户机的情况留下了伏笔,让 accept() 能够返回一个新的 socket。下面是 accept 接口的原型: int accept(int s, struct sockaddr *addr, socklen_t *addrlen); 输入参数 s 是从 socket(),bind() 和 listen() 中沿用下来的 socket 句柄值。执行完 bind() 和 listen() 后,操作系统已经开始在指定的端口处监听所有的连接请求,如果有请求,则将该连接请求加入请求队列。调用 accept() 接口正是从 socket s 的请求队列抽取第一个连接信息,创建一个与 s 同类的新的 socket 返回句柄。新的 so......余下全文>>
 

相关内容