Home Blog/Rants Downloads C++ Papers GitHub LinkedIn
Michael-Maniscalco.com
C++
Work Contracts - A simple and efficient system for handling async tasks in C++
October 17, 2019

Introduction:
TODO:

Core concepts:
  • Thread Pool
  • Work Contract Group
  • Work Contract
  • System is thread pool agnostic. In the following example the thread pool is

A really simple thread pool:

Work Contract Groups and Work Contracts are intentional thread pool agnostic. Here, for the purposes of clarity a thread pool is defined which is intentionally written to be as minimalistic as is possible so that it does not distract from the main concepts of 'work contracts' and 'work contract groups'. This simple thread pool works by constructing a predetermined number of worker threads and registering a single lambda function which each of these threads will invoke repeatedly until the thread is terminated. All custom work is managed within that single lambda function.

Once the concept of work contracts is understood it should be apparent that this simple and minimilistic thread pool design is actually sufficient for professional grade code (with a few more bells and whistles perhaps but these are left out in this instance for the sake of clarity).

Here is the thread pool class:

thread_pool.h

#pragma once

#include <thread>
#include <vector>
#include <cstdint>
#include <functional>
#include <memory>


namespace maniscalco
{
    
    class thread_pool
    {
    public:
    
        using thread_pool_ready_handler = std::function<void()>;
        using worker_thread_function = std::function<void()>;
        
        struct configuration_type
        {
            std::size_t                 threadCount_;
            worker_thread_function      workerThreadFunction_;
        };

        thread_pool
        (
            configuration_type const &
        );
        
        ~thread_pool();

        void stop();

    private:
    
        std::vector<std::thread>    threads_;

        bool volatile               terminateFlag_;
    };
    
} // namespace maniscalco

thread_pool.cpp

#include "./thread_pool.h"


maniscalco::system::thread_pool::thread_pool
(
    configuration_type const & configuration
):
    threads_(),
    terminateFlag_(false)
{
    threads_.resize(configuration.threadCount_);
    for (auto & thread : threads_)
    {
        thread = std::thread([threadFunction = configuration.workerThreadFunction_]
                (
                    bool volatile const & terminateFlag
                )
                {
                    while (!terminateFlag)
                        threadFunction();
                },
                std::cref(terminateFlag_));
    }
}
    

maniscalco::system::thread_pool::~thread_pool
(
)
{
    terminateFlag_ = true;
    for (auto & thread : threads_)
        thread.join();
}


void maniscalco::system::thread_pool::stop
(
    // issue terminate to all worker threads
)
{
    terminateFlag_ = true;
}

main.cpp

#include <iostream>
#include "thread_pool.h"


int main
(
    int, 
    char const **
)
{
    static auto constexpr numWorkerThreads = 4;
    maniscalco::system::thread_pool::configuration_type threadPoolConfiguration;
    threadPoolConfiguration.threadCount_ = num_worker_threads;
    threadPoolConfiguration.workerThreadFunction_ = []
	(
            // all worker threads repeatedly invoke this function.
    	    // for demonstration purposes it dumps astrisks to console to demonstate activity.
	)
	{
    	    std::cout << "*" << std::endl;
	};

    maniscalco::system::thread_pool workerThreadPool(threadPoolConfiguration);
    return 0;
}

Work Contracts - how work gets done:

In this system all work is done by taking out a 'contract' for work. A work contract is a simple class defining a function to invoke when the work is to be done and a function to invoke when the contract expires. A work contract can be exercised as many times as desired. They are non copyable and are guaranteed to be processed by only one worker thread at a time (this guarantee is made by the parent work_contract_group to which the specific work_contract belongs - more on this later).

Here is the work_contract class:

work_contract.h

#pragma once

#include <cstdint>
#include <functional>


namespace maniscalco::system
{

    class work_contract final
    {
    public:
    
        using contract_handler = std::function<void()>;
        using end_contract_handler = std::function<void()>;
        
        struct configuration_type
        {
            contract_handler        contractHandler_;
            end_contract_handler    endContractHandler_;
        };
        
        work_contract
        (
            work_contract &&
        );
      
        work_contract & operator = 
        (
            work_contract &&
        );
                  
        ~work_contract();
        
        void exercise();
        
    protected:
    
    private:
    
        friend class work_contract_group;
    
        work_contract
        (
            configuration_type
        );
    
        work_contract(work_contract const &) = delete;
        work_contract & operator = (work_contract const &) = delete;
    
        end_contract_handler    endContractHandler_;
        
        contract_handler        contractHandler_;
    };
    
} // namespace maniscalco::system
work_contract.cpp

#include "./work_contract.h"


maniscalco::system::work_contract::work_contract
(
    configuration_type configuration
):
    endContractHandler_(configuration.endContractHandler_),
    contractHandler_(configuration.contractHandler_)
{
}


maniscalco::system::work_contract::work_contract
(
    work_contract && other
):
    endContractHandler_(std::move(other.endContractHandler_)),
    contractHandler_(std::move(other.contractHandler_))
{
    other.endContractHandler_ = nullptr;
    other.contractHandler_ = nullptr;
}


auto maniscalco::system::work_contract::operator = 
(
    work_contract && other
) -> work_contract &
{
    if (endContractHandler_)
        endContractHandler_();
    endContractHandler_ = std::move(other.endContractHandler_);
    other.endContractHandler_ = nullptr;
    contractHandler_ = std::move(other.contractHandler_);
    other.contractHandler_ = nullptr;
    return *this;
}
        

maniscalco::system::work_contract::~work_contract
(
)
{
    if (endContractHandler_)
        endContractHandler_();
}


void maniscalco::system::work_contract::exercise
(
)
{
    if (contractHandler_)
        contractHandler_();
}

Work Contract Groups - Managing work contracts:

Work contracts are intended to be managed by a parent Work Contract Group. They can not be constructed independently and must, instead, be created by a Work Contract Group. A work contract group can manage a finite number of work contracts and is designed to be lock free and to ensure that any work contract which it manages will be exercised by only one thread at a time. This is a powerful guarantee which, with clever design, can be used to significantly simplify code while simultaneously improve performance - more on this later.

A work_contract_group constructor requires a single function which is invoked whenever one of its work_contracts has been exercised. This function hook is required to completely decouple the work_contract_group from the specific implementation of the worker thread pool. This decoupling is important both for architectural reasons but also it allows this system to be used either in low latency systems where 'core burning' is desirable (threads spin rather than yielding while there is no work to be done) or in more traditional systems where threads are expected to sleep while there is no work to be done. More on this later.

NOTE: The work_contract_group represents the only significant complexity within this system. This complexity is required to achieve lockless behavior and will be described in greater detail further down.

Here is the work_contract_group class:

work_contract_group.h

#pragma once

#include <array>
#include <atomic>
#include <chrono>
#include <optional>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <memory>
#include <atomic>
#include <utility>

#include "./work_contract.h"


namespace maniscalco::system 
{

    class work_contract_group final
        : public std::enable_shared_from_this<work_contract_group> 
    {
    public:
        
        using contract_service_handler = std::function<void()>;
    
        using contract_handler = std::function<void()>;
        using end_contract_handler = std::function<void()>;
        
        struct contract_configuration_type
        {
            contract_handler        contractHandler_;
            end_contract_handler    endContractHandler_;
        };
        
        static std::shared_ptr<work_contract_group> create
        (
            contract_service_handler
        );

        ~work_contract_group();

        std::optional<work_contract> create_contract
        (
            contract_configuration_type
        );

        void service_contracts();

        bool get_service_requested() const;
        
    protected:

    private:

        static std::uint64_t constexpr capacity = 256;
        using contract_state_flags = std::array<std::atomic<std::uint64_t>, 2 * ((capacity + 63) / 64)>;

        work_contract_group
        (
            contract_service_handler
        );

        void on_end_contract
        (
            std::uint64_t
        );

        struct contract_info
        {
            enum class contract_status : std::uint32_t
            {
                none = 0,
                subscribed = 1,
                unsubscribed = 2
            };
            contract_info():contractStatus_(contract_status::none), contractHandler_(), errorHandler_(), endContractHandler_(){}
            std::atomic<contract_status>                contractStatus_;
            work_contract::contract_handler      contractHandler_;
            std::function<void()>        errorHandler_;
            work_contract::end_contract_handler  endContractHandler_;
        };

        struct shared_state
        {
            std::atomic<std::int64_t>               serviceRequested_;
            contract_state_flags                    contractStateFlags_;
            contract_service_handler                contractRequiresServiceHandler_;
            std::array<contract_info, capacity>     contracts_;
        };

        std::shared_ptr<shared_state>           sharedState_;

    }; // class work_contract_group

} // namespace maniscalco::system
work_contract_group.cpp

#include "./work_contract_group.h"
#include <include/invoke_from_weak_ptr.h>


auto maniscalco::system::work_contract_group::create
(
    contract_service_handler contractRequiresServiceHandler
) -> std::shared_ptr<work_contract_group> 
{
    return std::shared_ptr<work_contract_group>(new work_contract_group(contractRequiresServiceHandler));
}


maniscalco::system::work_contract_group::work_contract_group
(
    contract_service_handler contractRequiresServiceHandler
):
    sharedState_(std::make_shared<shared_state>())
{
    sharedState_->contractRequiresServiceHandler_ = [sharedState = sharedState_, contractRequiresServiceHandler]
            (
            )
            {
                sharedState->serviceRequested_++;
                contractRequiresServiceHandler();
            };
}


maniscalco::system::work_contract_group::~work_contract_group
(
) 
{
}


bool maniscalco::system::work_contract_group::get_service_requested
(
) const
{
    return (sharedState_->serviceRequested_ > 0);
}


void maniscalco::system::work_contract_group::service_contracts
(
) 
{
    using contract_status = contract_info::contract_status; 
        
    std::uint64_t contractIndex = 0;
    for (auto & flags : sharedState_->contractStateFlags_) 
    {
        if (flags != 0)
        {
            std::uint64_t need_service_bit = 0x01;
            std::uint64_t is_being_serviced_bit = 0x02;
            for (auto k = 0; ((need_service_bit <= flags) && (k < 32)); ++k, ++contractIndex) 
            {
                auto current = (flags.load() & ~(need_service_bit | is_being_serviced_bit));
                auto expected = (current | need_service_bit);
                auto desired = (current | is_being_serviced_bit);
                if (flags.compare_exchange_strong(expected, desired)) 
                {
                    --sharedState_->serviceRequested_;
                    auto & contract = sharedState_->contracts_[contractIndex];
                    if (contract.contractStatus_ == contract_status::unsubscribed)
                    {
                        // contract has ended
                        if (contract.endContractHandler_)
                        {
                            try
                            {
                                contract.endContractHandler_();
                            }
                            catch(...)
                            {
                            }
                        }
                        contract.contractStatus_ = contract_status::none;
                    }
                    else
                    {
                        if (contract.contractHandler_)
                        {
                            try
                            {
                                contract.contractHandler_();
                            }
                            catch (...)
                            {
                            }
                        }
                    }
                    flags.fetch_and(~is_being_serviced_bit);
                }
                need_service_bit <<= 2;
                is_being_serviced_bit <<= 2;
            }
        }
    }
}
    

auto maniscalco::system::work_contract_group::create_contract
(
    contract_configuration_type contractConfiguration
) -> std::optional<work_contract>
{
    std::uint64_t contractIndex = 0;
    for (; contractIndex < capacity; ++contractIndex)
    {
        using status = contract_info::contract_status; 
        auto expected = status::none;
        if (sharedState_->contracts_[contractIndex].contractStatus_.compare_exchange_strong(expected, status::subscribed))
            break;
    }
    if (contractIndex == capacity)
        return std::nullopt; // at capacity

    sharedState_->contracts_[contractIndex].contractHandler_ = contractConfiguration.contractHandler_;
    sharedState_->contracts_[contractIndex].endContractHandler_ = contractConfiguration.endContractHandler_;
    
    return work_contract({            
            [contractIndex, sharedState = sharedState_] // contract handler
            (
            )
            {
                auto const contractServiceFlag =  (1ull << ((contractIndex & 0x1f) << 1));
                auto & contractFlags = sharedState->contractStateFlags_[contractIndex >> 5];
                if ((contractFlags.fetch_or(contractServiceFlag) & contractServiceFlag) == 0)
                    sharedState->contractRequiresServiceHandler_();
            },
            [contractIndex, sharedState = sharedState_] // unsubscribe handler
            (
            ) mutable
            {
                auto const contractServiceFlag =  (1ull << ((contractIndex & 0x1f) << 1));
                auto & contractFlags = sharedState->contractStateFlags_[contractIndex >> 5];
                sharedState->contracts_[contractIndex].contractStatus_ = contract_info::contract_status::unsubscribed;
                if ((contractFlags.fetch_or(contractServiceFlag) & contractServiceFlag) == 0)
                    sharedState->contractRequiresServiceHandler_();
            }});
}
main.cpp

int main
(
    int, 
    char const **
)
{
    // demonstrate construction of a work contract group
    auto workContractGroup = maniscalco::system::work_contract_group::create
    (
        []()
        {
          // this hook invoked whenever a contract within this group is exercised.  
          // more on the motivation for this hook further down.
        }
    );
    return 0;
}
Putting the pieces together:

In order to actually process the work associated with a work_contract we will need create a thread pool to do the work and direct those threads to query the work_contract_group to see if any work_contracts require servicing. The following code demonstrates this:

main.cpp

int main
(
    int, 
    char const **
)
{
    // create a work_contract_group - very simple
    auto workContractGroup = maniscalco::system::work_contract_group::create([](){});

    // create a worker thread pool and direct the threads to service the work contract group - also very simple
    static auto constexpr numWorkerThreads = 4;
    maniscalco::system::thread_pool::configuration_type threadPoolConfiguration;
    threadPoolConfiguration.threadCount_ = num_worker_threads;
    threadPoolConfiguration.workerThreadFunction_ = [&](){workContractGroup->service_contracts();};

    maniscalco::system::thread_pool workerThreadPool(threadPoolConfiguration);

    return 0;
}

We now have a collection of threads which will do the work for any work_contract associated with our work_contract_group. You will notice, however, that this results in a constant 100% CPU usage for the cores which the worker threads are running on. This might be exactly what you want in situations where ultra-low latency is required (I'm looking at you, finanace). However, for the typical 'good citizen' application where playing well with the other applications is desirable we will need to use as little CPU as possible when there is no work to be done. This is the motivation for the single function hook provided in the work_contract_group constructor. Let's add a little bit of code to our example an make this system a more pleasant application to be around.

main.cpp

int main
(
    int, 
    char const **
)
{
    
    std::condition_variable     conditionVariable;
    std::mutex                  mutex;
    
    // create a work_contract_group - very simple
    auto workContractGroup = maniscalco::system::work_contract_group::create(
            [&]()
            {
                // whenever a contract is excercised we use our condition variable to 'wake' a thread.
                conditionVariable.notify_one();
            });

    // create a worker thread pool and direct the threads to service the work contract group - also very simple
    static auto constexpr numWorkerThreads = 4;
    maniscalco::system::thread_pool::configuration_type threadPoolConfiguration;
    threadPoolConfiguration.threadCount_ = num_worker_threads;
    threadPoolConfiguration.workerThreadFunction_ = [&]
	    (
            )
            {
                // wait until the there is work to do rather than spin.  
                // here we wait only for up to 10 microseconds.  this is just to simplify the example and eliminate the
		// need to write code which would prevent the worker threads from waiting for ever and preventing the demo
		// from exiting.  real world code would likely have a mechanism for signaling thread pool shutdown instead.
                std::unique_lock uniqueLock(mutex);
                std::chrono::microseconds waitTime(10);
                if (conditionVariable.wait_for(uniqueLock, waitTime, [](){return workContractGroup->get_service_requested();}))
                    workContractGroup->service_contracts();
            };
    maniscalco::system::thread_pool workerThreadPool(threadPoolConfiguration);

    return 0;
}

Now we have two simple methods for processing work for a work_contract_group. One which is more suitable for low latency applications and one which is intended for normal application usage. Both are trivial to set up. Now let's actually create a work contract and do something with this system.

main.cpp

int main
(
    int, 
    char const **
)
{
    std::condition_variable     conditionVariable;
    std::mutex                  mutex;
    
    // create a work_contract_group - very simple
    auto workContractGroup = maniscalco::system::work_contract_group::create(
            [&]()
            {
                // whenever a contract is excercised we use our condition variable to 'wake' a thread.
                conditionVariable.notify_one();
            });

    // create a worker thread pool and direct the threads to service the work contract group - also very simple
    static auto constexpr numWorkerThreads = 4;
    maniscalco::system::thread_pool::configuration_type threadPoolConfiguration;
    threadPoolConfiguration.threadCount_ = num_worker_threads;
    threadPoolConfiguration.workerThreadFunction_ = [&]
            (
            )
            {
                // wait until the there is work to do rather than spin.  
                // here we wait only for up to 10 microseconds.  this is just to simplify the example and eliminate the
		// need to write code which would prevent the worker threads from waiting for ever and preventing the demo
		// from exiting.  real world code would likely have a mechanism for signaling thread pool shutdown instead.
                std::unique_lock uniqueLock(mutex);
                std::chrono::microseconds waitTime(10);
                if (conditionVariable.wait_for(uniqueLock, waitTime, [](){return workContractGroup->get_service_requested();}))
                    workContractGroup->service_contracts();
            }
    maniscalco::system::thread_pool workerThreadPool(threadPoolConfiguration);

    // create a work_contract
    maniscalco::system::work_contract_group::contract_configuration_type workContractConfiguration;
    workContractConfiguration.contractHandler_ = [&](){std::cout << "Work contract exercised" << std::endl;};
    workContractConfiguration.endContractHandler_ = [&](){std::cout << "Work contract expired" << std::endl;};
    auto workContract = workContractGroup->create_contract(workContractConfiguration);

    // invoke the contract
    if (workContract)
        workContract->exercise();
    
    return 0;
}
output:

$ ./build/release/bin/basic_test 
Work contract exercised
Work contract expired
Performance:

To measure the performance of this system I wrote a test which created a thread_pool with a configurable number of worker threads (consumer threads) and a configurable number of threads which create work contracts (producer threads). These producer threads create work by creating a work contract which simply increments a counter for the number of times which the contract has been executed. These producer threads spin in a loop for some configurable amount of time continually exercising their work contract and waiting for the counter to increment (indicating that a worker thread has executed the work contract once). After the the test time elapses the number of times which the contract has been executed per second per thread is calculated which effectively measures the overhead of the system for queing and executing an asynchronous task - our work contract which simply increments a counter.

The basic 'producer' thread loop:

void producer_thread_function
(
    std::atomic const & stopFlag,
    std::size_t volatile & taskCount
)
{
    maniscalco::system::work_contract_group::contract_configuration_type workContract;
    workContract.contractHandler_ = [&](){++taskCount;};
    workContract.endContractHandler_ = nullptr;

    auto contract = _workContractGroup->create_contract(workContract);

    while (!stopFlag)
    {
        auto nextTaskCount = (taskCount + 1);
        contract->exercise();
        while ((taskCount != nextTaskCount) && (!stopFlag))
            ;
    }
}
The thread_pool threads act as the consumers - using the basic set up described earlier:

int main
(
    int,
    char const **
)
{
    // create really simple thread pool and basic worker thread function
    // which services are work contract group.
    maniscalco::system::thread_pool::configuration_type threadPoolConfiguration;
    threadPoolConfiguration.threadCount_ = num_worker_threads;
    threadPoolConfiguration.workerThreadFunction_ = []()
            {
                #ifdef SLEEP_WHILE_NO_WORK
                    std::unique_lock uniqueLock(_mutex);
                    if (_conditionVariable.wait_for(uniqueLock, std::chrono::microseconds(10), 
                        [](){return _workContractGroup->get_service_requested();}))
                    {
                        _workContractGroup->service_contracts();
                    }
                #else
                    _workContractGroup->service_contracts();
                #endif
            };
    maniscalco::system::thread_pool workerThreadPool(threadPoolConfiguration);

    // remaining code just creates the producer threads above
}
output - two producer threads and thread pool with two worker (consumer) threads:

$ ./build/release/bin/performance_test 
test iteration 1 of 16: Average tasks per thread/sec: 2070099
test iteration 2 of 16: Average tasks per thread/sec: 2011617
test iteration 3 of 16: Average tasks per thread/sec: 2148769
test iteration 4 of 16: Average tasks per thread/sec: 2149436
test iteration 5 of 16: Average tasks per thread/sec: 2016004
test iteration 6 of 16: Average tasks per thread/sec: 1810209
test iteration 7 of 16: Average tasks per thread/sec: 1859610
test iteration 8 of 16: Average tasks per thread/sec: 1808259
test iteration 9 of 16: Average tasks per thread/sec: 2047596
test iteration 10 of 16: Average tasks per thread/sec: 2037487
test iteration 11 of 16: Average tasks per thread/sec: 1996469
test iteration 12 of 16: Average tasks per thread/sec: 2054800
test iteration 13 of 16: Average tasks per thread/sec: 1987179
test iteration 14 of 16: Average tasks per thread/sec: 2130017
test iteration 15 of 16: Average tasks per thread/sec: 1946058
test iteration 16 of 16: Average tasks per thread/sec: 2061830