Boost C++ 函式庫

...世界上最受推崇且設計精良的 C++ 函式庫專案之一。 Herb SutterAndrei Alexandrescu,《C++ 程式碼規範

概述

以下是 cobalt 中相關功能的列表

表 1. 協程類型

promise

一個及早求值並返回單一結果的協程 - 將其視為預設值

generator

一個可以產生多個值的及早求值協程。

task

promise 的惰性版本,可以生成到其他執行器上。

detached

類似於 promise 的協程,但沒有控制代碼

表 2. 同步函式

race

一個以偽隨機方式等待一組協程中其中一個準備就緒的函式,以避免資源匱乏。

join

一個等待一組協程的函式,並將所有協程作為值返回,如果任何可等待對象拋出異常,則拋出異常。

gather

一個等待一組協程的函式,並將所有協程作為 result 返回,單獨捕獲所有異常。

left_race

一個從左到右評估的確定性 race

表 3. 工具程式

channel

一個線程本地的工具程式,用於在協程之間發送值。

with

一個非同步 RAII 輔助工具,允許在發生異常時進行非同步銷毀

表 4. 閱讀指南

協程入門

C++ 協程的簡短介紹

如果您以前從未使用過協程,請閱讀

導覽

功能和概念的簡要高階視圖

如果您熟悉 asio 和協程,並且想知道這個函式庫提供了什麼,請閱讀

教學

低階用法視圖

如果您想快速開始編碼,請閱讀

參考

API 參考

在編碼時查找詳細資訊

深入探討

一些實現細節

如果您還不夠困惑,請閱讀

動機

許多程式語言,例如 node.js 和 python,都提供了易於使用的單線程併發框架。雖然比同步程式碼更複雜,但單線程非同步避免了多線程的許多陷阱和開銷。

也就是說,一個協程可以工作,而其他協程則等待事件(例如來自伺服器的響應)。這允許編寫在**單線程**上**同時執行多項操作**的應用程式。

這個函式庫旨在為 C++ 提供此功能:**簡單的單線程非同步**,類似於 node.js 和 python 中的 asyncio,可與現有函式庫(例如 boost.beastboost.mysqlboost.redis)一起使用。它基於 boost.asio

它採用了其他語言中的一系列概念,並基於 C++20 協程提供它們。

asio::awaitableasio::experimental::coro 不同,cobalt 協程是開放的。也就是說,一個 asio::awaitable 只能等待其他 asio::awaitable 並被其等待,並且不提供協程特定的同步機制。

另一方面,cobalt 提供了一個協程特定的 channel 和不同的等待類型(racegather 等),這些類型經過最佳化,可與協程和可等待對象一起使用。

協程入門

非同步程式設計

非同步程式設計通常是指允許在後台運行任務的程式設計風格,而其他工作則在執行。

想像一下,如果您有一個 get-request 函式,它執行完整的 http 請求,包括連接和 ssl 握手等。

std::string http_get(std:string_view url);

int main(int argc, char * argv[])
{
    auto res = http_get("https://boost.dev.org.tw");
    printf("%s", res.c_str());
    return 0;
}

上面的程式碼是傳統的同步程式設計。如果我們想要並行執行兩個請求,我們需要創建另一個線程來使用同步程式碼運行另一個線程。

std::string http_get(std:string_view url);

int main(int argc, char * argv[])
{
    std::string other_res;

    std::thread thr{[&]{ other_res = http_get("https://cppalliance.org"); }};
    auto res = http_get("https://boost.dev.org.tw");
    thr.join();

    printf("%s", res.c_str());
    printf("%s", other_res.c_str());
    return 0;
}

這是可行的,但我們的程式將花費大部分時間等待輸入。作業系統提供允許非同步執行 IO 的 API,而諸如 boost.asio 之類的函式庫則提供了管理非同步操作的可攜式方法。Asio 本身並沒有規定處理完成的方式。這個函式庫 (boost.cobalt) 提供了一種通過協程/可等待對象管理所有這些的方法。

cobalt::promise<std::string> http_cobalt_get(std:string_view url);

cobalt::main co_main(int argc, char * argv[])
{
    auto [res, other_res] =
            cobalt::join(
                http_cobalt_get(("https://boost.dev.org.tw"),
                http_cobalt_get(("https://cppalliance.org")
            );

    printf("%s", res.c_str());
    printf("%s", other_res.c_str());
    return 0;
}

在上面的程式碼中,執行請求的非同步函式利用了作業系統 API,因此實際的 IO 不會阻塞。這意味著,當我們等待兩個函式完成時,操作是交錯的且非阻塞的。同時,cobalt 提供了協程原語,使我們免於回調地獄。

協程

協程是可恢復的函式。可恢復意味著函式可以暫停,也就是將控制權多次交還給呼叫者。

一般的函式使用 return 函式將控制權交還給呼叫者,同時也返回數值。

另一方面,協程可以將控制權交還給呼叫者,並多次恢復執行。

協程具有三個類似於 co_return 的控制關鍵字(其中只有 co_return 是必須支援的)。

  • co_return

  • co_yield

  • co_await

co_return

這類似於 return,但將函式標記為協程。

co_await

co_await 運算式會暫停等待一個可等待物件 (Awaitable),也就是停止執行,直到該 awaitable 恢復它的執行。

例如:

cobalt::promise<void> delay(std::chrono::milliseconds);

cobalt::task<void> example()
{
  co_await delay(std::chrono::milliseconds(50));
}

co_await 運算式可以產生一個值,取決於它正在等待的物件。

cobalt::promise<std::string> read_some();

cobalt::task<void> example()
{
  std::string res = co_await read_some();
}
cobalt 中,大多數協程原語也是可等待物件 (Awaitables)

co_yield

co_yield 運算式類似於 co_await,但它會將控制權交還給呼叫者,並帶有一個值。

例如:

cobalt::generator<int> iota(int max)
{
  int i = 0;
  while (i < max)
    co_yield i++;

  co_return i;
}

co_yield 運算式也可以產生一個值,允許使用 yielding 協程的使用者將值推入其中。

cobalt::generator<int> iota()
{
  int i = 0;
  bool more = false;
  do
  {
    more = co_yield i++;
  }
  while(more);
  co_return -1;
}
無堆疊

C++ 協程是無堆疊的,這意味著它們只分配自己的函式框架。

詳情請見無堆疊 (Stackless)

可等待物件 (Awaitables)

可等待物件 (Awaitables) 是可以在 co_await 運算式中使用的類型。

struct awaitable_prototype
{
    bool await_ready();

    template<typename T>
    see_below await_suspend(std::coroutine_handle<T>);

    return_type  await_resume();
};
如果可以使用 operator co_await 呼叫,則類型將被隱式轉換為可等待物件。本文檔將使用 awaitable 來包含這些類型,並使用「actual_awaitable」來指符合上述原型的類型。
awaitables

co_await 運算式中,等待的協程會先呼叫 await_ready 來檢查協程是否需要暫停。如果已準備好,它會直接跳到 await_resume 以取得值,因為不需要暫停。否則,它會暫停自身並使用指向自身 promise 的 std::coroutine_handle 呼叫 await_suspend

std::coroutine_handle<void> 可用於類型擦除。

return_typeco_await 運算式的結果類型,例如 int

int i = co_await awaitable_with_int_result();

await_suspend 的返回類型可以是三種:

  • void

  • bool

  • std::coroutine_handle<U>

如果是 void,等待的協程將保持暫停狀態。如果是 bool,則會檢查該值,如果為 false,等待的協程將立即恢復。

如果返回 std::coroutine_handle,則會恢復該協程。後者允許 await_suspend 返回傳入的 handle,實際上與返回 false 相同。

如果等待的協程立即重新恢復,也就是在呼叫 await_resume 之後,則在此程式庫中稱為「立即完成」。這與非暫停可等待物件(即從 await_ready 返回 true 的物件)不同。

事件迴圈

由於 cobalt 中的協程可以 co_await 事件,因此它們需要在事件迴圈上運行。也就是說,另一段程式碼負責追蹤未完成的事件,並恢復正在等待這些事件的協程。這種模式非常常見,node.js 或 Python 的 asyncio 也以類似的方式使用它。

cobalt 使用asio::io_context 作為其預設事件迴圈。也就是說,threadmain 類和 run 函式在內部使用它。

您可以使用任何可以產生asio::any_io_executor 的事件迴圈與程式庫一起使用。最簡單的方法是使用 spawn

事件迴圈可透過執行器(依據 asio 術語)存取,並且可以使用 set_executor 手動設定。

導覽

進入 cobalt 環境

為了使用 可等待物件 (awaitables),我們需要能夠 co_await 它們,也就是說,必須在一個協程 (coroutine) 內。

我們有四種方法可以達成此目的

cobalt/main.hpp

int main 替換為協程

cobalt::main co_main(int argc, char* argv[])
{
    // co_await things here
    co_return 0;
}
cobalt/thread.hpp

為非同步環境建立一個執行緒

cobalt::thread my_thread()
{
    // co_await things here
    co_return;
}

int main(int argc, char ** argv[])
{
    auto t = my_thread();
    t.join();
    return 0;
}
cobalt/task.hpp

建立一個任務並執行或生成它

cobalt::task<void> my_thread()
{
   // co_await things here
   co_return;
}

int main(int argc, char ** argv[])
{
    cobalt::run(my_task()); // sync
    asio::io_context ctx;
    cobalt::spawn(ctx, my_task(), asio::detached);
    ctx.run();
    return 0;
}

Promise

Promise 是推薦的預設協程類型。它們是積極的 (eager),因此易於用於臨時的並行處理。

cobalt::promise<int> my_promise()
{
   co_await do_the_thing();
   co_return 0;
}

cobalt::main co_main(int argc, char * argv[])
{
    // start the promise here
    auto p = my_promise();
    // do something else here
    co_await do_the_other_thing();
    // wait for the promise to complete
    auto res = co_await p;

    co_return res;
}

Task

Task 是惰性的 (lazy),這表示它們在被等待或生成之前不會執行任何動作。

cobalt::task<int> my_task()
{
   co_await do_the_thing();
   co_return 0;
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the task here
    auto t = my_task();
    // do something else here first
    co_await do_the_other_thing();
    // start and wait for the task to complete
    auto res = co_await t;
    co_return res;
}

產生器 (Generator)

Generator 是 cobalt 中唯一可以 co_yield 值的類型。

Generator 預設是積極的。與 std::generator 不同,cobalt::generator 可以 co_await,因此是非同步的。

cobalt::generator<int> my_generator()
{
   for (int i = 0; i < 10; i++)
    co_yield i;
   co_return 10;
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator();
    while (g)
        printf("Generator %d\n", co_await g);
    co_return 0;
}

可以將值推送到 generator 中,這些值將會從 co_yield 返回。

cobalt::generator<double, int> my_eager_push_generator(int value)
{
   while (value != 0)
       value = co_yield value * 0.1;
   co_return std::numeric_limits<double>::quiet_NaN();
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator(5);

    assert(0.5 == co_await g(4)); // result of 5
    assert(0.4 == co_await g(3)); // result of 4
    assert(0.3 == co_await g(2)); // result of 3
    assert(0.2 == co_await g(1)); // result of 2
    assert(0.1 == co_await g(0)); // result of 1

    // we let the coroutine go out of scope while suspended
    // no need for another co_await of `g`

    co_return 0;
}

也可以使用 this_coro::initial 使協程變為惰性的。

cobalt::generator<double, int> my_eager_push_generator()
{
    auto value = co_await this_coro::initial;
    while (value != 0)
        value = co_yield value * 0.1;
    co_return std::numeric_limits<double>::quiet_NaN();
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator(); // lazy, so the generator waits for the first pushed value
    assert(0.5 == co_await g(5)); // result of 5
    assert(0.4 == co_await g(4)); // result of 4
    assert(0.3 == co_await g(3)); // result of 3
    assert(0.2 == co_await g(2)); // result of 2
    assert(0.1 == co_await g(1)); // result of 1

    // we let the coroutine go out of scope while suspended
    // no need for another co_await of `g`

    co_return 0;
}

join

如果多個 可等待物件 並行工作,則可以使用 join 同時等待它們。

cobalt::promise<int> some_work();
cobalt::promise<double> more_work();

cobalt::main co_main(int argc, char * argv[])
{
    std::tuple<int, double> res = cobalt::join(some_work(), more_work());
    co_return 0;
}

race

如果多個 可等待物件 並行工作,但我們希望在其中任何一個完成時收到通知,我們應該使用 race

cobalt::generator<int> some_data_source();
cobalt::generator<double> another_data_source();

cobalt::main co_main(int argc, char * argv[])
{
    auto g1 = some_data_source();
    auto g2 = another_data_source();

    int res1    = co_await g1;
    double res2 = co_await g2;

    printf("Result: %f", res1 * res2);

    while (g1 && g2)
    {
        switch(variant2::variant<int, double> nx = co_await cobalt::race(g1, g2))
        {
            case 0:
                res1 = variant2::get<0>(nx);
                break;
            case 1:
                res2 = variant2::get<1>(nx);
                break;
        }
        printf("New result: %f", res1 * res2);
    }

    co_return 0;
}
在此情況下,race 不會造成任何資料遺失。

教學

延遲

讓我們從最簡單的例子開始:一個簡單的延遲。

example/delay.cpp
cobalt::main co_main(int argc, char * argv[]) (1)
{
  asio::steady_timer tim{co_await asio::this_coro::executor, (2)
                         std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
  co_await tim.async_wait(cobalt::use_op); (4)
  co_return 0; (5)
}
1 co_main 函式在使用時會定義一個隱式 main 函式,並且是設定用於執行非同步程式碼的環境的最簡單方法。
2 從目前的協程 promise 中取得執行器。
3 使用參數設定逾時。
4 使用 cobalt::use_op 執行等待。
5 返回一個從隱式 main 函式返回的值。

在此範例中,我們使用 cobalt/main.hpp 標頭檔,如果定義了如上的 co_main,它會提供一個主要的協程。這有一些優點

  • 正確設定環境 (executormemory)

  • 向 asio 發出訊號,表示上下文是單執行緒的

  • 具有 SIGINTSIGTERMasio::signal_set 會自動連接到取消動作(例如,Ctrl+C 會導致取消)

然後,這個協程在其 promise(promise 是 C++ 中協程狀態的名稱,不要與 cobalt/promise.hpp 混淆)中有一個執行器,我們可以透過 this_coro 命名空間中的虛擬 可等待物件 取得它。

然後,我們可以建構一個計時器,並使用 use_op 啟動 async_waitcobalt 提供了多種與 asio 互動的 co_await 方法,其中 use_op 是最簡單的。

Echo 伺服器

我們將在各處使用 use_opasio 完成記號),因此我們使用 預設完成記號,這樣我們就可以省略最後的參數。

example/echo_server.cpp 宣告
namespace cobalt = boost::cobalt;
using boost::asio::ip::tcp;
using boost::asio::detached;
using tcp_acceptor = cobalt::use_op_t::as_default_on_t<tcp::acceptor>;
using tcp_socket   = cobalt::use_op_t::as_default_on_t<tcp::socket>;
namespace this_coro = boost::cobalt::this_coro;

我們將 echo 函式寫成 promise 協程。它是一個積極的協程,建議作為預設值;如果需要惰性協程,可以使用 task

example/echo_server.cpp echo 函式
cobalt::promise<void> echo(tcp_socket socket)
{
  try (1)
  {
    char data[4096];
    while (socket.is_open()) (2)
    {
      std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data)); (3)
      co_await async_write(socket, boost::asio::buffer(data, n)); (4)
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo: exception: %s\n", e.what());
  }
}
1 使用 use_op 完成記號時,I/O 錯誤會轉換為 C++ 例外狀況。此外,如果協程被取消(例如,因為使用者按下 Ctrl-C),也會引發例外狀況。在這些情況下,我們會印出錯誤並結束迴圈。
2 我們會持續執行迴圈,直到收到取消指令(例外)或使用者關閉連線。
3 讀取所有可用的資料。
4 寫入所有已讀取的位元組。

請注意,promise 是積極的。呼叫 echo 會立即執行程式碼,直到 async_read_some,然後將控制權返回給呼叫者。

接下來,我們也需要一個 acceptor 函式。這裡,我們使用產生器來管理 acceptor 狀態。這是一個可以被多次 co_await 的協程,直到遇到 co_return 表達式。

example/echo_server.cpp listen 函式
cobalt::generator<tcp_socket> listen()
{
  tcp_acceptor acceptor({co_await cobalt::this_coro::executor}, {tcp::v4(), 55555});
  for (;;) (1)
  {
    tcp_socket sock = co_await acceptor.async_accept(); (2)
    co_yield std::move(sock); (3)
  }
  co_return tcp_socket{acceptor.get_executor()}; (4)
}
1 取消也會導致在此處從 co_await 拋出例外。
2 非同步接受連線
3 將其產出給等待的協程
4 為了符合 C++ 規範,使用 co_return 返回一個值。

有了這兩個函式,我們現在可以編寫伺服器了

example/echo_server.cpp run_server 函式
cobalt::promise<void> run_server(cobalt::wait_group & workers)
{
  auto l = listen(); (1)
  while (true)
  {
    if (workers.size() == 10u)
      co_await workers.wait_one();  (2)
    else
      workers.push_back(echo(co_await l)); (3)
  }
}
1 建構 listener 產生器協程。當物件被銷毀時,協程將被取消,執行所有必要的清理工作。
2 當我們有超過 10 個 worker 時,我們會等待其中一個完成
3 接受新的連線並啟動它。

wait_group 用於管理正在執行的 echo 函式。這個類別將取消並等待正在執行的 echo 協程。

我們不需要對 listener 做同樣的事情,因為當 l 被銷毀時,它會自行停止。產生器的解構函式會取消它。

由於 promise 是積極的,只需呼叫它就足以啟動。然後我們將這些 promise 放入 wait_group 中,這將允許我們在範圍結束時關閉所有 worker。

example/echo_server.cpp co_main 函式
cobalt::main co_main(int argc, char ** argv)
{
  co_await cobalt::with(cobalt::wait_group(), &run_server); (1)
  co_return 0u;
}
1 使用非同步作用域執行 run_server

上面顯示的 with 函式,將使用資源(例如 wait_group)來執行函式。在作用域結束時,with 將呼叫並 co_await 一個非同步的 teardown 函式。這將導致所有連線在 co_main 結束之前被正確關閉。

價格行情

為了示範 channels 和其他工具,我們需要一定的複雜度。為此,我們的專案是一個價格報價器,它連接到 https://blockchain.info。使用者可以連接到 localhost 來查詢指定的貨幣對,如下所示

wscat -c localhost:8080/btc/usd

首先,我們進行與 echo-server 相同的宣告。

example/ticker.cpp 宣告
using executor_type = cobalt::use_op_t::executor_with_default<cobalt::executor>;
using socket_type   = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;

下一步是編寫一個函式來連接 ssl-stream,以連接到上游

example/ticker.cpp connect
cobalt::promise<asio::ssl::stream<socket_type>> connect(
        std::string host, boost::asio::ssl::context & ctx)
{
    asio::ip::tcp::resolver res{cobalt::this_thread::get_executor()};
    auto ep = co_await res.async_resolve(host, "https", cobalt::use_op); (1)

    asio::ssl::stream<socket_type> sock{cobalt::this_thread::get_executor(), ctx};
    co_await sock.next_layer().async_connect(*ep.begin()); (2)
    co_await sock.async_handshake(asio::ssl::stream_base::client); (3)

    co_return sock; (4)
}
1 查詢主機
2 連接到端點
3 執行 ssl 握手
4 將 socket 返回給呼叫者

接下來,我們需要一個函式在現有的 ssl-stream 上執行 websocket 升級。

example/ticker.cpp connect_to_blockchain_info
cobalt::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
 ws.set_option(beast::websocket::stream_base::decorator(
     [](beast::websocket::request_type& req)
     {
       req.set(http::field::user_agent,
               std::string(BOOST_BEAST_VERSION_STRING) + " cobalt-ticker");
       req.set(http::field::origin,
               "https://exchange.blockchain.com"); (1)
     }));

 co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 blockchain.info 要求設定此標頭。
2 執行 websocket 握手。

一旦 websocket 連接,我們想要持續接收 json 訊息,產生器是一個不錯的選擇。

example/ticker.cpp json_read
cobalt::generator<json::object> json_reader(websocket_type & ws)
try
{
    beast::flat_buffer buf;
    while (ws.is_open()) (1)
    {
        auto sz = co_await ws.async_read(buf); (2)
        json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
        auto obj = json::parse(data);
        co_yield obj.as_object(); (3)
        buf.consume(sz);
    }
    co_return {};
}
catch (std::exception & e)
{
  std::cerr << "Error reading: " << e.what() << std::endl;
  throw;
}
1 只要 socket 開啟就持續執行
2 從 websocket 讀取一個 frame
3 解析並以物件形式 co_yield 它。

接著需要將其連接到訂閱者,我們將利用通道來傳遞原始 JSON 資料。為了簡化生命週期管理,訂閱者將持有 shared_ptr,而生產者則持有 weak_ptr

範例/ticker.cpp 訂閱類型
using subscription = std::pair<std::string, std::weak_ptr<cobalt::channel<json::object>>>;
using subscription_channel = std::weak_ptr<cobalt::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;

運行區塊鏈連接器的主要函式,操作兩個輸入:來自 WebSocket 的數據和一個處理新訂閱的通道。

範例/ticker.cpp 運行 blockchain_info
cobalt::promise<void> run_blockchain_info(cobalt::channel<subscription> & subc)
try
{
    asio::ssl::context ctx{asio::ssl::context_base::tls_client};
    websocket_type ws{co_await connect("blockchain.info", ctx)};
    co_await connect_to_blockchain_info(ws); (1)

    subscription_map subs;
    std::list<std::string> unconfirmed;

    auto rd = json_reader(ws); (2)
    while (ws.is_open()) (3)
    {
      switch (auto msg = co_await cobalt::race(rd, subc.read()); msg.index()) (4)
      {
        case 0: (5)
          if (auto ms = get<0>(msg);
              ms.at("event") == "rejected") // invalid sub, cancel however subbed
            co_await handle_rejections(unconfirmed, subs, ms);
          else
            co_await handle_update(unconfirmed, subs, ms, ws);
        break;
        case 1: // (6)
            co_await handle_new_subscription(
                unconfirmed, subs,
                std::move(get<1>(msg)), ws);
        break;
      }
    }

    for (auto & [k ,c] : subs)
    {
        if (auto ptr = c.lock())
            ptr->close();
    }
}
catch(std::exception & e)
{
  std::cerr << "Exception: " << e.what() << std::endl;
  throw;
}
1 初始化連線
2 實例化 json_reader
3 只要 WebSocket 開啟就持續運行
4 選擇,也就是等待新的 JSON 訊息或訂閱
5 如果是 JSON,則處理更新或拒絕
6 處理新的訂閱訊息

handle_* 函式的內容對於 cobalt 的功能來說並不重要,因此在本教學中將其省略。

handle_new_subscription 函式會向 blockchain.info 發送訊息,後者會發送確認或拒絕訊息回來。handle_rejectionhandle_update 將取得 JSON 值並將其轉發到訂閱通道。

在消費者端,我們的伺服器只會將數據轉發給客戶端。如果客戶端輸入數據,我們將立即關閉 WebSocket。我們使用 as_tuple 來忽略潛在的錯誤。

範例/ticker.cpp 讀取並關閉
cobalt::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
    system::error_code ec;
    co_await st.async_read(buf, asio::as_tuple(cobalt::use_op));
    co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(cobalt::use_op));
    st.next_layer().close(ec);
}

接下來,我們運行使用者發送的連線階段

範例/ticker.cpp run_session
cobalt::promise<void> run_session(beast::websocket::stream<socket_type> st,
                                 cobalt::channel<subscription> & subc)
try
{
    http::request<http::empty_body> req;
    beast::flat_buffer buf;
    co_await http::async_read(st.next_layer(), buf, req); (1)
    // check the target
    auto r = urls::parse_uri_reference(req.target());
    if (r.has_error() || (r->segments().size() != 2u)) (2)
    {
        http::response<http::string_body> res{http::status::bad_request, 11};
        res.body() = r.has_error() ? r.error().message() :
                    "url needs two segments, e.g. /btc/usd";
        co_await http::async_write(st.next_layer(), res);
        st.next_layer().close();
        co_return ;
    }

    co_await st.async_accept(req); (3)

    auto sym = std::string(r->segments().front()) + "-" +
               std::string(r->segments().back());
    boost::algorithm::to_upper(sym);
    // close when data gets sent
    auto p = read_and_close(st, std::move(buf)); (4)

    auto ptr = std::make_shared<cobalt::channel<json::object>>(1u); (5)
    co_await subc.write(subscription{sym, ptr}); (6)

    while (ptr->is_open() && st.is_open()) (7)
    {
      auto bb = json::serialize(co_await ptr->read());
      co_await st.async_write(asio::buffer(bb));
    }

    co_await st.async_close(beast::websocket::close_code::going_away,
                            asio::as_tuple(cobalt::use_op)); (8)
    st.next_layer().close();
    co_await p; (9)

}
catch(std::exception & e)
{
    std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 讀取 HTTP 請求,因為我們需要路徑
2 檢查路徑,例如 /btc/usd
3 接受 WebSocket
4 開始讀取;如果消費者發送訊息,則關閉
5 建立接收更新的通道
6 run_blockchain_info 發送訂閱請求
7 只要通道和 WebSocket 開啟,我們就會轉發數據。
8 關閉通訊端並忽略錯誤
9 由於 WebSocket 現在肯定已關閉,請等待 read_and_close 關閉。

撰寫完 run_sessionrun_blockchain_info 後,我們現在可以繼續進行 main 函式

範例/ticker.cpp main
cobalt::main co_main(int argc, char * argv[])
{
    acceptor_type acc{co_await cobalt::this_coro::executor,
                      asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
    std::cout << "Listening on localhost:8080" << std::endl;

    constexpr int limit = 10; // allow 10 ongoing sessions
    cobalt::channel<subscription> sub_manager; (1)

    co_await join( (2)
      run_blockchain_info(sub_manager),
      cobalt::with( (3)
        cobalt::wait_group(
            asio::cancellation_type::all,
            asio::cancellation_type::all),
        [&](cobalt::wait_group & sessions) -> cobalt::promise<void>
        {
          while (!co_await cobalt::this_coro::cancelled) (4)
          {
            if (sessions.size() >= limit) (5)
              co_await sessions.wait_one();

            auto conn = co_await acc.async_accept(); (6)
            sessions.push_back( (7)
                run_session(
                    beast::websocket::stream<socket_type>{std::move(conn)},
                    sub_manager));
          }
        })
      );

    co_return 0;
}
1 建立用於管理訂閱的通道
2 使用 join 並行運行兩個任務。
3 使用 cobalt 作用域來提供 wait_group
4 運行直到取消。
5 當我們達到 limit 時,我們會等待一個任務完成。
6 等待新的連線。
7 將連線階段插入 wait_group

Main 函式使用 join,因為一個任務失敗應該取消另一個任務。

延遲操作

到目前為止,我們使用了 use_op,它基於 asio 的完成權杖機制來使用隱式操作。

然而,我們可以實現自己的操作,這些操作也可以利用 await_ready 優化。與立即完成不同,當 await_ready 返回 true 時,協程將永遠不會暫停。

為了利用這個協程功能,cobalt 提供了一種簡單的方法來建立可跳過的操作

範例/delay_op.cpp
struct wait_op final : cobalt::op<system::error_code> (1)
{
  asio::steady_timer & tim;
  wait_op(asio::steady_timer & tim) : tim(tim) {}
  void ready(cobalt::handler<system::error_code> h ) override (2)
  {
    if (tim.expiry() < std::chrono::steady_clock::now())
      h(system::error_code{});
  }
  void initiate(cobalt::completion_handler<system::error_code> complete) override (3)
  {
    tim.async_wait(std::move(complete));
  }
};


cobalt::main co_main(int argc, char * argv[])
{
  asio::steady_timer tim{co_await asio::this_coro::executor,
                         std::chrono::milliseconds(std::stoi(argv[1]))};
  co_await wait_op(tim); (4)
  co_return 0; //
}
1 宣告操作。我們繼承 op 使其可等待。
2 這裡實作了預先暫停檢查
3 如果需要,執行等待
4 像使用任何其他可等待對象一樣使用操作

透過這種方式,我們可以最大限度地減少協程暫停的次數。

雖然以上是與 asio 一起使用,但您也可以將這些處理程式與任何其他基於回呼的程式碼一起使用。

具有推播值的產生器

帶有推送值的協程並不常見,但可以顯著簡化某些問題。

由於在前一個例子中我們已經有一個 json_reader,以下是如何編寫一個用於推送值的 json_writer。

使用生成器的優點是內部狀態管理。

cobalt::generator<system::error_code, json::object>
    json_writer(websocket_type & ws)
try
{
    char buffer[4096];
    json::serializer ser;

    while (ws.is_open()) (1)
    {
        auto val = co_yield system::error_code{}; (2)

        while (!ser.done())
        {
            auto sv = ser.read(buffer);
            co_await ws.cobalt_write({sv.data(), sv.size()}); (3)
        }

    }
    co_return {};
}
catch (system::system_error& e)
{
    co_return e.code();
}
catch (std::exception & e)
{
    std::cerr << "Error reading: " << e.what() << std::endl;
    throw;
}
1 只要 socket 開啟就持續執行
2 co_yield 傳回目前的錯誤並取得新的值。
3 將框架寫入 websocket

現在我們可以像這樣使用生成器

auto g = json_writer(my_ws);

extern std::vector<json::value> to_write;

for (auto && tw : std::move(to_write))
{
    if (auto ec = co_await g(std::move(tw)))
        return ec; // yield error
}

進階範例

儲存庫中提供了更多僅為程式碼的範例。所有範例如下所示。

表 5. 所有範例

example/http.cpp

執行單個 http get 請求的 http 用戶端。

example/outcome.cpp

使用 boost.outcome 協程類型。

example/python.cpp & example/python.py

使用 nanobind 將 cobalt 與 python 整合。它使用 python 的 asyncio 作為執行器,並允許 C++ 對 python 函式進行 co_await,反之亦然。

example/signals.cpp

boost.signals2 採用到可等待類型中(單執行緒)。

example/spsc.cpp

建立基於 boost.lockfree 且可等待的 spsc_queue(多執行緒)。

example/thread.cpp

使用具有 asio 的 `concurrent_channel` 的工作執行緒。

example/thread_pool.cpp

使用 asio::thread_pool 並將任務產生到它們上。

example/delay.cpp

延遲 章節使用的範例

範例/delay_op.cpp

延遲操作 章節使用的範例

example/echo_server.cpp

迴響伺服器 章節使用的範例

example/ticker.cpp

價格行情 章節使用的範例

example/channel.cpp

通道 參考使用的範例

設計

概念

此函式庫有兩個基本概念

**可等待物件**( awaitable) 是一個可以在協程內使用 co_await 的表達式,例如:

co_await delay(50ms);

然而,協程 promise 可以定義一個 await_transform,也就是說,實際上可以用 co_await 表達式使用的內容取決於協程。

因此,我們應該重新定義可等待物件的定義:**可等待物件**是一種可以在協程內被 co_await 的類型,其 promise 並未定義 await_transform

一個**偽關鍵字**(pseudo-keyword) 是一種可以在協程中使用的類型,由於其 promise 的 await_transform,它為協程添加了特殊功能。

this_coro 命名空間中的所有動詞都是這樣的偽關鍵字。

auto exec = co_await this_coro::executor;
此函式庫公開了一組 promise 的 enable_* 基礎類別,以便輕鬆建立自訂協程。這包括 enable_awaitables,它提供了一個只轉發可等待物件await_transform

本文檔中提到的協程是指非同步協程,也就是說,像 std::generator 這樣的同步協程不被視為協程。

除了 main 之外,所有協程實際上也是可等待物件

執行器 (Executors)

由於所有操作都是非同步的,因此函式庫需要使用事件迴圈。因為所有操作都是單執行緒的,可以假設每個執行緒只有一個執行器,這足以應付 97% 的使用情況。因此,有一個 thread_local 執行器會被協程物件用作預設值(儘管在協程 promise 中以拷貝方式儲存)。

同樣地,程式庫使用一種 executor 類型,預設為 asio::any_io_executor

如果您編寫自己的協程,它應該持有一個 executor 的副本,並具備一個 get_executor 函式,以 const 參考方式返回它。

使用 Strands

雖然可以使用 strands,但它們與 thread_local executor 不相容。這是因為它們可能會切換執行緒,因此它們不能是 thread_local 的。

如果您希望使用 strands(例如透過 spawn),則任何 promisegeneratorchannel 的 executor 必須手動指定。

對於 channel 來說,這是一個建構函式參數,但對於其他協程類型,需要使用 asio::executor_arg。這可以透過在協程的參數列表中直接放置 asio::executor_arg_t(在某處),後面緊跟著要使用的 executor 來完成,例如:

cobalt::promise<void> example_with_executor(int some_arg, asio::executor_arg_t, cobalt::executor);

這樣,協程-promise 可以從第三個參數取得 executor,而不是預設使用 thread_local 的 executor。

當然,可以將參數設為預設值,以便在有時使用 thread_local executor 時減少不便。

cobalt::promise<void> example_with_executor(int some_arg,
                                           asio::executor_arg_t = asio::executor_arg,
                                           cobalt::executor = cobalt::this_thread::get_executor());

如果在 strand 上省略這個參數,會丟擲類型為 asio::bad_allocator 的例外,或者更糟的情況是,使用了錯誤的 executor。

多型記憶體資源

類似地,程式庫使用 thread_local 的 pmr::memory_resource 來配置協程框架,並在非同步操作中用作分配器。

原因是,使用者可能想要自訂配置,例如避免鎖定、限制記憶體使用或監控使用情況。pmr 允許我們在不引入不必要的模板參數的情況下實現這一點,也就是說,沒有 promise<T, Allocator> 的複雜性。然而,使用 pmr 會引入一些最小的開銷,因此使用者可以選擇透過定義 BOOST_COBALT_NO_PMR 來禁用它。

op 使用針對 asio 的分配器使用情況進行最佳化的內部資源,而 gatherracejoin 使用單調資源來最小化分配。這兩者仍然可以在定義 BOOST_COBALT_NO_PMR 的情況下運作,在這種情況下,它們將使用 new/delete 作為上游配置。

在啟用 PMR 的情況下,mainthread 每個執行緒使用單個 pmr::unsynchronized_pool_resource

如果您編寫自己的協程,它應該具備一個 get_allocator 函式,返回一個 pmr::polymorphic_allocator<void>

取消

cobalt 使用基於 asio::cancellation_signal 的隱式取消。這主要以隱式方式使用(例如與 race 一起使用),因此在範例中很少有明確的使用。

如果您編寫自訂協程,它必須從 get_cancellation_slot 函式返回一個 cancellation_slot,以便能夠取消其他操作。
如果您編寫自訂 awaitable,它可以在 await_suspend 中使用該函式來接收取消訊號。

Promise

主要的協程類型是 promise,它是 eager 的。預設使用這個的原因是,編譯器可以最佳化掉那些沒有暫停的 promise,就像這樣:

cobalt::promise<void> noop()
{
  co_return;
}

理論上,等待上述操作是一個空操作,但實際上,截至 2023 年,編譯器還沒有做到這一點。

Race

最重要的同步機制是 race 函式。

它以偽隨機順序等待多個 awaitable 物件,並在捨棄其餘物件之前返回第一個完成物件的結果。

也就是說,它以偽隨機順序啟動 co_await,並在發現一個 awaitable 物件已準備好或立即完成後停止。

cobalt::generator<int> gen1();
cobalt::generator<double> gen2();

cobalt::promise<void> p()
{
  auto g1 = gen1();
  auto g2 = gen2();
  while (!co_await cobalt::this_coro::cancelled)
  {
    switch(auto v = co_await race(g1, g2); v.index())
    {
    case 0:
      printf("Got int %d\n", get<0>(v));
      break;
    case 1:
      printf("Got double %f\n", get<1>(v));
      break;
    }
  }
}

然而,race 一旦開始 co_await,就必須在內部等待所有可等待對象完成。因此,一旦第一個可等待對象完成,它就會嘗試中斷其餘的,如果失敗則取消它們。

race 是觸發取消的首選方法,例如:

cobalt::promise<void> timeout();
cobalt::promise<void> work();

race(timeout(), work());

interrupt_await

然而,如果它單純地取消,就會遺失數據。因此,引入了 interrupt_await 的概念,它告訴可等待對象(如果它支持的話)立即恢復等待器並返回或拋出一個被忽略的值。

可中斷等待對象的示例
struct awaitable
{
   bool await_ready() const;

   template<typename Promise>
   std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);

   T await_resume();

   void interrupt_await() &;
};

如果 interrupt_await 沒有導致立即恢復(h),race 將發送取消信號。

race 使用正確的引用限定符來應用這些

auto g = gen1();
race(g, gen2());

如果有的話,以上將會為 g1 調用 interrupt_await() & 函數,為 g2 調用 interrupt_await() && 函數。

一般來說,cobalt 中的協程支持左值中斷,即 interrupt_await() &通道操作是非限定的,即在兩種情況下都能工作。

joingather 將會轉發中斷,即如果 gen2() 先完成,這只會中斷 g1g2

關聯器 (Associators)

cobalt 使用 asio 的 associator 概念,但簡化了它。也就是說,它有三個作為等待承諾成員函數的關聯器。

  • const executor_type & get_executor() (始終為 executor,必須通過 const ref 返回)

  • allocator_type get_allocator() (始終為 pmr::polymorphic_allocator<void>

  • cancellation_slot_type get_cancellation_slot() (必須與 asio::cancellation_slot 具有相同的介面)

cobalt 使用概念來檢查這些是否存在於其 await_suspend 函數中。

這樣,自定義協程就可以支持取消、執行器和分配器。

在自定義的可等待對象中,您可以像這樣獲取它們

struct my_awaitable
{
    bool await_ready();
    template<typename T>
    void await_suspend(std::corutine_handle<P> h)
    {
        if constexpr (requires  (Promise p) {p.get_executor();})
            handle_executor(h.promise().get_executor();

        if constexpr (requires (Promise p) {p.get_cancellation_slot();})
            if ((cl = h.promise().get_cancellation_slot()).is_connected())
                cl.emplace<my_cancellation>();
    }

    void await_resume();
};

取消會在 co_await 表達式中連接(如果協程和可等待對象支持的話),包括像 race 這樣的同步機制。

執行緒

這個函式庫的設計是單線程的,因為這樣可以簡化恢復,從而更有效地處理像 race 這樣的同步。 race 需要鎖定每個競爭的可等待對象以避免數據丟失,這需要阻塞,並且隨著元素的增加而變得更糟。

除了任務(例如使用spawn)之外,您不能讓任何協程在與創建時不同的線程上恢復。

主要的技術原因是切換協程最有效的方法是從 await_suspend 返回新協程的句柄,如下所示

struct my_awaitable
{
    bool await_ready();
    std::coroutine_handle<T> await_suspend(std::coroutine_handle<U>);
    void await_resume();
};

在這種情況下,等待的協程將在調用 await_suspend 之前被掛起,並恢復返回的協程。如果我們需要通過執行器,這當然行不通。

這不僅適用於等待的協程,也適用於通道。此函式庫中的通道使用可等待對象的侵入式列表,並且可能從寫入操作的 await_suspend 返回讀取(以及因此掛起)協程的句柄。

參考

cobalt/main.hpp

開始使用 cobalt 應用程序的最簡單方法是使用具有以下簽名的 co_main 函數

cobalt::main co_main(int argc, char *argv[]);

聲明 co_main 將添加一個 main 函數,該函數執行在事件循環上運行協程所需的所有步驟。這使我們能夠編寫非常簡單的異步程序。

cobalt::main co_main(int argc, char *argv[])
{
  auto exec = co_await cobalt::this_coro::executor;             (1)
  asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
  co_await tim.async_wait(cobalt::use_op);                      (3)
  co_return 0;
}
1 獲取 main 運行的執行器
2 將其與 asio 對象一起使用
3 co_await 一個 cobalt 操作

主要的 promise 會建立一個 asio::signal_set 並用於取消。SIGINT 代表完全取消,而 SIGTERM 代表終止取消。

取消動作不會轉發到已分離的協程。使用者需要自行處理取消時的協程結束,否則程式無法正常終止。

執行器

它也會建立一個 asio::io_context 來執行,您可以透過 this_coro::executor 取得。它會被指派給 cobalt::this_thread::get_executor()

記憶體資源

它還會建立一個記憶體資源,將作為內部記憶體分配的預設值。它會被指派給 thread_localcobalt::this_thread::get_default_resource()

Promise

每個協程都有一個內部狀態,稱為 promise(不要與 cobalt::promise 混淆)。根據協程的屬性,可以 co_await 不同的東西,就像我們在上面的例子中使用的那樣。

它們是透過繼承實現的,並在不同的 promise 類型之間共享。

主要的 promise 具有以下屬性。

規格

  1. 宣告 co_main 會隱式宣告一個 main 函式。

  2. 只有在定義 co_main 時才會出現 main 函式。

  3. SIGINTSIGTERM 會導致內部任務取消。

cobalt/promise.hpp

Promise 是一個積極的協程,可以 co_awaitco_return 值。也就是說,它不能使用 co_yield

cobalt::promise<void> delay(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
}

cobalt::main co_main(int argc, char *argv[])
{
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}

Promise 預設是附加的。這意味著當 promise 控制代碼超出範圍時,會發送取消指令。

可以透過呼叫 detach 或使用前綴 + 運算子來分離 promise。這是使用分離的執行時期替代方案。分離的 promise 在銷毀時不會發送取消指令。

cobalt::promise<void> my_task();

cobalt::main co_main(int argc, char *argv[])
{
  +my_task(); (1)
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}
1 透過使用 +,任務會被分離。如果沒有它,編譯器會產生 nodiscard 警告。

執行器

執行器會從 thread_localget_executor 函式取得,除非在任何位置使用了 asio::executor_arg 後面接著執行器參數。

cobalt::promise<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

記憶體資源

記憶體資源會從 thread_localget_default_resource 函式取得,除非在任何位置使用了 std::allocator_arg 後面接著 polymorphic_allocator 參數。

cobalt::promise<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概要

template<typename Return>
struct [[nodiscard]] promise
{
    promise(promise &&lhs) noexcept;
    promise& operator=(promise && lhs) noexcept;

    // enable `co_await`. (1)
    auto operator co_await ();

    // Ignore the return value, i.e. detach it. (2)
    void operator +() &&;

    // Cancel the promise.
    void cancel(asio::cancellation_type ct = asio::cancellation_type::all);

    // Check if the result is ready
    bool ready() const;
    // Check if the promise can be awaited.
    explicit operator bool () const; (3)

    // Detach or attach
    bool attached() const;
    void detach();
    void attach();
    // Create an already completed promimse

    static promise

    // Get the return value. If !ready() this function has undefined behaviour.
    Return get();
};
1 支援中斷等待
2 這允許使用簡單的 +my_task() 表達式建立並行執行的 promise。
3 這允許像 while (p) co_await p; 這樣的程式碼。

Promise

協程 promise (promise::promise_type) 具有以下屬性。

產生器是一個積極的協程,可以 co_await 並將值 co_yield 給呼叫者。

cobalt::generator<int> example()
{
  printf("In coro 1\n");
  co_yield 2;
  printf("In coro 3\n");
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main 1\n");
  printf("In main %d\n", co_await f);
  printf("In main %d\n", co_await f);
  return 0;
}

這將產生以下輸出

In main 0
In coro 1
In main 1
In main 2
In coro 3
In main 4
generators1

Push(第二個模板參數)設定為非 void 時,可以將值推送到產生器中。

cobalt::generator<int, int> example()
{
  printf("In coro 1\n");
  int i =  co_yield 2;
  printf("In coro %d\n", i);
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main %d\n", co_await f(3)); (1)
  co_return 0;
}
1 推送的值會透過 operator() 傳遞給 co_yield 的結果。

這將產生以下輸出

In main 0
In coro 1
In main 2
In coro 3

惰性 (Lazy)

可以透過等待 initial 將產生器變為延遲的。這個 co_await 表達式將產生 Push 值。這表示產生器將會等待,直到第一次被等待,然後處理新推送的值並在下次 co_yield 恢復。

cobalt::generator<int, int> example()
{
  int v = co_await cobalt::this_coro::initial;
  printf("In coro %d\n", v);
  co_yield 2;
  printf("In coro %d\n", v);
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main 1\n"); // < this is now before the co_await initial
  printf("In main %d\n", co_await f(1));
  printf("In main %d\n", co_await f(3));
  return 0;
}

這將產生以下輸出

In main 0
In main 1
In coro 1
In main 2
In coro 3
In main 4
generators2

執行器

執行器會從 thread_localget_executor 函式取得,除非在任何位置使用了 asio::executor_arg 後面接著執行器參數。

cobalt::generator<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

記憶體資源

記憶體資源會從 thread_localget_default_resource 函式取得,除非在任何位置使用了 std::allocator_arg 後面接著 polymorphic_allocator 參數。

cobalt::generator<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概要

template<typename Yield, typename Push = void>
struct [[nodiscard]] generator
{
  // Movable

  generator(generator &&lhs) noexcept = default;
  generator& operator=(generator &&) noexcept;

  // True until it co_returns & is co_awaited after (1)
  explicit operator bool() const;

  // Cancel the generator. (3)
  void cancel(asio::cancellation_type ct = asio::cancellation_type::all);

  // Check if a value is available
  bool ready() const;

  // Get the returned value. If !ready() this function has undefined behaviour.
  Yield get();

  // Cancel & detach the generator.
  ~generator();

  // an awaitable that results in value of Yield.
  using generator_awaitable = unspecified;

  // Present when Push != void
  generator_awaitable operator()(      Push && push);
  generator_awaitable operator()(const Push &  push);

  // Present when Push == void, i.e. can co_await the generator directly.
  generator_awaitable operator co_await (); (2)

};
1 這允許像 while (gen) co_await gen; 這樣的程式碼。
2 支援中斷等待
3 已取消的產生器可能可以恢復。

cobalt/task.hpp

任務是一個延遲的協程,可以 co_awaitco_return 值。也就是說,它不能使用 co_yield

cobalt::task<void> delay(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
}

cobalt::main co_main(int argc, char *argv[])
{
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}

promise不同,任務可以在建立它的執行器以外的執行器上被等待或產生。

執行器

由於 task 是延遲的,因此在建構時不需要執行器。它會嘗試從呼叫者或等待者取得執行器(如果有的話)。否則,它將預設使用 thread_local 執行器。

記憶體資源

記憶體資源**不會**取自 thread_localget_default_resource 函式,而是取自 pmr::get_default_resource(),除非在任何位置使用了 std::allocator_arg 後面接著 polymorphic_allocator 參數。

cobalt::task<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概要

template<typename Return>
struct [[nodiscard]] task
{
    task(task &&lhs) noexcept = default;
    task& operator=(task &&) noexcept = default;

    // enable `co_await`
    auto operator co_await ();

};
可以透過呼叫 run(my_task()) 從同步函式同步使用任務。

use_task

use_task 完成記號可用於從 cobalt_ 函式建立任務。這比 use_op 的效率低,因為它需要分配一個協程框架,但具有更簡單的返回類型並支援 中斷等待

cobalt/detached.hpp

一個 detached 是一個積極的協程,它可以 co_await 但不能 co_return 值。也就是說,它不能被恢復,通常也不會被等待。

cobalt::detached delayed_print(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
  printf("Hello world\n");
}

cobalt::main co_main(int argc, char *argv[])
{
  delayed_print();
  co_return 0;
}

Detached 用於輕鬆地在背景執行協程。

cobalt::detached my_task();

cobalt::main co_main(int argc, char *argv[])
{
  my_task(); (1)
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}
1 衍生 detached 協程。

Detached 可以像這樣為自己分配一個新的取消來源

cobalt::detached my_task(asio::cancellation_slot sl)
{
   co_await this_coro::reset_cancellation_source(sl);
   // do somework
}

cobalt::main co_main(int argc, char *argv[])
{
  asio::cancellation_signal sig;
  my_task(sig.slot()); (1)
  co_await delay(std::chrono::milliseconds(50));
  sig.emit(asio::cancellation_type::all);
  co_return 0;
}

執行器

執行器會從 thread_localget_executor 函式取得,除非在任何位置使用了 asio::executor_arg 後面接著執行器參數。

cobalt::detached my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

記憶體資源

記憶體資源會從 thread_localget_default_resource 函式取得,除非在任何位置使用了 std::allocator_arg 後面接著 polymorphic_allocator 參數。

cobalt::detached my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

概要

struct detached {};
1 支援中斷等待

Promise

執行緒 detached 具有以下屬性。

cobalt 中的操作是一個 可等待 物件,包裝了一個 asio 操作。

use_op

use_op 記號可以直接創建一個 op,也就是說,使用 cobalt::use_op 作為完成記號將創建所需的 awaitable 物件。

auto tim = cobalt::use_op.as_default_on(asio::steady_timer{co_await cobalt::this_coro::executor});
co_await tim.async_wait();

根據完成簽名,co_await 表達式可能會拋出異常。

簽名 返回類型 異常

void()

void

noexcept

void(T)

T

noexcept

void(T…​)

std::tuple<T…​>

noexcept

void(system::error_code, T)

T

system::system_error

void(system::error_code, T…​)

std::tuple<T…​>

system::system_error

void(std::exception_ptr, T)

T

任何異常

void(std::exception_ptr, T…​)

std::tuple<T…​>

任何異常

use_op 永遠不會立即完成,即 await_ready 永遠會返回 false,但永遠會暫停協程。

手動編碼的操作

操作是 [cobalt_operation] 功能的更進階實現。

這個函式庫可以輕鬆地創建具有提前完成條件的非同步操作,即一個完全避免暫停協程的條件。

例如,我們可以創建一個在計時器已經過期時什麼都不做的 wait_op

struct wait_op : cobalt::op<system::error_code> (1)
{
  asio::steady_timer & tim;

  wait_op(asio::steady_timer & tim) : tim(tim) {}

  bool ready(cobalt::handler<system::error_code> ) (2)
  {
    if (tim.expiry() < std::chrono::steady_clock::now())
        h(system::error_code{});
  }
  void initiate(cobalt::completion_handler<system::error_code> complete) (3)
  {
    tim.async_wait(std::move(complete));
  }
};
1 繼承具有匹配簽名的 opawait_transform 會擷取它
2 檢查操作是否已準備好 — 從 await_ready 呼叫
3 如果操作尚未準備好,則啟動操作。

cobalt/concepts.hpp

可等待物件

可等待物件是一個可以用於 co_await 的表達式。

template<typename Awaitable, typename Promise = void>
concept awaitable_type = requires (Awaitable aw, std::coroutine_handle<Promise> h)
{
    {aw.await_ready()} -> std::convertible_to<bool>;
    {aw.await_suspend(h)};
    {aw.await_resume()};
};

template<typename Awaitable, typename Promise = void>
concept awaitable =
        awaitable_type<Awaitable, Promise>
    || requires (Awaitable && aw) { {std::forward<Awaitable>(aw).operator co_await()} -> awaitable_type<Promise>;}
    || requires (Awaitable && aw) { {operator co_await(std::forward<Awaitable>(aw))} -> awaitable_type<Promise>;};
這個函式庫中的 可等待物件 要求,如果協程承諾提供執行器,則必須透過 const 參考返回執行器。否則,它將使用 this_thread::get_executor()

啟用可等待物件

繼承 enable_awaitables 將使協程能夠透過 await_transform 共同等待任何在沒有任何 await_transform 的情況下可以 co_await 的物件。

cobalt/this_coro.hpp

this_coro 命名空間提供了一些工具來訪問協程承諾的內部狀態。

虛擬可等待物件 (Pseudo-awaitables)

// Awaitable type that returns the executor of the current coroutine.
struct executor_t {}
constexpr executor_t executor;

// Awaitable type that returns the cancellation state of the current coroutine.
struct cancellation_state_t {};
constexpr cancellation_state_t cancellation_state;

// Reset the cancellation state with custom or default filters.
constexpr unspecified reset_cancellation_state();
template<typename Filter>
constexpr unspecified reset_cancellation_state(
    Filter && filter);
template<typename InFilter, typename OutFilter>
constexpr unspecified reset_cancellation_state(
    InFilter && in_filter,
    OutFilter && out_filter);

// get & set the throw_if_cancelled setting.
unspecified throw_if_cancelled();
unspecified throw_if_cancelled(bool value);

// Set the cancellation source in a detached.
unspecified reset_cancellation_source();
unspecified reset_cancellation_source(asio::cancellation_slot slot);


// get the allocator the promise
struct allocator_t {};
constexpr allocator_t allocator;

// get the current cancellation state-type
struct cancelled_t {};
constexpr cancelled_t cancelled;

// set the over-eager mode of a generator
struct initial_t {};
constexpr initial_t initial;

等待配置器

可以透過以下方式取得支援 enable_await_allocator 的協程的分配器

co_await cobalt::this_coro::allocator;

為了在您自己的協程中啟用此功能,您可以使用 CRTP 模式繼承 enable_await_allocator

struct my_promise : cobalt::enable_await_allocator<my_promise>
{
  using allocator_type = __your_allocator_type__;
  allocator_type get_allocator();
};
如果可用,分配器將被 use_op 使用

等待執行器

可以透過以下方式取得支援 enable_await_executor 的協程的執行器

co_await cobalt::this_coro::executor;

為了在您自己的協程中啟用此功能,您可以使用 CRTP 模式繼承 enable_await_executor

struct my_promise : cobalt::enable_await_executor<my_promise>
{
  using executor_type = __your_executor_type__;
  executor_type get_executor();
};
如果可用,執行器將被 use_op 使用

等待延遲

您的協程承諾可以繼承 enable_await_deferred,以便在 co_await 表達式中使用單個簽名 asio::deferred

由於 asio::deferred 現在是預設的完成記號,因此允許以下程式碼無需指定任何完成記號或其他特殊化。

asio::steady_timer t{co_await cobalt::this_coro::executor};
co_await t.async_wait();

記憶體資源基底

promise 的 promise_memory_resource_base 基礎類別會提供一個 get_allocator 方法,該方法會從預設資源或跟在 std::allocator_arg 參數之後傳入的資源中獲取分配器。同樣地,它也會新增 operator new 多載,以便協程使用相同的記憶體資源來分配其框架。

若已取消則拋出例外

promise_throw_if_cancelled_base 提供了基本選項,允許操作啟用協程在等待另一個實際的可等待物件時拋出異常。

co_await cobalt::this_coro::throw_if_cancelled;

取消狀態

promise_cancellation_base 提供了基本選項,允許操作啟用協程擁有一個可由 reset_cancellation_state 重設的 cancellation_state。

co_await cobalt::this_coro::reset_cancellation_state();

為了方便起見,還提供了一個捷徑來檢查目前的取消狀態。

asio::cancellation_type ct = (co_await cobalt::this_coro::cancellation_state).cancelled();
asio::cancellation_type ct = co_await cobalt::this_coro::cancelled; // same as above

cobalt/this_thread.hpp

由於所有操作都是單執行緒的,因此這個函式庫為每個執行緒提供了一個執行器和預設記憶體資源。

namespace boost::cobalt::this_thread
{

pmr::memory_resource* get_default_resource() noexcept; (1)
pmr::memory_resource* set_default_resource(pmr::memory_resource* r) noexcept; (2)
pmr::polymorphic_allocator<void> get_allocator(); (3)

typename asio::io_context::executor_type & get_executor(); (4)
void set_executor(asio::io_context::executor_type exec) noexcept; (5)

}
1 取得預設資源 - 除非設定,否則將會是 pmr::get_default_resource。
2 設定預設資源 - 返回先前設定的資源。
3 取得包裝 (1) 的分配器。
4 取得執行緒的執行器 - 如果未設定則拋出異常。
5 設定目前執行緒的執行器。

協程將使用這些作為預設值,但會保留一份副本以防萬一。

唯一的例外是 cobalt 操作的初始化,它將使用 this_thread::executor 來重新拋出異常。

cobalt/channel.hpp

通道可用於在單個執行緒上的不同協程之間交換資料。

概要

通道概要
template<typename T>
struct channel
{
  // create a channel with a buffer limit, executor & resource.
  explicit
  channel(std::size_t limit = 0u,
          executor executor = this_thread::get_executor(),
          pmr::memory_resource * resource = this_thread::get_default_resource());
  // not movable.
  channel(channel && rhs) noexcept = delete;
  channel & operator=(channel && lhs) noexcept = delete;

  using executor_type = executor;
  const executor_type & get_executor();

  // Closes the channel
  ~channel();
  bool is_open() const;
  // close the operation, will cancel all pending ops, too
  void close();

  // an awaitable that yields T
  using read_op = unspecified;

  // an awaitable that yields void
  using write_op = unspecified;

  // read a value to a channel
  read_op  read();

  // write a value to the channel
  write_op write(const T  && value);
  write_op write(const T  &  value);
  write_op write(      T &&  value);
  write_op write(      T  &  value);

  // write a value to the channel if T is void

};

說明

通道是兩個協程進行通訊和同步的工具。

const std::size_t buffer_size = 2;
channel<int> ch{exec, buffer_size};

// in coroutine (1)
co_await ch.write(42);

// in coroutine (2)
auto val = co_await ch.read();
1 將值傳送到通道 - 將會阻塞直到可以傳送為止。
2 從通道讀取值 - 將會阻塞直到值可等待為止。

兩個操作都可能會阻塞,具體取決於通道緩衝區大小。

如果緩衝區大小為零,則需要同時進行 readwrite 操作,即充當會合點。

如果緩衝區未滿,寫入操作不會暫停協程;同樣地,如果緩衝區不為空,讀取操作也不會暫停。

如果兩個操作同時完成(在空緩衝區的情況下總是如此),則第二個操作會發佈到執行器以供稍後完成。

通道類型可以是 void,在這種情況下,write 不需要參數。

通道操作可以被取消而不會丟失資料。這使得它們可以與 競爭 一起使用。

generator<variant2::variant<int, double>> merge(
    channel<int> & c1,
    channel<double> & c2)
{
    while (c1 && c2)
       co_yield co_await race(c1, c2);
}

範例

cobalt::promise<void> producer(cobalt::channel<int> & chan)
{
  for (int i = 0; i < 4; i++)
    co_await chan.write(i);

  chan.close();
}

cobalt::main co_main(int argc, char * argv[])
{
  cobalt::channel<int> c;

  auto p = producer(c);
  while (c.is_open())
    std::cout << co_await c.read() << std::endl;

  co_await p;
  co_return 0;
}

此外,還提供了一個 channel_reader,使讀取通道更加方便,並且可以與 BOOST_COBALT_FOR 一起使用。

cobalt::main co_main(int argc, char * argv[])
{
  cobalt::channel<int> c;

  auto p = producer(c);
  BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
    std::cout << value << std::endl;

  co_await p;
  co_return 0;
}

cobalt/with.hpp

with 工具提供了一種執行協程非同步清除的方法。這就像一個非同步的解構函式呼叫。

struct my_resource
{
  cobalt::promise<void> await_exit(std::exception_ptr e);
};

cobalt::promise<void> work(my_resource & res);

cobalt::promise<void> outer()
{
  co_await cobalt::with(my_resource(), &work);
}

清除可以通過提供 await_exit 成員函式或返回可等待物件tag_invoke 函式,或者通過將清除作為 with 的第三個參數來完成。

using ws_stream = beast::websocket::stream<asio::ip::tcp::socket>>;
cobalt::promise<ws_stream> connect(urls::url); (1)
cobalt::promise<void>   disconnect(ws_stream &ws); (2)

auto teardown(const boost::cobalt::with_exit_tag & wet , ws_stream & ws, std::exception_ptr e)
{
  return disconnect(ws);
}

cobalt::promise<void> run_session(ws_stream & ws);

cobalt::main co_main(int argc, char * argv[])
{
  co_await cobalt::with(co_await connect(argv[1]), &run_session, &teardown);
  co_return 0;
}
1 實作 WebSocket 連線和 WebSocket 初始化。
2 實作有序關閉。
如果在沒有異常的情況下退出作用域,則 std::exception_ptr 為空。注意:exit 函式可以通過引用獲取 exception_ptr 並修改它,這是合法的。

cobalt/race.hpp

race 函式可以用來從一組可等待物件 (awaitable)中,co_await 其中一個。

它可以作為可變參數函式被呼叫,接受多個可等待物件,或者作用於一個可等待物件的範圍。

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_wait()
{
  co_await cobalt::race(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::race(aws); (2)
}
1 等待一組可變參數的可等待物件
2 等待一個可等待物件的向量 (vector)

race 的第一個參數可以是一個均勻隨機位元產生器 (uniform random bit generator)

race 的簽章 (Signatures)
extern promise<void> pv1, pv2;
std::vector<promise<void>> pvv;

std::default_random_engine rdm{1};
// if everything returns void race returns the index
std::size_t r1 = co_await race(pv1, pv2);
std::size_t r2 = co_await race(rdm, pv1, pv2);
std::size_t r3 = co_await race(pvv);
std::size_t r4 = co_await race(rdm, pvv);

// variant if not everything is void. void become monostate
extern promise<int> pi1, pi2;
variant2::variant<monostate, int, int> r5 = co_await race(pv1, pi1, pi2);
variant2::variant<monostate, int, int> r6 = co_await race(rdm, pv1, pi1, pi2);

// a range returns a pair of the index and the result if non-void
std::vector<promise<int>> piv;
std::pair<std::size_t, int> r7 = co_await race(piv);
std::pair<std::size_t, int> r8 = co_await race(rdm, piv);

中斷等待

當參數以右值參考傳遞時,race 會嘗試在可等待物件上使用 .interrupt_await 來通知該可等待物件立即完成,並且其結果將被忽略。如果支援的話,可等待物件必須在 interrupt_await 返回之前恢復等待的協程 (coroutine)。如果 race 沒有偵測到該函式,它會發送一個取消請求。

這表示您可以像這樣重複使用 race

cobalt::promise<void> do_wait()
{
  auto t1 = task1();
  auto t2 = task2();
  co_await cobalt::race(t1, t2); (1)
  co_await cobalt::race(t1, t2); (2)
}
1 等待第一個任務完成
2 等待另一個任務完成

這由 promisegeneratorgather 支援。

race 會像在 co_await 表達式中使用一樣呼叫可等待物件的函式,或者根本不執行它們。

left_race

left_race 函式類似於 race,但遵循嚴格的從左到右掃描。這可能會導致資源餓死問題,因此這不是推薦的預設行為,但在仔細處理的情況下,可用於設定優先順序。

概要

// Concept for the random number generator.
template<typename G>
  concept uniform_random_bit_generator =
    requires ( G & g)
    {
      {typename std::decay_t<G>::result_type() } -> std::unsigned_integral; // is an unsigned integer type
      // T	Returns the smallest value that G's operator() may return. The value is strictly less than G::max(). The function must be constexpr.
      {std::decay_t<G>::min()} -> std::same_as<typename std::decay_t<G>::result_type>;
      // T	Returns the largest value that G's operator() may return. The value is strictly greater than G::min(). The function must be constexpr.
      {std::decay_t<G>::max()} -> std::same_as<typename std::decay_t<G>::result_type>;
      {g()} -> std::same_as<typename std::decay_t<G>::result_type>;
    } && (std::decay_t<G>::max() > std::decay_t<G>::min());


// Variadic race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
         uniform_random_bit_generator URBG, awaitable ... Promise>
awaitable race(URBG && g, Promise && ... p);

// Ranged race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
         uniform_random_bit_generator URBG, range<awaitable> PromiseRange>
awaitable race(URBG && g, PromiseRange && p);

// Variadic race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable race(Promise && ... p);

// Ranged race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable race(PromiseRange && p);

// Variadic left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable left_race(Promise && ... p);

// Ranged left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable left_race(PromiseRange && p);
選擇空範圍將會拋出異常。

cobalt/gather.hpp

gather 函式可以用來一次 co_await 多個可等待物件,並將取消請求傳遞下去。

該函式將收集所有完成的結果,並將它們作為 system::result 返回,即將異常作為值捕獲。一個可等待物件拋出異常不會取消其他物件。

它可以作為可變參數函式被呼叫,接受多個可等待物件,或者作用於一個可等待物件的範圍。

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_gather()
{
  co_await cobalt::gather(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::gather(aws); (2)
}
1 等待一組可變參數的可等待物件
2 等待一個可等待物件的向量

gather 會像在 co_await 表達式中使用一樣呼叫可等待物件的函式。

gather 的簽章
extern promise<void> pv1, pv2;
std::tuple<system::result<int>, system::result<int>> r1 = co_await gather(pv1, pv2);

std::vector<promise<void>> pvv;
pmr::vector<system::result<void>> r2 =  co_await gather(pvv);

extern promise<int> pi1, pi2;
std::tuple<system::result<monostate>,
           system::result<monostate>,
           system::result<int>,
           system::result<int>> r3 = co_await gather(pv1, pv2, pi1, pi2);

std::vector<promise<int>> piv;
pmr::vector<system::result<int>> r4 = co_await gather(piv);

概要

// Variadic gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable gather(Promise && ... p);

// Ranged gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable gather(PromiseRange && p);

cobalt/join.hpp

join 函式可以用來一次 co_await 多個可等待物件,並正確地連接取消請求。

該函式將收集所有完成的結果,並將它們作為值返回,除非拋出異常。如果拋出異常,所有未完成的操作將被取消(如果可能,則會被中斷),並且第一個異常會被重新拋出。

void 將在元組 (tuple) 中作為 variant2::monostate 返回,除非所有可等待物件都產生 void

它可以作為可變參數函式被呼叫,接受多個可等待物件,或者作用於一個可等待物件的範圍。

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_join()
{
  co_await cobalt::join(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::join(aws); (2)
}
1 等待一組可變參數的可等待物件
2 等待一個可等待物件的向量

join 會像在 co_await 表達式中使用一樣呼叫可等待物件的函式。

join 的簽章
extern promise<void> pv1, pv2;
/* void */ co_await join(pv1, pv2);

std::vector<promise<void>> pvv;
/* void */ co_await join(pvv);

extern promise<int> pi1, pi2;
std::tuple<monostate, monostate, int, int> r1 = co_await join(pv1, pv2, pi1, pi2);

std::vector<promise<int>> piv;
pmr::vector<int> r2 = co_await join(piv);

概要

// Variadic join
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable join(Promise && ... p);

// Ranged join
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable join(PromiseRange && p);
選擇空範圍將會拋出異常。

cobalt/wait_group.hpp

wait_group 函式可以用來管理多個 promise<void> 類型的協程。通過匹配的 await_exit 成員,它可以與 cobalt/with.hpp 直接搭配使用。

本質上,wait_group 是一個動態的 promise 列表,它具有 race 函式 (wait_one)、gather 函式 (wait_all),並且會在作用域結束時進行清理。

struct wait_group
{
    // create a wait_group
    explicit
    wait_group(asio::cancellation_type normal_cancel = asio::cancellation_type::none,
               asio::cancellation_type exception_cancel = asio::cancellation_type::all);

    // insert a task into the group
    void push_back(promise<void> p);

    // the number of tasks in the group
    std::size_t size() const;
    // remove completed tasks without waiting (i.e. zombie tasks)
    std::size_t reap();
    // cancel all tasks
    void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
    // wait for one task to complete.
    wait_one_op wait_one();
    // wait for all tasks to complete
    wait_op wait();
    // wait for all tasks to complete
    wait_op operator co_await ();
    // when used with with , this will receive the exception
    // and wait for the completion
    // if ep is set, this will use the exception_cancel level,
    // otherwise the normal_cancel to cancel all promises.
    wait_op await_exit(std::exception_ptr ep);
};

cobalt/spawn.hpp

spawn 函式允許在 asio 的 executor/execution_context 上運行任務,並使用完成記號 (completion token) 來取用結果。

auto spawn(Context & context, task<T> && t, CompletionToken&& token);
auto spawn(Executor executor, task<T> && t, CompletionToken&& token);

Spawn 會分派其初始化並發佈完成。這使得在另一個 executor 上運行任務並使用 use_op 在當前 executor 上取用結果變得安全。也就是說,spawn 可以用於跨執行緒。

範例

cobalt::task<int> work();

int main(int argc, char *argv[])
{
  asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_1};
  auto f = spawn(ctx, work(), asio::use_future);
  ctx.run();

  return f.get();
}
呼叫者需要確保 executor 沒有在多個執行緒上同時運行,例如,通過使用單執行緒的 asio::io_contextstrand

cobalt/run.hpp

run 函式類似於 spawn,但同步運行。它會在內部設置執行環境和記憶體資源。

這在將一段 cobalt 程式碼整合到同步應用程式時非常有用。

概要

// Run the task and return it's value or rethrow any exception.
T run(task<T> t);

範例

cobalt::task<int> work();

int main(int argc, char *argv[])
{
  return run(work());
}

cobalt/thread.hpp

執行緒類型是另一種建立類似於 main 的環境的方法,但不使用 signal_set

cobalt::thread my_thread()
{
  auto exec = co_await cobalt::this_coro::executor;             (1)
  asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
  co_await tim.async_wait(cobalt::use_op);                      (3)
  co_return 0;
}
1 取得 thread 正在運行的 executor
2 將其與 asio 對象一起使用
3 co_await 一個 cobalt 操作

要使用執行緒,您可以像使用 std::thread 一樣使用它

int main(int argc, char * argv[])
{
  auto thr = my_thread();
  thr.join();
  return 0;
}

執行緒也是一個 awaitable(包括取消)。

cobalt::main co_main(int argc, char * argv[])
{
  auto thr = my_thread();
  co_await thr;
  co_return 0;
}
解構一個分離的執行緒將導致硬停止 (io_context::stop) 並加入執行緒。
除了等待 cobalt/thread.hppcobalt/spawn.hpp 之外,這個函式庫中沒有任何東西是執行緒安全的。如果您需要跨執行緒傳輸資料,您需要一個執行緒安全的工具,例如 asio::concurrent_channel。您不能在執行緒之間共用任何 cobalt 基元,唯一的例外是能夠將 spawn 一個 任務 到另一個執行緒的 executor 上。

執行器

它也會建立一個 asio::io_context 來執行,您可以透過 this_coro::executor 取得。它會被指派給 cobalt::this_thread::get_executor()

記憶體資源

它還會建立一個記憶體資源,將作為內部記憶體分配的預設值。它會被指派給 thread_localcobalt::this_thread::get_default_resource()

概要

struct thread
{
  // Send a cancellation signal
  void cancel(asio::cancellation_type type = asio::cancellation_type::all);


  // Allow the thread to be awaited. NOOP if the thread is invalid.
  auto operator co_await() &-> detail::thread_awaitable; (1)
  auto operator co_await() && -> detail::thread_awaitable; (2)

  // Stops the io_context & joins the executor
  ~thread();
  /// Move constructible
  thread(thread &&) noexcept = default;

  using executor_type = executor;

  using id = std::thread::id;
  id get_id() const noexcept;

  // Add the functions similar to `std::thread`
  void join();
  bool joinable() const;
  void detach();

  executor_type get_executor() const;
};
1 支援中斷等待
2 永遠轉發取消

cobalt/result.hpp

Awaitables 可以修改為返回 system::resultstd::tuple 而不是使用例外。

// value only
T res = co_await foo();

// as result
system::result<T, std::exception_ptr> res = co_await cobalt::as_result(foo());

// as tuple
std::tuple<std::exception_ptr, T> res = co_await cobalt::as_tuple(foo());

Awaitables 也可以通過使用 cobalt::as_result_tagcobalt::as_tuple_tag 提供 await_resume 過載,來提供處理結果和 tuples 的自訂方法。

your_result_type await_resume(cobalt::as_result_tag);
your_tuple_type  await_resume(cobalt::as_tuple_tag);

這允許 awaitable 提供除 std::exception_ptr 之外的其他錯誤類型,例如 system::error_code。這是由 opchannel 完成的。

// example of an op with result system::error_code, std::size_t
system::result<std::size_t>                 await_resume(cobalt::as_result_tag);
std::tuple<system::error_code, std::size_t> await_resume(cobalt::as_tuple_tag);
Awaitables 仍然允許拋出例外,例如,對於像是 OOM 等嚴重例外。

cobalt/async_for.hpp

對於 generators 之類的類型,提供了 BOOST_COBALT_FOR 巨集,以模擬 for co_await 迴圈。

cobalt::generator<int> gen();

cobalt::main co_main(int argc, char * argv[])
{
    BOOST_COBALT_FOR(auto i, gen())
        printf("Generated value %d\n", i);

    co_return 0;
}

要求是在 for 迴圈中使用的 awaitable 具有 operator bool 來檢查它是否可以再次被 awaited。 generatorpromise 就是這種情況。

cobalt/error.hpp

為了更容易管理錯誤,cobalt 提供了一個 error_categoryboost::system::error_code 一起使用。

enum class error
{
  moved_from,
  detached,
  completed_unexpected,
  wait_not_ready,
  already_awaited,
  allocation_failed
};

system::error_category & cobalt_category();
system::error_code make_error_code(error e);

cobalt/config.hpp

config adder 允許設定 boost.cobalt 的一些實作細節。

executor_type

executor 類型預設為 boost::asio::any_io_executor

您可以通過定義 BOOST_COBALT_CUSTOM_EXECUTOR 並自行添加 boost::cobalt::executor 類型,將其設置為 boost::asio::any_io_executor

或者,可以定義 BOOST_COBALT_USE_IO_CONTEXT 將 executor 設置為 boost::asio::io_context::executor_type

pmr

Boost.cobalt 可以與不同的 pmr 實作一起使用,預設為 std::pmr

可以使用以下巨集進行配置

  • BOOST_COBALT_USE_STD_PMR

  • BOOST_COBALT_USE_BOOST_CONTAINER_PMR

  • BOOST_COBALT_USE_CUSTOM_PMR

如果您定義 BOOST_COBALT_USE_CUSTOM_PMR,您將需要提供一個 boost::cobalt::pmr 命名空間,它是 std::pmr 的直接替代品。

或者,可以使用以下方法停用 pmr 的使用

  • BOOST_COBALT_NO_PMR.

在這種情況下,cobalt 將使用非 pmr 單調資源來執行同步函數(競爭收集加入)。

use_op 使用一個針對小緩衝區最佳化的資源,其大小可以通過定義 BOOST_COBALT_SBO_BUFFER_SIZE 來設定,預設值為 4096 位元組。

cobalt/leaf.hpp

Async 提供與 boost.leaf 的整合。它提供了類似於 leaf 的函數,這些函數接受可等待物件而不是函數物件,並返回一個可等待物件

template<awaitable TryAwaitable, typename ... H >
auto try_catch(TryAwaitable && try_coro, H && ... h );

template<awaitable TryAwaitable, typename ... H >
auto try_handle_all(TryAwaitable && try_coro, H && ... h );

template<awaitable TryAwaitable, typename ... H >
auto try_handle_some(TryAwaitable && try_coro, H && ... h );

詳情請參閱 leaf 文件。

cobalt/experimental/context.hpp

這(很可能)是未定義的行為,因為它違反了標準中的前置條件。可以在這裡找到一篇解決此問題的文章(https://isocpp.org/files/papers/P3203R0.html)。

此標頭提供 experimental 支援,可以使用基於 boost.fiber 的堆疊式協程,就如同它們是 C20 協程一樣。也就是說,它們可以通過被放入 coroutine_handle 中來使用 `可等待物件`。同樣地,實作使用 C20 協程 promise 並像執行 C++20 協程一樣執行它。

//
void delay(experimental::context<promise<void>> h, std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  h.await(tim.async_wait(cobalt::use_op)); // instead of co_await.
}

cobalt::main co_main(int argc, char *argv[])
{
  cobalt::promise<void> dl = cobalt::experimental::make_context(&delay, 50);
  co_await dl;
  co_return 0;
}

參考

// The internal coroutine context.
/// Args are the function arguments after the handle.
template<typename Return, typename ... Args>
struct context
{
  // Get a handle to the promise
        promise_type & promise();
  const promise_type & promise() const;

  // Convert it to any context if the underlying promise is the same
  template<typename Return_, typename ... Args_>
  constexpr operator context<Return_, Args_...>() const;

  // Await something. Uses await_transform automatically.
  template<typename Awaitable>
  auto await(Awaitable && aw);
  // Yield a value, if supported by the promise.
  template<typename Yield>
  auto yield(Yield && value);
};


// Create a fiber with a custom stack allocator (see boost.fiber for details) and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc  && salloc, Args && ... args);

// Create a fiber with the default allocator and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func>
auto make_context(Func && func, Args && ... args);

// Create a fiber with a custom stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc  && salloc, Args && ... args);

// Create a fiber with the default stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func>
auto make_context(Func && func, Args && ... args);

深入探討

自訂執行器

cobalt 預設使用 asio::any_io_executor 的原因之一是它是一個類型擦除執行器,也就是說,您可以提供自己的事件迴圈,而無需重新編譯 cobalt

然而,在 Executor TS 的開發過程中,執行器的概念變得有點不直觀,委婉地說。

Ruben Perez 寫了一篇很棒的部落格文章,我將厚顏無恥地參考它。

定義

執行器是一種指向實際事件迴圈的類型,它可以(廉價地)複製,支援屬性(見下文),可以進行相等性比較,並且具有 execute 函數。

execute
struct example_executor
{
  template<typename Fn>
  void execute(Fn && fn) const;
};

上述函數根據其屬性執行 fn

屬性

屬性可以被查詢、偏好或要求,例如:

struct example_executor
{
  // get a property by querying it.
  asio::execution::relationship_t &query(asio::execution::relationship_t) const
  {
    return asio::execution::relationship.fork;
  }

  // require an executor with a new property
  never_blocking_executor require(const execution::blocking_t::never_t);

  // prefer an executor with a new property. the executor may or may not support it.
  never_blocking_executor prefer(const execution::blocking_t::never_t);
  // not supported
  example_executor prefer(const execution::blocking_t::always_t);
};
asio::any_io_executor 的屬性

為了將執行器包裝在 asio::any_io_executor 中,需要兩個屬性

  • `execution::context_t

  • execution::blocking_t::never_t

這表示我們需要使它們成為可要求的(這對於 context 沒有意義)或從 query 返回預期值。

execution::context_t 查詢應該像這樣返回 asio::execution_context&

struct example_executor
{
  asio::execution_context &query(asio::execution::context_t) const;
};

執行上下文用於管理管理 io 物件生命週期的服務的生命週期,例如 asio 的計時器和 sockets。也就是說,通過提供此上下文,所有 asio 的 io 都可以使用它。

在執行器被銷毀後,execution_context 必須保持活動狀態。

以下可能是偏好的

  • execution::blocking_t::possibly_t

  • execution::outstanding_work_t::tracked_t

  • execution::outstanding_work_t::untracked_t

  • execution::relationship_t::fork_t

  • execution::relationship_t::continuation_

這表示您可能希望在您的執行器中支援它們以進行最佳化。

blocking 屬性

正如我們之前看到的,此屬性控制傳遞給 execute() 的函數是否可以立即作為 execute() 的一部分運行,或者必須排隊等待稍後執行。可能的值為

  • asio::execution::blocking.never:永遠不要將函數作為 execute() 的一部分運行。這就是 asio::post() 的作用。

  • asio::execution::blocking.possibly:函數可能會也可能不會作為 execute() 的一部分運行。這是預設值(呼叫 io_context::get_executor 時獲得的值)。

  • asio::execution::blocking.always:函數始終作為 execute() 的一部分運行。io_context::executor 不支援此功能。

relationship 屬性

relationship 可以採用兩個值

  • asio::execution::relationship.continuation:表示傳遞給 execute() 的函數是呼叫 execute() 的函數的延續。

  • asio::execution::relationship.fork:與上述相反。這是預設值(呼叫 io_context::get_executor() 時獲得的值)。

將此屬性設定為 continuation 可以啟用一些函數排程的優化。它僅在函數排入佇列時(而不是立即執行)才有效。對於 io_context,設定後,函數會排程在更快的執行緒區域佇列中執行,而不是上下文全域佇列。

outstanding_work_t 屬性

outstanding_work 可以採用兩個值

  • asio::execution::outstanding_work.tracked:表示當執行器處於活動狀態時,仍有工作要做。

  • asio::execution::outstanding_work.untracked:與上述相反。這是預設值(呼叫 io_context::get_executor() 時獲得的值)。

將此屬性設定為 tracked 表示只要執行器處於活動狀態,事件迴圈就不會返回。

一個最小的執行器

有了這些,讓我們看一下最小執行器的介面。

struct minimal_executor
{
  minimal_executor() noexcept;

  asio::execution_context &query(asio::execution::context_t) const;

  static constexpr asio::execution::blocking_t
  query(asio::execution::blocking_t) noexcept
  {
    return asio::execution::blocking.never;
  }

  template<class F>
  void execute(F && f) const;

  bool operator==(minimal_executor const &other) const noexcept;
  bool operator!=(minimal_executor const &other) const noexcept;
};
請參閱 example/python.cpp 以了解使用 Python 的 asyncio 事件迴圈的實作。

新增一個工作守衛。

現在,讓我們為 outstanding_work 屬性添加一個 require 函數,該函數使用多種類型。

struct untracked_executor : minimal_executor
{
  untracked_executor() noexcept;

  constexpr   tracked_executor require(asio::execution::outstanding_work::  tracked_t) const;
  constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const {return *this; }
};

struct untracked_executor : minimal_executor
{
  untracked_executor() noexcept;

  constexpr   tracked_executor require(asio::execution::outstanding_work::  tracked_t) const {return *this;}
  constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const;
};

請注意,不必從 require 函數返回不同的類型,也可以像這樣完成

struct trackable_executor : minimal_executor
{
  trackable_executor() noexcept;

  constexpr trackable_executor require(asio::execution::outstanding_work::  tracked_t) const;
  constexpr trackable_executor require(asio::execution::outstanding_work::untracked_t) const;
};

如果我們想使用 prefer,它看起來如下所示

struct trackable_executor : minimal_executor
{
  trackable_executor() noexcept;

  constexpr trackable_executor prefer(asio::execution::outstanding_work::  tracked_t) const;
  constexpr trackable_executor prefer(asio::execution::outstanding_work::untracked_t) const;
};

總結

如您所見,屬性系統並非簡單易懂,但功能強大。實作自訂執行器本身就是一個問題類別,這就是本文檔不這樣做的原因。相反,有一個關於如何將 Python 事件迴圈包裝在執行器中的範例。

以下是一些閱讀建議。

無堆疊

C++20 協程是無堆疊的,這意味著它們沒有自己的堆疊。

C++ 中的堆疊描述了呼叫堆疊,即所有堆疊的函數框架。函數框架是函數運作所需的記憶體,即用於儲存其變數和資訊(例如返回位址)的記憶體片段。

函數框架的大小在編譯時已知,但在包含其定義的編譯單元之外則未知。
int bar() {return 0;} // the deepest point of the stack
int foo() {return bar();}

int main()
{
    return foo();
}

上例中的呼叫堆疊為

main()
  foo()
    bar()
stackless1

協程可以實作為有堆疊的,這意味著它會分配一個固定的記憶體區塊,並將函數框架堆疊起來,類似於執行緒。C++20 協程是無堆疊的,即它們只分配自己的框架,並在恢復時使用呼叫者的堆疊。使用我們之前的範例

fictional_eager_coro_type<int> example()
{
    co_yield 0;
    co_yield 1;
}

void nested_resume(fictional_eager_coro_type<int>& f)
{
    f.resume();
}

int main()
{
    auto f = example();
    nested_resume(f);
    f.reenter();
    return 0;
}

這將產生類似於此的呼叫堆疊

main()
  f$example()
  nested_resume()
    f$example()
  f$example()
stackless2

如果協程在執行緒之間移動,則同樣適用。

惰性與及早求值

如果協程僅在其恢復後才開始執行其程式碼,則它們是惰性的,而積極的協程將立即執行,直到其第一個暫停點(即 co_awaitco_yieldco_return 表達式)。

lazy_coro co_example()
{
    printf("Entered coro\n");
    co_yield 0;
    printf("Coro done\n");
}

int main()
{
    printf("enter main\n");
    auto lazy = co_example();
    printf("constructed coro\n");
    lazy.resume();
    printf("resumed once\n");
    lazy.resume();
    printf("resumed twice\n");
    return 0;
}

這將產生如下輸出

enter main
constructed coro
Entered coro
resumed once
Coro Done
resumed twice
lazy eager1

而積極的協程看起來像這樣

eager_coro co_example()
{
    printf("Entered coro\n");
    co_yield 0;
    printf("Coro done\n");
}

int main()
{
    printf("enter main\n");
    auto lazy = co_example();
    printf("constructed coro\n");
    lazy.resume();
    printf("resumed once\n");
    return 0;
}

這將產生如下輸出

enter main
Entered coro
constructed coro
resume once
Coro Done
lazy eager2

基準測試

在第 11 代 Intel® Core™ i7-1185G7 @ 3.00GHz 上執行

發佈到執行器

基準測試正在執行以下程式碼,使用 cobalt 的任務、asio::awaitable 和基於 `asio' 的有堆疊協程 (boost.context)。

cobalt::task<void> atest()
{
  for (std::size_t i = 0u; i < n; i++)
    co_await asio::post(cobalt::use_op);
}
表 6. 5000 萬次結果,單位為毫秒
gcc 12 clang 16

cobalt

2472

2098

可等待

2432

2253

stackful (有堆疊)

3655

3725

平行執行 noop 協程

此基準測試使用大小為零的 asio::experimental::channel,以便並行讀寫。它使用 cobalt 的 gatherasio::awaitable 中的 awaitable_operator

cobalt::task<void> atest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
  for (std::size_t i = 0u; i < n; i++)
    co_await cobalt::gather(
              chan.async_send(system::error_code{}, cobalt::use_task),
              chan.async_receive(cobalt::use_task));
}

asio::awaitable<void> awtest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
  using boost::asio::experimental::awaitable_operators::operator&&;
  for (std::size_t i = 0u; i < n; i++)
    co_await (
        chan.async_send(system::error_code{}, asio::use_awaitable)
        &&
        chan.async_receive(asio::use_awaitable));
}
表 7. 300 萬次結果,單位為毫秒
gcc 12 clang 16

cobalt

1563

1468

可等待

2800

2805

立即

此基準測試利用立即完成,使用大小為 1 的通道,以便每個操作都是立即的。

cobalt::task<void> atest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 1u};
  for (std::size_t i = 0u; i < n; i++)
  {
    co_await chan.async_send(system::error_code{}, cobalt::use_op);
    co_await chan.async_receive(cobalt::use_op);
  }
}
表 8. 1000 萬次結果,單位為毫秒
gcc 12 clang 16

cobalt

1810

1864

可等待

3109

4110

stackful (有堆疊)

3922

4705

通道

在此基準測試中,比較了 asio::experimental::channelcobalt::channel

這與並行測試類似,但使用的是 cobalt::channel

表 9. 執行測試 300 萬次的結果,單位為毫秒
gcc clang

cobalt

500

350

可等待

790

770

stackful (有堆疊)

867

907

操作配置

此基準測試比較了非同步操作關聯配置器的不同解決方案。

表 10. 測試執行 200 萬次的結果,單位為毫秒。
gcc clang

std::allocator

1136

1139

cobalt::monotonic

1149

1270

pmr::monotonic

1164

1173

cobalt::sbo

1021

1060

後者方法為 cobalt 內部使用。

需求

函式庫

Boost.cobalt 需要 C++20 編譯器,並直接依賴於以下 boost 函式庫:

  • boost.asio

  • boost.system

  • boost.circular_buffer

  • boost.intrusive

  • boost.smart_ptr

  • boost.container(適用於 clang < 16)

編譯器

此函式庫自 Clang 14、Gcc 10 和 MSVC 19.30(Visual Studio 2022)起開始支援。

Gcc 版本 12.1 和 12.2 似乎在使用無堆疊變數的協程方面存在錯誤,如[此處](https://godbolt.org/z/6adGcqP1z)所示,應避免在協程中使用這些版本。

Clang 在 16 版才加入 std::pmr 支援,因此較舊的 Clang 版本使用 boost::container::pmr 作為替代方案。

部分(如果不是全部)MSVC 版本的協程實作都有問題,這個函式庫需要解決這些問題。這可能會導致非確定性行為和額外負擔。

協程的繼續執行可以在從 final_suspend 返回的 awaitable 中完成,如下所示:

// in promise
auto final_suspend() noexcept
{
    struct final_awaitable
    {
      std::coroutine_handle<void> continuation{std::noop_coroutine()}; (1)
      bool await_ready() const noexcept;
      std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> h) noexcept
      {
        auto cc = continuation;
        h.destroy(); (2)
        return cc;
      }

      void await_resume() noexcept {}
    };
    return final_awaitable{my_continuation};
};
1 繼續執行
2 在繼續執行之前自行銷毀協程

在 MSVC 上,final_suspend 並未正確暫停協程,因此 h.destroy() 將導致協程框架上的元素被重複銷毀。因此,MSVC 需要延後銷毀操作,使其在行外執行。這將導致額外負擔,並使實際的記憶體釋放變得不確定。

致謝

如果没有 CppAlliance 及其創辦人 Vinnie Falco,這個函式庫就不可能存在。Vinnie 非常信任我,讓我參與這個專案,儘管他本人對此類函式庫的設計方式抱持截然不同的看法。

也要感謝 Ruben Perez 和 Richard Hodges 傾聽我的設計問題,並提供建議和使用案例。此外,如果没有 Chris Kohlhoff 出色的 boost.asio,這個函式庫也不可能存在。