epoll处理,
epoll处理,
先看看如何使用的:
int main(int argc, char* argv[])
{
int numThreads = 0;
if (argc > 1)
{
benchmark = true;
Logger::setLogLevel(Logger::WARN);
numThreads = atoi(argv[1]);
}
EventLoop loop;
HttpServer server(&loop, InetAddress(8000), "dummy");
server.setHttpCallback(onRequest);
server.setThreadNum(numThreads);
server.start();
loop.loop();
}
在EventLoop::EventLoop()中会调用:
Poller::newDefaultPoller(this)
该函数来源:Poller.h
#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H
#include <vector>
#include <boost/noncopyable.hpp>
#include <muduo/base/Timestamp.h>
#include <muduo/net/EventLoop.h>
namespace muduo
{
namespace net
{
class Channel;
///
/// Base class for IO Multiplexing Poller 为抽象类,子类必须实现接口(poll, updateChannel, removeChannel)函数
///
/// This class doesn't own the Channel objects.
class Poller : boost::noncopyable
{
public:
typedef std::vector<Channel*> ChannelList;
Poller(EventLoop* loop);
virtual ~Poller();
/// Polls the I/O events.
/// Must be called in the loop thread.
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel* channel) = 0;
/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel* channel) = 0;
static Poller* newDefaultPoller(EventLoop* loop); //在哪实现的呢?
void assertInLoopThread()
{
ownerLoop_->assertInLoopThread();
}
private:
EventLoop* ownerLoop_;
};
}
}
#endif // MUDUO_NET_POLLER_H
Poller.cpp
#include <muduo/net/Poller.h>
using namespace muduo;
using namespace muduo::net;
Poller::Poller(EventLoop* loop)
: ownerLoop_(loop)
{
}
Poller::~Poller()
{
}
找到了Poller::newDefaultPoller的实现地方DefaultPoller.cc
#include <muduo/net/Poller.h>
#include <muduo/net/poller/PollPoller.h>
#include <muduo/net/poller/EPollPoller.h>
#include <stdlib.h>
using namespace muduo::net;
Poller* Poller::newDefaultPoller(EventLoop* loop)
{
if (::getenv("MUDUO_USE_POLL"))
{
return new PollPoller(loop);
}
else
{
return new EPollPoller(loop);
}
}
无非是新建PollPoller对象或者EPollPoller对象;
先来看看PollPoller对象
PollPoller.h
#ifndef MUDUO_NET_POLLER_POLLPOLLER_H
#define MUDUO_NET_POLLER_POLLPOLLER_H
#include <muduo/net/Poller.h>
#include <map>
#include <vector>
struct pollfd;
namespace muduo
{
namespace net
{
///
/// IO Multiplexing with poll(2). 继承于抽象类Poller并实现其接口函数poll,updateChannel, removeChannel
///
class PollPoller : public Poller
{
public:
PollPoller(EventLoop* loop);
virtual ~PollPoller();
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
virtual void updateChannel(Channel* channel);
virtual void removeChannel(Channel* channel);
private:
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
typedef std::vector<struct pollfd> PollFdList; //对于poll函数而言,一个struct pollfd其中包含socketfd和事件
typedef std::map<int, Channel*> ChannelMap; //key=socketfd, value=Channel
PollFdList pollfds_;
ChannelMap channels_;
};
}
}
#endif // MUDUO_NET_POLLER_POLLPOLLER_H
PollPoller.cpp
#include <muduo/net/poller/PollPoller.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Types.h>
#include <muduo/net/Channel.h>
#include <assert.h>
#include <poll.h>
using namespace muduo;
using namespace muduo::net;
PollPoller::PollPoller(EventLoop* loop)
: Poller(loop)
{
}
PollPoller::~PollPoller()
{
}
/*poll
获得发生事件的描述符个数numEvents ,
在pollfds_中获得对应的socketfd,再利用key=socketfd在ChannelMap中找到对应的Channel,将其放入activeChannels
*/
Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
// XXX pollfds_ shouldn't change
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happended";
fillActiveChannels(numEvents, activeChannels);
}
else if (numEvents == 0)
{
LOG_TRACE << " nothing happended";
}
else
{
LOG_SYSERR << "PollPoller::poll()";
}
return now;
}
/*利用key=socketfd在ChannelMap中找到对应的Channel,将其放入activeChannels
*/
void PollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0)
{
--numEvents;
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel* channel = ch->second;
assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents);
// pfd->revents = 0;
activeChannels->push_back(channel);
}
}
}
/*updateChannel
channel存在于key=socketfd的map为channels_中,利用channel的index()在pollfds_中获得该socketfd对应的
struct
pollfd,并将struct pollfd的event修改为channel的events()
*/
void PollPoller::updateChannel(Channel* channel){
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if (channel->index() < 0)
{
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd);
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
}
else
{
// update existing one
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if (channel->isNoneEvent())
{
// ignore this pollfd
pfd.fd = -channel->fd()-1;
}
}
}
/*updateChannel
channel存在于key=socketfd的map为channels_中,利用channel的index()在pollfds_中获得该socketfd对应的
struct pollfd,首先将key=channel->fd()从map=channels_中删除,再将其struct
pollfd从pollfds_中删除
*/
void PollPoller::removeChannel(Channel* channel){
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
assert(channel->isNoneEvent());
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
size_t n = channels_.erase(channel->fd());
assert(n == 1); (void)n;
if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
{
pollfds_.pop_back();
}
else
{
int channelAtEnd = pollfds_.back().fd;
iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
if (channelAtEnd < 0)
{
channelAtEnd = -channelAtEnd-1;
}
channels_[channelAtEnd]->set_index(idx);
pollfds_.pop_back();
}
}
再来看看EPollPoller类[LT模式]:
#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H
#include <muduo/net/Poller.h>
#include <map>
#include <vector>
struct epoll_event;
namespace muduo
{
namespace net
{
///
/// IO Multiplexing with epoll(4). 继承于抽象类Poller,子类必须实现其接口函数poll, updateChannel, removeChannel
///
class EPollPoller : public Poller
{
public:
EPollPoller(EventLoop* loop);
virtual ~EPollPoller();
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
virtual void updateChannel(Channel* channel);
virtual void removeChannel(Channel* channel);
private:
static const int kInitEventListSize = 16;
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
void update(int operation, Channel* channel);
typedef std::vector<struct epoll_event> EventList;//一个socketfd对应一个struct epoll_event
typedef std::map<int, Channel*> ChannelMap; //key=socketfd value=Channel
int epollfd_;
EventList events_;
ChannelMap channels_;
};
}
}
#endif // MUDUO_NET_POLLER_EPOLLPOLLER_H
EPollPoller.cc
#include <muduo/net/poller/EPollPoller.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <boost/static_assert.hpp>
#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>
using namespace muduo;
using namespace muduo::net;
// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
BOOST_STATIC_ASSERT(EPOLLIN == POLLIN);
BOOST_STATIC_ASSERT(EPOLLPRI == POLLPRI);
BOOST_STATIC_ASSERT(EPOLLOUT == POLLOUT);
BOOST_STATIC_ASSERT(EPOLLRDHUP == POLLRDHUP);
BOOST_STATIC_ASSERT(EPOLLERR == POLLERR);
BOOST_STATIC_ASSERT(EPOLLHUP == POLLHUP);
namespace
{
const int kNew = -1;
const int kAdded = 1;
const int kDeleted = 2;
}
/*
创建epollfd_,
可以关注的最多事件数目kInitEventListSize
*/
EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop),
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if (epollfd_ < 0)
{
LOG_SYSFATAL << "EPollPoller::EPollPoller";
}
}
EPollPoller::~EPollPoller()
{
::close(epollfd_);
}
/*poll
等待事件发生
有numEvents 个描述符发生事件
从events_[i].data.ptr 中获取该描述符对应的Channel
将Channel 的revents_设为events_[i].events,
最后将其放入activeChannels
*/
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happended";
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}
}
else if (numEvents == 0)
{
LOG_TRACE << " nothing happended";
}
else
{
LOG_SYSERR << "EPollPoller::poll()";
}
return now;
}
/*
有numEvents 个描述符发生事件
从events_[i].data.ptr 中获取该描述符对应的Channel
将Channel 的revents_设为events_[i].events,
最后将其放入activeChannels
*/
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}
/*updateChannel
根据channel->index()知道
是新的连接,先看key=socketfd 在channels_ 中是否存在,存在则
让event.data.ptr 保存channel; 并将该socketfd 加入epollfd_集合;
老的连接则将该socketfd 从epollfd_集合修改或者删除
*/
void EPollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
const int index = channel->index();
if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
assert(channels_.find(fd) == channels_.end());
channels_[fd] = channel;
}
else // index == kDeleted
{
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
}
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
int fd = channel->fd();
(void)fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(index == kAdded);
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
/*updateChannel
根据channel->index()知道是否要删除
先看key=socketfd 在channels_ 中是否存在,存在则先将其删除,
再将该socketfd 从epollfd_集合中删除
*/
void EPollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
int fd = channel->fd();
LOG_TRACE << "fd = " << fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(channel->isNoneEvent());
int index = channel->index();
assert(index == kAdded || index == kDeleted);
size_t n = channels_.erase(fd);
(void)n;
assert(n == 1);
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
channel->set_index(kNew);
}
/*
将channel 赋给该socketfd的event.data.ptr = channel;
并在epollfd_集合中对socketfd 进行operation 操作
*/
void EPollPoller::update(int operation, Channel* channel)
{
struct epoll_event event;
bzero(&event, sizeof event);
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
if (operation == EPOLL_CTL_DEL)
{
LOG_SYSERR << "epoll_ctl op=" << operation << " fd=" << fd;
}
else
{
LOG_SYSFATAL << "epoll_ctl op=" << operation << " fd=" << fd;
}
}
}
总结:EPollPoller 类无非是对 ChannelMap channels_;的增删以及epoll集合的增删,还有就是将发生事件的Channel放入传入的参数为activeChannels中。
因为在EventLoop::EventLoop()中对象指针Poller指向了新建的EPollPoller 类对象,因此EventLoop类中会调用EPollPoller 类中实现的函数的,那么在哪里调用的呢?
int main(int argc, char* argv[])
{
int numThreads = 0;
if (argc > 1)
{
benchmark = true;
Logger::setLogLevel(Logger::WARN);
numThreads = atoi(argv[1]);
}
EventLoop loop;
HttpServer server(&loop, InetAddress(8000), "dummy");
server.setHttpCallback(onRequest);
server.setThreadNum(numThreads);
server.start();
loop.loop();
//该函数会告诉答案的
}
评论暂时关闭