Converting repeated callbacks into C++20 coroutines

=> home

C++20 introduced coroutines. Which drastically simplifies the implementation of asynchronous code. However, converting existing callback-based code to coroutines is not always easy. The simple case, where the callback is guarenteed to be called only once is covered by the fillowing question on StackOverflow.

=> StackOverflow: Turning a function call which takes a callback into a coroutine.

I've cleaned up the code and modernized it a bit. To make it easier to read.

auto api_call_async(const std::string& some_parameter)
{
    struct awaiter : public std::suspend_always
    {
        awaiter(const std::string ¶meter)
            :parameter_(parmeter) {}

        bool await_ready() { return true; }

        void await_suspend(std::coroutine_handle<> handle)
        { 
            // use your third party lib call directly here.
            api_call(parameter_, [handle](std::error_code ec) 
            { 
                // call the handle to resume the coroutine
                handle(); 
            }
        }
    };
    return awaiter(some_parameter);
}

Won't get into the details. C++ coroutines are complicated AF. In short. It does the following:

Let's try a pratical example. In GNUnet++, a library I wrote. The DHT::put(key, callback) function is used to store a value in the DHT. The callback is called when the operation is complete. To wrap it into a coroutine, we can do the following:

struct PutAwaiter
{
    // store the parameters into member variables
    PutAwaiter(DHT* dht, const std::string& key, const std::string& value)
        :key_(key), value_(value), dht_(dht) {}

    bool await_ready() { return true; }

    void await_suspend(std::coroutine_handle<> handle)
    {
        // use the stored parameters to call the function
        // and awake the coroutine when the callback is called.
        dht_->put(key_, value_, [handle](bool success) {
            success_ = success;
            handle.resume();
        });
    }

    void await_resume() {
        if(!success_)
            throw std::runtime_error("put failed");
    }

    bool success_;

    std::string key_;
    std::string value_;
    DHT* dht_;
};

// Now we can use it like this:
// PutAwaiter putCoro(DHT* dht, const std::string& key, const std::string& value)
// {
//     return PutAwaiter(dht, key, value);
// }

// or to make the signature a bit nicer:
cppcoro::task putCoro(DHT* dht, const std::string& key, const std::string& value)
{
    co_return co_await PutAwaiter(dht, key, value)
}

Dealing callbacks being called multiple times

That's nice and all. But what whould we do when the callback can be called multiple times? For example, when we want to read from the DHR where there could be multiple values for a given key. Or when iterating over all replies from a TCP stream. I took me a lot of head scratching to figure out how. Turns out the key is you can resume a coroutine multiple times as long as you make damn sure you don't try to resume while the coroutien is running or is dead.

Here's my solution. The QueuedAwaiter. It'll keep track of all values generated by the callback and dispense them one by one to the awaiter. The main API is addValue and addException. Which queues the value or exception.

template 
struct QueuedAwaiter
{
    using ElementType = std::variant;
    std::queue queue_;
    std::coroutine_handle<> handle_;
    // A mutex is needed to protect the queue as it's possible for
    // multi thread system to run the coroutine and the callback
    // at the same time.
    mutable std::mutex mtx_;

    void addValue(T&& value)
    {
        std::coroutine_handle<> handle = nullptr;
        {
            std::lock_guard lock(mtx_);
            queue_.emplace(std::move(value));
            handle = handle_;
            handle_ = nullptr;
        }
        if(handle)
            handle.resume();
    }

    void addException(std::exception_ptr&& exception)
    {
        std::coroutine_handle<> handle = nullptr;
        {
            std::lock_guard lock(mtx_);
            queue_.emplace(std::move(exception));
            handle = handle_;
            handle_ = nullptr;
        }
        if(handle)
            handle.resume();
    }

    bool await_ready() const noexcept
    {
        std::lock_guard lock(mtx_);
        return !queue_.empty();
    }

    T await_resume() noexcept(false)
    {
        auto try_front = [this]() ->std::optional {
            std::lock_guard lock(mtx_);
            if(queue_.empty())
                return std::nullopt;
            return std::move(queue_.front());
        };
        auto front = try_front();
        assert(front.has_value());


        if(front->index() == 1)
            std::rethrow_exception(std::get<1>(*front));
        else {
            auto item = std::move(std::get<0>(*front));
            queue_.pop();
            return item;
        }
    }

    void await_suspend(std::coroutine_handle<> handle) noexcept
    {
        bool has_data = false;
        {
            std::lock_guard lock(mtx_);
            has_data = queue_.size() != 0;
        }
        if (has_data)
            handle.resume();
        else {
            std::lock_guard lock(mtx_);
            handle_ = handle;
        }
    }
};

To convert callbacks getting called over and over, like the DHT::get function:

DHT::get(key, [](const std::optional& value) {
    // value == std::nullopt if the DHT search is complete.
    // otherwise, value contains the data found

    if (!value.has_value()) 
        std::cout << "Search complete" << std::endl;
    else
        // do something with the value
});

We do the following horrible and unreadable thing:

cppcoro::async_generator(std::string) getCoro(DHT* dht, const std::string& key)
{
    QueuedAwaiter> awaiter;
    dht->get(key, [&awaiter](const std::optional& value) {
        awaiter.addValue(value);
    });

    // Try to await for the values and exit the loop when we get the notification
    // that the search is complete.
    while(true) {
        // wait for the next value
        auto res = co_await awaiter;

        // Note! You must provide a way to notify the generator to exit the loop
        // or it'll hang there and cause all sorts of problems.
        if (!res.has_value())
            break;
        // Nice! We got a value. Yield it to the caller.
        co_yield *res;
    }
}

// using it
auto lookup = getCoro(dht, "key");
for(auto it = co_await lookup.begin(); it != lookup.end(); co_await ++it) {
    std::cout << "Found on DHT: " << *it << std::endl;
}

// Unfortunately, the C++ standard committee decided to remove the for co_await syntax
// for co_await(auto val : lookup) {
//     std::cout << "Found on DHT: " << val << std::endl;
// }

=> ======

It's much harder than I thought.

Proxy Information
Original URL
gemini://clehaxze.tw/gemlog/2023/05-09-converting-repeat-callbacks-into-coroutines-cpp.gmi
Status Code
Success (20)
Meta
text/gemini
Capsule Response Time
1386.066742 milliseconds
Gemini-to-HTML Time
0.654399 milliseconds

This content has been proxied by September (ba2dc).