当前位置 博文首页 > 0向往0:剖析虚幻渲染体系(02)- 多线程渲染

    0向往0:剖析虚幻渲染体系(02)- 多线程渲染

    作者:0向往0 时间:2021-01-25 23:03

    目录
    • 2.1 多线程编程基础
      • 2.1.1 多线程概述
      • 2.1.2 多线程概念
      • 2.1.3 C++的多线程
        • 2.1.3.1 C++多线程关键字
        • 2.1.3.2 C++线程
        • 2.1.3.3 C++多线程同步
      • 2.1.4 多线程实现机制
    • 2.2 现代图形API的多线程特性
      • 2.2.1 传统图形API的多线程特性
      • 2.2.2 DirectX12的多线程特性
      • 2.2.3 Vulkan的多线程特性
      • 2.2.4 Metal的多线程特性
    • 2.3 游戏引擎的多线程渲染
      • 2.3.1 Unity
      • **2.3.2 Frostbite **
      • 2.3.3 Naughty Dog Engine
      • 2.3.4 Destiny’s Engine
    • 2.4 UE的多线程机制
      • 2.4.1 UE的多线程基础
      • 2.4.2 UE的多线程实现
        • 2.4.2.1 FRunnable
        • 2.4.2.2 FRunnableThread
        • 2.4.2.3 QueuedWork
        • 2.4.2.4 TaskGraph
    • 2.5 UE的多线程渲染
      • 2.5.1 UE的多线程渲染基础
        • 2.5.1.1 场景和渲染模块主要类型
        • 2.5.1.2 引擎模块和渲染模块代表
        • 2.5.1.3 游戏线程和渲染线程代表
      • 2.5.2 UE的多线程渲染总览
      • 2.5.3 游戏线程和渲染线程的实现
        • 2.5.3.1 游戏线程的实现
        • 2.5.3.2 渲染线程的实现
        • 2.5.3.3 RHI线程的实现
      • 2.5.4 游戏线程和渲染线程的交互
      • 2.5.5 游戏线程和渲染线程的同步
    • 2.6 多线程渲染结语
    • 特别说明
    • 参考文献

     

     

    2.1 多线程编程基础

    为了更平稳地过渡,在真正进入UE的多线程渲染知识之前,先学习或重温一下多线程编程的基础知识。

    2.1.1 多线程概述

    多线程(Multithread)编程的思想早在单核时代就已经出现了,当时的操作系统(如Windows95)就已经支持多任务的功能,其原理就是在单核中切换不同的上下文(Context),以便每个进程中的线程都有时间获得执行指令的机会。

    但到了2005年,当单核主频接近4GHz时,CPU硬件厂商英特尔和AMD发现,速度也会遇到自己的极限:那就是单纯的主频提升,已经无法明显提升系统整体性能。

    随着单核计算频率摩尔定律的缓慢终结,Intel率先于2005年发布了奔腾D和奔腾四至尊版840系列,首次支持了两个物理级别的线程计算单元。此后十多年,多核CPU得到蓬勃发展,由AMD制造的Ryzen 3990X处理器已经拥有64个核心128个逻辑线程。

    锐龙(Ryzen)3990X的宣传海报中赫然凸显的核心与线程数量。

    硬件的多核发展,给软件极大的发挥空间。应用程序可以充分发挥多核多线程的计算资源,各个应用领域由此也产生多线程编程模型和技术。作为游戏的发动机Unreal Engine等商业引擎,同样可以利用多线程技术,以便更加充分地提升效率和效果。

    使用多线程并发带来的作用总结起来主要有两点:

    • 分离关注点。通过将相关的代码与无关的代码分离,可以使程序更容易理解和测试,从而减少出错的可能性。比如,游戏引擎中通常将文件加载、网络传输放入独立的线程中,既可以不阻碍主线程,也可以分离逻辑代码,使得更加清晰可扩展。
    • 提升性能。人多力量大,这样的道理同样用到CPU上(核多力量大)。相同量级的任务,如果能够分散到多个CPU中同时运行,必然会带来效率的提升。

    但是,随着CPU核心数量的提升,计算机获得的效益并非直线提升,而是遵循Amdahl's law(阿姆达尔定律),Amdahl's law的公式定义如下:

    \[S_{latency}(s) = \frac{1}{(1-p) + \frac{p}{s}} \]

    公式的各个分量含义如下:

    • \(S_{latency}\):整个任务在多线程处理中理论上获得的加速比。
    • \(s\):用于执行任务并行部分的硬件资源的线程数量。
    • \(p\):可并行处理的任务占比。

    举个具体的栗子,假设有8核16线程的CPU用于处理某个任务,这个任务有70%的部分是可以并行处理的,那么它的理论加速比为:

    \[S_{latency}(16) = \frac{1}{(1-0.7) + \frac{0.7}{16}} = 2.9 \]

    由此可见,多线程编程带来的效益并非跟核心数呈直线正比,实际上它的曲线如下所示:

    阿姆达尔定律揭示的核心数和加速比图例。由此可见,可并行的任务占比越低,加速比获得的效果越差:当可并行任务占比为50%时,16核已经基本达到加速比天花板,无论后面增加多少核心数量,都无济于事;如果可并行任务占比为95%时,到2048个核心才会达到加速比天花板。

    虽然阿姆达尔定律给我们带来了残酷的现实,但是,如果我们能够提升任务并行占比到接近100%,则加速比天花板可以得到极大提升:

    \[S_{latency}(s) = \frac{1}{(1-p) + \frac{p}{s}} = \frac{1}{(1-1) + \frac{1}{s}} = s \]

    如上公式所示,当\(p=1\)(即可并行的任务占比100%)时,理论上的加速比和核心数量成线性正比!!

    举个具体的例子,在编译Unreal Engine工程源码或Shader时,由于它们基本是100%的并行占比,理论上可以获得接近线性关系的加速比,在多核系统中将极大地缩短编译时间。

    利用多线程并发提高性能的方式有两种:

    • 任务并行(task parallelism)。将一个单个任务分成几部分,且各自并行运行,从而降低总运行时间。这种方式虽然看起来很简单直观,但实际操作中可能会很复杂,因为在各个部分之间可能存在着依赖。
    • 数据并行(data parallelism)。任务并行的是算法(执行指令)部分,即每个线程执行的指令不一样;而数据并行是指令相同,但执行的数据不一样。SIMD也是数据并行的一种方式。

    上面阐述了多线程并发的益处,接下来说说它的副作用。总结起来,副作用如下:

    • 导致数据竞争。多线程访问常常会交叉执行同一段代码,或者操作同一个资源,又或者多核CPU的高度缓存同步问题,由此变化带来各种数据不同步或数据读写错误,由此产生了各种各样的异常结果,这便是数据竞争。
    • 逻辑复杂化,难以调试。由于多线程的并发方式不唯一,不可预知,所以为了避免数据竞争,常常加入复杂多样的同步操作,代码也会变得离散、片段化、繁琐、难以理解,增加代码的辅助,对后续的维护、扩展都带来不可估量的阻碍。也会引发小概率事件难以重现的BUG,给调试和查错增加了数量级的难度。
    • 不一定能够提升效益。多线程技术用得到确实会带来效率的提升,但并非绝对,常和物理核心、同步机制、运行时状态、并发占比等等因素相关,在某些极端情况,或者用得不够妥当,可能反而会降低程序效率。

    2.1.2 多线程概念

    本小节将阐述多线程编程技术中常涉及的基本概念。

    • 进程(Process)

    进程(Process)是操作系统执行应用程序的基本单元和实体,它本身只是个容器,通常包含内核对象、地址空间、统计信息和若干线程。它本身并不真正执行代码指令,而是交由进程内的线程执行。

    对Windows而言,操作系统在创建进程时,同时也会给它创建一个线程,该线程被称为主线程(Primary thread, Main thread)。

    对Unix而言,进程和主线程其实是同一个东西,操作系统并不知道有线程的存在,线程更接近于lightweight processes(轻量级进程)的概念。

    进程有优先级概念,Windows下由低到高为:低(Low)、低于正常(Below normal)、正常(Normal)、高于正常(Above normal)、高(High)、实时(Real time)。(见下图)

    默认情况下,进程的优先级为Normal。优先级高的进程将会优先获得执行机会和时间。

    • 线程(Thread)

    线程(Thread)是可以执行代码的实体,通常不能独立存在,需要依附在某个进程内部。一个进程可以拥有多个线程,这些线程可以共享进程的数据,以便并行或并发地执行多个任务。

    在单核CPU中,操作系统(如Windows)可能会采用轮循(Round robin)的方式进行调度,使得多个线程看起来是同时运行的。(下图)

    在多核CPU中,线程可能会安排在不同的CPU核心同时运行,从而达到并行处理的目的。

    采用SMP的Windows在多核CPU的执行示意图。等待处理的线程被安排到不同的CPU核心。

    每个线程可拥有自己的执行指令上下文(如Windows的IP(指令寄存器地址)和SP(栈起始寄存器地址))、执行栈和TLS(Thread Local Storage,线程局部缓存)。

    Windows线程创建和初始化示意图。

    线程局部存储(Thread Local Storage)是一种存储持续期,对象的生命周期与线程一样,在线程开始时分配,线程结束时回收。每个线程有该对象自己的实例,访问和修改这样的对象不会造成竞争条件(Race Condition)。

    线程也存在优先级概念,优先级越高的将优先获得执行指令的机会。

    线程的状态一般有运行状态、暂停状态等。Windows可用以下接口切换线程状态:

    // 暂停线程
    DWORD SuspendThread(HANDLE hThread);
    // 继续运行线程
    DWORD ResumeThread(HANDLE hThread);
    

    同个线程可被多次暂停,如果要恢复运行状态,则需要调用同等次数的继续运行接口。

    • 协程(Coroutine)

    协程(Coroutine)是一种轻量级(lightweight)的用户态线程,通常跑在同一个线程,利用同一个线程的不同时间片段执行指令,没有线程、进程切换和调度的开销。从使用者角度,可以利用协程机制实现在同个线程模拟异步的任务和编码方式。在同个线程内,它不会造成数据竞争,但也会因线程阻塞而阻塞。

    • 纤程(Fiber)

    纤程(Fiber)如同协程,也是一种轻量级的用户态线程,可以使得应用程序独立决定自己的线程要如何运作。操作系统内核不知道纤程的存在,也不会为它进行调度。

    • 竞争条件(Race Condition)

    同个进程允许有多个线程,这些线程可以共享进程的地址空间、数据结构和上下文。进程内的同一数据块,可能存在多个线程在某个很小的时间片段内同时读写,这就会造成数据异常,从而导致了不可预料的结果。这种不可预期性便造就了竞争条件(Race Condition)

    避免产生竞争条件的技术有很多,诸如原子操作、临界区、读写锁、内核对象、信号量、互斥体、栅栏、屏障、事件等等。

    • 并行(Parallelism)

    至少两个线程同时执行任务的机制。一般有多核多物理线程的CPU同时执行的行为,才可以叫并行,单核的多线程不能称之为并行。

    • 并发(Concurrency)

    至少两个线程利用时间片(Timeslice)执行任务的机制,是并行的更普遍形式。即便单核CPU同时执行的多线程,也可称为并发。

    并发的两种形式——上:双物理核心的同时执行(并行);下:单核的多任务切换(并发)。

    事实上,并发和并行在多核处理器中是可以同时存在的,比如下图所示,存在双核,每个核心又同时切换着多个任务:

    部分参考文献严格区分了并行和并发,但部分文献并不明确指出其中的区别。虚幻引擎的多线程渲染架构和API中,常出现并行和并发的概念,所以虚幻是明显区分两者之间的含义。

    • 线程池(Thread Pool)

    线程池提供了一种新的任务并发的方式,调用者只需要传入一组可并行的任务和分组的策略,便可以使用线程池的若干线程并发地执行任务,使得调用者无需接直接触线程的调用和管理细节,降低了调用者的成本,也提升了线程的调度效率和吞吐量。

    不过,创建一个线程池时,几个关键性的设计问题会影响并发效率,比如:可使用的线程数量,高效的任务分配方式,以及是否需要等待一个任务完成。

    线程池可以自定义实现,也可以直接使用C++、操作系统或第三方库提供的API。

    2.1.3 C++的多线程

    在C++11之前,C++的多线程支持基本为零,仅提供少量鸡肋的volatile等关键字。直到C++11标准,多线程才真正纳入C++标准,并提供了相关关键字、STL标准库,以便使用者实现跨平台的多线程调用。

    当然,对使用者来说,多线程的实现可采用C++11的线程库,也可以根据具体的系统平台提供的多线程API自定义线程库,还可以使用诸如ACE、boost::thread等第三方库。使用C++自带的多线程库,有几个优点,一是使用简单方便,依赖少;二是跨平台,无需关注系统底层。

    2.1.3.1 C++多线程关键字

    • thread_local

    thread_local是C++是实现线程局部存储的关键,添加了此关键字的变量意味着每个线程都有自己的一份数据,不会共享同一份数据,避免数据竞争。

    C11的关键字_Thread_local用于定义线程局部变量。在头文件<threads.h>定义了thread_local为上述关键词的同义。例如:

    #include <threads.h>
    thread_local int foo = 0;
    

    C++11引入的thread_local关键字用于下述情形:

    1、名字空间(全局)变量。

    2、文件静态变量。

    3、函数静态变量。

    4、静态成员变量。

    此外,不同编译器提供了各自的方法声明线程局部变量:

    // Visual C++, Intel C/C++ (Windows systems), C++Builder, Digital Mars C++
    __declspec(thread) int number;
    
    // Solaris Studio C/C++, IBM XL C/C++, GNU C, Clang, Intel C++ Compiler (Linux systems)
    __thread int number;
    
    // C++ Builder
    int __thread number;
    
    • volatile

    使用了volatile修饰符的变量意味着它在内存中的值可能随时发生变化,也告诉编译器不能做任何优化,每次使用到此变量的值都必须从内存中读取,而不应该直接使用寄存器的值。

    举个具体的栗子吧。假设有以下代码段:

    int a = 10;
    volatile int *p = &a;
    int b, c;
    b = *p;
    c = *p;
    

    p没有volatile修饰,则b = *pc = *p只需从内存取一次p的值,那么bc的值必然是10

    若考虑volatile的影响,假设执行完b = *p语句之后,p的值被其它线程修改了,则执行c = *p会再次从内存中读取p的值,此时c的值不再是10,而是新的值。

    但是,volatile并不能解决多线程的同步问题,只适合以下三种情况使用:

    1、和信号处理(signal handler)相关的场合。

    2、和内存映射硬件(memory mapped hardware)相关的场合。

    3、和非本地跳转(setjmplongjmp)相关的场合。

    • std::atomic

    严格来说atomic并不是关键字,而是STL的模板类,可以支持指定类型的原子操作。

    使用原子的类型意味着该类型的实例的读写操作都是原子性的,无法被其它线程切割,从而达到线程安全和同步的目标。

    可能有些读者会好奇,为什么对于基本类型的操作也需要原子操作。比如:

    int cnt = 0;
    auto f = [&]{cnt++;};
    std::thread t1{f}, t2{f}, t3{f};
    

    以上三个线程同时调用函数f,该函数只执行cnt++,在C++维度,似乎只有一条执行语句,理论上不应该存在同步问题。然而,编译成汇编指令后,会有多条指令,这就会在多线程中引起线程上下文切换,引起不可预知的行为。

    为了避免这种情况,就需要加入atomic类型:

    std::atomic<int> cnt{0};	// 给cnt加入原子操作。
    auto f = [&]{cnt++;};
    std::thread t1{f}, t2{f}, t3{f};
    

    加入atomic之后,所有线程执行后的结果是确定的,能够正常给变量计数。atomic的实现机制与临界区类似,但效率上比临界区更快。

    为了更进一步地说明C++的单条语句可能生成多条汇编指令,可借助Compiler Explorer来实时查探C++汇编后的指令:

    Compiler Explorer动态将左侧C++语句编译出的汇编指令。上图所示的c++代码编译后可能存在一对多的汇编指令,由此印证atomic原子操作的必要性。

    充分利用std::atomic的特性和接口,可以实现很多非阻塞无锁的线程安全的数据结构和算法,关于这一点的延伸阅读,强力推荐《C++ Concurrency In Action》。

    2.1.3.2 C++线程

    C++的线程类型是std::thread,它提供的接口如下表:

    接口 解析
    join 加入主线程,使得主线程强制等待该线程执行完。
    detach 从主线程分离,使得主线程无需等待该线程执行完。
    swap 与另外一个线程交换线程对象。
    joinable 查询是否可加入主线程。
    get_id 获取该线程的唯一标识符。
    native_handle 返回实现层的线程句柄。
    hardware_concurrency 静态接口,返回硬件支持的并发线程数量。

    使用范例:

    #include <iostream>
    #include <thread>
    #include <chrono>
    
    void foo()
    {
        // simulate expensive operation
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
     
    int main()
    {
        std::cout << "starting thread...\n";
        std::thread t(foo); // 构造线程对象,且传入被执行的函数。
     
        std::cout << "waiting for thread to finish..." << std::endl;
        t.join(); // 加入主线程,使得主线程必须等待该线程执行完毕。
     
        std::cout << "done!\n";
    }
    

    输出:

    starting thread...
    waiting for thread to finish...
    done!
    

    如果需要在调用线程和新线程之间同步数据,则可以使用C++的std::promisestd::future等机制。示例代码:

    #include <vector>
    #include <thread>
    #include <future>
    #include <numeric>
    #include <iostream>
     
    void accumulate(std::vector<int>::iterator first,
                    std::vector<int>::iterator last,
                    std::promise<int> accumulate_promise)
    {
        int sum = std::accumulate(first, last, 0);
        accumulate_promise.set_value(sum);  // Notify future
    }
     
    int main()
    {
        // Demonstrate using promise<int> to transmit a result between threads.
        std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
        std::promise<int> accumulate_promise;
        std::future<int> accumulate_future = accumulate_promise.get_future();
        std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
                                std::move(accumulate_promise));
     
        // future::get() will wait until the future has a valid result and retrieves it.
        // Calling wait() before get() is not needed
        //accumulate_future.wait();  // wait for result
        std::cout << "result = " << accumulate_future.get() << '\n';
        work_thread.join();  // wait for thread completion
    }
    

    输出结果:

    result = 21
    

    但是,std::thread的执行并不能保证是异步的,也可能是在当前线程执行。

    如果需要强制异步,则可使用std::async。它可以指定两种异步方式:std::launch::asyncstd::launch::deferred,前者表示使用新的线程异步地执行任务,后者表示在当前线程执行,且会被延迟执行。使用范例:

    #include <iostream>
    #include <vector>
    #include <algorithm>
    #include <numeric>
    #include <future>
    #include <string>
    #include <mutex>
     
    std::mutex m;
    struct X {
        void foo(int i, const std::string& str) {
            std::lock_guard<std::mutex> lk(m);
            std::cout << str << ' ' << i << '\n';
        }
        void bar(const std::string& str) {
            std::lock_guard<std::mutex> lk(m);
            std::cout << str << '\n';
        }
        int operator()(int i) {
            std::lock_guard<std::mutex> lk(m);
            std::cout << i << '\n';
            return i + 10;
        }
    };
     
    template <typename RandomIt>
    int parallel_sum(RandomIt beg, RandomIt end)
    {
        auto len = end - beg;
        if (len < 1000)
            return std::accumulate(beg, end, 0);
     
        RandomIt mid = beg + len/2;
        auto handle = std::async(std::launch::async,
                                 parallel_sum<RandomIt>, mid, end);
        int sum = parallel_sum(beg, mid);
        return sum + handle.get();
    }
     
    int main()
    {
        std::vector<int> v(10000, 1);
        std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
     
        X x;
        // Calls (&x)->foo(42, "Hello") with default policy:
        // may print "Hello 42" concurrently or defer execution
        auto a1 = std::async(&X::foo, &x, 42, "Hello");
        // Calls x.bar("world!") with deferred policy
        // prints "world!" when a2.get() or a2.wait() is called
        auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
        // Calls X()(43); with async policy
        // prints "43" concurrently
        auto a3 = std::async(std::launch::async, X(), 43);
        a2.wait();                     // prints "world!"
        std::cout << a3.get() << '\n'; // prints "53"
    } // if a1 is not done at this point, destructor of a1 prints "Hello 42" here
    

    执行结果:

    The sum is 10000
    43
    Hello 42
    world!
    53
    

    另外,C++20已经支持轻量级的协程(coroutine)了,相关的关键字:co_awaitco_returnco_yield,跟C#等脚本语言的概念和用法如出一辙,但行为和实现机制可能会稍有不同,此文不展开探讨了。

    2.1.3.3 C++多线程同步

    线程同步的机制有很多,C++支持的有以下几种:

    • std::atomic

    [2.1.3.1 C++多线程关键字](#2.1.3.1 C++多线程关键字)已经对std::atomic做了详细的解析,可以防止多线程之间共享数据的数据竞险问题。此外,它还提供了丰富多样的接口和状态查询,以便更加精细和高效地同步原子数据,常见接口和解析如下:

    接口名 解析
    is_lock_free 检查原子对象是否无锁的。
    store 存储值到原子对象。
    load 从原子对象加载值。
    exchange 获取原子对象的值,并替换成指定值。
    compare_exchange_weak, compare_exchange_strong 将原子对象的值和预期值(expected)对比,如果相同就替换成目标值(desired),并返回true;如果不同,就加载原子对象的值到预期值(expected),并返回false。weak模式不会卡调用线程,strong模式会卡住调用线程,直到原子对象的值和预期值(expected)相同。
    fetch_add, fetch_sub, fetch_and, fetch_or, fetch_xor 获取原子对象的值,并对其相加、相减等操作。
    operator ++, operator --, operator +=, operator -=, ... 对原子对象响应各类操作符,操作符的意义和普通变量一致。

    此外,C++20还支持wait, notify_one, notify_all等同步接口。

    利用compare_exchange_weak接口可以很方便地实现线程安全的非阻塞式的数据结构。示例:

    #include <atomic>
    #include <future>
    #include <iostream>
    
    template<typename T>
    struct node
    {
        T data;
        node* next;
        node(const T& data) : data(data), next(nullptr) {}
    };
     
    template<typename T>
    class stack
    {
     public:
        std::atomic<node<T>*> head;	// 堆栈头, 采用原子操作.
     public:
        // 入栈操作
        void push(const T& data)
        {
            node<T>* new_node = new node<T>(data);
     
            // 将原有的头指针作为新节点的下一节点.
            new_node->next = head.load(std::memory_order_relaxed);
     
            // 将新的节点和老的头部节点做对比测试, 如果new_node->next==head, 说明其它线程没有修改head, 可以将head替换成new_node, 从而完成push操作.
            // 反之, 如果new_node->next!=head, 说明其它线程修改了head, 将其它线程修改的head保存到new_node->next, 继续循环检测.
            while(!head.compare_exchange_weak(new_node->next, new_node,
                                            std::memory_order_release,
                                            std::memory_order_relaxed))
                ; // 空循环体
        }
    };
    
    int main()
    {
        stack<int> s;
        
        auto r1 = std::async(std::launch::async, &stack<int>::push, &s, 1);
        auto r2 = std::async(std::launch::async, &stack<int>::push, &s, 2);
        auto r3 = std::async(std::launch::async, &stack<int>::push, &s, 3);
        
        r1.wait();
        r2.wait();
        r3.wait();
        
        // print the stack's values
        node<int>* node = s.head.load(std::memory_order_relaxed);
        while(node)
        {
            std::cout << node->data << " ";
            node = node->next;
        }
    }
    

    输出:

    2 3 1
    

    由此可见,利用原子及其接口可以很方便地进行多线程同步,而且由于是多线程异步入栈,栈的元素不一定与编码的顺序一致。

    以上代码还涉及内存访问顺序的标记:

    • 排序一致序列(sequentially consistent)。
    • 获取-释放序列(memory_order_consume, memory_order_acquire, memory_order_release和memory_order_acq_rel)。
    • 自由序列(memory_order_relaxed)。

    关于这方面的详情可以参看第一篇的内存屏障或者《C++ concurrency in action》的章节5.3 同步操作和强制排序。

    • std::mutex

    std::mutex即互斥量,它会在作用范围内进入临界区(Critical section),使得该代码片段同时只能由一个线程访问,当其它线程尝试执行该片段时,会被阻塞。std::mutex常与std::lock_guard,示例代码:

    #include <iostream>
    #include <map>
    #include <string>
    #include <chrono>
    #include <thread>
    #include <mutex>
     
    std::map<std::string, std::string> g_pages;
    std::mutex g_pages_mutex;	// 声明互斥量
     
    void save_page(const std::string &url)
    {
        // simulate a long page fetch
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::string result = "fake content";
     	
        // 配合std::lock_guard使用, 可以及时进入和释放互斥量.
        std::lock_guard<std::mutex> guard(g_pages_mutex);
        g_pages[url] = result;
    }
     
    int main() 
    {
        std::thread t1(save_page, "http://foo");
        std::thread t2(save_page, "http://bar");
        t1.join();
        t2.join();
     
        // safe to access g_pages without lock now, as the threads are joined
        for (const auto &pair : g_pages) {
            std::cout << pair.first << " => " << pair.second << '\n';
        }
    }
    

    输出:

    http://bar => fake content
    http://foo => fake content
    

    此外,手动操作std::mutex的锁定和解锁,可以实现一些特殊行为,例如等待某个标记:

    #include <chrono>
    #include <thread>
    #include <mutex>
    
    bool flag;
    std::mutex m;
    
    void wait_for_flag()
    {
        std::unique_lock<std::mutex> lk(m); // 这里采用std::unique_lock而非std::lock_guard. std::unique_lock可以实现尝试获得锁, 如果当前以及被其它线程锁定, 则延迟直到其它线程释放, 然后才获得锁.
        while(!flag)
        {
            lk.unlock(); // 解锁互斥量
            std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 休眠100ms,在此期间,其它线程可以进入互斥量,以便更改flag标记。
            lk.lock();   // 再锁互斥量
        }
    }
    
    • std::condition_variable

    std::condition_variablestd::condition_variable_any都是条件变量,都是C++标准库的实现,它们都需要与互斥量配合使用。由于std::condition_variable_any更加通用,会在性能上产生更多的开销。故而,应当首先考虑使用std::condition_variable

    利用条件变量的接口,结合互斥量的使用,可以很方便地执行线程间的等待、通知等操作。示例:

    #include <iostream>
    #include <string>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
     
    std::mutex m;
    std::condition_variable cv;	// 声明条件变量
    std::string data;
    bool ready = false;
    bool processed = false;
     
    void worker_thread()
    {
        // 等待直到主线程改变ready为true.
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return ready;});
     
        // 获得了互斥量的锁
        std::cout << "Worker thread is processing data\n";
        data += " after processing";
     
        // 发送数据给主线程
        processed = true;
        std::cout << "Worker thread signals data processing completed\n";
     
        // 手动解锁, 以便主线程获得锁.
        lk.unlock();
        cv.notify_one();
    }
     
    int main()
    {
        std::thread worker(worker_thread);
     
        data = "Example data";
        // send data to the worker thread
        {
            std::lock_guard<std::mutex> lk(m);
            ready = true;
            std::cout << "main() signals data ready for processing\n";
        }
        cv.notify_one();
     
        // wait for the worker
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, []{return processed;});
        }
        std::cout << "Back in main(), data = " << data << '\n';
     
        worker.join();
    }
    

    输出:

    main() signals data ready for processing
    Worker thread is processing data
    Worker thread signals data processing completed
    Back in main(), data = Example data after processing
    
    • std::future

    C++的future(期望)是一种可以访问未来的返回值的机制,常用于多线程的同步。可以创建future的类型有: std::async, std::packaged_task, std::promise。

    future对象可以执行wait、wait_for、wait_until,从而实现事件等待和同步,示例代码:

    #include <iostream>
    #include <future>
    #include <thread>
     
    int main()
    {
        // 从packaged_task获取的future
        std::packaged_task<int()> task([]{ return 7; }); // wrap the function
        std::future<int> f1 = task.get_future();  // get a future
        std::thread t(std::move(task)); // launch on a thread
     
        // 从async()获取的future
        std::future<int> f2 = std::async(std::launch::async, []{ return 8; });
     
        // 从promise获取的future
        std::promise<int> p;
        std::future<int> f3 = p.get_future();
        std::thread( [&p]{ p.set_value_at_thread_exit(9); }).detach();
     	
        // 等待所有future
        std::cout << "Waiting..." << std::flush;
        f1.wait();
        f2.wait();
        f3.wait();
        std::cout << "Done!\nResults are: " << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';
        t.join();
    }
    

    输出:

    Waiting...Done!
    Results are: 7 8 9
    

    2.1.4 多线程实现机制

    多线程按并行内容可分为数据并行和任务并行两种。其中数据并行是不同的线程携带不同的数据执行相同的逻辑,最经典的数据并行的应用是MMX指令、SIMD技术、Compute着色器等。任务并行是不同的线程执行不同的逻辑,数据可以相同,也可以不同,例如,游戏引擎经常将文件加载、音频处理、网络接收乃至物理模拟都放到单独的线程,以便它们可以并行地执行不同的任务。

    多线程如果按划分粒度和方式,则有线性划分、递归划分、任务类型划分等。

    线性划分法的最简单应用就是将连续数组的元素平均分成若干份,每份数据派发到一个线程中执行,例如并行化的std::for_each和UE里的ParallelFor

    线性划分示意图。连续数据被均分为若干份,接着派发到若干线程中并行地执行。

    在线性划分并行执行结束后,通常需要由调用线程合并和同步并行的结果。

    递归划分法是将连续数据按照某种规则划分成若干份,每一份又可继续划分成更细粒度,直到某种规则停止划分。常用于快速排序。

    快速排序有两个最基本的步骤:将数据划分到中枢(pivot)元素之前或之后,然后对中枢元素之前和之后的两半数组再次进行快速排序。由于只有在一次排序结束后才能知道哪些项在中枢元素之前和之后,所以不能通过对数据的简单(线性)划分达到并行。当要对这种算法进行并行化,很自然的会想到使用递归。每一级的递归都会多次调用quick_sort函数,因为需要知道哪些元素在中枢元素之前和之后。