dlib C++ Library - pipe.cpp

// Copyright (C) 2006 Davis E. King (davis@dlib.net)
// License: Boost Software License See LICENSE.txt for the full license.
#include <sstream>
#include <string>
#include <cstdlib>
#include <ctime>
#include <dlib/misc_api.h>
#include <dlib/pipe.h>
#include "tester.h"
namespace 
{
 using namespace test;
 using namespace dlib;
 using namespace std;
 logger dlog("test.pipe");
 namespace pipe_kernel_test_helpers
 {
 const unsigned long proc1_count = 10000;
 dlib::mutex m;
 signaler s(m);
 unsigned long threads_running = 0;
 bool found_error;
 inline void add_running_thread (
 )
 {
 auto_mutex M(m);
 ++threads_running;
 }
 inline void remove_running_thread (
 )
 {
 auto_mutex M(m);
 --threads_running;
 s.broadcast();
 }
 inline void wait_for_threads (
 )
 {
 auto_mutex M(m);
 while (threads_running > 0)
 s.wait();
 }
 template <
 typename pipe
 >
 void threadproc1 (
 void* param
 )
 {
 add_running_thread();
 pipe& p = *static_cast<pipe*>(param);
 try
 {
 int last = -1;
 for (unsigned long i = 0; i < proc1_count; ++i)
 {
 int cur=0;
 DLIB_TEST(p.dequeue(cur) == true);
 DLIB_TEST(last + 1 == cur);
 last = cur;
 }
 DLIB_TEST(p.size() == 0);
 }
 catch(exception& e)
 {
 auto_mutex M(m);
 found_error = true;
 cout << "\n\nERRORS FOUND" << endl;
 cout << e.what() << endl;
 dlog << LWARN << "ERRORS FOUND";
 dlog << LWARN << e.what();
 p.disable();
 } 
 remove_running_thread();
 }
 template <
 typename pipe
 >
 void threadproc2 (
 void* param
 )
 {
 add_running_thread();
 pipe& p = *static_cast<pipe*>(param);
 try
 {
 int last = -1;
 int cur;
 while (p.dequeue(cur))
 {
 DLIB_TEST(last < cur);
 last = cur;
 }
 auto_mutex M(m);
 }
 catch(exception& e)
 {
 auto_mutex M(m);
 found_error = true;
 cout << "\n\nERRORS FOUND" << endl;
 cout << e.what() << endl;
 dlog << LWARN << "ERRORS FOUND";
 dlog << LWARN << e.what();
 p.disable();
 } 
 remove_running_thread();
 }
 template <
 typename pipe
 >
 void threadproc3 (
 void* param
 )
 {
 add_running_thread();
 pipe& p = *static_cast<pipe*>(param);
 try
 {
 int last = -1;
 int cur;
 while (p.dequeue_or_timeout(cur,100000))
 {
 DLIB_TEST(last < cur);
 last = cur;
 }
 auto_mutex M(m);
 }
 catch(exception& e)
 {
 auto_mutex M(m);
 found_error = true;
 cout << "\n\nERRORS FOUND" << endl;
 cout << e.what() << endl;
 dlog << LWARN << "ERRORS FOUND";
 dlog << LWARN << e.what();
 p.disable();
 } 
 remove_running_thread();
 }
 }
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
 template<typename in_type, typename out_type>
 class PipelineProcessor : private dlib::threaded_object
 {
 public:
 PipelineProcessor(
 dlib::pipe<in_type> & in,
 dlib::pipe<out_type> & out) :
 InPipe(in),
 OutPipe(out),
 InMsg(),
 OutMsg() {
 start();
 }
 ~PipelineProcessor() {
 // signal the thread to stop
 stop();
 wait();
 }
 private:
 dlib::pipe<in_type> & InPipe;
 dlib::pipe<out_type> & OutPipe;
 in_type InMsg;
 out_type OutMsg;
 void thread() 
 {
 while (!should_stop()) {
 if(InPipe.dequeue_or_timeout(InMsg, 100)) 
 {
 // if function signals ready to send OutMsg
 while (!OutPipe.enqueue_or_timeout(OutMsg, 100)) 
 {
 // try to send until should stop
 if (should_stop()) 
 {
 return;
 }
 }
 }
 }
 };
 };
 void do_zero_size_test_with_timeouts()
 {
 dlog << LINFO << "in do_zero_size_test_with_timeouts()";
 // make sure we can get though this without deadlocking
 for (int k = 0; k < 10; ++k)
 {
 dlib::pipe<int> in_pipe(10);
 dlib::pipe<float> out_pipe(0);
 {
 PipelineProcessor<int, float> pp(in_pipe, out_pipe);
 int in = 1;
 in_pipe.enqueue(in);
 in = 2;
 in_pipe.enqueue(in);
 in = 3;
 in_pipe.enqueue(in);
 // sleep to make sure thread enqueued
 dlib::sleep(100);
 float out = 1.0f;
 out_pipe.dequeue(out);
 dlib::sleep(100);
 }
 print_spinner();
 }
 }
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
 template <
 typename pipe
 >
 void pipe_kernel_test (
 )
 /*!
 requires
 - pipe is an implementation of pipe/pipe_kernel_abstract.h and
 is instantiated with int
 ensures
 - runs tests on pipe for compliance with the specs 
 !*/
 { 
 using namespace pipe_kernel_test_helpers;
 found_error = false;
 print_spinner();
 pipe test(10), test2(100);
 pipe test_0(0), test2_0(0);
 pipe test_1(1), test2_1(1);
 DLIB_TEST(test.size() == 0);
 DLIB_TEST(test2.size() == 0);
 DLIB_TEST(test_0.size() == 0);
 DLIB_TEST(test2_0.size() == 0);
 DLIB_TEST(test_1.size() == 0);
 DLIB_TEST(test2_1.size() == 0);
 DLIB_TEST(test.is_enqueue_enabled() == true);
 DLIB_TEST(test.is_dequeue_enabled() == true);
 DLIB_TEST(test.is_enabled() == true);
 test.empty();
 test2.empty();
 DLIB_TEST(test.size() == 0);
 DLIB_TEST(test2.size() == 0);
 test_0.empty();
 test2_0.empty();
 DLIB_TEST(test_0.size() == 0);
 DLIB_TEST(test2_0.size() == 0);
 test_1.empty();
 test2_1.empty();
 DLIB_TEST(test_1.size() == 0);
 DLIB_TEST(test2_1.size() == 0);
 int a;
 a = 3;
 test.enqueue(a);
 DLIB_TEST(test.size() == 1);
 a = 5;
 test.enqueue(a);
 DLIB_TEST(test.size() == 2);
 a = 0;
 test.dequeue(a);
 DLIB_TEST(a == 3);
 DLIB_TEST(test.size() == 1);
 a = 0;
 test.dequeue(a);
 DLIB_TEST(a == 5);
 DLIB_TEST(test.size() == 0);
 print_spinner();
 {
 dlog << LINFO << "starting normal length pipe tests";
 create_new_thread(&threadproc1<pipe>,&test);
 create_new_thread(&threadproc2<pipe>,&test2);
 create_new_thread(&threadproc2<pipe>,&test2);
 create_new_thread(&threadproc2<pipe>,&test2);
 for (unsigned long i = 0; i < proc1_count; ++i)
 {
 a = i;
 test.enqueue(a);
 }
 DLIB_TEST(test.is_enqueue_enabled() == true);
 test.disable_enqueue();
 DLIB_TEST(test.is_enqueue_enabled() == false);
 for (unsigned long i = 0; i < proc1_count; ++i)
 {
 a = i;
 test.enqueue(a);
 }
 for (unsigned long i = 0; i < 100000; ++i)
 {
 a = i;
 if (i%2 == 0)
 test2.enqueue(a);
 else
 test2.enqueue_or_timeout(a,100000);
 }
 test2.wait_for_num_blocked_dequeues(3);
 DLIB_TEST(test2.size() == 0);
 test2.disable();
 wait_for_threads();
 DLIB_TEST(test2.size() == 0);
 test2.enable();
 print_spinner();
 create_new_thread(&threadproc3<pipe>,&test2);
 create_new_thread(&threadproc3<pipe>,&test2);
 for (unsigned long i = 0; i < 100000; ++i)
 {
 a = i;
 if (i%2 == 0)
 test2.enqueue(a);
 else
 test2.enqueue_or_timeout(a,100000);
 }
 test2.wait_for_num_blocked_dequeues(2);
 DLIB_TEST(test2.size() == 0);
 test2.disable();
 wait_for_threads();
 DLIB_TEST(test2.size() == 0);
 }
 print_spinner();
 {
 dlog << LINFO << "starting 0 length pipe tests";
 create_new_thread(&threadproc1<pipe>,&test_0);
 create_new_thread(&threadproc2<pipe>,&test2_0);
 create_new_thread(&threadproc2<pipe>,&test2_0);
 create_new_thread(&threadproc2<pipe>,&test2_0);
 dlog << LTRACE << "0: 1";
 for (unsigned long i = 0; i < proc1_count; ++i)
 {
 a = i;
 test_0.enqueue(a);
 }
 dlog << LTRACE << "0: 2";
 DLIB_TEST(test_0.is_enqueue_enabled() == true);
 test_0.disable_enqueue();
 DLIB_TEST(test_0.is_enqueue_enabled() == false);
 for (unsigned long i = 0; i < proc1_count; ++i)
 {
 a = i;
 test_0.enqueue(a);
 }
 dlog << LTRACE << "0: 3";
 for (unsigned long i = 0; i < 100000; ++i)
 {
 a = i;
 if (i%2 == 0)
 test2_0.enqueue(a);
 else
 test2_0.enqueue_or_timeout(a,100000);
 }
 print_spinner();
 dlog << LTRACE << "0: 4";
 test2_0.wait_for_num_blocked_dequeues(3);
 DLIB_TEST(test2_0.size() == 0);
 test2_0.disable();
 wait_for_threads();
 DLIB_TEST(test2_0.size() == 0);
 dlog << LTRACE << "0: 5";
 test2_0.enable();
 create_new_thread(&threadproc3<pipe>,&test2_0);
 create_new_thread(&threadproc3<pipe>,&test2_0);
 for (unsigned long i = 0; i < 20000; ++i)
 {
 if ((i%100) == 0)
 print_spinner();
 a = i;
 if (i%2 == 0)
 test2_0.enqueue(a);
 else
 test2_0.enqueue_or_timeout(a,100000);
 }
 dlog << LTRACE << "0: 6";
 test2_0.wait_for_num_blocked_dequeues(2);
 DLIB_TEST(test2_0.size() == 0);
 test2_0.disable();
 wait_for_threads();
 DLIB_TEST(test2_0.size() == 0);
 dlog << LTRACE << "0: 7";
 }
 print_spinner();
 {
 dlog << LINFO << "starting 1 length pipe tests";
 create_new_thread(&threadproc1<pipe>,&test_1);
 create_new_thread(&threadproc2<pipe>,&test2_1);
 create_new_thread(&threadproc2<pipe>,&test2_1);
 create_new_thread(&threadproc2<pipe>,&test2_1);
 for (unsigned long i = 0; i < proc1_count; ++i)
 {
 a = i;
 test_1.enqueue(a);
 }
 DLIB_TEST(test_1.is_enqueue_enabled() == true);
 test_1.disable_enqueue();
 DLIB_TEST(test_1.is_enqueue_enabled() == false);
 for (unsigned long i = 0; i < proc1_count; ++i)
 {
 a = i;
 test_1.enqueue(a);
 }
 print_spinner();
 for (unsigned long i = 0; i < 100000; ++i)
 {
 a = i;
 if (i%2 == 0)
 test2_1.enqueue(a);
 else
 test2_1.enqueue_or_timeout(a,100000);
 }
 test2_1.wait_for_num_blocked_dequeues(3);
 DLIB_TEST(test2_1.size() == 0);
 test2_1.disable();
 wait_for_threads();
 DLIB_TEST(test2_1.size() == 0);
 test2_1.enable();
 create_new_thread(&threadproc3<pipe>,&test2_1);
 create_new_thread(&threadproc3<pipe>,&test2_1);
 for (unsigned long i = 0; i < 100000; ++i)
 {
 a = i;
 if (i%2 == 0)
 test2_1.enqueue(a);
 else
 test2_1.enqueue_or_timeout(a,100000);
 }
 test2_1.wait_for_num_blocked_dequeues(2);
 DLIB_TEST(test2_1.size() == 0);
 test2_1.disable();
 wait_for_threads();
 DLIB_TEST(test2_1.size() == 0);
 }
 test.enable_enqueue();
 test_0.enable_enqueue();
 test_1.enable_enqueue();
 DLIB_TEST(test.is_enabled());
 DLIB_TEST(test.is_enqueue_enabled());
 DLIB_TEST(test_0.is_enabled());
 DLIB_TEST(test_0.is_enqueue_enabled());
 DLIB_TEST(test_1.is_enabled());
 DLIB_TEST(test_1.is_enqueue_enabled());
 DLIB_TEST(test.size() == 0);
 DLIB_TEST(test_0.size() == 0);
 DLIB_TEST(test_1.size() == 0);
 DLIB_TEST(test.max_size() == 10);
 DLIB_TEST(test_0.max_size() == 0);
 DLIB_TEST(test_1.max_size() == 1);
 for (int i = 0; i < 100; ++i)
 {
 a = 1;
 test.enqueue_or_timeout(a,0);
 a = 1;
 test_0.enqueue_or_timeout(a,0);
 a = 1;
 test_1.enqueue_or_timeout(a,0);
 }
 DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() );
 DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() );
 DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() );
 for (int i = 0; i < 10; ++i)
 {
 a = 0;
 DLIB_TEST(test.enqueue_or_timeout(a,10) == false);
 a = 0;
 DLIB_TEST(test_0.enqueue_or_timeout(a,10) == false);
 a = 0;
 DLIB_TEST(test_1.enqueue_or_timeout(a,10) == false);
 }
 DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() );
 DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() );
 DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() );
 for (int i = 0; i < 10; ++i)
 {
 a = 0;
 DLIB_TEST(test.dequeue_or_timeout(a,0) == true);
 DLIB_TEST(a == 1);
 }
 DLIB_TEST(test.max_size() == 10);
 DLIB_TEST(test_0.max_size() == 0);
 DLIB_TEST(test_1.max_size() == 1);
 a = 0;
 DLIB_TEST(test_1.dequeue_or_timeout(a,0) == true);
 DLIB_TEST(test.max_size() == 10);
 DLIB_TEST(test_0.max_size() == 0);
 DLIB_TEST(test_1.max_size() == 1);
 DLIB_TEST_MSG(a == 1,"a: " << a);
 DLIB_TEST(test.size() == 0);
 DLIB_TEST(test_0.size() == 0);
 DLIB_TEST(test_1.size() == 0);
 DLIB_TEST(test.dequeue_or_timeout(a,0) == false);
 DLIB_TEST(test_0.dequeue_or_timeout(a,0) == false);
 DLIB_TEST(test_1.dequeue_or_timeout(a,0) == false);
 DLIB_TEST(test.dequeue_or_timeout(a,10) == false);
 DLIB_TEST(test_0.dequeue_or_timeout(a,10) == false);
 DLIB_TEST(test_1.dequeue_or_timeout(a,10) == false);
 DLIB_TEST(test.size() == 0);
 DLIB_TEST(test_0.size() == 0);
 DLIB_TEST(test_1.size() == 0);
 DLIB_TEST(found_error == false);
 {
 test.enable();
 test.enable_enqueue();
 test.empty();
 DLIB_TEST(test.size() == 0);
 DLIB_TEST(test.is_enabled() == true);
 DLIB_TEST(test.is_enqueue_enabled() == true);
 DLIB_TEST(test.is_dequeue_enabled() == true);
 test.disable_dequeue();
 dlog << LINFO << "Make sure disable_dequeue() works right...";
 DLIB_TEST(test.is_dequeue_enabled() == false);
 DLIB_TEST(test.dequeue(a) == false);
 test.wait_until_empty();
 a = 4;
 test.enqueue(a);
 test.wait_until_empty();
 test.wait_for_num_blocked_dequeues(4);
 DLIB_TEST(test.size() == 1);
 DLIB_TEST(test.dequeue(a) == false);
 DLIB_TEST(test.dequeue_or_timeout(a,10000) == false);
 DLIB_TEST(test.size() == 1);
 a = 0;
 test.enable_dequeue();
 DLIB_TEST(test.is_dequeue_enabled() == true);
 DLIB_TEST(test.dequeue(a) == true);
 DLIB_TEST(a == 4);
 test_1.wait_until_empty();
 }
 {
 test_1.enable();
 test_1.enable_enqueue();
 test_1.empty();
 DLIB_TEST(test_1.size() == 0);
 DLIB_TEST(test_1.is_enabled() == true);
 DLIB_TEST(test_1.is_enqueue_enabled() == true);
 DLIB_TEST(test_1.is_dequeue_enabled() == true);
 test_1.disable_dequeue();
 dlog << LINFO << "Make sure disable_dequeue() works right...";
 DLIB_TEST(test_1.is_dequeue_enabled() == false);
 DLIB_TEST(test_1.dequeue(a) == false);
 a = 4;
 test_1.wait_for_num_blocked_dequeues(4);
 test_1.wait_for_num_blocked_dequeues(0);
 test_1.enqueue(a);
 test_1.wait_until_empty();
 DLIB_TEST(test_1.size() == 1);
 DLIB_TEST(test_1.dequeue(a) == false);
 DLIB_TEST(test_1.dequeue_or_timeout(a,10000) == false);
 DLIB_TEST(test_1.size() == 1);
 a = 0;
 test_1.enable_dequeue();
 DLIB_TEST(test_1.is_dequeue_enabled() == true);
 DLIB_TEST(test_1.dequeue(a) == true);
 DLIB_TEST(a == 4);
 test_1.wait_until_empty();
 }
 }
 class pipe_tester : public tester
 {
 public:
 pipe_tester (
 ) :
 tester ("test_pipe",
 "Runs tests on the pipe component.")
 {}
 void perform_test (
 )
 {
 pipe_kernel_test<dlib::pipe<int> >();
 do_zero_size_test_with_timeouts();
 }
 } a;
}

AltStyle によって変換されたページ (->オリジナル) /