ZLMediaKit 服务器源码解读---事件循环
2022/2/11 14:13:14
本文主要是介绍ZLMediaKit 服务器源码解读---事件循环,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一:事件循环池类
事件循环池是一个单例类,管理着EventPoller
1、EventPollerPool构造函数
EventPollerPool::EventPollerPool() { auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true); InfoL << "创建EventPoller个数:" << size; }
2:循环池的添加EventPoller
根据s_pool_size的大小,实例化s_pool_size个EventPoller,在调用runLoop时创建不同的线程
size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread) { auto cpus = thread::hardware_concurrency(); size = size > 0 ? size : cpus; for (size_t i = 0; i < size; ++i) { EventPoller::Ptr poller(new EventPoller((ThreadPool::Priority) priority)); poller->runLoop(false, register_thread);//这里创建不同的线程 auto full_name = name + " " + to_string(i); poller->async([i, cpus, full_name]() { setThreadName(full_name.data()); setThreadAffinity(i % cpus); }); _threads.emplace_back(std::move(poller)); } return size; }
在调用runLoop时blocked传递的为false,因此是在函数else分支创建的线程,然后将runLoop函数传入线程,并将参数blocked 修改为true,
这样创建了不同的线程执行runLoop,事件循环以多路复用epoll的模型建立轮巡过程,epoll原理不在多描述
void EventPoller::runLoop(bool blocked,bool regist_self) { if (blocked) { ...... } else { _loop_thread = new thread(&EventPoller::runLoop, this, true, regist_self); _sem_run_started.wait(); } }
二:EventPoller(事件轮巡)
通过epoll监听2种类型文件描述符
1、socket的描述符,主要是监听不同客户端发来的数据
通过addEvent将需要监听的fd加入epoll管理,
如果是当前线程,则将事件加入_event_map去管理当epoll事件触发时,又会将事件epoll触发事件回调出去,也就是传入的PollEventCB
如果不是当前线程,异步处理,其实就是利用管道将事件在当前线程处理,管道处理过程可以操控2
int EventPoller::addEvent(int fd, int event, PollEventCB cb) { TimeTicker(); if (!cb) { WarnL << "PollEventCB 为空!"; return -1; } if (isCurrentThread()) { struct epoll_event ev = {0}; ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE; ev.data.fd = fd; int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev); if (ret == 0) { _event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb))); } return ret; } async([this, fd, event, cb]() { addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb))); }); return 0; }
2、内部的自定义事件,利用linux管道模拟文件,从而进行监听,
程序对管道进行了包装,同时在构造函数中构建了管道
PipeWrap _pipe; PipeWrap::PipeWrap(){ //创建管道 if (pipe(_pipe_fd) == -1) { throw runtime_error(StrPrinter << "create posix pipe failed:" << get_uv_errmsg());\ } SockUtil::setNoBlocked(_pipe_fd[0],true); SockUtil::setNoBlocked(_pipe_fd[1],false); SockUtil::setCloExec(_pipe_fd[0]); SockUtil::setCloExec(_pipe_fd[1]); }
在EventPoller构造的时候,就已经将管道加入了epoll管理,并且回调为onPipeEvent
EventPoller::EventPoller(ThreadPool::Priority priority ) { ...... //添加内部管道事件 if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) { throw std::runtime_error("epoll添加管道失败"); } }
onPipeEvent定义如下,当触发时,会将_list_task交换到一个临时变量中,然后执行其中的任务,本质是处理异步任务的:在其他线程将任务添加进任务列表,同时写管道,epoll会在本线程监听到该管道的读事件,再从任务列表中取出任务去执行。从而达到异步处理任务的目的
inline void EventPoller::onPipeEvent() { TimeTicker(); char buf[1024]; int err = 0; do { if (_pipe.read(buf, sizeof(buf)) > 0) { continue; } err = get_uv_error(true); } while (err != UV_EAGAIN); decltype(_list_task) _list_swap; { lock_guard<mutex> lck(_mtx_task); _list_swap.swap(_list_task); } _list_swap.for_each([&](const Task::Ptr &task) { try { (*task)(); } catch (ExitException &) { _exit_flag = true; } catch (std::exception &ex) { ErrorL << "EventPoller执行异步任务捕获到异常:" << ex.what(); } }); }
三:总结
程序启动会创建一个事件循环池,大小可以自定义,默认根据硬件性能自动分配,每个事件循环类拥有自己的独立的线程,事件循环是通过多路复用epoll模式去实现,监听sock的事件接收网络数据,监听管道的事件执行异步任务
这篇关于ZLMediaKit 服务器源码解读---事件循环的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-01后台管理开发学习:新手入门指南
- 2024-11-01后台管理系统开发学习:新手入门教程
- 2024-11-01后台开发学习:从入门到实践的简单教程
- 2024-11-01后台综合解决方案学习:从入门到初级实战教程
- 2024-11-01接口模块封装学习入门教程
- 2024-11-01请求动作封装学习:新手入门教程
- 2024-11-01登录鉴权入门:新手必读指南
- 2024-11-01动态面包屑入门:轻松掌握导航设计技巧
- 2024-11-01动态权限入门:新手必读指南
- 2024-11-01动态主题处理入门:新手必读指南