概述
以下是 cobalt 中相關功能的列表
一個及早求值並返回單一結果的協程 - 將其視為預設值 |
|
一個可以產生多個值的及早求值協程。 |
|
promise 的惰性版本,可以生成到其他執行器上。 |
|
類似於 promise 的協程,但沒有控制代碼 |
一個以偽隨機方式等待一組協程中其中一個準備就緒的函式,以避免資源匱乏。 |
|
一個等待一組協程的函式,並將所有協程作為值返回,如果任何可等待對象拋出異常,則拋出異常。 |
|
一個等待一組協程的函式,並將所有協程作為 |
|
一個從左到右評估的確定性 |
一個線程本地的工具程式,用於在協程之間發送值。 |
|
一個非同步 RAII 輔助工具,允許在發生異常時進行非同步銷毀 |
C++ 協程的簡短介紹 |
如果您以前從未使用過協程,請閱讀 |
|
功能和概念的簡要高階視圖 |
如果您熟悉 asio 和協程,並且想知道這個函式庫提供了什麼,請閱讀 |
|
低階用法視圖 |
如果您想快速開始編碼,請閱讀 |
|
API 參考 |
在編碼時查找詳細資訊 |
|
一些實現細節 |
如果您還不夠困惑,請閱讀 |
動機
許多程式語言,例如 node.js 和 python,都提供了易於使用的單線程併發框架。雖然比同步程式碼更複雜,但單線程非同步避免了多線程的許多陷阱和開銷。
也就是說,一個協程可以工作,而其他協程則等待事件(例如來自伺服器的響應)。這允許編寫在**單線程**上**同時執行多項操作**的應用程式。
這個函式庫旨在為 C++ 提供此功能:**簡單的單線程非同步**,類似於 node.js 和 python 中的 asyncio,可與現有函式庫(例如 boost.beast
、boost.mysql
或 boost.redis
)一起使用。它基於 boost.asio
。
它採用了其他語言中的一系列概念,並基於 C++20 協程提供它們。
與 asio::awaitable
和 asio::experimental::coro
不同,cobalt
協程是開放的。也就是說,一個 asio::awaitable
只能等待其他 asio::awaitable
並被其等待,並且不提供協程特定的同步機制。
另一方面,cobalt
提供了一個協程特定的 channel
和不同的等待類型(race
、gather
等),這些類型經過最佳化,可與協程和可等待對象一起使用。
協程入門
非同步程式設計
非同步程式設計通常是指允許在後台運行任務的程式設計風格,而其他工作則在執行。
想像一下,如果您有一個 get-request 函式,它執行完整的 http 請求,包括連接和 ssl 握手等。
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
auto res = http_get("https://boost.dev.org.tw");
printf("%s", res.c_str());
return 0;
}
上面的程式碼是傳統的同步程式設計。如果我們想要並行執行兩個請求,我們需要創建另一個線程來使用同步程式碼運行另一個線程。
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
std::string other_res;
std::thread thr{[&]{ other_res = http_get("https://cppalliance.org"); }};
auto res = http_get("https://boost.dev.org.tw");
thr.join();
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
這是可行的,但我們的程式將花費大部分時間等待輸入。作業系統提供允許非同步執行 IO 的 API,而諸如 boost.asio 之類的函式庫則提供了管理非同步操作的可攜式方法。Asio 本身並沒有規定處理完成的方式。這個函式庫 (boost.cobalt) 提供了一種通過協程/可等待對象管理所有這些的方法。
cobalt::promise<std::string> http_cobalt_get(std:string_view url);
cobalt::main co_main(int argc, char * argv[])
{
auto [res, other_res] =
cobalt::join(
http_cobalt_get(("https://boost.dev.org.tw"),
http_cobalt_get(("https://cppalliance.org")
);
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
在上面的程式碼中,執行請求的非同步函式利用了作業系統 API,因此實際的 IO 不會阻塞。這意味著,當我們等待兩個函式完成時,操作是交錯的且非阻塞的。同時,cobalt 提供了協程原語,使我們免於回調地獄。
協程
協程是可恢復的函式。可恢復意味著函式可以暫停,也就是將控制權多次交還給呼叫者。
一般的函式使用 return
函式將控制權交還給呼叫者,同時也返回數值。
另一方面,協程可以將控制權交還給呼叫者,並多次恢復執行。
協程具有三個類似於 co_return 的控制關鍵字(其中只有 co_return
是必須支援的)。
-
co_return
-
co_yield
-
co_await
co_return
這類似於 return
,但將函式標記為協程。
co_await
co_await
運算式會暫停等待一個可等待物件 (Awaitable),也就是停止執行,直到該 awaitable
恢復它的執行。
例如:
cobalt::promise<void> delay(std::chrono::milliseconds);
cobalt::task<void> example()
{
co_await delay(std::chrono::milliseconds(50));
}
co_await
運算式可以產生一個值,取決於它正在等待的物件。
cobalt::promise<std::string> read_some();
cobalt::task<void> example()
{
std::string res = co_await read_some();
}
在 cobalt 中,大多數協程原語也是可等待物件 (Awaitables)。 |
co_yield
co_yield
運算式類似於 co_await
,但它會將控制權交還給呼叫者,並帶有一個值。
例如:
cobalt::generator<int> iota(int max)
{
int i = 0;
while (i < max)
co_yield i++;
co_return i;
}
co_yield
運算式也可以產生一個值,允許使用 yielding 協程的使用者將值推入其中。
cobalt::generator<int> iota()
{
int i = 0;
bool more = false;
do
{
more = co_yield i++;
}
while(more);
co_return -1;
}
可等待物件 (Awaitables)
可等待物件 (Awaitables) 是可以在 co_await
運算式中使用的類型。
struct awaitable_prototype
{
bool await_ready();
template<typename T>
see_below await_suspend(std::coroutine_handle<T>);
return_type await_resume();
};
如果可以使用 operator co_await 呼叫,則類型將被隱式轉換為可等待物件。本文檔將使用 awaitable 來包含這些類型,並使用「actual_awaitable」來指符合上述原型的類型。 |
data:image/s3,"s3://crabby-images/a71fd/a71fd2b73df2b135387cccea4e11937d292a9555" alt="awaitables"
在 co_await
運算式中,等待的協程會先呼叫 await_ready
來檢查協程是否需要暫停。如果已準備好,它會直接跳到 await_resume
以取得值,因為不需要暫停。否則,它會暫停自身並使用指向自身 promise 的 std::coroutine_handle
呼叫 await_suspend
。
std::coroutine_handle<void> 可用於類型擦除。 |
return_type 是 co_await
運算式的結果類型,例如 int
。
int i = co_await awaitable_with_int_result();
await_suspend
的返回類型可以是三種:
-
void
-
bool
-
std::coroutine_handle<U>
如果是 void,等待的協程將保持暫停狀態。如果是 bool
,則會檢查該值,如果為 false,等待的協程將立即恢復。
如果返回 std::coroutine_handle
,則會恢復該協程。後者允許 await_suspend
返回傳入的 handle,實際上與返回 false
相同。
如果等待的協程立即重新恢復,也就是在呼叫 await_resume 之後,則在此程式庫中稱為「立即完成」。這與非暫停可等待物件(即從 await_ready
返回 true
的物件)不同。
事件迴圈
由於 cobalt
中的協程可以 co_await
事件,因此它們需要在事件迴圈上運行。也就是說,另一段程式碼負責追蹤未完成的事件,並恢復正在等待這些事件的協程。這種模式非常常見,node.js 或 Python 的 asyncio
也以類似的方式使用它。
cobalt
使用asio::io_context
作為其預設事件迴圈。也就是說,thread、main 類和 run 函式在內部使用它。
您可以使用任何可以產生asio::any_io_executor
的事件迴圈與程式庫一起使用。最簡單的方法是使用 spawn。
事件迴圈可透過執行器(依據 asio 術語)存取,並且可以使用 set_executor 手動設定。
導覽
進入 cobalt 環境
為了使用 可等待物件 (awaitables),我們需要能夠 co_await
它們,也就是說,必須在一個協程 (coroutine) 內。
我們有四種方法可以達成此目的
- cobalt/main.hpp
-
將
int main
替換為協程
cobalt::main co_main(int argc, char* argv[])
{
// co_await things here
co_return 0;
}
- cobalt/thread.hpp
-
為非同步環境建立一個執行緒
cobalt::thread my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
auto t = my_thread();
t.join();
return 0;
}
- cobalt/task.hpp
-
建立一個任務並執行或生成它
cobalt::task<void> my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
cobalt::run(my_task()); // sync
asio::io_context ctx;
cobalt::spawn(ctx, my_task(), asio::detached);
ctx.run();
return 0;
}
Promise
Promise 是推薦的預設協程類型。它們是積極的 (eager),因此易於用於臨時的並行處理。
cobalt::promise<int> my_promise()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// start the promise here
auto p = my_promise();
// do something else here
co_await do_the_other_thing();
// wait for the promise to complete
auto res = co_await p;
co_return res;
}
Task
Task 是惰性的 (lazy),這表示它們在被等待或生成之前不會執行任何動作。
cobalt::task<int> my_task()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the task here
auto t = my_task();
// do something else here first
co_await do_the_other_thing();
// start and wait for the task to complete
auto res = co_await t;
co_return res;
}
產生器 (Generator)
Generator 是 cobalt 中唯一可以 co_yield
值的類型。
Generator 預設是積極的。與 std::generator 不同,cobalt::generator
可以 co_await
,因此是非同步的。
cobalt::generator<int> my_generator()
{
for (int i = 0; i < 10; i++)
co_yield i;
co_return 10;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator();
while (g)
printf("Generator %d\n", co_await g);
co_return 0;
}
可以將值推送到 generator 中,這些值將會從 co_yield
返回。
cobalt::generator<double, int> my_eager_push_generator(int value)
{
while (value != 0)
value = co_yield value * 0.1;
co_return std::numeric_limits<double>::quiet_NaN();
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(5);
assert(0.5 == co_await g(4)); // result of 5
assert(0.4 == co_await g(3)); // result of 4
assert(0.3 == co_await g(2)); // result of 3
assert(0.2 == co_await g(1)); // result of 2
assert(0.1 == co_await g(0)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
也可以使用 this_coro::initial
使協程變為惰性的。
cobalt::generator<double, int> my_eager_push_generator()
{
auto value = co_await this_coro::initial;
while (value != 0)
value = co_yield value * 0.1;
co_return std::numeric_limits<double>::quiet_NaN();
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(); // lazy, so the generator waits for the first pushed value
assert(0.5 == co_await g(5)); // result of 5
assert(0.4 == co_await g(4)); // result of 4
assert(0.3 == co_await g(3)); // result of 3
assert(0.2 == co_await g(2)); // result of 2
assert(0.1 == co_await g(1)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
join
cobalt::promise<int> some_work();
cobalt::promise<double> more_work();
cobalt::main co_main(int argc, char * argv[])
{
std::tuple<int, double> res = cobalt::join(some_work(), more_work());
co_return 0;
}
race
cobalt::generator<int> some_data_source();
cobalt::generator<double> another_data_source();
cobalt::main co_main(int argc, char * argv[])
{
auto g1 = some_data_source();
auto g2 = another_data_source();
int res1 = co_await g1;
double res2 = co_await g2;
printf("Result: %f", res1 * res2);
while (g1 && g2)
{
switch(variant2::variant<int, double> nx = co_await cobalt::race(g1, g2))
{
case 0:
res1 = variant2::get<0>(nx);
break;
case 1:
res2 = variant2::get<1>(nx);
break;
}
printf("New result: %f", res1 * res2);
}
co_return 0;
}
在此情況下,race 不會造成任何資料遺失。 |
教學
延遲
讓我們從最簡單的例子開始:一個簡單的延遲。
cobalt::main co_main(int argc, char * argv[]) (1)
{
asio::steady_timer tim{co_await asio::this_coro::executor, (2)
std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
co_await tim.async_wait(cobalt::use_op); (4)
co_return 0; (5)
}
1 | co_main 函式在使用時會定義一個隱式 main 函式,並且是設定用於執行非同步程式碼的環境的最簡單方法。 |
2 | 從目前的協程 promise 中取得執行器。 |
3 | 使用參數設定逾時。 |
4 | 使用 cobalt::use_op 執行等待。 |
5 | 返回一個從隱式 main 函式返回的值。 |
在此範例中,我們使用 cobalt/main.hpp 標頭檔,如果定義了如上的 co_main
,它會提供一個主要的協程。這有一些優點
-
正確設定環境 (
executor
和memory
) -
向 asio 發出訊號,表示上下文是單執行緒的
-
具有
SIGINT
和SIGTERM
的asio::signal_set
會自動連接到取消動作(例如,Ctrl+C
會導致取消)
然後,這個協程在其 promise(promise 是 C++ 中協程狀態的名稱,不要與 cobalt/promise.hpp 混淆)中有一個執行器,我們可以透過 this_coro 命名空間中的虛擬 可等待物件 取得它。
Echo 伺服器
namespace cobalt = boost::cobalt;
using boost::asio::ip::tcp;
using boost::asio::detached;
using tcp_acceptor = cobalt::use_op_t::as_default_on_t<tcp::acceptor>;
using tcp_socket = cobalt::use_op_t::as_default_on_t<tcp::socket>;
namespace this_coro = boost::cobalt::this_coro;
cobalt::promise<void> echo(tcp_socket socket)
{
try (1)
{
char data[4096];
while (socket.is_open()) (2)
{
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data)); (3)
co_await async_write(socket, boost::asio::buffer(data, n)); (4)
}
}
catch (std::exception& e)
{
std::printf("echo: exception: %s\n", e.what());
}
}
1 | 使用 use_op 完成記號時,I/O 錯誤會轉換為 C++ 例外狀況。此外,如果協程被取消(例如,因為使用者按下 Ctrl-C),也會引發例外狀況。在這些情況下,我們會印出錯誤並結束迴圈。 |
2 | 我們會持續執行迴圈,直到收到取消指令(例外)或使用者關閉連線。 |
3 | 讀取所有可用的資料。 |
4 | 寫入所有已讀取的位元組。 |
請注意,promise 是積極的。呼叫 echo
會立即執行程式碼,直到 async_read_some
,然後將控制權返回給呼叫者。
接下來,我們也需要一個 acceptor 函式。這裡,我們使用產生器來管理 acceptor 狀態。這是一個可以被多次 co_await
的協程,直到遇到 co_return
表達式。
cobalt::generator<tcp_socket> listen()
{
tcp_acceptor acceptor({co_await cobalt::this_coro::executor}, {tcp::v4(), 55555});
for (;;) (1)
{
tcp_socket sock = co_await acceptor.async_accept(); (2)
co_yield std::move(sock); (3)
}
co_return tcp_socket{acceptor.get_executor()}; (4)
}
1 | 取消也會導致在此處從 co_await 拋出例外。 |
2 | 非同步接受連線 |
3 | 將其產出給等待的協程 |
4 | 為了符合 C++ 規範,使用 co_return 返回一個值。 |
有了這兩個函式,我們現在可以編寫伺服器了
cobalt::promise<void> run_server(cobalt::wait_group & workers)
{
auto l = listen(); (1)
while (true)
{
if (workers.size() == 10u)
co_await workers.wait_one(); (2)
else
workers.push_back(echo(co_await l)); (3)
}
}
1 | 建構 listener 產生器協程。當物件被銷毀時,協程將被取消,執行所有必要的清理工作。 |
2 | 當我們有超過 10 個 worker 時,我們會等待其中一個完成 |
3 | 接受新的連線並啟動它。 |
wait_group 用於管理正在執行的 echo 函式。這個類別將取消並等待正在執行的 echo
協程。
我們不需要對 listener
做同樣的事情,因為當 l
被銷毀時,它會自行停止。產生器的解構函式會取消它。
由於 promise
是積極的,只需呼叫它就足以啟動。然後我們將這些 promise 放入 wait_group 中,這將允許我們在範圍結束時關閉所有 worker。
cobalt::main co_main(int argc, char ** argv)
{
co_await cobalt::with(cobalt::wait_group(), &run_server); (1)
co_return 0u;
}
1 | 使用非同步作用域執行 run_server 。 |
上面顯示的 with 函式,將使用資源(例如 wait_group)來執行函式。在作用域結束時,with
將呼叫並 co_await
一個非同步的 teardown 函式。這將導致所有連線在 co_main
結束之前被正確關閉。
價格行情
為了示範 channels
和其他工具,我們需要一定的複雜度。為此,我們的專案是一個價格報價器,它連接到 https://blockchain.info。使用者可以連接到 localhost 來查詢指定的貨幣對,如下所示
wscat -c localhost:8080/btc/usd
首先,我們進行與 echo-server 相同的宣告。
using executor_type = cobalt::use_op_t::executor_with_default<cobalt::executor>;
using socket_type = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;
下一步是編寫一個函式來連接 ssl-stream,以連接到上游
cobalt::promise<asio::ssl::stream<socket_type>> connect(
std::string host, boost::asio::ssl::context & ctx)
{
asio::ip::tcp::resolver res{cobalt::this_thread::get_executor()};
auto ep = co_await res.async_resolve(host, "https", cobalt::use_op); (1)
asio::ssl::stream<socket_type> sock{cobalt::this_thread::get_executor(), ctx};
co_await sock.next_layer().async_connect(*ep.begin()); (2)
co_await sock.async_handshake(asio::ssl::stream_base::client); (3)
co_return sock; (4)
}
1 | 查詢主機 |
2 | 連接到端點 |
3 | 執行 ssl 握手 |
4 | 將 socket 返回給呼叫者 |
接下來,我們需要一個函式在現有的 ssl-stream 上執行 websocket 升級。
cobalt::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
ws.set_option(beast::websocket::stream_base::decorator(
[](beast::websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " cobalt-ticker");
req.set(http::field::origin,
"https://exchange.blockchain.com"); (1)
}));
co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 | blockchain.info 要求設定此標頭。 |
2 | 執行 websocket 握手。 |
一旦 websocket 連接,我們想要持續接收 json 訊息,產生器是一個不錯的選擇。
cobalt::generator<json::object> json_reader(websocket_type & ws)
try
{
beast::flat_buffer buf;
while (ws.is_open()) (1)
{
auto sz = co_await ws.async_read(buf); (2)
json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
auto obj = json::parse(data);
co_yield obj.as_object(); (3)
buf.consume(sz);
}
co_return {};
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | 只要 socket 開啟就持續執行 |
2 | 從 websocket 讀取一個 frame |
3 | 解析並以物件形式 co_yield 它。 |
接著需要將其連接到訂閱者,我們將利用通道來傳遞原始 JSON 資料。為了簡化生命週期管理,訂閱者將持有 shared_ptr
,而生產者則持有 weak_ptr
。
using subscription = std::pair<std::string, std::weak_ptr<cobalt::channel<json::object>>>;
using subscription_channel = std::weak_ptr<cobalt::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;
運行區塊鏈連接器的主要函式,操作兩個輸入:來自 WebSocket 的數據和一個處理新訂閱的通道。
cobalt::promise<void> run_blockchain_info(cobalt::channel<subscription> & subc)
try
{
asio::ssl::context ctx{asio::ssl::context_base::tls_client};
websocket_type ws{co_await connect("blockchain.info", ctx)};
co_await connect_to_blockchain_info(ws); (1)
subscription_map subs;
std::list<std::string> unconfirmed;
auto rd = json_reader(ws); (2)
while (ws.is_open()) (3)
{
switch (auto msg = co_await cobalt::race(rd, subc.read()); msg.index()) (4)
{
case 0: (5)
if (auto ms = get<0>(msg);
ms.at("event") == "rejected") // invalid sub, cancel however subbed
co_await handle_rejections(unconfirmed, subs, ms);
else
co_await handle_update(unconfirmed, subs, ms, ws);
break;
case 1: // (6)
co_await handle_new_subscription(
unconfirmed, subs,
std::move(get<1>(msg)), ws);
break;
}
}
for (auto & [k ,c] : subs)
{
if (auto ptr = c.lock())
ptr->close();
}
}
catch(std::exception & e)
{
std::cerr << "Exception: " << e.what() << std::endl;
throw;
}
1 | 初始化連線 |
2 | 實例化 json_reader |
3 | 只要 WebSocket 開啟就持續運行 |
4 | 選擇,也就是等待新的 JSON 訊息或訂閱 |
5 | 如果是 JSON,則處理更新或拒絕 |
6 | 處理新的訂閱訊息 |
handle_*
函式的內容對於 cobalt
的功能來說並不重要,因此在本教學中將其省略。
handle_new_subscription
函式會向 blockchain.info
發送訊息,後者會發送確認或拒絕訊息回來。handle_rejection
和 handle_update
將取得 JSON 值並將其轉發到訂閱通道。
在消費者端,我們的伺服器只會將數據轉發給客戶端。如果客戶端輸入數據,我們將立即關閉 WebSocket。我們使用 as_tuple
來忽略潛在的錯誤。
cobalt::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
system::error_code ec;
co_await st.async_read(buf, asio::as_tuple(cobalt::use_op));
co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(cobalt::use_op));
st.next_layer().close(ec);
}
接下來,我們運行使用者發送的連線階段
cobalt::promise<void> run_session(beast::websocket::stream<socket_type> st,
cobalt::channel<subscription> & subc)
try
{
http::request<http::empty_body> req;
beast::flat_buffer buf;
co_await http::async_read(st.next_layer(), buf, req); (1)
// check the target
auto r = urls::parse_uri_reference(req.target());
if (r.has_error() || (r->segments().size() != 2u)) (2)
{
http::response<http::string_body> res{http::status::bad_request, 11};
res.body() = r.has_error() ? r.error().message() :
"url needs two segments, e.g. /btc/usd";
co_await http::async_write(st.next_layer(), res);
st.next_layer().close();
co_return ;
}
co_await st.async_accept(req); (3)
auto sym = std::string(r->segments().front()) + "-" +
std::string(r->segments().back());
boost::algorithm::to_upper(sym);
// close when data gets sent
auto p = read_and_close(st, std::move(buf)); (4)
auto ptr = std::make_shared<cobalt::channel<json::object>>(1u); (5)
co_await subc.write(subscription{sym, ptr}); (6)
while (ptr->is_open() && st.is_open()) (7)
{
auto bb = json::serialize(co_await ptr->read());
co_await st.async_write(asio::buffer(bb));
}
co_await st.async_close(beast::websocket::close_code::going_away,
asio::as_tuple(cobalt::use_op)); (8)
st.next_layer().close();
co_await p; (9)
}
catch(std::exception & e)
{
std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 | 讀取 HTTP 請求,因為我們需要路徑 |
2 | 檢查路徑,例如 /btc/usd 。 |
3 | 接受 WebSocket |
4 | 開始讀取;如果消費者發送訊息,則關閉 |
5 | 建立接收更新的通道 |
6 | 向 run_blockchain_info 發送訂閱請求 |
7 | 只要通道和 WebSocket 開啟,我們就會轉發數據。 |
8 | 關閉通訊端並忽略錯誤 |
9 | 由於 WebSocket 現在肯定已關閉,請等待 read_and_close 關閉。 |
撰寫完 run_session
和 run_blockchain_info
後,我們現在可以繼續進行 main 函式
cobalt::main co_main(int argc, char * argv[])
{
acceptor_type acc{co_await cobalt::this_coro::executor,
asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
std::cout << "Listening on localhost:8080" << std::endl;
constexpr int limit = 10; // allow 10 ongoing sessions
cobalt::channel<subscription> sub_manager; (1)
co_await join( (2)
run_blockchain_info(sub_manager),
cobalt::with( (3)
cobalt::wait_group(
asio::cancellation_type::all,
asio::cancellation_type::all),
[&](cobalt::wait_group & sessions) -> cobalt::promise<void>
{
while (!co_await cobalt::this_coro::cancelled) (4)
{
if (sessions.size() >= limit) (5)
co_await sessions.wait_one();
auto conn = co_await acc.async_accept(); (6)
sessions.push_back( (7)
run_session(
beast::websocket::stream<socket_type>{std::move(conn)},
sub_manager));
}
})
);
co_return 0;
}
1 | 建立用於管理訂閱的通道 |
2 | 使用 join 並行運行兩個任務。 |
3 | 使用 cobalt 作用域來提供 wait_group 。 |
4 | 運行直到取消。 |
5 | 當我們達到 limit 時,我們會等待一個任務完成。 |
6 | 等待新的連線。 |
7 | 將連線階段插入 wait_group 。 |
Main 函式使用 join
,因為一個任務失敗應該取消另一個任務。
延遲操作
到目前為止,我們使用了 use_op
,它基於 asio 的完成權杖機制來使用隱式操作。
然而,我們可以實現自己的操作,這些操作也可以利用 await_ready
優化。與立即完成不同,當 await_ready
返回 true 時,協程將永遠不會暫停。
為了利用這個協程功能,cobalt
提供了一種簡單的方法來建立可跳過的操作
struct wait_op final : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
void ready(cobalt::handler<system::error_code> h ) override (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) override (3)
{
tim.async_wait(std::move(complete));
}
};
cobalt::main co_main(int argc, char * argv[])
{
asio::steady_timer tim{co_await asio::this_coro::executor,
std::chrono::milliseconds(std::stoi(argv[1]))};
co_await wait_op(tim); (4)
co_return 0; //
}
1 | 宣告操作。我們繼承 op 使其可等待。 |
2 | 這裡實作了預先暫停檢查 |
3 | 如果需要,執行等待 |
4 | 像使用任何其他可等待對象一樣使用操作。 |
透過這種方式,我們可以最大限度地減少協程暫停的次數。
雖然以上是與 asio 一起使用,但您也可以將這些處理程式與任何其他基於回呼的程式碼一起使用。
具有推播值的產生器
帶有推送值的協程並不常見,但可以顯著簡化某些問題。
由於在前一個例子中我們已經有一個 json_reader,以下是如何編寫一個用於推送值的 json_writer。
使用生成器的優點是內部狀態管理。
cobalt::generator<system::error_code, json::object>
json_writer(websocket_type & ws)
try
{
char buffer[4096];
json::serializer ser;
while (ws.is_open()) (1)
{
auto val = co_yield system::error_code{}; (2)
while (!ser.done())
{
auto sv = ser.read(buffer);
co_await ws.cobalt_write({sv.data(), sv.size()}); (3)
}
}
co_return {};
}
catch (system::system_error& e)
{
co_return e.code();
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | 只要 socket 開啟就持續執行 |
2 | 以co_yield 傳回目前的錯誤並取得新的值。 |
3 | 將框架寫入 websocket |
現在我們可以像這樣使用生成器
auto g = json_writer(my_ws);
extern std::vector<json::value> to_write;
for (auto && tw : std::move(to_write))
{
if (auto ec = co_await g(std::move(tw)))
return ec; // yield error
}
進階範例
儲存庫中提供了更多僅為程式碼的範例。所有範例如下所示。
執行單個 http get 請求的 http 用戶端。 |
|
使用 |
|
使用 nanobind 將 cobalt 與 python 整合。它使用 python 的 asyncio 作為執行器,並允許 C++ 對 python 函式進行 co_await,反之亦然。 |
|
將 |
|
建立基於 |
|
使用具有 |
|
使用 |
|
延遲 章節使用的範例 |
|
延遲操作 章節使用的範例 |
|
迴響伺服器 章節使用的範例 |
|
價格行情 章節使用的範例 |
|
通道 參考使用的範例 |
設計
概念
此函式庫有兩個基本概念
-
協程
**可等待物件**( awaitable) 是一個可以在協程內使用 co_await
的表達式,例如:
co_await delay(50ms);
然而,協程 promise 可以定義一個 await_transform
,也就是說,實際上可以用 co_await
表達式使用的內容取決於協程。
因此,我們應該重新定義可等待物件的定義:**可等待物件**是一種可以在協程內被 co_await
的類型,其 promise 並未定義 await_transform
。
一個**偽關鍵字**(pseudo-keyword
) 是一種可以在協程中使用的類型,由於其 promise 的 await_transform
,它為協程添加了特殊功能。
this_coro 命名空間中的所有動詞都是這樣的偽關鍵字。
auto exec = co_await this_coro::executor;
此函式庫公開了一組 promise 的 enable_* 基礎類別,以便輕鬆建立自訂協程。這包括 enable_awaitables,它提供了一個只轉發可等待物件的 await_transform 。 |
本文檔中提到的協程是指非同步協程,也就是說,像 std::generator 這樣的同步協程不被視為協程。
執行器 (Executors)
由於所有操作都是非同步的,因此函式庫需要使用事件迴圈。因為所有操作都是單執行緒的,可以假設每個執行緒只有一個執行器,這足以應付 97% 的使用情況。因此,有一個 thread_local
執行器會被協程物件用作預設值(儘管在協程 promise 中以拷貝方式儲存)。
同樣地,程式庫使用一種 executor
類型,預設為 asio::any_io_executor
。
如果您編寫自己的協程,它應該持有一個 executor 的副本,並具備一個 get_executor 函式,以 const 參考方式返回它。 |
使用 Strands
雖然可以使用 strands,但它們與 thread_local
executor 不相容。這是因為它們可能會切換執行緒,因此它們不能是 thread_local
的。
對於 channel 來說,這是一個建構函式參數,但對於其他協程類型,需要使用 asio::executor_arg
。這可以透過在協程的參數列表中直接放置 asio::executor_arg_t
(在某處),後面緊跟著要使用的 executor 來完成,例如:
cobalt::promise<void> example_with_executor(int some_arg, asio::executor_arg_t, cobalt::executor);
這樣,協程-promise 可以從第三個參數取得 executor,而不是預設使用 thread_local 的 executor。
當然,可以將參數設為預設值,以便在有時使用 thread_local executor 時減少不便。
cobalt::promise<void> example_with_executor(int some_arg,
asio::executor_arg_t = asio::executor_arg,
cobalt::executor = cobalt::this_thread::get_executor());
如果在 strand 上省略這個參數,會丟擲類型為 asio::bad_allocator
的例外,或者更糟的情況是,使用了錯誤的 executor。
多型記憶體資源
類似地,程式庫使用 thread_local 的 pmr::memory_resource
來配置協程框架,並在非同步操作中用作分配器。
原因是,使用者可能想要自訂配置,例如避免鎖定、限制記憶體使用或監控使用情況。pmr
允許我們在不引入不必要的模板參數的情況下實現這一點,也就是說,沒有 promise<T, Allocator>
的複雜性。然而,使用 pmr
會引入一些最小的開銷,因此使用者可以選擇透過定義 BOOST_COBALT_NO_PMR
來禁用它。
op 使用針對 asio 的分配器使用情況進行最佳化的內部資源,而 gather、race 和 join 使用單調資源來最小化分配。這兩者仍然可以在定義 BOOST_COBALT_NO_PMR
的情況下運作,在這種情況下,它們將使用 new/delete
作為上游配置。
如果您編寫自己的協程,它應該具備一個 get_allocator 函式,返回一個 pmr::polymorphic_allocator<void> 。 |
取消
cobalt 使用基於 asio::cancellation_signal
的隱式取消。這主要以隱式方式使用(例如與 race 一起使用),因此在範例中很少有明確的使用。
如果您編寫自訂協程,它必須從 get_cancellation_slot 函式返回一個 cancellation_slot ,以便能夠取消其他操作。 |
如果您編寫自訂 awaitable,它可以在 await_suspend 中使用該函式來接收取消訊號。 |
Promise
主要的協程類型是 promise
,它是 eager 的。預設使用這個的原因是,編譯器可以最佳化掉那些沒有暫停的 promise,就像這樣:
cobalt::promise<void> noop()
{
co_return;
}
理論上,等待上述操作是一個空操作,但實際上,截至 2023 年,編譯器還沒有做到這一點。
Race
最重要的同步機制是 race
函式。
它以偽隨機順序等待多個 awaitable 物件,並在捨棄其餘物件之前返回第一個完成物件的結果。
也就是說,它以偽隨機順序啟動 co_await
,並在發現一個 awaitable 物件已準備好或立即完成後停止。
cobalt::generator<int> gen1();
cobalt::generator<double> gen2();
cobalt::promise<void> p()
{
auto g1 = gen1();
auto g2 = gen2();
while (!co_await cobalt::this_coro::cancelled)
{
switch(auto v = co_await race(g1, g2); v.index())
{
case 0:
printf("Got int %d\n", get<0>(v));
break;
case 1:
printf("Got double %f\n", get<1>(v));
break;
}
}
}
race
是觸發取消的首選方法,例如:
cobalt::promise<void> timeout();
cobalt::promise<void> work();
race(timeout(), work());
interrupt_await
然而,如果它單純地取消,就會遺失數據。因此,引入了 interrupt_await
的概念,它告訴可等待對象(如果它支持的話)立即恢復等待器並返回或拋出一個被忽略的值。
struct awaitable
{
bool await_ready() const;
template<typename Promise>
std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
T await_resume();
void interrupt_await() &;
};
如果 interrupt_await
沒有導致立即恢復(h
),race
將發送取消信號。
race
使用正確的引用限定符來應用這些
auto g = gen1();
race(g, gen2());
如果有的話,以上將會為 g1
調用 interrupt_await() &
函數,為 g2
調用 interrupt_await() &&
函數。
一般來說,cobalt 中的協程支持左值中斷,即 interrupt_await() & 。通道操作是非限定的,即在兩種情況下都能工作。 |
關聯器 (Associators)
cobalt
使用 asio 的 associator
概念,但簡化了它。也就是說,它有三個作為等待承諾成員函數的關聯器。
-
const executor_type & get_executor()
(始終為executor
,必須通過 const ref 返回) -
allocator_type get_allocator()
(始終為pmr::polymorphic_allocator<void>
) -
cancellation_slot_type get_cancellation_slot()
(必須與asio::cancellation_slot
具有相同的介面)
cobalt
使用概念來檢查這些是否存在於其 await_suspend
函數中。
這樣,自定義協程就可以支持取消、執行器和分配器。
在自定義的可等待對象中,您可以像這樣獲取它們
struct my_awaitable
{
bool await_ready();
template<typename T>
void await_suspend(std::corutine_handle<P> h)
{
if constexpr (requires (Promise p) {p.get_executor();})
handle_executor(h.promise().get_executor();
if constexpr (requires (Promise p) {p.get_cancellation_slot();})
if ((cl = h.promise().get_cancellation_slot()).is_connected())
cl.emplace<my_cancellation>();
}
void await_resume();
};
取消會在 co_await
表達式中連接(如果協程和可等待對象支持的話),包括像 race 這樣的同步機制。
執行緒
主要的技術原因是切換協程最有效的方法是從 await_suspend
返回新協程的句柄,如下所示
struct my_awaitable
{
bool await_ready();
std::coroutine_handle<T> await_suspend(std::coroutine_handle<U>);
void await_resume();
};
在這種情況下,等待的協程將在調用 await_suspend 之前被掛起,並恢復返回的協程。如果我們需要通過執行器,這當然行不通。
這不僅適用於等待的協程,也適用於通道。此函式庫中的通道使用可等待對象的侵入式列表,並且可能從寫入操作的 await_suspend
返回讀取(以及因此掛起)協程的句柄。
參考
cobalt/main.hpp
開始使用 cobalt 應用程序的最簡單方法是使用具有以下簽名的 co_main
函數
cobalt::main co_main(int argc, char *argv[]);
聲明 co_main
將添加一個 main
函數,該函數執行在事件循環上運行協程所需的所有步驟。這使我們能夠編寫非常簡單的異步程序。
cobalt::main co_main(int argc, char *argv[])
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
1 | 獲取 main 運行的執行器 |
2 | 將其與 asio 對象一起使用 |
3 | co_await 一個 cobalt 操作 |
主要的 promise 會建立一個 asio::signal_set
並用於取消。SIGINT
代表完全取消,而 SIGTERM
代表終止取消。
取消動作不會轉發到已分離的協程。使用者需要自行處理取消時的協程結束,否則程式無法正常終止。 |
執行器
它也會建立一個 asio::io_context
來執行,您可以透過 this_coro::executor
取得。它會被指派給 cobalt::this_thread::get_executor()
。
記憶體資源
它還會建立一個記憶體資源,將作為內部記憶體分配的預設值。它會被指派給 thread_local
的 cobalt::this_thread::get_default_resource()
。
Promise
每個協程都有一個內部狀態,稱為 promise
(不要與 cobalt::promise
混淆)。根據協程的屬性,可以 co_await
不同的東西,就像我們在上面的例子中使用的那樣。
它們是透過繼承實現的,並在不同的 promise 類型之間共享。
主要的 promise 具有以下屬性。
規格
-
宣告
co_main
會隱式宣告一個main
函式。 -
只有在定義
co_main
時才會出現main
函式。 -
SIGINT
和SIGTERM
會導致內部任務取消。
cobalt/promise.hpp
Promise 是一個積極的協程,可以 co_await
和 co_return
值。也就是說,它不能使用 co_yield
。
cobalt::promise<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
Promise 預設是附加的。這意味著當 promise
控制代碼超出範圍時,會發送取消指令。
可以透過呼叫 detach
或使用前綴 +
運算子來分離 promise。這是使用分離的執行時期替代方案。分離的 promise 在銷毀時不會發送取消指令。
cobalt::promise<void> my_task();
cobalt::main co_main(int argc, char *argv[])
{
+my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
1 | 透過使用 + ,任務會被分離。如果沒有它,編譯器會產生 nodiscard 警告。 |
執行器
執行器會從 thread_local
的 get_executor 函式取得,除非在任何位置使用了 asio::executor_arg
後面接著執行器參數。
cobalt::promise<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
記憶體資源
記憶體資源會從 thread_local
的 get_default_resource 函式取得,除非在任何位置使用了 std::allocator_arg
後面接著 polymorphic_allocator
參數。
cobalt::promise<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
概要
template<typename Return>
struct [[nodiscard]] promise
{
promise(promise &&lhs) noexcept;
promise& operator=(promise && lhs) noexcept;
// enable `co_await`. (1)
auto operator co_await ();
// Ignore the return value, i.e. detach it. (2)
void operator +() &&;
// Cancel the promise.
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if the result is ready
bool ready() const;
// Check if the promise can be awaited.
explicit operator bool () const; (3)
// Detach or attach
bool attached() const;
void detach();
void attach();
// Create an already completed promimse
static promise
// Get the return value. If !ready() this function has undefined behaviour.
Return get();
};
1 | 支援中斷等待 |
2 | 這允許使用簡單的 +my_task() 表達式建立並行執行的 promise。 |
3 | 這允許像 while (p) co_await p; 這樣的程式碼。 |
Promise
協程 promise (promise::promise_type
) 具有以下屬性。
產生器是一個積極的協程,可以 co_await
並將值 co_yield
給呼叫者。
cobalt::generator<int> example()
{
printf("In coro 1\n");
co_yield 2;
printf("In coro 3\n");
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n");
printf("In main %d\n", co_await f);
printf("In main %d\n", co_await f);
return 0;
}
這將產生以下輸出
In main 0 In coro 1 In main 1 In main 2 In coro 3 In main 4
data:image/s3,"s3://crabby-images/b51e0/b51e008710edcdfc1810f992dae532e9a1cad061" alt="generators1"
當 Push
(第二個模板參數)設定為非 void 時,可以將值推送到產生器中。
cobalt::generator<int, int> example()
{
printf("In coro 1\n");
int i = co_yield 2;
printf("In coro %d\n", i);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main %d\n", co_await f(3)); (1)
co_return 0;
}
1 | 推送的值會透過 operator() 傳遞給 co_yield 的結果。 |
這將產生以下輸出
In main 0 In coro 1 In main 2 In coro 3
惰性 (Lazy)
可以透過等待 initial 將產生器變為延遲的。這個 co_await
表達式將產生 Push
值。這表示產生器將會等待,直到第一次被等待,然後處理新推送的值並在下次 co_yield 恢復。
cobalt::generator<int, int> example()
{
int v = co_await cobalt::this_coro::initial;
printf("In coro %d\n", v);
co_yield 2;
printf("In coro %d\n", v);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n"); // < this is now before the co_await initial
printf("In main %d\n", co_await f(1));
printf("In main %d\n", co_await f(3));
return 0;
}
這將產生以下輸出
In main 0 In main 1 In coro 1 In main 2 In coro 3 In main 4
data:image/s3,"s3://crabby-images/734d3/734d3b45c10dae46a29cd6796de0704af07bd1be" alt="generators2"
執行器
執行器會從 thread_local
的 get_executor 函式取得,除非在任何位置使用了 asio::executor_arg
後面接著執行器參數。
cobalt::generator<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
記憶體資源
記憶體資源會從 thread_local
的 get_default_resource 函式取得,除非在任何位置使用了 std::allocator_arg
後面接著 polymorphic_allocator
參數。
cobalt::generator<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
概要
template<typename Yield, typename Push = void>
struct [[nodiscard]] generator
{
// Movable
generator(generator &&lhs) noexcept = default;
generator& operator=(generator &&) noexcept;
// True until it co_returns & is co_awaited after (1)
explicit operator bool() const;
// Cancel the generator. (3)
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if a value is available
bool ready() const;
// Get the returned value. If !ready() this function has undefined behaviour.
Yield get();
// Cancel & detach the generator.
~generator();
// an awaitable that results in value of Yield
.
using generator_awaitable = unspecified;
// Present when Push
!= void
generator_awaitable operator()( Push && push);
generator_awaitable operator()(const Push & push);
// Present when Push
== void
, i.e. can co_await
the generator directly.
generator_awaitable operator co_await (); (2)
};
1 | 這允許像 while (gen) co_await gen; 這樣的程式碼。 |
2 | 支援中斷等待 |
3 | 已取消的產生器可能可以恢復。 |
cobalt/task.hpp
任務是一個延遲的協程,可以 co_await
和 co_return
值。也就是說,它不能使用 co_yield
。
cobalt::task<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
與promise不同,任務可以在建立它的執行器以外的執行器上被等待或產生。
執行器
由於 task
是延遲的,因此在建構時不需要執行器。它會嘗試從呼叫者或等待者取得執行器(如果有的話)。否則,它將預設使用 thread_local 執行器。
記憶體資源
記憶體資源**不會**取自 thread_local
的 get_default_resource 函式,而是取自 pmr::get_default_resource()
,除非在任何位置使用了 std::allocator_arg
後面接著 polymorphic_allocator
參數。
cobalt::task<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
概要
template<typename Return>
struct [[nodiscard]] task
{
task(task &&lhs) noexcept = default;
task& operator=(task &&) noexcept = default;
// enable `co_await`
auto operator co_await ();
};
可以透過呼叫 run(my_task()) 從同步函式同步使用任務。 |
cobalt/detached.hpp
一個 detached 是一個積極的協程,它可以 co_await
但不能 co_return
值。也就是說,它不能被恢復,通常也不會被等待。
cobalt::detached delayed_print(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
printf("Hello world\n");
}
cobalt::main co_main(int argc, char *argv[])
{
delayed_print();
co_return 0;
}
Detached 用於輕鬆地在背景執行協程。
cobalt::detached my_task();
cobalt::main co_main(int argc, char *argv[])
{
my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
1 | 衍生 detached 協程。 |
Detached 可以像這樣為自己分配一個新的取消來源
cobalt::detached my_task(asio::cancellation_slot sl)
{
co_await this_coro::reset_cancellation_source(sl);
// do somework
}
cobalt::main co_main(int argc, char *argv[])
{
asio::cancellation_signal sig;
my_task(sig.slot()); (1)
co_await delay(std::chrono::milliseconds(50));
sig.emit(asio::cancellation_type::all);
co_return 0;
}
執行器
執行器會從 thread_local
的 get_executor 函式取得,除非在任何位置使用了 asio::executor_arg
後面接著執行器參數。
cobalt::detached my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
記憶體資源
記憶體資源會從 thread_local
的 get_default_resource 函式取得,除非在任何位置使用了 std::allocator_arg
後面接著 polymorphic_allocator
參數。
cobalt::detached my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
use_op
use_op
記號可以直接創建一個 op,也就是說,使用 cobalt::use_op
作為完成記號將創建所需的 awaitable 物件。
auto tim = cobalt::use_op.as_default_on(asio::steady_timer{co_await cobalt::this_coro::executor});
co_await tim.async_wait();
根據完成簽名,co_await
表達式可能會拋出異常。
簽名 | 返回類型 | 異常 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
任何異常 |
|
|
任何異常 |
use_op 永遠不會立即完成,即 await_ready 永遠會返回 false,但永遠會暫停協程。 |
手動編碼的操作
操作是 [cobalt_operation] 功能的更進階實現。
這個函式庫可以輕鬆地創建具有提前完成條件的非同步操作,即一個完全避免暫停協程的條件。
例如,我們可以創建一個在計時器已經過期時什麼都不做的 wait_op
。
struct wait_op : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
bool ready(cobalt::handler<system::error_code> ) (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) (3)
{
tim.async_wait(std::move(complete));
}
};
1 | 繼承具有匹配簽名的 op ,await_transform 會擷取它 |
2 | 檢查操作是否已準備好 — 從 await_ready 呼叫 |
3 | 如果操作尚未準備好,則啟動操作。 |
cobalt/concepts.hpp
可等待物件
可等待物件是一個可以用於 co_await
的表達式。
template<typename Awaitable, typename Promise = void>
concept awaitable_type = requires (Awaitable aw, std::coroutine_handle<Promise> h)
{
{aw.await_ready()} -> std::convertible_to<bool>;
{aw.await_suspend(h)};
{aw.await_resume()};
};
template<typename Awaitable, typename Promise = void>
concept awaitable =
awaitable_type<Awaitable, Promise>
|| requires (Awaitable && aw) { {std::forward<Awaitable>(aw).operator co_await()} -> awaitable_type<Promise>;}
|| requires (Awaitable && aw) { {operator co_await(std::forward<Awaitable>(aw))} -> awaitable_type<Promise>;};
這個函式庫中的 可等待物件 要求,如果協程承諾提供執行器,則必須透過 const 參考返回執行器。否則,它將使用 this_thread::get_executor() 。 |
啟用可等待物件
繼承 enable_awaitables
將使協程能夠透過 await_transform
共同等待任何在沒有任何 await_transform
的情況下可以 co_await
的物件。
cobalt/this_coro.hpp
this_coro
命名空間提供了一些工具來訪問協程承諾的內部狀態。
虛擬可等待物件 (Pseudo-awaitables)
// Awaitable type that returns the executor of the current coroutine.
struct executor_t {}
constexpr executor_t executor;
// Awaitable type that returns the cancellation state of the current coroutine.
struct cancellation_state_t {};
constexpr cancellation_state_t cancellation_state;
// Reset the cancellation state with custom or default filters.
constexpr unspecified reset_cancellation_state();
template<typename Filter>
constexpr unspecified reset_cancellation_state(
Filter && filter);
template<typename InFilter, typename OutFilter>
constexpr unspecified reset_cancellation_state(
InFilter && in_filter,
OutFilter && out_filter);
// get & set the throw_if_cancelled setting.
unspecified throw_if_cancelled();
unspecified throw_if_cancelled(bool value);
// Set the cancellation source in a detached.
unspecified reset_cancellation_source();
unspecified reset_cancellation_source(asio::cancellation_slot slot);
// get the allocator the promise
struct allocator_t {};
constexpr allocator_t allocator;
// get the current cancellation state-type
struct cancelled_t {};
constexpr cancelled_t cancelled;
// set the over-eager mode of a generator
struct initial_t {};
constexpr initial_t initial;
等待配置器
可以透過以下方式取得支援 enable_await_allocator
的協程的分配器
co_await cobalt::this_coro::allocator;
為了在您自己的協程中啟用此功能,您可以使用 CRTP 模式繼承 enable_await_allocator
struct my_promise : cobalt::enable_await_allocator<my_promise>
{
using allocator_type = __your_allocator_type__;
allocator_type get_allocator();
};
如果可用,分配器將被 use_op 使用 |
等待執行器
可以透過以下方式取得支援 enable_await_executor
的協程的執行器
co_await cobalt::this_coro::executor;
為了在您自己的協程中啟用此功能,您可以使用 CRTP 模式繼承 enable_await_executor
struct my_promise : cobalt::enable_await_executor<my_promise>
{
using executor_type = __your_executor_type__;
executor_type get_executor();
};
如果可用,執行器將被 use_op 使用 |
等待延遲
您的協程承諾可以繼承 enable_await_deferred
,以便在 co_await
表達式中使用單個簽名 asio::deferred
。
由於 asio::deferred
現在是預設的完成記號,因此允許以下程式碼無需指定任何完成記號或其他特殊化。
asio::steady_timer t{co_await cobalt::this_coro::executor};
co_await t.async_wait();
記憶體資源基底
promise 的 promise_memory_resource_base
基礎類別會提供一個 get_allocator
方法,該方法會從預設資源或跟在 std::allocator_arg
參數之後傳入的資源中獲取分配器。同樣地,它也會新增 operator new
多載,以便協程使用相同的記憶體資源來分配其框架。
若已取消則拋出例外
promise_throw_if_cancelled_base
提供了基本選項,允許操作啟用協程在等待另一個實際的可等待物件時拋出異常。
co_await cobalt::this_coro::throw_if_cancelled;
取消狀態
promise_cancellation_base
提供了基本選項,允許操作啟用協程擁有一個可由 reset_cancellation_state
重設的 cancellation_state。
co_await cobalt::this_coro::reset_cancellation_state();
為了方便起見,還提供了一個捷徑來檢查目前的取消狀態。
asio::cancellation_type ct = (co_await cobalt::this_coro::cancellation_state).cancelled();
asio::cancellation_type ct = co_await cobalt::this_coro::cancelled; // same as above
cobalt/this_thread.hpp
由於所有操作都是單執行緒的,因此這個函式庫為每個執行緒提供了一個執行器和預設記憶體資源。
namespace boost::cobalt::this_thread
{
pmr::memory_resource* get_default_resource() noexcept; (1)
pmr::memory_resource* set_default_resource(pmr::memory_resource* r) noexcept; (2)
pmr::polymorphic_allocator<void> get_allocator(); (3)
typename asio::io_context::executor_type & get_executor(); (4)
void set_executor(asio::io_context::executor_type exec) noexcept; (5)
}
1 | 取得預設資源 - 除非設定,否則將會是 pmr::get_default_resource。 |
2 | 設定預設資源 - 返回先前設定的資源。 |
3 | 取得包裝 (1) 的分配器。 |
4 | 取得執行緒的執行器 - 如果未設定則拋出異常。 |
5 | 設定目前執行緒的執行器。 |
協程將使用這些作為預設值,但會保留一份副本以防萬一。
唯一的例外是 cobalt 操作的初始化,它將使用 this_thread::executor 來重新拋出異常。 |
cobalt/channel.hpp
通道可用於在單個執行緒上的不同協程之間交換資料。
概要
template<typename T>
struct channel
{
// create a channel with a buffer limit, executor & resource.
explicit
channel(std::size_t limit = 0u,
executor executor = this_thread::get_executor(),
pmr::memory_resource * resource = this_thread::get_default_resource());
// not movable.
channel(channel && rhs) noexcept = delete;
channel & operator=(channel && lhs) noexcept = delete;
using executor_type = executor;
const executor_type & get_executor();
// Closes the channel
~channel();
bool is_open() const;
// close the operation, will cancel all pending ops, too
void close();
// an awaitable that yields T
using read_op = unspecified;
// an awaitable that yields void
using write_op = unspecified;
// read a value to a channel
read_op read();
// write a value to the channel
write_op write(const T && value);
write_op write(const T & value);
write_op write( T && value);
write_op write( T & value);
// write a value to the channel if T is void
};
說明
通道是兩個協程進行通訊和同步的工具。
const std::size_t buffer_size = 2;
channel<int> ch{exec, buffer_size};
// in coroutine (1)
co_await ch.write(42);
// in coroutine (2)
auto val = co_await ch.read();
1 | 將值傳送到通道 - 將會阻塞直到可以傳送為止。 |
2 | 從通道讀取值 - 將會阻塞直到值可等待為止。 |
兩個操作都可能會阻塞,具體取決於通道緩衝區大小。
如果緩衝區大小為零,則需要同時進行 read
和 write
操作,即充當會合點。
如果緩衝區未滿,寫入操作不會暫停協程;同樣地,如果緩衝區不為空,讀取操作也不會暫停。
如果兩個操作同時完成(在空緩衝區的情況下總是如此),則第二個操作會發佈到執行器以供稍後完成。
通道類型可以是 void ,在這種情況下,write 不需要參數。 |
通道操作可以被取消而不會丟失資料。這使得它們可以與 競爭 一起使用。
generator<variant2::variant<int, double>> merge(
channel<int> & c1,
channel<double> & c2)
{
while (c1 && c2)
co_yield co_await race(c1, c2);
}
範例
cobalt::promise<void> producer(cobalt::channel<int> & chan)
{
for (int i = 0; i < 4; i++)
co_await chan.write(i);
chan.close();
}
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
while (c.is_open())
std::cout << co_await c.read() << std::endl;
co_await p;
co_return 0;
}
此外,還提供了一個 channel_reader
,使讀取通道更加方便,並且可以與 BOOST_COBALT_FOR 一起使用。
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
std::cout << value << std::endl;
co_await p;
co_return 0;
}
cobalt/with.hpp
with
工具提供了一種執行協程非同步清除的方法。這就像一個非同步的解構函式呼叫。
struct my_resource
{
cobalt::promise<void> await_exit(std::exception_ptr e);
};
cobalt::promise<void> work(my_resource & res);
cobalt::promise<void> outer()
{
co_await cobalt::with(my_resource(), &work);
}
清除可以通過提供 await_exit
成員函式或返回可等待物件的 tag_invoke
函式,或者通過將清除作為 with
的第三個參數來完成。
using ws_stream = beast::websocket::stream<asio::ip::tcp::socket>>;
cobalt::promise<ws_stream> connect(urls::url); (1)
cobalt::promise<void> disconnect(ws_stream &ws); (2)
auto teardown(const boost::cobalt::with_exit_tag & wet , ws_stream & ws, std::exception_ptr e)
{
return disconnect(ws);
}
cobalt::promise<void> run_session(ws_stream & ws);
cobalt::main co_main(int argc, char * argv[])
{
co_await cobalt::with(co_await connect(argv[1]), &run_session, &teardown);
co_return 0;
}
1 | 實作 WebSocket 連線和 WebSocket 初始化。 |
2 | 實作有序關閉。 |
如果在沒有異常的情況下退出作用域,則 std::exception_ptr 為空。注意:exit 函式可以通過引用獲取 exception_ptr 並修改它,這是合法的。 |
cobalt/race.hpp
race
函式可以用來從一組可等待物件 (awaitable)中,co_await
其中一個。
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_wait()
{
co_await cobalt::race(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::race(aws); (2)
}
race
的第一個參數可以是一個均勻隨機位元產生器 (uniform random bit generator)。
race
的簽章 (Signatures)extern promise<void> pv1, pv2;
std::vector<promise<void>> pvv;
std::default_random_engine rdm{1};
// if everything returns void race returns the index
std::size_t r1 = co_await race(pv1, pv2);
std::size_t r2 = co_await race(rdm, pv1, pv2);
std::size_t r3 = co_await race(pvv);
std::size_t r4 = co_await race(rdm, pvv);
// variant if not everything is void. void become monostate
extern promise<int> pi1, pi2;
variant2::variant<monostate, int, int> r5 = co_await race(pv1, pi1, pi2);
variant2::variant<monostate, int, int> r6 = co_await race(rdm, pv1, pi1, pi2);
// a range returns a pair of the index and the result if non-void
std::vector<promise<int>> piv;
std::pair<std::size_t, int> r7 = co_await race(piv);
std::pair<std::size_t, int> r8 = co_await race(rdm, piv);
中斷等待
當參數以右值參考傳遞時,race
會嘗試在可等待物件上使用 .interrupt_await
來通知該可等待物件立即完成,並且其結果將被忽略。如果支援的話,可等待物件必須在 interrupt_await
返回之前恢復等待的協程 (coroutine)。如果 race
沒有偵測到該函式,它會發送一個取消請求。
這表示您可以像這樣重複使用 race
cobalt::promise<void> do_wait()
{
auto t1 = task1();
auto t2 = task2();
co_await cobalt::race(t1, t2); (1)
co_await cobalt::race(t1, t2); (2)
}
1 | 等待第一個任務完成 |
2 | 等待另一個任務完成 |
race
會像在 co_await
表達式中使用一樣呼叫可等待物件的函式,或者根本不執行它們。
left_race
left_race
函式類似於 race
,但遵循嚴格的從左到右掃描。這可能會導致資源餓死問題,因此這不是推薦的預設行為,但在仔細處理的情況下,可用於設定優先順序。
概要
// Concept for the random number generator.
template<typename G>
concept uniform_random_bit_generator =
requires ( G & g)
{
{typename std::decay_t<G>::result_type() } -> std::unsigned_integral; // is an unsigned integer type
// T Returns the smallest value that G's operator() may return. The value is strictly less than G::max(). The function must be constexpr.
{std::decay_t<G>::min()} -> std::same_as<typename std::decay_t<G>::result_type>;
// T Returns the largest value that G's operator() may return. The value is strictly greater than G::min(). The function must be constexpr.
{std::decay_t<G>::max()} -> std::same_as<typename std::decay_t<G>::result_type>;
{g()} -> std::same_as<typename std::decay_t<G>::result_type>;
} && (std::decay_t<G>::max() > std::decay_t<G>::min());
// Variadic race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, awaitable ... Promise>
awaitable race(URBG && g, Promise && ... p);
// Ranged race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, range<awaitable> PromiseRange>
awaitable race(URBG && g, PromiseRange && p);
// Variadic race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable race(Promise && ... p);
// Ranged race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable race(PromiseRange && p);
// Variadic left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable left_race(Promise && ... p);
// Ranged left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable left_race(PromiseRange && p);
選擇空範圍將會拋出異常。 |
cobalt/gather.hpp
gather
函式可以用來一次 co_await
多個可等待物件,並將取消請求傳遞下去。
該函式將收集所有完成的結果,並將它們作為 system::result
返回,即將異常作為值捕獲。一個可等待物件拋出異常不會取消其他物件。
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_gather()
{
co_await cobalt::gather(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::gather(aws); (2)
}
gather
會像在 co_await
表達式中使用一樣呼叫可等待物件的函式。
gather
的簽章extern promise<void> pv1, pv2;
std::tuple<system::result<int>, system::result<int>> r1 = co_await gather(pv1, pv2);
std::vector<promise<void>> pvv;
pmr::vector<system::result<void>> r2 = co_await gather(pvv);
extern promise<int> pi1, pi2;
std::tuple<system::result<monostate>,
system::result<monostate>,
system::result<int>,
system::result<int>> r3 = co_await gather(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<system::result<int>> r4 = co_await gather(piv);
概要
// Variadic gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable gather(Promise && ... p);
// Ranged gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable gather(PromiseRange && p);
cobalt/join.hpp
join
函式可以用來一次 co_await
多個可等待物件,並正確地連接取消請求。
該函式將收集所有完成的結果,並將它們作為值返回,除非拋出異常。如果拋出異常,所有未完成的操作將被取消(如果可能,則會被中斷),並且第一個異常會被重新拋出。
void 將在元組 (tuple) 中作為 variant2::monostate 返回,除非所有可等待物件都產生 void 。 |
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_join()
{
co_await cobalt::join(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::join(aws); (2)
}
join
會像在 co_await
表達式中使用一樣呼叫可等待物件的函式。
join
的簽章extern promise<void> pv1, pv2;
/* void */ co_await join(pv1, pv2);
std::vector<promise<void>> pvv;
/* void */ co_await join(pvv);
extern promise<int> pi1, pi2;
std::tuple<monostate, monostate, int, int> r1 = co_await join(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<int> r2 = co_await join(piv);
概要
// Variadic join
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable join(Promise && ... p);
// Ranged join
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable join(PromiseRange && p);
選擇空範圍將會拋出異常。 |
cobalt/wait_group.hpp
wait_group
函式可以用來管理多個 promise<void>
類型的協程。通過匹配的 await_exit
成員,它可以與 cobalt/with.hpp 直接搭配使用。
本質上,wait_group
是一個動態的 promise 列表,它具有 race
函式 (wait_one
)、gather
函式 (wait_all
),並且會在作用域結束時進行清理。
struct wait_group
{
// create a wait_group
explicit
wait_group(asio::cancellation_type normal_cancel = asio::cancellation_type::none,
asio::cancellation_type exception_cancel = asio::cancellation_type::all);
// insert a task into the group
void push_back(promise<void> p);
// the number of tasks in the group
std::size_t size() const;
// remove completed tasks without waiting (i.e. zombie tasks)
std::size_t reap();
// cancel all tasks
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// wait for one task to complete.
wait_one_op wait_one();
// wait for all tasks to complete
wait_op wait();
// wait for all tasks to complete
wait_op operator co_await ();
// when used with with
, this will receive the exception
// and wait for the completion
// if ep
is set, this will use the exception_cancel
level,
// otherwise the normal_cancel
to cancel all promises.
wait_op await_exit(std::exception_ptr ep);
};
cobalt/spawn.hpp
spawn
函式允許在 asio 的 executor
/execution_context
上運行任務,並使用完成記號 (completion token) 來取用結果。
auto spawn(Context & context, task<T> && t, CompletionToken&& token);
auto spawn(Executor executor, task<T> && t, CompletionToken&& token);
Spawn 會分派其初始化並發佈完成。這使得在另一個 executor 上運行任務並使用 use_op 在當前 executor 上取用結果變得安全。也就是說,spawn
可以用於跨執行緒。
範例
cobalt::task<int> work();
int main(int argc, char *argv[])
{
asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_1};
auto f = spawn(ctx, work(), asio::use_future);
ctx.run();
return f.get();
}
呼叫者需要確保 executor 沒有在多個執行緒上同時運行,例如,通過使用單執行緒的 asio::io_context 或 strand 。 |
cobalt/run.hpp
run
函式類似於 spawn,但同步運行。它會在內部設置執行環境和記憶體資源。
這在將一段 cobalt 程式碼整合到同步應用程式時非常有用。
概要
// Run the task and return it's value or rethrow any exception.
T run(task<T> t);
範例
cobalt::task<int> work();
int main(int argc, char *argv[])
{
return run(work());
}
cobalt/thread.hpp
執行緒類型是另一種建立類似於 main
的環境的方法,但不使用 signal_set
。
cobalt::thread my_thread()
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
1 | 取得 thread 正在運行的 executor |
2 | 將其與 asio 對象一起使用 |
3 | co_await 一個 cobalt 操作 |
要使用執行緒,您可以像使用 std::thread
一樣使用它
int main(int argc, char * argv[])
{
auto thr = my_thread();
thr.join();
return 0;
}
執行緒也是一個 awaitable
(包括取消)。
cobalt::main co_main(int argc, char * argv[])
{
auto thr = my_thread();
co_await thr;
co_return 0;
}
解構一個分離的執行緒將導致硬停止 (io_context::stop ) 並加入執行緒。 |
除了等待 cobalt/thread.hpp 和 cobalt/spawn.hpp 之外,這個函式庫中沒有任何東西是執行緒安全的。如果您需要跨執行緒傳輸資料,您需要一個執行緒安全的工具,例如 asio::concurrent_channel 。您不能在執行緒之間共用任何 cobalt 基元,唯一的例外是能夠將 spawn 一個 任務 到另一個執行緒的 executor 上。 |
執行器
它也會建立一個 asio::io_context
來執行,您可以透過 this_coro::executor
取得。它會被指派給 cobalt::this_thread::get_executor()
。
記憶體資源
它還會建立一個記憶體資源,將作為內部記憶體分配的預設值。它會被指派給 thread_local
的 cobalt::this_thread::get_default_resource()
。
概要
struct thread
{
// Send a cancellation signal
void cancel(asio::cancellation_type type = asio::cancellation_type::all);
// Allow the thread to be awaited. NOOP if the thread is invalid.
auto operator co_await() &-> detail::thread_awaitable; (1)
auto operator co_await() && -> detail::thread_awaitable; (2)
// Stops the io_context & joins the executor
~thread();
/// Move constructible
thread(thread &&) noexcept = default;
using executor_type = executor;
using id = std::thread::id;
id get_id() const noexcept;
// Add the functions similar to `std::thread`
void join();
bool joinable() const;
void detach();
executor_type get_executor() const;
};
1 | 支援中斷等待 |
2 | 永遠轉發取消 |
cobalt/result.hpp
Awaitables 可以修改為返回 system::result
或 std::tuple
而不是使用例外。
// value only
T res = co_await foo();
// as result
system::result<T, std::exception_ptr> res = co_await cobalt::as_result(foo());
// as tuple
std::tuple<std::exception_ptr, T> res = co_await cobalt::as_tuple(foo());
Awaitables 也可以通過使用 cobalt::as_result_tag
和 cobalt::as_tuple_tag
提供 await_resume
過載,來提供處理結果和 tuples 的自訂方法。
your_result_type await_resume(cobalt::as_result_tag);
your_tuple_type await_resume(cobalt::as_tuple_tag);
// example of an op with result system::error_code, std::size_t
system::result<std::size_t> await_resume(cobalt::as_result_tag);
std::tuple<system::error_code, std::size_t> await_resume(cobalt::as_tuple_tag);
Awaitables 仍然允許拋出例外,例如,對於像是 OOM 等嚴重例外。 |
cobalt/async_for.hpp
對於 generators 之類的類型,提供了 BOOST_COBALT_FOR
巨集,以模擬 for co_await
迴圈。
cobalt::generator<int> gen();
cobalt::main co_main(int argc, char * argv[])
{
BOOST_COBALT_FOR(auto i, gen())
printf("Generated value %d\n", i);
co_return 0;
}
cobalt/error.hpp
為了更容易管理錯誤,cobalt 提供了一個 error_category
與 boost::system::error_code
一起使用。
enum class error
{
moved_from,
detached,
completed_unexpected,
wait_not_ready,
already_awaited,
allocation_failed
};
system::error_category & cobalt_category();
system::error_code make_error_code(error e);
cobalt/config.hpp
config adder 允許設定 boost.cobalt 的一些實作細節。
executor_type
executor 類型預設為 boost::asio::any_io_executor
。
您可以通過定義 BOOST_COBALT_CUSTOM_EXECUTOR
並自行添加 boost::cobalt::executor
類型,將其設置為 boost::asio::any_io_executor
。
或者,可以定義 BOOST_COBALT_USE_IO_CONTEXT
將 executor 設置為 boost::asio::io_context::executor_type
。
pmr
Boost.cobalt 可以與不同的 pmr 實作一起使用,預設為 std::pmr
。
可以使用以下巨集進行配置
-
BOOST_COBALT_USE_STD_PMR
-
BOOST_COBALT_USE_BOOST_CONTAINER_PMR
-
BOOST_COBALT_USE_CUSTOM_PMR
如果您定義 BOOST_COBALT_USE_CUSTOM_PMR
,您將需要提供一個 boost::cobalt::pmr
命名空間,它是 std::pmr
的直接替代品。
或者,可以使用以下方法停用 pmr
的使用
-
BOOST_COBALT_NO_PMR
.
use_op
使用一個針對小緩衝區最佳化的資源,其大小可以通過定義 BOOST_COBALT_SBO_BUFFER_SIZE
來設定,預設值為 4096 位元組。
cobalt/leaf.hpp
template<awaitable TryAwaitable, typename ... H >
auto try_catch(TryAwaitable && try_coro, H && ... h );
template<awaitable TryAwaitable, typename ... H >
auto try_handle_all(TryAwaitable && try_coro, H && ... h );
template<awaitable TryAwaitable, typename ... H >
auto try_handle_some(TryAwaitable && try_coro, H && ... h );
詳情請參閱 leaf 文件。
cobalt/experimental/context.hpp
這(很可能)是未定義的行為,因為它違反了標準中的前置條件。可以在這裡找到一篇解決此問題的文章(https://isocpp.org/files/papers/P3203R0.html)。 |
此標頭提供 experimental
支援,可以使用基於 boost.fiber
的堆疊式協程,就如同它們是 C20 協程一樣。也就是說,它們可以通過被放入 coroutine_handle
中來使用 `可等待物件`。同樣地,實作使用 C20 協程 promise 並像執行 C++20 協程一樣執行它。
//
void delay(experimental::context<promise<void>> h, std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
h.await(tim.async_wait(cobalt::use_op)); // instead of co_await.
}
cobalt::main co_main(int argc, char *argv[])
{
cobalt::promise<void> dl = cobalt::experimental::make_context(&delay, 50);
co_await dl;
co_return 0;
}
參考
// The internal coroutine context.
/// Args are the function arguments after the handle.
template<typename Return, typename ... Args>
struct context
{
// Get a handle to the promise
promise_type & promise();
const promise_type & promise() const;
// Convert it to any context if the underlying promise is the same
template<typename Return_, typename ... Args_>
constexpr operator context<Return_, Args_...>() const;
// Await something. Uses await_transform automatically.
template<typename Awaitable>
auto await(Awaitable && aw);
// Yield a value, if supported by the promise.
template<typename Yield>
auto yield(Yield && value);
};
// Create a fiber with a custom stack allocator (see boost.fiber for details) and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc && salloc, Args && ... args);
// Create a fiber with the default allocator and explicit result (e.g. `promise<void>`)
template<typename Return, typename ... Args, std::invocable<context<Return, Args...>, Args...> Func>
auto make_context(Func && func, Args && ... args);
// Create a fiber with a custom stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func, typename StackAlloc>
auto make_context(Func && func, std::allocator_arg_t, StackAlloc && salloc, Args && ... args);
// Create a fiber with the default stack allocator and implicit result (deduced from the first argument to func).
template<typename ... Args, typename Func>
auto make_context(Func && func, Args && ... args);
深入探討
自訂執行器
cobalt 預設使用 asio::any_io_executor
的原因之一是它是一個類型擦除執行器,也就是說,您可以提供自己的事件迴圈,而無需重新編譯 cobalt
。
然而,在 Executor TS 的開發過程中,執行器的概念變得有點不直觀,委婉地說。
Ruben Perez 寫了一篇很棒的部落格文章,我將厚顏無恥地參考它。
定義
執行器是一種指向實際事件迴圈的類型,它可以(廉價地)複製,支援屬性(見下文),可以進行相等性比較,並且具有 execute
函數。
execute
struct example_executor
{
template<typename Fn>
void execute(Fn && fn) const;
};
上述函數根據其屬性執行 fn
。
屬性
屬性可以被查詢、偏好或要求,例如:
struct example_executor
{
// get a property by querying it.
asio::execution::relationship_t &query(asio::execution::relationship_t) const
{
return asio::execution::relationship.fork;
}
// require an executor with a new property
never_blocking_executor require(const execution::blocking_t::never_t);
// prefer an executor with a new property. the executor may or may not support it.
never_blocking_executor prefer(const execution::blocking_t::never_t);
// not supported
example_executor prefer(const execution::blocking_t::always_t);
};
asio::any_io_executor
的屬性
為了將執行器包裝在 asio::any_io_executor
中,需要兩個屬性
-
`execution::context_t
-
execution::blocking_t::never_t
這表示我們需要使它們成為可要求的(這對於 context 沒有意義)或從 query
返回預期值。
execution::context_t
查詢應該像這樣返回 asio::execution_context&
struct example_executor
{
asio::execution_context &query(asio::execution::context_t) const;
};
執行上下文用於管理管理 io 物件生命週期的服務的生命週期,例如 asio 的計時器和 sockets。也就是說,通過提供此上下文,所有 asio 的 io 都可以使用它。
在執行器被銷毀後,execution_context 必須保持活動狀態。 |
以下可能是偏好的
-
execution::blocking_t::possibly_t
-
execution::outstanding_work_t::tracked_t
-
execution::outstanding_work_t::untracked_t
-
execution::relationship_t::fork_t
-
execution::relationship_t::continuation_
這表示您可能希望在您的執行器中支援它們以進行最佳化。
blocking
屬性
正如我們之前看到的,此屬性控制傳遞給 execute()
的函數是否可以立即作為 execute()
的一部分運行,或者必須排隊等待稍後執行。可能的值為
-
asio::execution::blocking.never
:永遠不要將函數作為execute()
的一部分運行。這就是asio::post()
的作用。 -
asio::execution::blocking.possibly
:函數可能會也可能不會作為execute()
的一部分運行。這是預設值(呼叫io_context::get_executor
時獲得的值)。 -
asio::execution::blocking.always
:函數始終作為execute()
的一部分運行。io_context::executor
不支援此功能。
relationship
屬性
relationship
可以採用兩個值
-
asio::execution::relationship.continuation
:表示傳遞給execute()
的函數是呼叫execute()
的函數的延續。 -
asio::execution::relationship.fork
:與上述相反。這是預設值(呼叫io_context::get_executor()
時獲得的值)。
將此屬性設定為 continuation
可以啟用一些函數排程的優化。它僅在函數排入佇列時(而不是立即執行)才有效。對於 io_context
,設定後,函數會排程在更快的執行緒區域佇列中執行,而不是上下文全域佇列。
outstanding_work_t
屬性
outstanding_work
可以採用兩個值
-
asio::execution::outstanding_work.tracked
:表示當執行器處於活動狀態時,仍有工作要做。 -
asio::execution::outstanding_work.untracked
:與上述相反。這是預設值(呼叫io_context::get_executor()
時獲得的值)。
將此屬性設定為 tracked
表示只要執行器處於活動狀態,事件迴圈就不會返回。
一個最小的執行器
有了這些,讓我們看一下最小執行器的介面。
struct minimal_executor
{
minimal_executor() noexcept;
asio::execution_context &query(asio::execution::context_t) const;
static constexpr asio::execution::blocking_t
query(asio::execution::blocking_t) noexcept
{
return asio::execution::blocking.never;
}
template<class F>
void execute(F && f) const;
bool operator==(minimal_executor const &other) const noexcept;
bool operator!=(minimal_executor const &other) const noexcept;
};
請參閱 example/python.cpp 以了解使用 Python 的 asyncio 事件迴圈的實作。 |
新增一個工作守衛。
現在,讓我們為 outstanding_work
屬性添加一個 require
函數,該函數使用多種類型。
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const {return *this; }
};
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const {return *this;}
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const;
};
請注意,不必從 require
函數返回不同的類型,也可以像這樣完成
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor require(asio::execution::outstanding_work::untracked_t) const;
};
如果我們想使用 prefer
,它看起來如下所示
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor prefer(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor prefer(asio::execution::outstanding_work::untracked_t) const;
};
總結
如您所見,屬性系統並非簡單易懂,但功能強大。實作自訂執行器本身就是一個問題類別,這就是本文檔不這樣做的原因。相反,有一個關於如何將 Python 事件迴圈包裝在執行器中的範例。
以下是一些閱讀建議。
無堆疊
C++20 協程是無堆疊的,這意味著它們沒有自己的堆疊。
C++ 中的堆疊描述了呼叫堆疊,即所有堆疊的函數框架。函數框架是函數運作所需的記憶體,即用於儲存其變數和資訊(例如返回位址)的記憶體片段。
函數框架的大小在編譯時已知,但在包含其定義的編譯單元之外則未知。 |
int bar() {return 0;} // the deepest point of the stack
int foo() {return bar();}
int main()
{
return foo();
}
上例中的呼叫堆疊為
main()
foo()
bar()
data:image/s3,"s3://crabby-images/0e831/0e831623a47f49c8d1a78d0daf3f822b6151eaa2" alt="stackless1"
協程可以實作為有堆疊的,這意味著它會分配一個固定的記憶體區塊,並將函數框架堆疊起來,類似於執行緒。C++20 協程是無堆疊的,即它們只分配自己的框架,並在恢復時使用呼叫者的堆疊。使用我們之前的範例
fictional_eager_coro_type<int> example()
{
co_yield 0;
co_yield 1;
}
void nested_resume(fictional_eager_coro_type<int>& f)
{
f.resume();
}
int main()
{
auto f = example();
nested_resume(f);
f.reenter();
return 0;
}
這將產生類似於此的呼叫堆疊
main()
f$example()
nested_resume()
f$example()
f$example()
data:image/s3,"s3://crabby-images/c66ec/c66ec5a8d7f64707e57b3ff81b9cc89531febab5" alt="stackless2"
如果協程在執行緒之間移動,則同樣適用。
惰性與及早求值
如果協程僅在其恢復後才開始執行其程式碼,則它們是惰性的,而積極的協程將立即執行,直到其第一個暫停點(即 co_await
、co_yield
或 co_return
表達式)。
lazy_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
lazy.resume();
printf("resumed twice\n");
return 0;
}
這將產生如下輸出
enter main
constructed coro
Entered coro
resumed once
Coro Done
resumed twice
data:image/s3,"s3://crabby-images/d2046/d20462f78db5dc4bc8dea7423d844ddc0b1f6cb9" alt="lazy eager1"
而積極的協程看起來像這樣
eager_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
return 0;
}
這將產生如下輸出
enter main
Entered coro
constructed coro
resume once
Coro Done
data:image/s3,"s3://crabby-images/e1aae/e1aae5c30f57aed72fecb58d6ea5d8ebaa27616e" alt="lazy eager2"
基準測試
在第 11 代 Intel® Core™ i7-1185G7 @ 3.00GHz 上執行
發佈到執行器
基準測試正在執行以下程式碼,使用 cobalt 的任務、asio::awaitable
和基於 `asio' 的有堆疊協程 (boost.context)。
cobalt::task<void> atest()
{
for (std::size_t i = 0u; i < n; i++)
co_await asio::post(cobalt::use_op);
}
gcc 12 | clang 16 | |
---|---|---|
cobalt |
2472 |
2098 |
可等待 |
2432 |
2253 |
stackful (有堆疊) |
3655 |
3725 |
平行執行 noop 協程
此基準測試使用大小為零的 asio::experimental::channel
,以便並行讀寫。它使用 cobalt 的 gather 和 asio::awaitable
中的 awaitable_operator
。
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
for (std::size_t i = 0u; i < n; i++)
co_await cobalt::gather(
chan.async_send(system::error_code{}, cobalt::use_task),
chan.async_receive(cobalt::use_task));
}
asio::awaitable<void> awtest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
using boost::asio::experimental::awaitable_operators::operator&&;
for (std::size_t i = 0u; i < n; i++)
co_await (
chan.async_send(system::error_code{}, asio::use_awaitable)
&&
chan.async_receive(asio::use_awaitable));
}
gcc 12 | clang 16 | |
---|---|---|
cobalt |
1563 |
1468 |
可等待 |
2800 |
2805 |
立即
此基準測試利用立即完成,使用大小為 1 的通道,以便每個操作都是立即的。
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 1u};
for (std::size_t i = 0u; i < n; i++)
{
co_await chan.async_send(system::error_code{}, cobalt::use_op);
co_await chan.async_receive(cobalt::use_op);
}
}
gcc 12 | clang 16 | |
---|---|---|
cobalt |
1810 |
1864 |
可等待 |
3109 |
4110 |
stackful (有堆疊) |
3922 |
4705 |
通道
在此基準測試中,比較了 asio::experimental::channel
和 cobalt::channel
。
這與並行測試類似,但使用的是 cobalt::channel
。
gcc | clang | |
---|---|---|
cobalt |
500 |
350 |
可等待 |
790 |
770 |
stackful (有堆疊) |
867 |
907 |
操作配置
此基準測試比較了非同步操作關聯配置器的不同解決方案。
gcc | clang | |
---|---|---|
std::allocator |
1136 |
1139 |
cobalt::monotonic |
1149 |
1270 |
pmr::monotonic |
1164 |
1173 |
cobalt::sbo |
1021 |
1060 後者方法為 cobalt 內部使用。 |
需求
函式庫
Boost.cobalt 需要 C++20 編譯器,並直接依賴於以下 boost 函式庫:
-
boost.asio
-
boost.system
-
boost.circular_buffer
-
boost.intrusive
-
boost.smart_ptr
-
boost.container(適用於 clang < 16)
編譯器
此函式庫自 Clang 14、Gcc 10 和 MSVC 19.30(Visual Studio 2022)起開始支援。
Gcc 版本 12.1 和 12.2 似乎在使用無堆疊變數的協程方面存在錯誤,如[此處](https://godbolt.org/z/6adGcqP1z)所示,應避免在協程中使用這些版本。 |
Clang 在 16 版才加入 std::pmr
支援,因此較舊的 Clang 版本使用 boost::container::pmr
作為替代方案。
部分(如果不是全部)MSVC 版本的協程實作都有問題,這個函式庫需要解決這些問題。這可能會導致非確定性行為和額外負擔。 |
協程的繼續執行可以在從 final_suspend
返回的 awaitable 中完成,如下所示:
// in promise
auto final_suspend() noexcept
{
struct final_awaitable
{
std::coroutine_handle<void> continuation{std::noop_coroutine()}; (1)
bool await_ready() const noexcept;
std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> h) noexcept
{
auto cc = continuation;
h.destroy(); (2)
return cc;
}
void await_resume() noexcept {}
};
return final_awaitable{my_continuation};
};
1 | 繼續執行 |
2 | 在繼續執行之前自行銷毀協程 |
在 MSVC 上,final_suspend 並未正確暫停協程,因此 h.destroy()
將導致協程框架上的元素被重複銷毀。因此,MSVC 需要延後銷毀操作,使其在行外執行。這將導致額外負擔,並使實際的記憶體釋放變得不確定。
致謝
如果没有 CppAlliance 及其創辦人 Vinnie Falco,這個函式庫就不可能存在。Vinnie 非常信任我,讓我參與這個專案,儘管他本人對此類函式庫的設計方式抱持截然不同的看法。
也要感謝 Ruben Perez 和 Richard Hodges 傾聽我的設計問題,並提供建議和使用案例。此外,如果没有 Chris Kohlhoff 出色的 boost.asio,這個函式庫也不可能存在。