当前位置 博文首页 > 0向往0:剖析虚幻渲染体系(02)- 多线程渲染
为了更平稳地过渡,在真正进入UE的多线程渲染知识之前,先学习或重温一下多线程编程的基础知识。
多线程(Multithread)编程的思想早在单核时代就已经出现了,当时的操作系统(如Windows95)就已经支持多任务的功能,其原理就是在单核中切换不同的上下文(Context),以便每个进程中的线程都有时间获得执行指令的机会。
但到了2005年,当单核主频接近4GHz时,CPU硬件厂商英特尔和AMD发现,速度也会遇到自己的极限:那就是单纯的主频提升,已经无法明显提升系统整体性能。
随着单核计算频率摩尔定律的缓慢终结,Intel率先于2005年发布了奔腾D和奔腾四至尊版840系列,首次支持了两个物理级别的线程计算单元。此后十多年,多核CPU得到蓬勃发展,由AMD制造的Ryzen 3990X处理器已经拥有64个核心128个逻辑线程。
锐龙(Ryzen)3990X的宣传海报中赫然凸显的核心与线程数量。
硬件的多核发展,给软件极大的发挥空间。应用程序可以充分发挥多核多线程的计算资源,各个应用领域由此也产生多线程编程模型和技术。作为游戏的发动机Unreal Engine等商业引擎,同样可以利用多线程技术,以便更加充分地提升效率和效果。
使用多线程并发带来的作用总结起来主要有两点:
但是,随着CPU核心数量的提升,计算机获得的效益并非直线提升,而是遵循Amdahl's law(阿姆达尔定律),Amdahl's law的公式定义如下:
公式的各个分量含义如下:
举个具体的栗子,假设有8核16线程的CPU用于处理某个任务,这个任务有70%的部分是可以并行处理的,那么它的理论加速比为:
由此可见,多线程编程带来的效益并非跟核心数呈直线正比,实际上它的曲线如下所示:
阿姆达尔定律揭示的核心数和加速比图例。由此可见,可并行的任务占比越低,加速比获得的效果越差:当可并行任务占比为50%时,16核已经基本达到加速比天花板,无论后面增加多少核心数量,都无济于事;如果可并行任务占比为95%时,到2048个核心才会达到加速比天花板。
虽然阿姆达尔定律给我们带来了残酷的现实,但是,如果我们能够提升任务并行占比到接近100%,则加速比天花板可以得到极大提升:
如上公式所示,当\(p=1\)(即可并行的任务占比100%)时,理论上的加速比和核心数量成线性正比!!
举个具体的例子,在编译Unreal Engine工程源码或Shader时,由于它们基本是100%的并行占比,理论上可以获得接近线性关系的加速比,在多核系统中将极大地缩短编译时间。
利用多线程并发提高性能的方式有两种:
上面阐述了多线程并发的益处,接下来说说它的副作用。总结起来,副作用如下:
本小节将阐述多线程编程技术中常涉及的基本概念。
进程(Process)是操作系统执行应用程序的基本单元和实体,它本身只是个容器,通常包含内核对象、地址空间、统计信息和若干线程。它本身并不真正执行代码指令,而是交由进程内的线程执行。
对Windows而言,操作系统在创建进程时,同时也会给它创建一个线程,该线程被称为主线程(Primary thread, Main thread)。
对Unix而言,进程和主线程其实是同一个东西,操作系统并不知道有线程的存在,线程更接近于lightweight processes(轻量级进程)的概念。
进程有优先级概念,Windows下由低到高为:低(Low)、低于正常(Below normal)、正常(Normal)、高于正常(Above normal)、高(High)、实时(Real time)。(见下图)
默认情况下,进程的优先级为Normal。优先级高的进程将会优先获得执行机会和时间。
线程(Thread)是可以执行代码的实体,通常不能独立存在,需要依附在某个进程内部。一个进程可以拥有多个线程,这些线程可以共享进程的数据,以便并行或并发地执行多个任务。
在单核CPU中,操作系统(如Windows)可能会采用轮循(Round robin)的方式进行调度,使得多个线程看起来是同时运行的。(下图)
在多核CPU中,线程可能会安排在不同的CPU核心同时运行,从而达到并行处理的目的。
采用SMP的Windows在多核CPU的执行示意图。等待处理的线程被安排到不同的CPU核心。
每个线程可拥有自己的执行指令上下文(如Windows的IP(指令寄存器地址)和SP(栈起始寄存器地址))、执行栈和TLS(Thread Local Storage,线程局部缓存)。
Windows线程创建和初始化示意图。
线程局部存储(Thread Local Storage)是一种存储持续期,对象的生命周期与线程一样,在线程开始时分配,线程结束时回收。每个线程有该对象自己的实例,访问和修改这样的对象不会造成竞争条件(Race Condition)。
线程也存在优先级概念,优先级越高的将优先获得执行指令的机会。
线程的状态一般有运行状态、暂停状态等。Windows可用以下接口切换线程状态:
// 暂停线程
DWORD SuspendThread(HANDLE hThread);
// 继续运行线程
DWORD ResumeThread(HANDLE hThread);
同个线程可被多次暂停,如果要恢复运行状态,则需要调用同等次数的继续运行接口。
协程(Coroutine)是一种轻量级(lightweight)的用户态线程,通常跑在同一个线程,利用同一个线程的不同时间片段执行指令,没有线程、进程切换和调度的开销。从使用者角度,可以利用协程机制实现在同个线程模拟异步的任务和编码方式。在同个线程内,它不会造成数据竞争,但也会因线程阻塞而阻塞。
纤程(Fiber)如同协程,也是一种轻量级的用户态线程,可以使得应用程序独立决定自己的线程要如何运作。操作系统内核不知道纤程的存在,也不会为它进行调度。
同个进程允许有多个线程,这些线程可以共享进程的地址空间、数据结构和上下文。进程内的同一数据块,可能存在多个线程在某个很小的时间片段内同时读写,这就会造成数据异常,从而导致了不可预料的结果。这种不可预期性便造就了竞争条件(Race Condition)。
避免产生竞争条件的技术有很多,诸如原子操作、临界区、读写锁、内核对象、信号量、互斥体、栅栏、屏障、事件等等。
至少两个线程同时执行任务的机制。一般有多核多物理线程的CPU同时执行的行为,才可以叫并行,单核的多线程不能称之为并行。
至少两个线程利用时间片(Timeslice)执行任务的机制,是并行的更普遍形式。即便单核CPU同时执行的多线程,也可称为并发。
并发的两种形式——上:双物理核心的同时执行(并行);下:单核的多任务切换(并发)。
事实上,并发和并行在多核处理器中是可以同时存在的,比如下图所示,存在双核,每个核心又同时切换着多个任务:
部分参考文献严格区分了并行和并发,但部分文献并不明确指出其中的区别。虚幻引擎的多线程渲染架构和API中,常出现并行和并发的概念,所以虚幻是明显区分两者之间的含义。
线程池提供了一种新的任务并发的方式,调用者只需要传入一组可并行的任务和分组的策略,便可以使用线程池的若干线程并发地执行任务,使得调用者无需接直接触线程的调用和管理细节,降低了调用者的成本,也提升了线程的调度效率和吞吐量。
不过,创建一个线程池时,几个关键性的设计问题会影响并发效率,比如:可使用的线程数量,高效的任务分配方式,以及是否需要等待一个任务完成。
线程池可以自定义实现,也可以直接使用C++、操作系统或第三方库提供的API。
在C++11之前,C++的多线程支持基本为零,仅提供少量鸡肋的volatile
等关键字。直到C++11标准,多线程才真正纳入C++标准,并提供了相关关键字、STL标准库,以便使用者实现跨平台的多线程调用。
当然,对使用者来说,多线程的实现可采用C++11的线程库,也可以根据具体的系统平台提供的多线程API自定义线程库,还可以使用诸如ACE、boost::thread等第三方库。使用C++自带的多线程库,有几个优点,一是使用简单方便,依赖少;二是跨平台,无需关注系统底层。
thread_local是C++是实现线程局部存储的关键,添加了此关键字的变量意味着每个线程都有自己的一份数据,不会共享同一份数据,避免数据竞争。
C11的关键字_Thread_local
用于定义线程局部变量。在头文件<threads.h>
定义了thread_local
为上述关键词的同义。例如:
#include <threads.h>
thread_local int foo = 0;
C++11引入的thread_local
关键字用于下述情形:
1、名字空间(全局)变量。
2、文件静态变量。
3、函数静态变量。
4、静态成员变量。
此外,不同编译器提供了各自的方法声明线程局部变量:
// Visual C++, Intel C/C++ (Windows systems), C++Builder, Digital Mars C++
__declspec(thread) int number;
// Solaris Studio C/C++, IBM XL C/C++, GNU C, Clang, Intel C++ Compiler (Linux systems)
__thread int number;
// C++ Builder
int __thread number;
使用了volatile修饰符的变量意味着它在内存中的值可能随时发生变化,也告诉编译器不能做任何优化,每次使用到此变量的值都必须从内存中读取,而不应该直接使用寄存器的值。
举个具体的栗子吧。假设有以下代码段:
int a = 10;
volatile int *p = &a;
int b, c;
b = *p;
c = *p;
若p
没有volatile
修饰,则b = *p
和c = *p
只需从内存取一次p
的值,那么b
和c
的值必然是10
。
若考虑volatile
的影响,假设执行完b = *p
语句之后,p
的值被其它线程修改了,则执行c = *p
会再次从内存中读取p
的值,此时c
的值不再是10,而是新的值。
但是,volatile并不能解决多线程的同步问题,只适合以下三种情况使用:
1、和信号处理(signal handler)相关的场合。
2、和内存映射硬件(memory mapped hardware)相关的场合。
3、和非本地跳转(setjmp
和 longjmp
)相关的场合。
严格来说atomic
并不是关键字,而是STL的模板类,可以支持指定类型的原子操作。
使用原子的类型意味着该类型的实例的读写操作都是原子性的,无法被其它线程切割,从而达到线程安全和同步的目标。
可能有些读者会好奇,为什么对于基本类型的操作也需要原子操作。比如:
int cnt = 0;
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f};
以上三个线程同时调用函数f
,该函数只执行cnt++
,在C++维度,似乎只有一条执行语句,理论上不应该存在同步问题。然而,编译成汇编指令后,会有多条指令,这就会在多线程中引起线程上下文切换,引起不可预知的行为。
为了避免这种情况,就需要加入atomic
类型:
std::atomic<int> cnt{0}; // 给cnt加入原子操作。
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f};
加入atomic
之后,所有线程执行后的结果是确定的,能够正常给变量计数。atomic
的实现机制与临界区类似,但效率上比临界区更快。
为了更进一步地说明C++的单条语句可能生成多条汇编指令,可借助Compiler Explorer来实时查探C++汇编后的指令:
Compiler Explorer动态将左侧C++语句编译出的汇编指令。上图所示的c++代码编译后可能存在一对多的汇编指令,由此印证atomic原子操作的必要性。
充分利用std::atomic
的特性和接口,可以实现很多非阻塞无锁的线程安全的数据结构和算法,关于这一点的延伸阅读,强力推荐《C++ Concurrency In Action》。
C++的线程类型是std::thread
,它提供的接口如下表:
接口 | 解析 |
---|---|
join | 加入主线程,使得主线程强制等待该线程执行完。 |
detach | 从主线程分离,使得主线程无需等待该线程执行完。 |
swap | 与另外一个线程交换线程对象。 |
joinable | 查询是否可加入主线程。 |
get_id | 获取该线程的唯一标识符。 |
native_handle | 返回实现层的线程句柄。 |
hardware_concurrency | 静态接口,返回硬件支持的并发线程数量。 |
使用范例:
#include <iostream>
#include <thread>
#include <chrono>
void foo()
{
// simulate expensive operation
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
std::cout << "starting thread...\n";
std::thread t(foo); // 构造线程对象,且传入被执行的函数。
std::cout << "waiting for thread to finish..." << std::endl;
t.join(); // 加入主线程,使得主线程必须等待该线程执行完毕。
std::cout << "done!\n";
}
输出:
starting thread...
waiting for thread to finish...
done!
如果需要在调用线程和新线程之间同步数据,则可以使用C++的std::promise
和std::future
等机制。示例代码:
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // Notify future
}
int main()
{
// Demonstrate using promise<int> to transmit a result between threads.
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
std::move(accumulate_promise));
// future::get() will wait until the future has a valid result and retrieves it.
// Calling wait() before get() is not needed
//accumulate_future.wait(); // wait for result
std::cout << "result = " << accumulate_future.get() << '\n';
work_thread.join(); // wait for thread completion
}
输出结果:
result = 21
但是,std::thread
的执行并不能保证是异步的,也可能是在当前线程执行。
如果需要强制异步,则可使用std::async
。它可以指定两种异步方式:std::launch::async
和std::launch::deferred
,前者表示使用新的线程异步地执行任务,后者表示在当前线程执行,且会被延迟执行。使用范例:
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
std::mutex m;
struct X {
void foo(int i, const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i) {
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}
};
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);
RandomIt mid = beg + len/2;
auto handle = std::async(std::launch::async,
parallel_sum<RandomIt>, mid, end);
int sum = parallel_sum(beg, mid);
return sum + handle.get();
}
int main()
{
std::vector<int> v(10000, 1);
std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
X x;
// Calls (&x)->foo(42, "Hello") with default policy:
// may print "Hello 42" concurrently or defer execution
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// Calls x.bar("world!") with deferred policy
// prints "world!" when a2.get() or a2.wait() is called
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// Calls X()(43); with async policy
// prints "43" concurrently
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // prints "world!"
std::cout << a3.get() << '\n'; // prints "53"
} // if a1 is not done at this point, destructor of a1 prints "Hello 42" here
执行结果:
The sum is 10000
43
Hello 42
world!
53
另外,C++20已经支持轻量级的协程(coroutine)了,相关的关键字:co_await
,co_return
,co_yield
,跟C#等脚本语言的概念和用法如出一辙,但行为和实现机制可能会稍有不同,此文不展开探讨了。
线程同步的机制有很多,C++支持的有以下几种:
[2.1.3.1 C++多线程关键字](#2.1.3.1 C++多线程关键字)已经对std::atomic
做了详细的解析,可以防止多线程之间共享数据的数据竞险问题。此外,它还提供了丰富多样的接口和状态查询,以便更加精细和高效地同步原子数据,常见接口和解析如下:
接口名 | 解析 |
---|---|
is_lock_free | 检查原子对象是否无锁的。 |
store | 存储值到原子对象。 |
load | 从原子对象加载值。 |
exchange | 获取原子对象的值,并替换成指定值。 |
compare_exchange_weak, compare_exchange_strong | 将原子对象的值和预期值(expected)对比,如果相同就替换成目标值(desired),并返回true ;如果不同,就加载原子对象的值到预期值(expected),并返回false 。weak模式不会卡调用线程,strong模式会卡住调用线程,直到原子对象的值和预期值(expected)相同。 |
fetch_add, fetch_sub, fetch_and, fetch_or, fetch_xor | 获取原子对象的值,并对其相加、相减等操作。 |
operator ++, operator --, operator +=, operator -=, ... | 对原子对象响应各类操作符,操作符的意义和普通变量一致。 |
此外,C++20还支持wait, notify_one, notify_all等同步接口。
利用compare_exchange_weak
接口可以很方便地实现线程安全的非阻塞式的数据结构。示例:
#include <atomic>
#include <future>
#include <iostream>
template<typename T>
struct node
{
T data;
node* next;
node(const T& data) : data(data), next(nullptr) {}
};
template<typename T>
class stack
{
public:
std::atomic<node<T>*> head; // 堆栈头, 采用原子操作.
public:
// 入栈操作
void push(const T& data)
{
node<T>* new_node = new node<T>(data);
// 将原有的头指针作为新节点的下一节点.
new_node->next = head.load(std::memory_order_relaxed);
// 将新的节点和老的头部节点做对比测试, 如果new_node->next==head, 说明其它线程没有修改head, 可以将head替换成new_node, 从而完成push操作.
// 反之, 如果new_node->next!=head, 说明其它线程修改了head, 将其它线程修改的head保存到new_node->next, 继续循环检测.
while(!head.compare_exchange_weak(new_node->next, new_node,
std::memory_order_release,
std::memory_order_relaxed))
; // 空循环体
}
};
int main()
{
stack<int> s;
auto r1 = std::async(std::launch::async, &stack<int>::push, &s, 1);
auto r2 = std::async(std::launch::async, &stack<int>::push, &s, 2);
auto r3 = std::async(std::launch::async, &stack<int>::push, &s, 3);
r1.wait();
r2.wait();
r3.wait();
// print the stack's values
node<int>* node = s.head.load(std::memory_order_relaxed);
while(node)
{
std::cout << node->data << " ";
node = node->next;
}
}
输出:
2 3 1
由此可见,利用原子及其接口可以很方便地进行多线程同步,而且由于是多线程异步入栈,栈的元素不一定与编码的顺序一致。
以上代码还涉及内存访问顺序的标记:
关于这方面的详情可以参看第一篇的内存屏障或者《C++ concurrency in action》的章节5.3 同步操作和强制排序。
std::mutex即互斥量,它会在作用范围内进入临界区(Critical section),使得该代码片段同时只能由一个线程访问,当其它线程尝试执行该片段时,会被阻塞。std::mutex常与std::lock_guard
,示例代码:
#include <iostream>
#include <map>
#include <string>
#include <chrono>
#include <thread>
#include <mutex>
std::map<std::string, std::string> g_pages;
std::mutex g_pages_mutex; // 声明互斥量
void save_page(const std::string &url)
{
// simulate a long page fetch
std::this_thread::sleep_for(std::chrono::seconds(2));
std::string result = "fake content";
// 配合std::lock_guard使用, 可以及时进入和释放互斥量.
std::lock_guard<std::mutex> guard(g_pages_mutex);
g_pages[url] = result;
}
int main()
{
std::thread t1(save_page, "http://foo");
std::thread t2(save_page, "http://bar");
t1.join();
t2.join();
// safe to access g_pages without lock now, as the threads are joined
for (const auto &pair : g_pages) {
std::cout << pair.first << " => " << pair.second << '\n';
}
}
输出:
http://bar => fake content
http://foo => fake content
此外,手动操作std::mutex
的锁定和解锁,可以实现一些特殊行为,例如等待某个标记:
#include <chrono>
#include <thread>
#include <mutex>
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m); // 这里采用std::unique_lock而非std::lock_guard. std::unique_lock可以实现尝试获得锁, 如果当前以及被其它线程锁定, 则延迟直到其它线程释放, 然后才获得锁.
while(!flag)
{
lk.unlock(); // 解锁互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 休眠100ms,在此期间,其它线程可以进入互斥量,以便更改flag标记。
lk.lock(); // 再锁互斥量
}
}
std::condition_variable
和std::condition_variable_any
都是条件变量,都是C++标准库的实现,它们都需要与互斥量配合使用。由于std::condition_variable_any
更加通用,会在性能上产生更多的开销。故而,应当首先考虑使用std::condition_variable
。
利用条件变量的接口,结合互斥量的使用,可以很方便地执行线程间的等待、通知等操作。示例:
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex m;
std::condition_variable cv; // 声明条件变量
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
// 等待直到主线程改变ready为true.
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return ready;});
// 获得了互斥量的锁
std::cout << "Worker thread is processing data\n";
data += " after processing";
// 发送数据给主线程
processed = true;
std::cout << "Worker thread signals data processing completed\n";
// 手动解锁, 以便主线程获得锁.
lk.unlock();
cv.notify_one();
}
int main()
{
std::thread worker(worker_thread);
data = "Example data";
// send data to the worker thread
{
std::lock_guard<std::mutex> lk(m);
ready = true;
std::cout << "main() signals data ready for processing\n";
}
cv.notify_one();
// wait for the worker
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return processed;});
}
std::cout << "Back in main(), data = " << data << '\n';
worker.join();
}
输出:
main() signals data ready for processing
Worker thread is processing data
Worker thread signals data processing completed
Back in main(), data = Example data after processing
C++的future(期望)是一种可以访问未来的返回值的机制,常用于多线程的同步。可以创建future的类型有: std::async, std::packaged_task, std::promise。
future对象可以执行wait、wait_for、wait_until,从而实现事件等待和同步,示例代码:
#include <iostream>
#include <future>
#include <thread>
int main()
{
// 从packaged_task获取的future
std::packaged_task<int()> task([]{ return 7; }); // wrap the function
std::future<int> f1 = task.get_future(); // get a future
std::thread t(std::move(task)); // launch on a thread
// 从async()获取的future
std::future<int> f2 = std::async(std::launch::async, []{ return 8; });
// 从promise获取的future
std::promise<int> p;
std::future<int> f3 = p.get_future();
std::thread( [&p]{ p.set_value_at_thread_exit(9); }).detach();
// 等待所有future
std::cout << "Waiting..." << std::flush;
f1.wait();
f2.wait();
f3.wait();
std::cout << "Done!\nResults are: " << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';
t.join();
}
输出:
Waiting...Done!
Results are: 7 8 9
多线程按并行内容可分为数据并行和任务并行两种。其中数据并行是不同的线程携带不同的数据执行相同的逻辑,最经典的数据并行的应用是MMX指令、SIMD技术、Compute着色器等。任务并行是不同的线程执行不同的逻辑,数据可以相同,也可以不同,例如,游戏引擎经常将文件加载、音频处理、网络接收乃至物理模拟都放到单独的线程,以便它们可以并行地执行不同的任务。
多线程如果按划分粒度和方式,则有线性划分、递归划分、任务类型划分等。
线性划分法的最简单应用就是将连续数组的元素平均分成若干份,每份数据派发到一个线程中执行,例如并行化的std::for_each
和UE里的ParallelFor
。
线性划分示意图。连续数据被均分为若干份,接着派发到若干线程中并行地执行。
在线性划分并行执行结束后,通常需要由调用线程合并和同步并行的结果。
递归划分法是将连续数据按照某种规则划分成若干份,每一份又可继续划分成更细粒度,直到某种规则停止划分。常用于快速排序。
快速排序有两个最基本的步骤:将数据划分到中枢(pivot)元素之前或之后,然后对中枢元素之前和之后的两半数组再次进行快速排序。由于只有在一次排序结束后才能知道哪些项在中枢元素之前和之后,所以不能通过对数据的简单(线性)划分达到并行。当要对这种算法进行并行化,很自然的会想到使用递归。每一级的递归都会多次调用quick_sort函数,因为需要知道哪些元素在中枢元素之前和之后。