作者:fangshen,腾讯 IEG 客户端开发工程师
C++20带来了coroutine特性, 同时新的execution也在提案过程中, 这两者都给我们在C++中解决异步问题带来了新的思路. 但对比其他语言的实现, C++的协程和后续的execution都存在一定的理解和封装成本, 本系列的分享我们将围绕基本的原理, 相应的封装, 以及剥析优秀的第三方实现, 最终结合笔者framework落地的情况来展开.
之前设计我们游戏用的c++框架的时候, 刚好c++20的coroutine已经发布, 又因为是专门 给game server用的c++ framework, 对多线程的诉求相对有限, 或者本着少并发少奇怪的错误的原则, 除网络和IO和日志等少量模块外, 大部分模块主要还是工作在主线程上的, 所以当时设计的重点也就放在了c++20 coroutine的包装和使用上, 更多的使用coroutine来完善异步的支持. 但如果考虑到framework作为前后端公用框架的话, 原来主要针对主线程使用的包装的coroutine调度器就显得有些不够用, 以此作为基础, 我们开始了尝试结合比较新的c++异步思路, 来重新思考应该如何实现一个尽量利用c++新特性, 业务层简单易用的异步框架了.
本系列的主要内容也是围绕这条主线来铺开, 过程中我们 主要以:
rstudio framework的异步框架由两块比较独立的部分组成:
这一部分的内容因为后续有asio scheduler实现具体的分析篇章, 这个地方主要以业务侧使用进行展开了.
GJobSystem->Post([]() {
//some calculate task here
//...
GJobSystem->Post(
[]() {
//task notify code here
//...
},
rstudio::JobSystemType::kLogicJob);
}, rstudio::JobSystemType::kWorkJob);
相关的时序图:
预定义的枚举值:
enum class JobSystemType : int {
kLogicJob = 0, // logic thread(main thread)
kWorkJob, // work thread
kSlowJob, // slow work thread(run io or other slow job)
kNetworkJob, // add a separate thread for network
kNetworkConnectJob, // extra connect thread for network
kLogJob, // log thread
kNotifyExternalJob, // use external process to report something, 1 thread only~~
kTotalJobTypes,
};
不同Job说明:
kLogicJob
kWorkJob
kSlowJob
kNetworkJob
kNetworkConnectJob
kLogJob
kNotifyExternalJob
相关接口:
//NoIgnore version
uint64_t JobSystemModule::AddAlwaysRunJob(JobSystemType jobType,
threads::ThreadJobFunction&& periodJob,
unsigned long periodTimeMs);uint64_t JobSystemModule::AddTimesRunJob(JobSystemType jobType,
threads::ThreadJobFunction&& periodJob,
unsigned long periodTimeMs,
unsigned int runCount);
uint64_t JobSystemModule::AddDelayRunJob(JobSystemType jobType,
threads::ThreadJobFunction&& periodJob,
unsigned long delayTimeMs);
void JobSystemModule::KillTimerJob(uint64_t tid);
本部分并未直接使用asio原始的basic_waitable_timer实现, 而是自己实现的定时任务.
特定的情况下, 被派发到Work线程池的任务存在依赖关系
需要串联执行的时候, 这个时候我们需要额外的设施 JobStrand
来保证任务是按先后依赖关系来串行执行的
如下图中part1, part2, part3, part4串行执行的情况所示
示例代码:
auto strand = GJobSystem->RequestStrand(rstudio::JobSystemType::kWorkJob);
starnd.Post([](){
//part1~
// ...
});
starnd.Post([](){
//part2~
// ...
});
starnd.Post([](){
//part3~
// ...
});
starnd.Post([](){
//part4~
// ...
});
starnd.Post([](){
GJobSystem->Post([](){
//return code here
// ...
}, rstudio::JobSystemType::kLogicJob);
});
jobs::JobFencePtr JobSystemModule::RequestFence();
示例代码(TcpService的初始化):
job_system_module_->Post(
[this, workTicket]() {
if (!workTicket || workTicket->IsExpired()) return; InitInNetworkThread();
},
JobSystemType::kNetworkJob);
period_task_ptr = job_system_module_->AddAlwaysRunJob(
JobSystemType::kNetworkJob,
[this, workTicket]() {
if (!workTicket || workTicket->IsExpired()) return;
LoopInNetworkThread();
},
10);
fence_->FenceTo((int)JobSystemType::kNetworkJob);
fence_->Wait();
jobs::JobWaiterPtr JobSystemModule::RequestWaiter();
jobs::JobNotifyPtr JobSystemModule::RequestNotify();
jobs::JobTicketPtr JobSystemModule::RequestTicket();
示例代码:
GJobSystem->Post(
[this, workTicket]() {
if (!workTicket || workTicket->IsExpired()) return; InitInNetworkThread();
},
JobSystemType::kNetworkJob);
正好今年的GDC上有一个<<One Frame In Halo Infinite>>的分享, 里面主要讲述的是对Halo Infinite的引擎升级, 提供新的JobSystem和新的动态帧的机制来支撑项目的, 我们直接以它为例子来对比一下framework和Halo的实现, 并且也借用Halo Infinite的例子, 来更好的了解这种lambda post模式的缺陷, 以及可以改进的点. Halo引入新的JobSystem主要是为了将老的Tetris结构的并发模式:
向新的基于Dependency的图状结构迁移:
他使用的JobSystem的业务Api其实很简单, 我们直接来看一下相关的代码:
JobSystem& jobSsytem = JobSystem::Get();
JobGraphHandle graphHandle = jobSystem.CreateJobGraph();JobHandle jobA = jobSystem.AddJob(
graphHandle,
"JobA",
[](){...} );
JobHandle jobB = jobSystem.AddJob(
graphHandle,
"JobB",
[](){...} );
jobSystem.AddJobToJobDependency(jobA, jobB);
jobSystem.SubmitJobGraph(graphHandle);
通过这样的机制, 就很容易形成如:
另外还有一个用于同步的SyncPoint:
JobSystem& jobSystem = JobSystem::Get();
JobGraphHandle graphHandle = jobSystem.CreateJobGraph();SyncPointHandle syncPointX = jobSystem.CreateSyncPoint(graphHandle, "SyncPointX");
JobHandle jobA = jobSystem.AddJob(graphHandle, "JobA", [](){...});
JobHandle jobB = jobSystem.AddJob(graphHandle, "JobB", [](){...});
jobSystem.AddJobToSyncPointDependency(jobA, syncPointX);
jobSystem.AddSyncPointToJobDependency(syncPointX, jobB);
jobSystem.SubmitJobGraph(graphHandle);
大致的作用如下:
这样在workload主动触发SyncPoint后, 整体执行才会继续往下推进, 这样就能方便的加入一些主动的同步点对整个Graph的执行做相关的控制了。
回到asio, 我们前面也介绍了, 使用strand和post(), 我们也能很方便的构造出Graph形的执行情况 , 而SyncPoint其实类型framework中提供的Event, 表达上会略有差异, 但很容易看出两套实现其实是相当类同的. 这样的话, Halo 的JobSystem有的所有优缺点, framework基本也同样存在了, 这里简单搬运一下:
对于复杂并发业务的表达以lambda内嵌为主, 虽然这种方式尽可能保证所有代码上下文是比较集中的, 对比纯粹使用callback的模式有所进步, 但这种自由度过高的方式本身也会存在一些问题, 纯粹靠编码者来维系并发上下文的正确性, 这种情况下状态值在lambda之间的传递也需要特别的小心, 容易出错, 并且难以调试。
coroutine部分之前的帖子里已经写得比较详细了, 这里仅给出链接以及简单的代码示例:
代码示例:
//C++ 20 coroutine
auto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");
mScheduler.CreateTask20([clientProxy]()
-> rstudio::logic::CoResumingTaskCpp20 {
auto* task = rco_self_task(); printf("step1: task is %llu\n", task->GetId());
co_await rstudio::logic::cotasks::NextFrame{};
printf("step2 after yield!\n");
int c = 0;
while (c < 5) {
printf("in while loop c=%d\n", c);
co_await rstudio::logic::cotasks::Sleep(1000);
c++;
}
for (c = 0; c < 5; c++) {
printf("in for loop c=%d\n", c);
co_await rstudio::logic::cotasks::NextFrame{};
}
printf("step3 %d\n", c);
auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false,
[]()-> logic::CoResumingTaskCpp20 {
printf("from child coroutine!\n");
co_await rstudio::logic::cotasks::Sleep(2000);
printf("after child coroutine sleep\n");
});
printf("new task create in coroutine: %llu\n", newTaskId);
printf("Begin wait for task!\n");
co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 };
printf("After wait for task!\n");
rstudio::logic::cotasks::RpcRequest
rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000};
auto* rpcret = co_await rpcReq;
if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {
assert(rpcret->totalRet == 1);
auto retval = rpcret->retValue.to<int>();
assert(retval == 4);
printf("rpc coroutine run suc, val = %d!\n", retval);
}
else {
printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType);
}
co_await rstudio::logic::cotasks::Sleep(5000);
printf("step4, after 5s sleep\n");
co_return rstudio::logic::CoNil;
} );
执行结果:
step1: task is 1
step2 after yield!
in while loop c=0
in while loop c=1
in while loop c=2
in while loop c=3
in while loop c=4
in for loop c=0
in for loop c=1
in for loop c=2
in for loop c=3
in for loop c=4
step3 5
new task create in coroutine: 2
Begin wait for task!
from child coroutine!
after child coroutine sleep
After wait for task!
service yield call finish!
rpc coroutine run suc, val = 4!
step4, after 5s sleep
整体来看, 协程的使用还是给异步编程带来了很多便利, 但框架本身的实现其实还是有比较多迭代优化的空间的:
上面也结合halo的实例说到了一些限制, 那么这些问题有没有好的解决办法了, 答案是肯定的, 虽然execution并未完全通过提案, 但整体而言, execution新的sender/reciever模型, 对于解决上面提到的一些缺陷, 应该是提供了非常好的思路, 我们下一章节中继续展开.
最开始的想法其实比较简单, 结合原来的framework, 适当引入提案中的execution一些比较可取的思路, 让framework的异步编程能更多的吸取c++新特性和execution比较高级的框架抽象能力, 提升整个异步库的实现质量. 所以最开始定的主线思路其实是更多的向execution倾斜, 怎么了解掌握execution, 怎么与现在的framework结合成了主线思路.
我们选择的基础参考库是来自冲元宇宙这波改名的Meta公司的libunifex, 客观来说, Meta公司的folly库, 以及libunifex库的实现质量, 肯定都是业界前沿的, 对c++新特性的使用和探索, 也是相当给力的. 这些我们后续在分析libunifex具体实现的篇章中也能实际感受到.
但深入了解libunifex后, 我们会发现, 它的优点有不少:
事情到这个点就有点尴尬了, 原有的asio, 架构层面来说, 跟新的execution是存在落差的. 而项目实践上来说, asio相当稳扎稳打, 而以libunifex当前的状态来说, 离工业化使用其实是有一定距离的. 但asio作者在21年时候的两篇演讲(更像coding show):
awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
for (;;)
{
auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
if (e)
break; auto ex = client.get_executor();
co_spawn(ex, proxy(std::move(client), target), detached);
}
}
auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
if (!e)
{
co_await (
(
transfer(client, server, client_to_server_deadline) ||
watchdog(client_to_server_deadline)
)
&&
(
transfer(server, client, server_to_client_deadline) ||
watchdog(server_to_client_deadline)
)
);
}
对比原来每个async_xxx()函数后接callback的模式, 整个实现可以说是相当的优雅了, 代码的可读性也得到了极大的提高, 这两段代码都来自于上面的演讲中, 想深入了解的可以直接打开相关的链接观看视频, 很推荐大家去看一下. 能够把复杂的事情用更简洁易懂的方法表达, 这肯定是让人振奋的, 当然, 深入了解相关实现后, 也会发现存在一些问题, 但我们的本意是参考学习, 得出最终想要的可以比较好的支撑并发和异步业务的基础框架, 有这些, 其实已经可以理出一条比较清晰的思路了:
本系列涉及的基础知识和相关内容比较多, 先给出一个临时的大纲, 后续可能会有调整. 目前的思路是先介绍大家相对熟悉度不那么高的execution基础知识和libunifex, 后面再介绍asio相关的scheduler以及coroutine实现, 最后再回归笔者正在迭代的framework, 这样一个顺序来展开.
参考