Skip to main content
Code Review

Return to Question

deleted 10 characters in body; edited title
Source Link
Jamal
  • 35.2k
  • 13
  • 134
  • 238

Asynchronous Object Destruction Serviceobject destruction service

Inspired by the recent Stack OverflowStack Overflow question "Safely release a resource on a different thread", I wrote the following destruction_service<T>. Please refer to the class template's DocString for more information (and let me know in your review if it is insufficient).

destruction_service.hxxdestruction_service.hxx

syncio.hxxsyncio.hxx

main.cxxmain.cxx

Asynchronous Object Destruction Service

Inspired by the recent Stack Overflow question "Safely release a resource on a different thread", I wrote the following destruction_service<T>. Please refer to the class template's DocString for more information (and let me know in your review if it is insufficient).

destruction_service.hxx

syncio.hxx

main.cxx

Asynchronous object destruction service

Inspired by the recent Stack Overflow question "Safely release a resource on a different thread", I wrote the following destruction_service<T>. Please refer to the class template's DocString for more information (and let me know in your review if it is insufficient).

destruction_service.hxx

syncio.hxx

main.cxx

Source Link
5gon12eder
  • 4.3k
  • 14
  • 29

Asynchronous Object Destruction Service

Inspired by the recent Stack Overflow question "Safely release a resource on a different thread", I wrote the following destruction_service<T>. Please refer to the class template's DocString for more information (and let me know in your review if it is insufficient).

destruction_service.hxx

/**
 * @file destruction_service.hxx
 *
 * @brief
 * Provides the `my::destruction_service` class template for
 * asynchronous object destruction.
 *
 */
#ifndef DESTRUCTION_SERVICE_HXX
#define DESTRUCTION_SERVICE_HXX
#include <cassert> // assert
#include <condition_variable> // std::condition_variable
#include <mutex> // std::mutex, std::unique_lock
#include <thread> // std::thread
#include <type_traits> // std::enable_if_t, std::is_nothrow_{move_constructible,destructible}
#include <utility> // std::move
#include <vector> // std::vector
#include "syncio.hxx" // syncio::print_log
namespace my
{
 /**
 * @brief
 * An asynchronous object destruction service.
 *
 * An instance of this class owns a worker thread that asynchronously
 * destroys objects. This might be useful for threads that have high
 * responsiveness requirements and objects with (potentially) long-running
 * destructors.
 *
 * To-be-destroyed objects can be scheduled for destruction by means of the
 * `schedule_destruction` member function. It takes its argument by rvalue
 * reference assuming that destructing a moved-away-from object is
 * considerably cheaper. If you are using this service to destroy smart
 * pointers, be aware that only scheduling the *last* instance of a shared
 * pointer will cause the managed object to be destroyed asynchronously. `T`
 * is required to have a non-throwing move constructor and its destructor
 * must not throw either.
 *
 * If you have to destruct objects of different types, consider using a
 * polymorphic wrapper to perform type erasure. The default is
 * non-polymorphic, however, because type erasure requires additional dynamic
 * memory allocation which is obnoxious for destruction. Also, smart
 * pointers already provide some degree of polymorphism, provided that all
 * types you need to destruct derive from a common base class that has a
 * `virtual` destructor.
 *
 * @tparam T
 * type of object to be destroyed
 *
 */
 template
 <
 typename T,
 typename = std::enable_if_t<std::is_nothrow_move_constructible<T>::value>,
 typename = std::enable_if_t<std::is_nothrow_destructible<T>::value>
 >
 class destruction_service final
 {
 public:
 /** @brief Type that is destructed by this service. */
 using object_type = T;
 private:
 /** @brief Worker thread destroying objects. */
 std::thread worker_ {};
 /** @brief Mutex to protect the object queue. */
 mutable std::mutex mutex_ {};
 /** @brief Condition variable to signal changes to the object queue. */
 mutable std::condition_variable condvar_ {};
 /** @brief Object queue of to-be-destructed items. */
 std::vector<object_type> queue_ {};
 /** @brief Signal that no more objects will be scheduled for destruction. */
 bool done_ {};
 public:
 /**
 * @brief
 * Creates a new destruction service with its own worker thread.
 *
 */
 destruction_service()
 {
 this->worker_ = std::thread {&destruction_service::do_work_, this};
 }
 /**
 * @brief
 * Destroys all remaining objects and then shuts down the service.
 *
 */
 ~destruction_service() noexcept
 {
 syncio::print_log(__PRETTY_FUNCTION__);
 auto lck = this->lock_queue_();
 this->done_ = true;
 lck.unlock();
 this->condvar_.notify_all();
 if (this->worker_.joinable())
 this->worker_.join();
 assert(this->queue_.empty());
 }
 /**
 * @brief
 * Schedules an object for asynchronous destruction.
 *
 * This function may safely be called concurrently.
 *
 * @param object
 * object to be destructed
 *
 */
 void
 schedule_destruction(object_type&& object)
 {
 auto lck = this->lock_queue_();
 this->queue_.push_back(std::move(object));
 lck.unlock();
 this->condvar_.notify_all();
 }
 private:
 /**
 * @brief
 * Worker thread main loop.
 *
 * Until the `done_` flag is not set, this function waits on the object
 * queue and removes and destructs any items in it. It is guaranteeds that
 * the queue be empty after this function returns unless items were added
 * after the `done_` flag has been set.
 *
 */
 void
 do_work_()
 {
 syncio::print_log("destruction thread");
 auto things = std::vector<object_type> {};
 for (auto stop = false; true; stop = this->is_done_())
 {
 auto lck = this->lock_queue_();
 while (this->queue_.empty() && !this->done_)
 this->condvar_.wait(lck);
 this->queue_.swap(things);
 lck.unlock();
 syncio::print_log("about to destroy ", things.size(), " objects...");
 things.clear();
 if (stop)
 break;
 }
 assert(this->queue_.empty());
 assert(things.empty());
 syncio::print_log("good bye");
 }
 /**
 * @brief
 * Fetches the value of the `done_` member in a synchronized
 * manner.
 *
 * @returns
 * value of the `done_` member variable
 *
 */
 bool
 is_done_() const noexcept
 {
 auto lck = this->lock_queue_();
 return this->done_;
 }
 /**
 * @brief
 * Returns a lock for the object queue.
 *
 * @returns
 * lock for the object queue
 *
 */
 std::unique_lock<std::mutex>
 lock_queue_() const noexcept
 {
 return std::unique_lock<std::mutex> {this->mutex_};
 }
 }; // struct destruction_service
} // namespace my
#endif // #ifndef DESTRUCTION_SERVICE_HXX

A few design decisions (please critique if you disagree):

  • The type of objects that can be destructed is not erased for the reasons mentioned in the DocString. This also means that I cannot use queue poisoning in the destructor but instead have to use the done_ flag. I'm somewhat uncomfortable with the resulting control logic of the main loop in the do_work_ function.
  • The schedule_destruction function might have to resize a std::vector while holding to a lock. I normally wouldn't do this but in this case, it seemed preferable to have the main thread execute as fast as possible. I don't care if the destruction (worker) thread is blocked for a short while. Its work isn't very urgent anyway. It is also assumed that reallocations will be infrequent (see next item).
  • The object queue is implemented by a pair of std::vectors that grow dynamically as needed but never shrink. I thought about using a std::deque instead to avoid potentially keeping large amounts of memory that are no longer needed after a burst of submissions but the extra capacity of the std::vectors reducing the number of memory allocations in the critical section convinced me that this is the lesser evil. On the other hand, resizing a std::deque is a constant-time operation, while resizing a std::vector is linear.

For completeness, I'll also show the syncio.hxx header that provides the thread-safe syncio::print_log function and a small example usage in main.cxx. Feel free to comment about these files as well if you want to but please focus on the destruction_service.hxx file.

syncio.hxx

/**
 * @file syncio.hxx
 *
 * @brief
 * Simple synchronized (thread-safe) I/O functions.
 *
 * The functions provided by this header are only thread-safe if they are not
 * mixed with other I/O functions. Synchronization happens via an internal
 * mutex. Locking is pessimistic, using the same mutex for all streams,
 * assuming that all streams might interleave.
 *
 */
#ifndef SYNCIO_HXX
#define SYNCIO_HXX
#include <iostream> // std::ostream, std::clog, std::endl
#include <mutex> // std::mutex, std::lock_guard
#include <thread> // std::this_thread::get_id
namespace syncio
{
 /** @brief Internal private `namespace`. */
 namespace syncio_hxx
 {
 /**
 * @brief
 * Returns a reference to the singleton instance of the I/O mutex.
 *
 * @returns
 * reference to the mutex protecting I/O operations
 *
 */
 std::mutex&
 get_iomutex() noexcept
 {
 static std::mutex iomutex {};
 return iomutex;
 }
 /**
 * @brief
 * Helper struct to print a variadic number of arguments.
 *
 * @tparam ...
 * ignored
 *
 */
 template <typename...>
 struct printer_helper;
 /**
 * @brief
 * Specialization for one or more arguments.
 *
 * @tparam HeadT
 * type of the first argument
 *
 * @tparam TailTs...
 * types of the remaining arguments (if any)
 *
 */
 template <typename HeadT, typename... TailTs>
 struct printer_helper<HeadT, TailTs...>
 {
 /**
 * @brief
 * Inserts all arguments followed by a newline into the given
 * stream and flushes it.
 *
 * @param os
 * stream to print to
 *
 * @param head
 * first argument to print
 *
 * @param tail...
 * remaining arguments to print
 *
 */
 static void
 print(std::ostream& os, const HeadT& head, const TailTs&... tail)
 {
 os << head;
 printer_helper<TailTs...>::print(os, tail...);
 }
 };
 /**
 * @brief
 * Specialization for zero arguments.
 *
 */
 template <>
 struct printer_helper<>
 {
 /**
 * @brief
 * Inserts a newline into the given stream and flushes it.
 *
 * @param os
 * stream to print to
 *
 */
 static void
 print(std::ostream& os)
 {
 os << std::endl;
 }
 };
 } // namespace syncio_hxx
 /**
 * @brief
 * Prints a message to the given stream in a synchronized
 * (thread-safe) manner.
 *
 * The arguments are inserted as if by successively streaming them with
 * `operator<<`. After the last item, a newline is inserted and the stream
 * flushed.
 *
 * @param os
 * stream to print to
 *
 * @param items...
 * items to make up the message
 *
 */
 template <typename... Ts>
 void
 print(std::ostream& os, const Ts&... items)
 {
 std::lock_guard<std::mutex> guard {syncio_hxx::get_iomutex()};
 syncio_hxx::printer_helper<Ts...>::print(os, items...);
 }
 /**
 * @brief
 * Prints a log message to `std::log` in a synchronized (thread-safe)
 * manner.
 *
 * The arguments are prefixed with the ID of the current thread and
 * terminated by a newline. The stream is flushed after printing the
 * message.
 *
 * @param items...
 * items to make up the log message
 *
 */
 template <typename... Ts>
 void
 print_log(const Ts&... items)
 {
 print(std::clog, "[", std::this_thread::get_id(), "] ", items...);
 }
} // namespace syncio
#endif // #ifndef SYNCIO_HXX

main.cxx

/**
 * @file main.cxx
 *
 * @brief
 * Example usage of the `my::destruction_service`.
 *
 */
#include <atomic> // std::atomic_int
#include <thread> // std::this_thread::{get_id,yield}
#include <utility> // std::exchange
#include "destruction_service.hxx" // my::destruction_service
#include "syncio.hxx" // syncio::print_log
namespace /* anonymous */
{
 /** @brief Number of `example` objects created thus far. */
 std::atomic_int example_count {};
 /** @brief ID of the main thread. */
 std::thread::id main_thread_id {};
 /**
 * @brief
 * A dummy class.
 *
 * The destructor if this class `assert()`s that it is invoked on a thread
 * different from the main thread. This is not the case for destructing
 * moved-away-from objects.
 *
 */
 class example
 {
 private:
 /** @brief ID of this object. */
 int id_ {-1};
 public:
 /**
 * @brief
 * Creates a new object with a new ID.
 *
 */
 example() : id_ {example_count.fetch_add(1)}
 {
 syncio::print_log("creating object ", this->id_);
 std::this_thread::yield();
 }
 /**
 * @brief
 * Creates a copy of another object with the same ID.
 *
 * @param other
 * object to copy
 *
 */
 example(const example& other) : id_ {other.id_}
 {
 }
 /**
 * @brief
 * Creates a copy of another object, stealing its ID.
 *
 * The moved-away-from object will get a negative ID.
 *
 * @param other
 * object to move away from
 *
 */
 example(example&& other) noexcept : id_ {std::exchange(other.id_, -1)}
 {
 }
 /**
 * @brief
 * Asserts that the object is either in a moved-away-from state or
 * the current thread is different from the main thread.
 *
 */
 ~example() noexcept
 {
 syncio::print_log("destroying object ", this->id_);
 assert(this->id_ < 0 || std::this_thread::get_id() != main_thread_id);
 std::this_thread::yield();
 }
 }; // struct example
} // namespace /* anonymous */
/**
 * @brief
 * Launches a `my::destruction_service` and submits a dozen `example`
 * objects to it.
 *
 */
int
main()
{
 main_thread_id = std::this_thread::get_id();
 syncio::print_log("main thread");
 my::destruction_service<example> destructor {};
 for (int i = 0; i < 12; ++i)
 {
 auto thing = example {};
 destructor.schedule_destruction(std::move(thing));
 }
}

I'm interested in feedback of any kind but in particular:

  • Correctness, especially concurrency bugs. It took me a while to get this into what I believe is now race and dead-lock free.
  • Compliance with the C++14 standard and making good use of its features.
  • Design of the interface.
  • Performance issues.
lang-cpp

AltStyle によって変換されたページ (->オリジナル) /