These workers will take a nullary function (one taking no arguments) and return a future through which the result of the calculation or process can be fetched later on. Tasks are queued and executed in turn.
Boost.Lambda or Boost.Bind can be used to bind arguments to a function in order to get a suitable nullary function.
The sample shows how to use mutexes and conditions to synchronise in a multi-threading environment. Any shared data structures used by the executed functions must be correctly synchronised.
The implementation of Worker::queue shows how to add items to a queue in a thread safe manner and Worker::execute shows how to remove them.
This code is slightly adapted from the version that is used as the bottom layer of the multi-threading support Mahlee is built on.
There is some similar code available in the “async” library at mr-edd.co.uk. The differences include:
There is too much code in async to post in a recipe, but it is released under the boost license (making it free to use in pretty much any project). Additional libraries used include Boost.Mpl, Boost.Tuple and Boost.TypeTraits.
#include <boost/shared_ptr.hpp>
#include <boost/type_traits.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>
namespace boost {
namespace detail {
template<typename R, typename F, typename L>
struct function_traits_helper< boost::_bi::bind_t<R, F, L > * >
{
BOOST_STATIC_CONSTANT(int, arity = 0);
typedef R result_type;
};
}
}
class Worker : boost::noncopyable {
private:
void execute();
public:
template< typename R > class Future;
Worker();
Worker( boost::function0< void > init );
~Worker();
template< typename F >
boost::shared_ptr< Future< typename boost::function_traits< typename boost::remove_pointer< F >::type >::result_type > > operator()( F f ) {
return run< typename boost::function_traits< typename boost::remove_pointer< F >::type >::result_type >( f );
}
template< typename R >
boost::shared_ptr< Future< R > > run( boost::function< R ( void ) > f ) {
boost::shared_ptr< Future< R > > future( new Future< R > );
queue( future, Future< R >::Function( future, f ) );
return future;
}
template<>
class Future< void > {
protected:
Future();
public:
virtual ~Future();
void wait();
boost::shared_ptr< std::string > exception();
private:
volatile bool m_completed;
boost::shared_ptr< std::string > m_exception;
boost::mutex m_mutex;
boost::condition m_control;
friend void Worker::execute();
};
template< typename R >
class Future : public Future< void > {
private:
struct Function {
Function( boost::shared_ptr< Future< R > > j, boost::function0< R > f )
: m_future( j ), m_f( f ) {
}
void operator() () {
m_future->m_result = m_f();
}
private:
boost::shared_ptr< Future< R > > m_future;
boost::function0< R > m_f;
};
Future()
: m_result() {
}
public:
R result() {
wait();
return m_result;
}
private:
R m_result;
friend class Worker;
};
void terminate();
private:
void queue( boost::shared_ptr< Future< void > > j, boost::function0< void > f );
private:
typedef std::list< std::pair< boost::shared_ptr< Future< void > >, boost::function0< void > > > t_queue;
t_queue m_queue;
volatile bool m_terminate;
boost::mutex m_mutex;
boost::condition m_control;
boost::thread m_thread;
};
/*
Worker
*/
Worker::Worker()
: m_terminate( false ), m_thread( boost::bind( &Worker::execute, this ) ) {
}
Worker::~Worker() {
{
boost::mutex::scoped_lock lock( m_mutex );
m_terminate = true;
m_control.notify_all();
}
m_thread.join();
}
void Worker::queue( boost::shared_ptr< Future< void > > future, boost::function0< void > f ) {
boost::mutex::scoped_lock lock( m_mutex );
m_queue.push_back( std::make_pair( future, f ) );
m_control.notify_all();
}
void Worker::execute() {
do {
t_queue job;
{ // Find a job to perform
boost::mutex::scoped_lock lock( m_mutex );
if ( m_queue.empty() )
m_control.wait( lock );
if ( !m_terminate && !m_queue.empty() )
job.swap( m_queue );
}
for ( t_queue::const_iterator j( job.begin() ); j != job.end() && !m_terminate; ++j ) {
// Execute job
try {
j->second();
} catch ( std::exception &e ) {
boost::mutex::scoped_lock lock( j->first->m_mutex );
j->first->m_exception = boost::shared_ptr< std::string >( new std::string( e.what() ) );
} catch ( ... ) {
boost::mutex::scoped_lock lock( j->first->m_mutex );
j->first->m_exception = boost::shared_ptr< std::string >( new std::string( "An unknown exception was caught" ) );
}
// Notify futures
boost::mutex::scoped_lock lock( j->first->m_mutex );
j->first->m_completed = true;
j->first->m_control.notify_all();
}
} while ( !m_terminate );
}
/*
Worker::Future< void >
*/
Worker::Future< void >::Future()
: m_completed( false ) {
}
Worker::Future< void >::~Future() {
}
boost::shared_ptr< std::string > Worker::Future< void >::exception() {
boost::mutex::scoped_lock lock( m_mutex );
m_control.wait( lock, boost::bind( &Future< void >::m_completed, this ) );
return m_exception;
}
void Worker::Future< void >::wait() {
boost::shared_ptr< std::string > e( exception() );
if ( e )
throw e;
}
/*
Example
*/
#include <iostream>
int throws() {
throw 0;
}
int add( int l, int r ) {
return l + r;
}
int main() {
Worker worker1, worker2;
boost::shared_ptr< Worker::Future< int > > job1( worker1( throws ) );
int four( 4 ), five( 5 );
boost::shared_ptr< Worker::Future< int > > job2( worker2( boost::bind( add, four, five ) ) );
if ( job1->exception() )
std::cout << "job1 threw " << *job1->exception() << std::endl;
else
std::cout << "job1 should have thrown an exception" << std::endl;
std::cout << "4 + 5 = " << job2->result() << std::endl;
}This is not an official Boost site. For more information on Boost please see Boost.org.