dlib C++ Library - pipe_kernel_1.h

// Copyright (C) 2006 Davis E. King (davis@dlib.net)
// License: Boost Software License See LICENSE.txt for the full license.
#ifndef DLIB_PIPE_KERNEl_1_ 
#define DLIB_PIPE_KERNEl_1_ 
#include "../algs.h"
#include "../threads.h"
#include "pipe_kernel_abstract.h"
namespace dlib
{
 template <
 typename T
 >
 class pipe 
 {
 /*!
 INITIAL VALUE
 - pipe_size == 0
 - pipe_max_size == defined by constructor
 - enabled == true
 - data == a pointer to an array of ((pipe_max_size>0)?pipe_max_size:1) T objects.
 - dequeue_waiters == 0
 - enqueue_waiters == 0
 - first == 1
 - last == 1
 - unblock_sig_waiters == 0
 CONVENTION
 - size() == pipe_size
 - max_size() == pipe_max_size
 - is_enabled() == enabled
 - m == the mutex used to lock access to all the members of this class
 - dequeue_waiters == the number of threads blocked on calls to dequeue()
 - enqueue_waiters == the number of threads blocked on calls to enqueue() and 
 wait_until_empty()
 - unblock_sig_waiters == the number of threads blocked on calls to 
 wait_for_num_blocked_dequeues() and the destructor. (i.e. the number of
 blocking calls to unblock_sig.wait())
 - dequeue_sig == the signaler that threads blocked on calls to dequeue() wait on
 - enqueue_sig == the signaler that threads blocked on calls to enqueue() 
 or wait_until_empty() wait on.
 - unblock_sig == the signaler that is signaled when a thread stops blocking on a call
 to enqueue() or dequeue(). It is also signaled when a dequeue that will probably
 block is called. The destructor and wait_for_num_blocked_dequeues are the only 
 things that will wait on this signaler.
 - if (pipe_size > 0) then
 - data[first] == the next item to dequeue
 - data[last] == the item most recently added via enqueue, so the last to dequeue.
 - else if (pipe_max_size == 0)
 - if (first == 0 && last == 0) then
 - data[0] == the next item to dequeue
 - else if (first == 0 && last == 1) then 
 - data[0] has been taken out already by a dequeue
 !*/
 public:
 // this is here for backwards compatibility with older versions of dlib.
 typedef pipe kernel_1a;
 typedef T type;
 explicit pipe ( 
 size_t maximum_size
 );
 virtual ~pipe (
 );
 void empty (
 );
 void wait_until_empty (
 ) const;
 void wait_for_num_blocked_dequeues (
 unsigned long num
 )const;
 void enable (
 );
 void disable (
 );
 bool is_enqueue_enabled (
 ) const;
 void disable_enqueue (
 );
 void enable_enqueue (
 );
 bool is_dequeue_enabled (
 ) const;
 void disable_dequeue (
 );
 void enable_dequeue (
 );
 bool is_enabled (
 ) const;
 size_t max_size (
 ) const;
 size_t size (
 ) const;
 bool enqueue (
 T& item
 );
 bool enqueue (
 T&& item
 ) { return enqueue(item); }
 bool dequeue (
 T& item
 );
 bool enqueue_or_timeout (
 T& item,
 unsigned long timeout
 );
 bool enqueue_or_timeout (
 T&& item,
 unsigned long timeout
 ) { return enqueue_or_timeout(item,timeout); }
 bool dequeue_or_timeout (
 T& item,
 unsigned long timeout
 );
 private:
 size_t pipe_size;
 const size_t pipe_max_size;
 bool enabled;
 T* const data;
 size_t first;
 size_t last;
 mutex m;
 signaler dequeue_sig;
 signaler enqueue_sig;
 signaler unblock_sig;
 unsigned long dequeue_waiters;
 mutable unsigned long enqueue_waiters;
 mutable unsigned long unblock_sig_waiters;
 bool enqueue_enabled;
 bool dequeue_enabled;
 // restricted functions
 pipe(const pipe&); // copy constructor
 pipe& operator=(const pipe&); // assignment operator
 }; 
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
// member function definitions
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 pipe<T>::
 pipe ( 
 size_t maximum_size
 ) : 
 pipe_size(0),
 pipe_max_size(maximum_size),
 enabled(true),
 data(new T[(maximum_size>0) ? maximum_size : 1]),
 first(1),
 last(1),
 dequeue_sig(m),
 enqueue_sig(m),
 unblock_sig(m),
 dequeue_waiters(0),
 enqueue_waiters(0),
 unblock_sig_waiters(0),
 enqueue_enabled(true),
 dequeue_enabled(true)
 {
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 pipe<T>::
 ~pipe (
 )
 {
 auto_mutex M(m);
 ++unblock_sig_waiters;
 // first make sure no one is blocked on any calls to enqueue() or dequeue()
 enabled = false;
 dequeue_sig.broadcast();
 enqueue_sig.broadcast();
 unblock_sig.broadcast();
 // wait for all threads to unblock
 while (dequeue_waiters > 0 || enqueue_waiters > 0 || unblock_sig_waiters > 1)
 unblock_sig.wait();
 delete [] data;
 --unblock_sig_waiters;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 void pipe<T>::
 empty (
 )
 {
 auto_mutex M(m);
 pipe_size = 0;
 // let any calls to enqueue() know that the pipe is now empty
 if (enqueue_waiters > 0)
 enqueue_sig.broadcast();
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 void pipe<T>::
 wait_until_empty (
 ) const
 {
 auto_mutex M(m);
 // this function is sort of like a call to enqueue so treat it like that
 ++enqueue_waiters;
 while (pipe_size > 0 && enabled && dequeue_enabled )
 enqueue_sig.wait();
 // let the destructor know we are ending if it is blocked waiting
 if (enabled == false)
 unblock_sig.broadcast();
 --enqueue_waiters;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 void pipe<T>::
 enable (
 )
 {
 auto_mutex M(m);
 enabled = true;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 void pipe<T>::
 disable (
 )
 {
 auto_mutex M(m);
 enabled = false;
 dequeue_sig.broadcast();
 enqueue_sig.broadcast();
 unblock_sig.broadcast();
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 bool pipe<T>::
 is_enabled (
 ) const
 {
 auto_mutex M(m);
 return enabled;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 size_t pipe<T>::
 max_size (
 ) const
 {
 auto_mutex M(m);
 return pipe_max_size;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 size_t pipe<T>::
 size (
 ) const
 {
 auto_mutex M(m);
 return pipe_size;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 bool pipe<T>::
 enqueue (
 T& item
 )
 {
 auto_mutex M(m);
 ++enqueue_waiters;
 // wait until there is room or we are disabled 
 while (pipe_size == pipe_max_size && enabled && enqueue_enabled &&
 !(pipe_max_size == 0 && first == 1) )
 enqueue_sig.wait();
 if (enabled == false || enqueue_enabled == false)
 {
 --enqueue_waiters;
 // let the destructor know we are unblocking
 unblock_sig.broadcast();
 return false;
 }
 // set the appropriate values for first and last
 if (pipe_size == 0)
 {
 first = 0;
 last = 0;
 }
 else
 {
 last = (last+1)%pipe_max_size;
 }
 exchange(item,data[last]);
 // wake up a call to dequeue() if there are any currently blocked
 if (dequeue_waiters > 0)
 dequeue_sig.signal();
 if (pipe_max_size > 0)
 {
 ++pipe_size;
 }
 else
 {
 // wait for a dequeue to take the item out
 while (last == 0 && enabled && enqueue_enabled)
 enqueue_sig.wait();
 if (last == 0 && (enabled == false || enqueue_enabled == false))
 {
 last = 1;
 first = 1;
 // no one dequeued this object to put it back into item
 exchange(item,data[0]);
 --enqueue_waiters;
 // let the destructor know we are unblocking
 if (unblock_sig_waiters > 0)
 unblock_sig.broadcast();
 return false;
 }
 last = 1;
 first = 1;
 // tell any waiting calls to enqueue() that one of them can proceed
 if (enqueue_waiters > 1)
 enqueue_sig.broadcast();
 // let the destructor know we are unblocking
 if (enabled == false && unblock_sig_waiters > 0)
 unblock_sig.broadcast();
 }
 --enqueue_waiters;
 return true;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 bool pipe<T>::
 dequeue (
 T& item
 )
 {
 auto_mutex M(m);
 ++dequeue_waiters;
 if (pipe_size == 0)
 {
 // notify wait_for_num_blocked_dequeues()
 if (unblock_sig_waiters > 0)
 unblock_sig.broadcast();
 // notify any blocked enqueue_or_timeout() calls
 if (enqueue_waiters > 0)
 enqueue_sig.broadcast();
 }
 // wait until there is something in the pipe or we are disabled 
 while (pipe_size == 0 && enabled && dequeue_enabled &&
 !(pipe_max_size == 0 && first == 0 && last == 0) )
 dequeue_sig.wait();
 if (enabled == false || dequeue_enabled == false)
 {
 --dequeue_waiters;
 // let the destructor know we are unblocking
 unblock_sig.broadcast();
 return false;
 }
 exchange(item,data[first]);
 if (pipe_max_size > 0)
 {
 // set the appropriate values for first 
 first = (first+1)%pipe_max_size;
 --pipe_size;
 }
 else
 {
 // let the enqueue waiting on us know that we took the 
 // item out already.
 last = 1;
 }
 // wake up a call to enqueue() if there are any currently blocked
 if (enqueue_waiters > 0)
 enqueue_sig.broadcast();
 --dequeue_waiters;
 return true;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 bool pipe<T>::
 enqueue_or_timeout (
 T& item,
 unsigned long timeout
 )
 {
 auto_mutex M(m);
 ++enqueue_waiters;
 // wait until there is room or we are disabled or 
 // we run out of time.
 bool timed_out = false;
 while (pipe_size == pipe_max_size && enabled && enqueue_enabled &&
 !(pipe_max_size == 0 && dequeue_waiters > 0 && first == 1) )
 {
 if (timeout == 0 || enqueue_sig.wait_or_timeout(timeout) == false)
 {
 timed_out = true;
 break;
 }
 }
 if (enabled == false || timed_out || enqueue_enabled == false)
 {
 --enqueue_waiters;
 // let the destructor know we are unblocking
 unblock_sig.broadcast();
 return false;
 }
 // set the appropriate values for first and last
 if (pipe_size == 0)
 {
 first = 0;
 last = 0;
 }
 else
 {
 last = (last+1)%pipe_max_size;
 }
 exchange(item,data[last]);
 // wake up a call to dequeue() if there are any currently blocked
 if (dequeue_waiters > 0)
 dequeue_sig.signal();
 if (pipe_max_size > 0)
 {
 ++pipe_size;
 }
 else
 {
 // wait for a dequeue to take the item out
 while (last == 0 && enabled && enqueue_enabled)
 enqueue_sig.wait();
 if (last == 0 && (enabled == false || enqueue_enabled == false))
 {
 last = 1;
 first = 1;
 // no one dequeued this object to put it back into item
 exchange(item,data[0]);
 --enqueue_waiters;
 // let the destructor know we are unblocking
 if (unblock_sig_waiters > 0)
 unblock_sig.broadcast();
 return false;
 }
 last = 1;
 first = 1;
 // tell any waiting calls to enqueue() that one of them can proceed
 if (enqueue_waiters > 1)
 enqueue_sig.broadcast();
 // let the destructor know we are unblocking
 if (enabled == false && unblock_sig_waiters > 0)
 unblock_sig.broadcast();
 }
 --enqueue_waiters;
 return true;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 bool pipe<T>::
 dequeue_or_timeout (
 T& item,
 unsigned long timeout
 )
 {
 auto_mutex M(m);
 ++dequeue_waiters;
 if (pipe_size == 0)
 {
 // notify wait_for_num_blocked_dequeues()
 if (unblock_sig_waiters > 0)
 unblock_sig.broadcast();
 // notify any blocked enqueue_or_timeout() calls
 if (enqueue_waiters > 0)
 enqueue_sig.broadcast();
 }
 bool timed_out = false;
 // wait until there is something in the pipe or we are disabled or we timeout.
 while (pipe_size == 0 && enabled && dequeue_enabled &&
 !(pipe_max_size == 0 && first == 0 && last == 0) )
 {
 if (timeout == 0 || dequeue_sig.wait_or_timeout(timeout) == false)
 {
 timed_out = true;
 break;
 }
 }
 if (enabled == false || timed_out || dequeue_enabled == false)
 {
 --dequeue_waiters;
 // let the destructor know we are unblocking
 unblock_sig.broadcast();
 return false;
 }
 exchange(item,data[first]);
 if (pipe_max_size > 0)
 {
 // set the appropriate values for first 
 first = (first+1)%pipe_max_size;
 --pipe_size;
 }
 else
 {
 // let the enqueue waiting on us know that we took the 
 // item out already.
 last = 1;
 }
 // wake up a call to enqueue() if there are any currently blocked
 if (enqueue_waiters > 0)
 enqueue_sig.broadcast();
 --dequeue_waiters;
 return true;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 void pipe<T>::
 wait_for_num_blocked_dequeues (
 unsigned long num
 )const
 {
 auto_mutex M(m);
 ++unblock_sig_waiters;
 while ( (dequeue_waiters < num || pipe_size != 0) && enabled && dequeue_enabled)
 unblock_sig.wait();
 // let the destructor know we are ending if it is blocked waiting
 if (enabled == false)
 unblock_sig.broadcast();
 --unblock_sig_waiters;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 bool pipe<T>::
 is_enqueue_enabled (
 ) const
 {
 auto_mutex M(m);
 return enqueue_enabled;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 void pipe<T>::
 disable_enqueue (
 )
 {
 auto_mutex M(m);
 enqueue_enabled = false;
 enqueue_sig.broadcast();
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 void pipe<T>::
 enable_enqueue (
 )
 {
 auto_mutex M(m);
 enqueue_enabled = true;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 bool pipe<T>::
 is_dequeue_enabled (
 ) const
 {
 auto_mutex M(m);
 return dequeue_enabled;
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 void pipe<T>::
 disable_dequeue (
 )
 {
 auto_mutex M(m);
 dequeue_enabled = false;
 dequeue_sig.broadcast();
 }
// ----------------------------------------------------------------------------------------
 template <
 typename T
 >
 void pipe<T>::
 enable_dequeue (
 )
 {
 auto_mutex M(m);
 dequeue_enabled = true;
 }
// ----------------------------------------------------------------------------------------
}
#endif // DLIB_PIPE_KERNEl_1_

AltStyle によって変換されたページ (->オリジナル) /