epoll处理,


先看看如何使用的:

int main(int argc, char* argv[])
{
  int numThreads = 0;
  if (argc > 1)
  {
    benchmark = true;
    Logger::setLogLevel(Logger::WARN);
    numThreads = atoi(argv[1]);
  }
  EventLoop loop;
  HttpServer server(&loop, InetAddress(8000), "dummy");
  server.setHttpCallback(onRequest);
  server.setThreadNum(numThreads);
  server.start();
  loop.loop();
}

在EventLoop::EventLoop()中会调用

Poller::newDefaultPoller(this)

该函数来源:Poller.h


#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H


#include <vector>
#include <boost/noncopyable.hpp>


#include <muduo/base/Timestamp.h>
#include <muduo/net/EventLoop.h>


namespace muduo
{
namespace net
{


class Channel;


///
/// Base class for IO Multiplexing  Poller  为抽象类,子类必须实现接口(poll, updateChannel, removeChannel)函数
/// 
/// This class doesn't own the Channel objects.
class Poller : boost::noncopyable
{
 public:
  typedef std::vector<Channel*> ChannelList;


  Poller(EventLoop* loop);
  virtual ~Poller();


  /// Polls the I/O events.
  /// Must be called in the loop thread.
  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;


  /// Changes the interested I/O events.
  /// Must be called in the loop thread.
  virtual void updateChannel(Channel* channel) = 0;


  /// Remove the channel, when it destructs.
  /// Must be called in the loop thread.
  virtual void removeChannel(Channel* channel) = 0;


  static Poller* newDefaultPoller(EventLoop* loop);    //在哪实现的呢?


  void assertInLoopThread()
  {
    ownerLoop_->assertInLoopThread();
  }


 private:
  EventLoop* ownerLoop_;
};


}
}
#endif  // MUDUO_NET_POLLER_H


Poller.cpp

#include <muduo/net/Poller.h>


using namespace muduo;
using namespace muduo::net;


Poller::Poller(EventLoop* loop)
  : ownerLoop_(loop)
{
}


Poller::~Poller()
{
}


找到了Poller::newDefaultPoller的实现地方DefaultPoller.cc

#include <muduo/net/Poller.h>
#include <muduo/net/poller/PollPoller.h>
#include <muduo/net/poller/EPollPoller.h>


#include <stdlib.h>


using namespace muduo::net;


Poller* Poller::newDefaultPoller(EventLoop* loop)
{
  if (::getenv("MUDUO_USE_POLL"))
  {
    return new PollPoller(loop);
  }
  else
  {
    return new EPollPoller(loop);
  }
}


无非是新建PollPoller对象或者EPollPoller对象

先来看看PollPoller对象

PollPoller.h

#ifndef MUDUO_NET_POLLER_POLLPOLLER_H
#define MUDUO_NET_POLLER_POLLPOLLER_H


#include <muduo/net/Poller.h>


#include <map>
#include <vector>


struct pollfd;


namespace muduo
{
namespace net
{


///
/// IO Multiplexing with poll(2). 继承于抽象类Poller并实现其接口函数poll,updateChannel, removeChannel
///
class PollPoller : public Poller
{
 public:


  PollPoller(EventLoop* loop);
  virtual ~PollPoller();


  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
  virtual void updateChannel(Channel* channel);
  virtual void removeChannel(Channel* channel);


 private:
  void fillActiveChannels(int numEvents,
                          ChannelList* activeChannels) const;


  typedef std::vector<struct pollfd> PollFdList;      //对于poll函数而言,一个struct pollfd其中包含socketfd和事件
  typedef std::map<int, Channel*> ChannelMap; //key=socketfd, value=Channel
  PollFdList pollfds_;
  ChannelMap channels_;
};


}
}
#endif  // MUDUO_NET_POLLER_POLLPOLLER_H


PollPoller.cpp

#include <muduo/net/poller/PollPoller.h>


#include <muduo/base/Logging.h>
#include <muduo/base/Types.h>
#include <muduo/net/Channel.h>


#include <assert.h>
#include <poll.h>


using namespace muduo;
using namespace muduo::net;


PollPoller::PollPoller(EventLoop* loop)
  : Poller(loop)
{
}


PollPoller::~PollPoller()
{
}


/*poll

获得发生事件的描述符个数numEvents ,

pollfds_中获得对应的socketfd,再利用key=socketfd在ChannelMap中找到对应的Channel,将其放入activeChannels

*/
Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
  // XXX pollfds_ shouldn't change
  int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    LOG_TRACE << numEvents << " events happended";
    fillActiveChannels(numEvents, activeChannels);
  }
  else if (numEvents == 0)
  {
    LOG_TRACE << " nothing happended";
  }
  else
  {
    LOG_SYSERR << "PollPoller::poll()";
  }
  return now;
}

/*利用key=socketfd在ChannelMap中找到对应的Channel,将其放入activeChannels

*/
void PollPoller::fillActiveChannels(int numEvents,
                                    ChannelList* activeChannels) const
{
  for (PollFdList::const_iterator pfd = pollfds_.begin();
      pfd != pollfds_.end() && numEvents > 0; ++pfd)
  {
    if (pfd->revents > 0)
    {
      --numEvents;
      ChannelMap::const_iterator ch = channels_.find(pfd->fd);
      assert(ch != channels_.end());
      Channel* channel = ch->second;
      assert(channel->fd() == pfd->fd);
      channel->set_revents(pfd->revents);
      // pfd->revents = 0;
      activeChannels->push_back(channel);
    }
  }
}

/*updateChannel

channel存在于key=socketfd的map为channels_中,利用channel的index()在pollfds_中获得该socketfd对应的

struct pollfd,并将struct pollfd的event修改为channel的events()

*/

void PollPoller::updateChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
  if (channel->index() < 0)
  {
    // a new one, add to pollfds_
    assert(channels_.find(channel->fd()) == channels_.end());
    struct pollfd pfd;
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    pollfds_.push_back(pfd);
    int idx = static_cast<int>(pollfds_.size())-1;
    channel->set_index(idx);
    channels_[pfd.fd] = channel;
  }
  else
  {
    // update existing one
    assert(channels_.find(channel->fd()) != channels_.end());
    assert(channels_[channel->fd()] == channel);
    int idx = channel->index();
    assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
    struct pollfd& pfd = pollfds_[idx];
    assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    if (channel->isNoneEvent())
    {
      // ignore this pollfd
      pfd.fd = -channel->fd()-1;
    }
  }
}


/*updateChannel

channel存在于key=socketfd的map为channels_中,利用channel的index()在pollfds_中获得该socketfd对应的

struct pollfd,首先将key=channel->fd()从map=channels_中删除,再将其struct pollfdpollfds_中删除

*/

void PollPoller::removeChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd();
  assert(channels_.find(channel->fd()) != channels_.end());
  assert(channels_[channel->fd()] == channel);
  assert(channel->isNoneEvent());
  int idx = channel->index();
  assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
  const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
  assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
  size_t n = channels_.erase(channel->fd());
  assert(n == 1); (void)n;
  if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
  {
    pollfds_.pop_back();
  }
  else
  {
    int channelAtEnd = pollfds_.back().fd;
    iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
    if (channelAtEnd < 0)
    {
      channelAtEnd = -channelAtEnd-1;
    }
    channels_[channelAtEnd]->set_index(idx);
    pollfds_.pop_back();
  }
}


再来看看EPollPoller类[LT模式]

#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H


#include <muduo/net/Poller.h>


#include <map>
#include <vector>


struct epoll_event;


namespace muduo
{
namespace net
{


///
/// IO Multiplexing with epoll(4). 继承于抽象类Poller,子类必须实现其接口函数poll, updateChannel, removeChannel
///
class EPollPoller : public Poller
{
 public:
  EPollPoller(EventLoop* loop);
  virtual ~EPollPoller();


  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
  virtual void updateChannel(Channel* channel);
  virtual void removeChannel(Channel* channel);


 private:
  static const int kInitEventListSize = 16;


  void fillActiveChannels(int numEvents,
                          ChannelList* activeChannels) const;
  void update(int operation, Channel* channel);


  typedef std::vector<struct epoll_event> EventList;//一个socketfd对应一个struct epoll_event
  typedef std::map<int, Channel*> ChannelMap;    //key=socketfd value=Channel


  int epollfd_;
  EventList events_;
  ChannelMap channels_;
};


}
}
#endif  // MUDUO_NET_POLLER_EPOLLPOLLER_H


EPollPoller.cc

#include <muduo/net/poller/EPollPoller.h>


#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>


#include <boost/static_assert.hpp>


#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>


using namespace muduo;
using namespace muduo::net;


// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
BOOST_STATIC_ASSERT(EPOLLIN == POLLIN);
BOOST_STATIC_ASSERT(EPOLLPRI == POLLPRI);
BOOST_STATIC_ASSERT(EPOLLOUT == POLLOUT);
BOOST_STATIC_ASSERT(EPOLLRDHUP == POLLRDHUP);
BOOST_STATIC_ASSERT(EPOLLERR == POLLERR);
BOOST_STATIC_ASSERT(EPOLLHUP == POLLHUP);


namespace
{
const int kNew = -1;
const int kAdded = 1;
const int kDeleted = 2;
}


/*
创建epollfd_, 
可以关注的最多事件数目kInitEventListSize

*/
EPollPoller::EPollPoller(EventLoop* loop)
  : Poller(loop),
    epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
    events_(kInitEventListSize)
{
  if (epollfd_ < 0)
  {
    LOG_SYSFATAL << "EPollPoller::EPollPoller";
  }
}


EPollPoller::~EPollPoller()
{
  ::close(epollfd_);
}




/*poll
等待事件发生
有numEvents 个描述符发生事件
从events_[i].data.ptr 中获取该描述符对应的Channel
将Channel 的revents_设为events_[i].events,
最后将其放入activeChannels

*/
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
  int numEvents = ::epoll_wait(epollfd_,
                               &*events_.begin(),
                               static_cast<int>(events_.size()),
                               timeoutMs);
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    LOG_TRACE << numEvents << " events happended";
    fillActiveChannels(numEvents, activeChannels);
    if (implicit_cast<size_t>(numEvents) == events_.size())
    {
      events_.resize(events_.size()*2);
    }
  }
  else if (numEvents == 0)
  {
    LOG_TRACE << " nothing happended";
  }
  else
  {
    LOG_SYSERR << "EPollPoller::poll()";
  }
  return now;
}


/*
有numEvents 个描述符发生事件
从events_[i].data.ptr 中获取该描述符对应的Channel
将Channel 的revents_设为events_[i].events,
最后将其放入activeChannels
*/
void EPollPoller::fillActiveChannels(int numEvents,
                                     ChannelList* activeChannels) const
{
  assert(implicit_cast<size_t>(numEvents) <= events_.size());
  for (int i = 0; i < numEvents; ++i)
  {
    Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
    int fd = channel->fd();
    ChannelMap::const_iterator it = channels_.find(fd);
    assert(it != channels_.end());
    assert(it->second == channel);
#endif
    channel->set_revents(events_[i].events);
    activeChannels->push_back(channel);
  }
}




/*updateChannel
根据channel->index()知道
是新的连接,先看key=socketfd 在channels_ 中是否存在,存在则
让event.data.ptr 保存channel; 并将该socketfd 加入epollfd_集合;
老的连接则将该socketfd 从epollfd_集合修改或者删除

*/
void EPollPoller::updateChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
  const int index = channel->index();
  if (index == kNew || index == kDeleted)
  {
    // a new one, add with EPOLL_CTL_ADD
    int fd = channel->fd();
    if (index == kNew)
    {
      assert(channels_.find(fd) == channels_.end());
      channels_[fd] = channel;
    }
    else // index == kDeleted
    {
      assert(channels_.find(fd) != channels_.end());
      assert(channels_[fd] == channel);
    }
    channel->set_index(kAdded);
    update(EPOLL_CTL_ADD, channel);
  }
  else
  {
    // update existing one with EPOLL_CTL_MOD/DEL
    int fd = channel->fd();
    (void)fd;
    assert(channels_.find(fd) != channels_.end());
    assert(channels_[fd] == channel);
    assert(index == kAdded);
    if (channel->isNoneEvent())
    {
      update(EPOLL_CTL_DEL, channel);
      channel->set_index(kDeleted);
    }
    else
    {
      update(EPOLL_CTL_MOD, channel);
    }
  }
}


/*updateChannel
根据channel->index()知道是否要删除
先看key=socketfd 在channels_ 中是否存在,存在则先将其删除,
再将该socketfd 从epollfd_集合中删除

*/
void EPollPoller::removeChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  int fd = channel->fd();
  LOG_TRACE << "fd = " << fd;
  assert(channels_.find(fd) != channels_.end());
  assert(channels_[fd] == channel);
  assert(channel->isNoneEvent());
  int index = channel->index();
  assert(index == kAdded || index == kDeleted);
  size_t n = channels_.erase(fd);
  (void)n;
  assert(n == 1);


  if (index == kAdded)
  {
    update(EPOLL_CTL_DEL, channel);
  }
  channel->set_index(kNew);
}




/*
将channel 赋给该socketfd的event.data.ptr = channel;
并在epollfd_集合中对socketfd 进行operation 操作
*/
void EPollPoller::update(int operation, Channel* channel)
{
  struct epoll_event event;
  bzero(&event, sizeof event);
  event.events = channel->events();
  event.data.ptr = channel;
  int fd = channel->fd();
  if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
  {
    if (operation == EPOLL_CTL_DEL)
    {
      LOG_SYSERR << "epoll_ctl op=" << operation << " fd=" << fd;
    }
    else
    {
      LOG_SYSFATAL << "epoll_ctl op=" << operation << " fd=" << fd;
    }
  }
}

总结:EPollPoller 类无非是对 ChannelMap channels_;的增删以及epoll集合的增删,还有就是将发生事件的Channel放入传入的参数为activeChannels中。

因为在EventLoop::EventLoop()中对象指针Poller指向了新建的EPollPoller 类对象,因此EventLoop类中会调用EPollPoller 类中实现的函数的,那么在哪里调用的呢?

int main(int argc, char* argv[])
{
  int numThreads = 0;
  if (argc > 1)
  {
    benchmark = true;
    Logger::setLogLevel(Logger::WARN);
    numThreads = atoi(argv[1]);
  }
  EventLoop loop;
  HttpServer server(&loop, InetAddress(8000), "dummy");
  server.setHttpCallback(onRequest);
  server.setThreadNum(numThreads);
  server.start();
  loop.loop(); //该函数会告诉答案的
}

相关内容

    暂无相关文章