C++ — implement channel from scratch Link to heading

A channel is a high-level communication abstraction that allows threads to send and receive data to each other. Channels are a fundamental part of the synchronization models in both Rust and Go.

In Rust, the standard library supports multi-producer single-consumer channel, i.e., std::sync::mpsc::channel. In Go, channels are a core feature of the language. Channels are used to communicate between goroutines, which are lightweight threads that are managed by the Go runtime.

Channels offer a number of benefits for concurrent programming, including:

  • Safety: Channels can help to prevent data races and other concurrency errors.
  • Efficiency: Channels can be used to implement efficient concurrent algorithms.
  • Modularity: Channels can be used to decouple different components of a concurrent program.

Unfortunately, C++ standard library does not support a channel. Today, let’s implement a minimal single-producer single-consumer channel in C++17. By doing so, we can learn a lot on how a channel is implemented from the basic low-level synchronization building blocks, such as mutex and condition_variable.

API Definition Link to heading

First, let’s define our API. We will make it very similar to how Rust does it.

// channel.h
#pragma once

#include <optional>

template<typename T>
class Channel {
  public:
    // the producer sends data via this call
    // returns whether operation is successful
    bool send(T item);

    // the consumer receives data via this call
    // blocks until connection is closed or an item is ready
    std::optional<T> recv();

    // either party can close the connection
    // returns whether operation is successful
    bool close();
}

// returns a pair of channel ends for producer and consumer, respectively
template<typename T>
std::pair<Channel<T>, Channel<T>> make_channels();

Before we implement, it is probably a good idea to first write a program that uses our library

// main.cc
#include "channel.h"

#include <thread>
#include <iostream>
#include <chrono>

using namespace std::chrono_literals;

int main() {
    auto [tx, rx] = make_channel<std::string>();
    std::thread t{[](Channel<std::string> tx) {
        std::this_thread::sleep_for(100ms);
        for (int i = 0; i < 100; ++i) {
            tx.send(std::to_string(i));
        }
    }, std::move(tx)};

    for (;;) {
        auto result = rx.recv();
        if (!result) break;
        std::cout << *result << "\n";
    }

    t.join();
    return 0;
}

In this code, we create a simple producer-consumer channel. The producer thread will send 100 strings to the channel, and the consumer thread will receive them and print them to the console. The channel abstraction makes the code very easy to read and write, without lower level synchronization constructs.

Now is our time to implement the channel using the lower level synchronization constructs. The idea is simple. A pair of channel will share a data queue. Both send() and recv() methods need to obtain synchronization lock and perform the operation. In particular, we need std::conditional_variable to notify the other end when a change occurs. Since all of these have to be shared between the two, let’s create a shared state

// channel.h
struct State {
    std::queue<T> queue;
    std::mutex lock;
    std::condition_variable cv;
    bool close = false;
};

We have added closed variable as well to indicate whether the channel is closed. Since we need to share this State, let’s add std::shared_ptr<State> variable in Channel. In addition, we need to distinguish each channel whether it is a sender or not, so let’s add bool is_sender variable.

// channel.h
template<typename T>
class Channel {
  public:
    ....
  private:
    std::shared_ptr<State> state;
    bool is_sender;
};

Since we don’t want anyone to create a new Channel instance without invoking make_channel() function, we will declare its constructor as a private method. In addition, make_channels() function shall be able to call our private constructor, so we will also need to declare it as a friend function.

// channel.h
template<typename T>
class Channel {
  public:
    ....
  private:
    ....

    explicit Channel(std::shared_ptr<State> state, bool is_sender)
            : state{std::move(state)}, is_sender{is_sender} {}

    friend std::pair<Channel<T>, Channel<T>> make_channel<T>();
};

With this, we can implement make_channel() function

// channel.h
template<typename T>
std::pair<Channel<T>, Channel<T>> make_channel() {
    auto ptr = std::make_shared<State>();
    Channel<T> sender{ptr, true};
    Channel<T> receiver{std::move(ptr), false};
    return {std::move(sender), std::move(receiver)};
}

Before we implement the main methods, we first must disable copy constructor and enable move constructor, since this is supposed to be a single-producer single-consumer channel, which means Channel instances can be moved but not copied. We enable move constructor, since transfer of ownership is allowed. Likewise, we do the same for copy and move assignment.

// channel.h
template<typename T>
class Channel {
public:
    Channel(Channel const&) = delete;
    Channel(Channel&&) = default;
    Channel& operator=(Channel&) = delete;
    Channel& operator=(Channel&&) = default;
    ...
}

Now, we are ready to implement the main methods. Let’s start with send()

// channel.h
template<typename T>
class Channel {
public:
    bool send(T item) {
        do {
            if (!is_sender || !state) break;
            std::lock_guard lock{state->lock};
            if (state->closed) break;
            state->queue.push(std::move(item));
            state->cv.notify_one();
            return true;
        } while (false);
        return false;
    }

    ....
};

This should be self-explanatory. We make sure this is the sending end of the channel. We then obtain the mutex lock to prevent data race, and makes sure the channel is still open. Then, we insert the item to the FIFO queue and notifies the other end via std::condition_variable. The use of do { ... } while (false) is to easily break out to failure path.

Similarly, the recv() method can be implemented as below

// channel.h
template<typename T>
class Channel {
public:
    std::optional<T> recv() override {
        do {
            if (is_sender || !state) break;
            std::unique_lock lock{state->lock};
            state->cv.wait(lock, [this]{
                return !state->queue.empty() || state->closed;
            });
            if (state->queue.empty()) break;
            auto x = std::move(state->queue.front());
            state->queue.pop();
            return x;
        } while (false);
        return std::nullopt;
    }

    ...
};

We make sure this is the receiving end of the channel, obtains the lock, and blocks the thread until either of the following to be true

  • there is an item in the queue
  • the channel is closed

If there is an item, we pop it and return it. Otherwise, we return std::nullopt.

Finally, the close() method, which should be self-explanatory.

// channel.h
template<typename T>
class Channel {
public:
    bool close() {
        do {
            if (!state) break;
            std::lock_guard lock{state->lock};
            if (state->closed) break;
            state->closed = true;
            state->cv.notify_one();
            return true;
        } while (false);
        return false;
    }

    ...
};

Before we call it done, there is actually one more thing we need to do. That is to close the connection when a channel goes out of scope. For that, we need to add a destructor that simply calls close().

// channel.h
template<typename T>
class Channel {
public:
    ~Channel() {
        close();
    }

    ...
};

This makes sense for single-producer single-consumer scenario, since if any of them goes out of the scope, the channel must be closed.

You can check out this repo for full code.