深圳幻海软件技术有限公司 欢迎您!

C++ 20 协程 Coroutine(2,等待体)

2023-02-28

co_await awaiter的用途?明确说C++20的协程大部分概念还算清晰,就是yeild,然后外部利用句柄resume。对协程这个概念有了解的不应该有什么特别难以理解的地方。但co_awaitawaiter比较让人疑惑。上次我已经讲过,awaiter其实是是一个对象,一个接口实现,

co_await  awaiter的用途?

明确说C++20的协程大部分概念还算清晰,就是yeild,然后外部利用句柄resume。对协程这个概念有了解的不应该有什么特别难以理解的地方。

但co_await awaiter比较让人疑惑。

上次我已经讲过,awaiter其实是是一个对象,一个接口实现,其3个接口函数是(详细解释请翻阅第一章):

  • await_ready​:等待体是否准备好了,没准备好(return false​)就调用await_suspend
  • await_suspend​:等待体挂起如何操作。参数为调用其的协程句柄。return true​ ,或者 return void 就会挂起协程。
  • await_resume​:协程挂起后恢复时,调用的接口,同时返回其结果,作为co_await的返回值。

不少代码的例子都是在await_suspend 函数中,直接把handle.resume(),就是说这些例子都是在挂起时就理解恢复了协程运行,这样的例子貌似什么异步的感觉都没有,没有体现任何异步操作的效果和优势。

这样co_await awaiter​能用来干啥就有点让我好奇了。我的直觉是等待体awaiter在await_suspend应该就是记录协程句柄,同时发起一个异步操作(比如用一个线程完成文件读写),然后在异步操作完成后,恢复协程的运行,告知协程读写的结果。

co_await awaiter的在未来应该会有很多种等待体,比如AIO,异步网络,异步读写数据库等。这也应该是未来C++协程重点反正发展地方。

await_suspend的参数

这个问题先提前说一下,我曾经疑惑过。await_suspend接口的参数,其是调用其的外部协程的句柄。

void await_suspend(std::coroutine_handle<result::promise_type> awaiting)
  • 1.

但让我疑惑的是 std::coroutine_handle<>​ 里面模板参数理论应该是协程promise_type​承诺对象。不知道您理解这儿的麻烦没有,如果你要写一个通用的awaiter,那么难道都要使用模板?让使用者填写其协程对应的promise_type。这样开发者,使用者都麻烦。

后面我发现,如果只要你不使用对应的承诺对象,std::coroutine_handle<promise_type>::promise()​ 。参数类型写成std::coroutine_handle<>也没有问题(<>中为空,默认为void)。这样也可以适配各种协程。

co_await  的呈现形式

co_await  可以呈现出不少形式,如果你才开始学你会比较疑惑。

co_ret = co_await  awaiter;
  • 1.

co_await 调用 awaiter的接口。co_ret 是从awaiter 里面的await_resume 接口的返回值。

co_ret = co_await  fun();
  • 1.

fun() 函数返回值是awaiter 对象,co_ret 是从awaiter 里面的await_resume 接口的返回值。

例子:尝试异步IO(有缺陷)

我们尝试一些一个异步的读取文件的操作,封装在awaiter对象await_read_file​里面,在其await_suspend​接口中,我们尝试使用std::async发起了一个异步操作。然后等待返回结果。

协程返回值仍然是 coro_ret<T>​, 承诺对象还是coro_ret<T>::promise_type​,这个地方和前面的例子几乎没有差别,只是initial_suspend​返回的std::suspend_never{},表示协程在初始化后(刚刚进入时)不进行挂起操作。源代码地址请点击。

#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <future>
#include <chrono>
#include <thread>

//!coro_ret 协程函数的返回值,内部定义promise_type,承诺对象
template <typename T>
struct coro_ret
{
   struct promise_type;
   using handle_type = std::coroutine_handle<promise_type>;
   //! 协程句柄
   handle_type coro_handle_;

   coro_ret(handle_type h)
      : coro_handle_(h)
  {
  }
   coro_ret(const coro_ret&) = delete;
   coro_ret(coro_ret&& s)
      : coro_handle_(s.coro_)
  {
       s.coro_handle_ = nullptr;
  }
   ~coro_ret()
  {
       //!自行销毁
       if (coro_handle_)
           coro_handle_.destroy();
  }
   coro_ret& operator=(const coro_ret&) = delete;
   coro_ret& operator=(coro_ret&& s)
  {
       coro_handle_ = s.coro_handle_;
       s.coro_handle_ = nullptr;
       return *this;
  }

   //!恢复协程,返回是否结束
   bool move_next()
  {
       coro_handle_.resume();
       return coro_handle_.done();
  }
   //!通过promise获取数据,返回值
   T get()
  {
       return coro_handle_.promise().return_data_;
  }
   //!promise_type就是承诺对象,承诺对象用于协程内外交流
   struct promise_type
  {
       promise_type() = default;
       ~promise_type() = default;

       //!生成协程返回值
       auto get_return_object()
      {
           return coro_ret<T>{handle_type::from_promise(*this)};
      }

       //! 注意这个函数,返回的就是awaiter
       //! 如果返回std::suspend_never{},就不挂起,
       //! 返回std::suspend_always{} 挂起
       //! 当然你也可以返回其他awaiter
       auto initial_suspend()
      {
           return std::suspend_never{};
           //return std::suspend_always{};
      }
       //!co_return 后这个函数会被调用
       void return_value(T v)
      {
           return_data_ = v;
           return;
      }
       //!
       auto yield_value(T v)
      {
           std::cout << "yield_value invoked." << std::endl;
           return_data_ = v;
           return std::suspend_always{};
      }
       //! 在协程最后退出后调用的接口。
       //! 若 final_suspend 返回 std::suspend_always 则需要用户自行调用
       //! handle.destroy() 进行销毁,但注意final_suspend被调用时协程已经结束
       //! 返回std::suspend_always并不会挂起协程(实测 VSC++ 2022)
       auto final_suspend() noexcept
      {
           std::cout << "final_suspend invoked." << std::endl;
           return std::suspend_always{};
      }
       //
       void unhandled_exception()
      {
           std::exit(1);
      }
       //返回值
       T return_data_;
  };
};

int read_file(const char* filename,
             char* buffer,
             size_t buf_len,
             size_t* read_len,
             std::coroutine_handle<> coro_hdl)
{
   int result = 0;
   size_t len = 0;
   *read_len = 0;
   //打开文件
   FILE* fd = ::fopen(filename, "r+");
   if (nullptr == fd)
  {
       result = -1;
       goto READ_FILE_END;
  }
   //读取内容
   len = ::fread(buffer, 1, buf_len, fd);
  ::fclose(fd);
   if (len <= 0)
  {
       result = -1;
       goto READ_FILE_END;
  }

   *read_len = len;
   result = 0;

   //到了最后一步,这儿用goto只是方便写代码。
READ_FILE_END:

   return result;
}


struct await_read_file
{
   await_read_file(const char* filename,
                   char* buffer,
                   size_t buf_len,
                   size_t* read_len)
  {
       filename_ = filename;
       buffer_ = buffer;
       buf_len_ = buf_len;
       read_len_ = read_len;
  };
   ~await_read_file() = default;

   bool await_ready()
  {
       return false;
  }
   //挂起的操作,发起异步读文件操作,然后等待返回
   void await_suspend(std::coroutine_handle<> awaiting)
  {
       fur_ = std::async(std::launch::async,
                         &read_file,
                         filename_,
                         buffer_,
                         buf_len_,
                         read_len_,
                         awaiting);
       result_ = fur_.get();
       awaiting.resume();
  }
   //返回结果
   int await_resume()
  {
       return result_;
  }

   //读文件的参数,返回值
   int result_ = -1;
   const char* filename_ = nullptr;
   char* buffer_ = nullptr;
   size_t buf_len_ = 0;
   size_t* read_len_ = nullptr;

   std::future<int> fur_;

   //!协程的句柄
   std::coroutine_handle<> awaiting_;
};



//这就是一个协程函数
coro_ret<int> coroutine_await(const char* filename,
                             char* buffer,
                             size_t buf_len,
                             size_t* read_len)
{
   int ret = co_await await_read_file(filename,
                                      buffer,
                                      buf_len,
                                      read_len);
   //这行其实没有执行到。
   std::cout << "await_read_file ret= " << ret << std::endl;
   if (ret == 0)
  {
       std::cout << "await_read_file read_len= " << *read_len << std::endl;
  }
   co_return 0;
}

int main(int argc, char* argv[])
{
   using namespace std::chrono_literals;
   //调用协程
   char buffer[1024];
   size_t read_len = 0;
   std::cout << "Start coroutine_await coroutine\n";
   auto c_r = coroutine_await("E:/TEST001/aio_test_001.txt",
                              buffer,
                              1024,
                              &read_len);
   std::cout << "End coroutine_await coroutine\n";
   return 0;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.

最后输出的信息记录是:

Start coroutine_await coroutine
await_read_file ret= 0
await_read_file read_len= 20
final_suspend invoked.
End coroutine_await coroutine
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

其实您可以已经发现了。这个实现虽然可以正常运行,但没有起到任何异步操作效果,因为await_suspend的接口虽然发起了异步操作std::async。但后面又进行了等待操作 result_ = fur_.get();

void await_suspend(std::coroutine_handle<> awaiting)
  {
       fur_ = std::async(std::launch::async,
                         &read_file,
                         filename_,
                         buffer_,
                         buf_len_,
                         read_len_,
                         awaiting);
       result_ = fur_.get();
       awaiting.resume();
  }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

你可以认为虽然他发起了异步操作,整个主线程还是阻塞的,没有任何异步效果。

例子:再次尝试异步IO(有bug,多线程的危险)

在部分文章例子代码中,他们会提出一些异步思路。

比如在异步执行的函数read_file 中去调用 coro_hdl.resume(); 在await_resume中执行result_ = fur_.get();效果如何呢?我们先贴出作出改进代码。

int read_file(const char* filename,
             char* buffer,
             size_t buf_len,
             size_t* read_len,
             std::coroutine_handle<> coro_hdl)
{
//…………
   //到了最后一步
READ_FILE_END:
   //变化点:在AIO的线程里面恢复协程。
   coro_hdl.resume();
   return result;
}

struct await_read_file
{
//其他代码没改变
   //…………
   //挂起的操作,发起异步读文件操作,然后等待返回
   void await_suspend(std::coroutine_handle<> awaiting)
  {
       fur_ = std::async(std::launch::async,
                         &read_file,
                         filename_,
                         buffer_,
                         buf_len_,
                         read_len_,
                         awaiting);
       //不再在这个地方进行等待了
  }
   //返回结果
   int await_resume()
  {
       result_ = fur_.get();
       return result_;
  }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.

但这无疑是一个错误的改进。最后的输出结果要不就是崩溃,要不就是无法真正完成协程。

Start coroutine_await ()
Start coroutine_await ()
End coroutine_await ()
# 协程并没有执行完成
  • 1.
  • 2.
  • 3.
  • 4.

为什么???这儿又是因为可恶的多线程陷阱了。我们贴个时序图,您就会更加理解。

您不能在另外一个线程中去恢复协程的运行。,切记,切记。

吐槽一下C++ 11的异步操作设计

那应该如何修正,能异步操作,有能唤醒协程呢?方法还是有的,在我们发起std::aysnc 操作,得到一个std::future时,我们可以在主循环里面去等待std::future​,因为future可以等待很短的时间,也可以反复尝试。这样我们的代码主循环就一边等待(反复尝试),一边干点别的事情。

不过我也懒得把这个很丑的模型实现出来了。

这儿我们可以讨论一个问题,C++的异步模式,promise/future,async/future,都需要future在后面等待事情的完成。特别是在服务器类型的开发,这种方式并不好用。(我注明了服务器类型呀)

首先看,每一个异步操作都(可能)需要启动一个线程,这个消耗过大,其次每一个future都需要等待,其实在设计上也很讨厌。如果你设计一个队列保存future,那么还需要将future和需要回调的操作绑定起来。

个人用不太惯,有高人指点一下?在服务器里面怎么

异步协程co_await awaiter接口设计

上面那个例子很初步,真正用起来很不爽,那么怎么设计能更加好的设计协程的异步IO。

首先我们回顾一下传统的libuv这类传统的AIO设计。

这类AIO都是通过一个请求消息队列传递请求给线程池,让线程池去真正干活。线程池干完活后,再将结果返回给一个应答消息队列。请求消息中有一个请求者的回调函数指针,随后又会回填给应答消息中。主循环会不断检查应答消息队列里面有没有消息,如果有应答消息,就从消息中取出回调函数调用之。

这种模型才是比较通用的服务器异步模型设计。这种模型也很容易结合到协程co_await awaiter设计中来。你只需要在回调函数里面激活挂起的协程就可以了。

做一个简单的时序图给大家。

而如果你想用libuv封装,我估计还是改造一下libuv的代码。毕竟如果寄希望协程句柄透传回填回来。也需要消息结构进行改变。

至于代码,我自己的代码库zcelib/dev分支,aio目录下的代码有一个测试实现。因为涉及的面有不少(因为功能,代码写在好多CPP里面),只贴出部分说明一下吧。

//AIO 文件处理相关的awaiter等待体
struct await_aiofs
{
    await_aiofs(zce::aio::Worker* worker,
                zce::aio::FS_Handle* fs_hdl)
         worker_(worker),
         fs_hdl_(fs_hdl)
    {
    }
    ~await_aiofs() = default;

    //是否准备好
    bool await_ready()
    {
        return false;
    }
    //挂起操作
    void await_suspend(std::coroutine_handle<> awaiting);
    {
        //回调函数
        fs_hdl_->call_back_ = std::bind(&await_aiofs::resume,
                                        this,
                                        std::placeholders::_1);
        //将一个文件操作句柄放入请求队列
        bool succ_req = worker_->request(fs_hdl_);
        if (succ_req)
        {
            return false;
        }
        else
        {
            return true;
        }
    }
    //!恢复后返回结果
    FS_Handle await_resume()
    {
        return return_hdl_;
    }
    //!回调函数
    void resume(AIO_Handle* return_hdl)
    {
        FS_Handle* fs_hdl = (FS_Handle*)return_hdl;
        return_hdl_ = *fs_hdl;
        awaiting_.resume();
        return;
    }

    //!工作者,具有请求,应答管道,处理IO多线程的管理者
    zce::aio::Worker* worker_ = nullptr;
    //!请求的文件操作句柄
    zce::aio::FS_Handle* fs_hdl_ = nullptr;
    //!完成后返回的句柄
    zce::aio::FS_Handle return_hdl_;
    //!协程的句柄(调用者)
    std::coroutine_handle<> awaiting_;
};

//AIO 协程的co_await 函数
await_aiofs co_read_file(zce::aio::Worker* worker,
                         const char* path,
                         char* read_bufs,
                         size_t nbufs,
                         ssize_t offset)
{
    //从对象池分配一个FS_Handle
    zce::aio::FS_Handle* aio_hdl = (FS_Handle*)
        worker->alloc_handle(AIO_TYPE::FS_READFILE);
    aio_hdl->path_ = path;
    aio_hdl->read_bufs_ = read_bufs;
    aio_hdl->bufs_count_ = nbufs;
    aio_hdl->offset_ = offset;

    return await_aiofs(worker, aio_hdl);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.

本章总结

本章讲解了一下 co_wait awaiter​;也讲了一下如何设计一个异步的awaiter。

参考文档

  • 初探 C++20 协程
  • 再探 C++20 协程,这两篇文字都不错。
  • Coroutines (C++20)
  • 协程(coroutine)简介
  • The Coroutine in C++ 20 协程之诺
  • C++ Coroutines: Understanding operator co_await