I'm currently trying to develop an easy to use TCP networking library based on boost::asio
.
This is my first attempt to work with boost::asio
and therefore I've got some questions:
- Firstly, is my code okay and easy to understand?
- Are there any misleading conventions?
- Are there any memory leaks?
- Is there any improvement for my written code?
- What about the style-guide?
Additional information about the three different classes:
TCPSession
handles the read/write operations of the socket.TCPClient
contains an object of theTCPSession
and handles the connection to a server.TCPServer
holds an acceptor andstd::unordererd_map<std::uint64_t, TCPSessionPtr>
with all the active connections.
Base.hpp
#pragma once
#include <cassert>
#include <string>
#include <boost/asio.hpp>
#include <boost/asio/strand.hpp>
#include <boost/bind.hpp>
#include <boost/asio/signal_set.hpp>
#define SPDLOG_ACTIVE_LEVEL 1
#include <spdlog/spdlog.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/async.h>
#include "Utils.hpp"
namespace nelw
{
typedef std::uint64_t id_type;
typedef boost::asio::ip::tcp::socket socket_t;
typedef boost::asio::io_context& io_context_ref;
typedef boost::asio::ip::tcp::acceptor tcp_acceptor;
class TCPSession;
typedef std::shared_ptr<TCPSession> TConnectionPtr;
typedef std::function<void(const TConnectionPtr&)> TConnectionCallback;
typedef std::function<void(const TConnectionPtr&, const boost::system::error_code&)> TErrorCallback;
class Buffer;
typedef std::unique_ptr<Buffer> TBufferPtr;
typedef std::function<void(const TConnectionPtr&, const TBufferPtr&)> TRecvCallback;
}
Utils.hpp
#pragma once
namespace nelw
{
template<class F, class...Args>
void for_each_arg(F f, Args&&...args) {
(f(std::forward<Args>(args)), ...);
}
}
TCPSession.hpp
#pragma once
#include "Buffer.hpp"
namespace nelw
{
class TCPSession : public std::enable_shared_from_this<TCPSession>
{
public:
TCPSession(io_context_ref context, std::uint64_t buffer_size = 4096);
virtual ~TCPSession();
friend class TCPServer;
friend class TCPClient;
public:
auto socket() -> socket_t& { return socket_; }
auto write_buffer() ->TBufferPtr& { return write_buffer_; }
auto id() -> id_type { return id_; }
auto set_id(id_type id) { id_ = id; }
auto read_buffer() ->TBufferPtr& { return read_buffer_; }
bool is_connected() { return is_connected_; }
template <class T> void set_dc_cb(T&& cb) { dc_cb_ = cb; }
template <class T> void set_connection_cb(T&& cb) { connection_cb_ = cb; }
template <class T> void set_recv_cb(T&& cb) { recv_cb_ = cb; }
template <class T> void set_error_cb(T&& cb) { error_cb_ = cb; }
public:
void Disconnect();
void Send(void* pData, size_t length);
void Read();
template <typename... Args> void Send(Args&&... args)
{
//constexpr std::size_t n = sizeof...(Args);
std::uint32_t size = 0;
for_each_arg([&size](auto& arg) { size += sizeof(arg); }, args...);
if (size <= 0)
return;
size_t offset = 0;
std::vector<std::byte> buffer{ size };
for_each_arg([&buffer, &offset](auto& arg)
{
std::memcpy(&buffer[offset], &arg, sizeof(arg));
offset += sizeof(arg);
}
, args...);
Send((void*)buffer.data(), buffer.size());
}
void WriteProcess();
void RemoteDisconnect();
protected:
virtual void OnError(const boost::system::error_code& ec);
virtual void OnConnection();
virtual void OnRecv();
protected:
io_context_ref context_;
socket_t socket_;
id_type id_;
std::function<void()> dc_cb_;
TBufferPtr write_buffer_; // write buffer
TBufferPtr read_buffer_;
bool is_connected_;
bool is_writing_;
TConnectionCallback connection_cb_;
TRecvCallback recv_cb_;
TErrorCallback error_cb_;
};
}
TCPSession.cpp
#include "nelw/Base.hpp"
#include "nelw/TcpSession.hpp"
#include <iostream>
namespace nelw
{
TCPSession::TCPSession(io_context_ref context, std::uint64_t buffer_size) : context_(context), socket_(context),
id_ { 0 }, dc_cb_{ nullptr },
is_connected_ { true }, connection_cb_ { nullptr },
recv_cb_ { nullptr }, error_cb_ { nullptr },
is_writing_{ false }
{
if (buffer_size)
{
read_buffer_ = std::make_unique<nelw::Buffer>(buffer_size);
write_buffer_ = std::make_unique<nelw::Buffer>(buffer_size);
}
}
TCPSession::~TCPSession()
{
if (read_buffer_)
read_buffer_.reset();
if (write_buffer_)
write_buffer_.reset();
if (dc_cb_)
{
dc_cb_();
dc_cb_ = nullptr;
}
spdlog::info("Remove from session! {}", id_);
}
void TCPSession::Disconnect()
{
if (!is_connected_)
return;
auto self{ shared_from_this() };
context_.post([self]() {
if (!self->socket().is_open())
return;
self->is_connected_ = false;
self->socket_.cancel();
self->socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
self->socket_.close();
self->OnConnection();
if (self->dc_cb_)
{
self->dc_cb_();
self->dc_cb_ = nullptr;
}
});
}
void TCPSession::Read()
{
if (!is_connected_)
return;
assert(read_buffer_);
if (!socket_.is_open())
return;
//spdlog::debug("Start reading from {}", e.address().to_string(), e.port());
auto self{ shared_from_this() };
auto write_able = read_buffer_->WritableBytes();
if (write_able <= 0)
{
//spdlog::debug("Waiting for bytes...");
try
{
socket_.async_wait(boost::asio::ip::tcp::socket::wait_read, [self = shared_from_this()](const boost::system::error_code& ec) {
if (!ec)
{
auto s = self->socket_.available();
if (s > 0)
self->read_buffer_->EnsureWritableBytes(s);
spdlog::debug("Ensure bytes size: {}", s);
self->Read();
}
else
{
self->OnError(ec);
if (boost::asio::error::eof == ec || boost::asio::error::connection_reset == ec)
{
spdlog::debug("Disconnect from async_wait!");
self->RemoteDisconnect();
}
}
});
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;//spdlog::error("socket_.async_wait -> {}", e.what());
}
}
else if (write_able > 0)
{
socket_.async_read_some(boost::asio::buffer(read_buffer_->WriteBegin(), read_buffer_->WritableBytes()),
[self = shared_from_this()](const boost::system::error_code& ec, std::size_t length)
{
if (!ec)
{
if (length)
{
self->read_buffer_->WriteBytes(length);
self->OnRecv();
}
self->Read();
}
else
{
self->OnError(ec);
if (boost::asio::error::eof == ec || boost::asio::error::connection_reset == ec)
{
spdlog::debug("Disconnect from read!");
self->RemoteDisconnect();
}
}
});
}
}
void TCPSession::Send(void* pData, size_t length)
{
if (!is_connected_)
return;
write_buffer_->Append(pData, length);
if (!is_writing_)
WriteProcess();
}
void TCPSession::WriteProcess()
{
if (!is_connected_)
return;
auto length = write_buffer_->length();
if (length <= 0)
return;
auto self{ shared_from_this() };
boost::asio::async_write(
socket_,
boost::asio::buffer(write_buffer_->data(), write_buffer_->length()),
[self](const boost::system::error_code& ec, size_t bytes_transfered)
{
if (ec)
{
self->OnError(ec);
if (boost::asio::error::eof == ec || boost::asio::error::connection_reset == ec)
self->RemoteDisconnect();
return;
}
else
{
if (bytes_transfered)
{
self->write_buffer_->Skip(bytes_transfered);
if (self->write_buffer_->length() > 0)
self->WriteProcess();
else
self->is_writing_ = false;
}
}
}
);
is_writing_ = true;
}
void TCPSession::RemoteDisconnect()
{
if (is_connected_)
{
is_connected_ = false;
OnConnection();
if (dc_cb_)
{
dc_cb_();
dc_cb_ = nullptr;
}
}
}
void TCPSession::OnError(const boost::system::error_code& ec)
{
if (error_cb_)
error_cb_(shared_from_this(), ec);
}
void TCPSession::OnConnection()
{
if (connection_cb_)
connection_cb_(shared_from_this());
}
void TCPSession::OnRecv()
{
if (recv_cb_)
recv_cb_(shared_from_this(), read_buffer_);
}
}
TCPClient.hpp
#pragma once
#include "TcpSession.hpp"
namespace nelw
{
class TCPClient
{
public:
enum flags : std::uint8_t
{
kTimerRunning = (1 << 1), // for the connection_timer_
kReconnecting = (1 << 2)
};
TCPClient(io_context_ref context);
~TCPClient();
virtual TConnectionPtr CreateSession();
void Connect(const std::string& host, std::uint16_t port);
void Disconnect();
// getters and setters
public:
void set_connection_timeout(boost::asio::deadline_timer::duration_type connection_timeout);
auto connection_timeout() -> boost::asio::deadline_timer::duration_type;
TConnectionPtr& connection();
// end of getters and setters
void Send(void* pData, size_t length);
template <typename... Args> void Send(Args&&... args)
{
if (connection_ && connection_->is_connected())
connection_->Send(args...);
}
template <class T> void set_connection_cb(T&& cb) { connection_cb_ = cb; }
template <class T> void set_recv_cb(T&& cb) { recv_cb_ = cb; }
template <class T> void set_error_cb(T&& cb) { error_cb_ = cb; }
void set_buffer_size(std::uint64_t buffer_size);
void set_reconnect(bool b);
bool is_connected();
protected:
io_context_ref context_;
TConnectionPtr connection_;
std::uint8_t flags_;
boost::asio::deadline_timer connection_timer_;
boost::asio::deadline_timer::duration_type connection_timeout_;
TConnectionCallback connection_cb_;
TErrorCallback error_cb_;
TRecvCallback recv_cb_;
std::uint64_t buffer_size_;
};
}
TCPClient.cpp
#include "nelw/Base.hpp"
#include "nelw/TcpClient.hpp"
#include "nelw/TcpSession.hpp"
namespace nelw
{
TCPClient::TCPClient(io_context_ref context) : context_{ context }, flags_{ 0 }, connection_timer_{ context }, connection_timeout_(boost::posix_time::seconds(5)),
connection_cb_{ nullptr }, error_cb_{ nullptr }, recv_cb_{ nullptr }, buffer_size_{ 4096 }
{
}
TCPClient::~TCPClient()
{
Disconnect(); // be sure
if (flags_ & flags::kTimerRunning)
{
connection_timer_.cancel();
flags_ &= ~(flags::kTimerRunning);
}
}
TConnectionPtr TCPClient::CreateSession()
{
return std::make_shared<TCPSession>(context_, buffer_size_);
}
void TCPClient::Connect(const std::string& host, std::uint16_t port)
{
if (connection_)
connection_.reset();
spdlog::debug("Starting connection to {}, {}", host, port);
connection_ = CreateSession();
boost::asio::ip::tcp::resolver resolver(context_);
auto endpoints = resolver.resolve(boost::asio::ip::tcp::resolver::query(host, std::to_string(port)));
boost::asio::async_connect(connection_->socket(), endpoints,
[this, h = host, p = port](boost::system::error_code ec, boost::asio::ip::tcp::endpoint)
{
if (!ec)
{
flags_ &= ~(flags::kTimerRunning);
connection_->set_dc_cb([this]() {
connection_ = nullptr;
});
if (error_cb_)
connection_->set_error_cb(error_cb_);
if (recv_cb_)
connection_->set_recv_cb(recv_cb_);
if (connection_cb_)
connection_->set_connection_cb(connection_cb_);
connection_->OnConnection();
connection_->Read();
}
else
{
if (connection_)
connection_->OnError(ec);
if (flags_ & flags::kReconnecting)
{
flags_ |= flags::kTimerRunning;
connection_timer_.expires_from_now(connection_timeout_);
connection_timer_.async_wait([this, h = h, p = p](const boost::system::error_code& ec) {
if (!ec)
{
Connect(h, p);
}
else
{
flags_ &= ~(flags::kTimerRunning);
}
});
}
}
});
}
void TCPClient::Disconnect()
{
if (!connection_)
return;
if (connection_->is_connected())
connection_->Disconnect();
connection_ = nullptr;
}
bool TCPClient::is_connected()
{
return connection_ ? connection_->is_connected() : false;
}
void TCPClient::set_connection_timeout(boost::asio::deadline_timer::duration_type connection_timeout)
{
connection_timeout_ = connection_timeout;
}
auto TCPClient::connection_timeout() -> boost::asio::deadline_timer::duration_type
{
return connection_timeout_;
}
TConnectionPtr& TCPClient::connection()
{
return connection_;
}
void TCPClient::Send(void* pData, size_t length)
{
if (connection_)
connection_->Send(pData, length);
}
void TCPClient::set_buffer_size(std::uint64_t buffer_size)
{
buffer_size_ = buffer_size_;
}
void TCPClient::set_reconnect(bool b)
{
if (b)
flags_ |= flags::kReconnecting;
else
flags_ &= ~(flags::kReconnecting);
}
}
TCPServer.hpp
#pragma once
namespace nelw
{
class option_wrapper
{
private:
boost::asio::ip::tcp::acceptor& acceptor_;
public:
option_wrapper(boost::asio::ip::tcp::acceptor& acceptor) : acceptor_(acceptor)
{
}
~option_wrapper() = default;
template <typename SettableSocketOption>
option_wrapper& set_option(const SettableSocketOption& option)
{
acceptor_.set_option(option);
return *this;
}
};
class TCPServer
{
public:
using TConnectionMap = std::unordered_map<id_type, TConnectionPtr>;
public:
TCPServer(io_context_ref io_service);
virtual ~TCPServer();
public:
void Run(boost::asio::ip::tcp::endpoint&& e);
void Run(std::uint16_t port, std::string_view ip = "127.0.0.1");
void Stop();
public:
/**********************/
/* getters & setters*/
auto context()->io_context_ref { return context_; }
auto acceptor()->tcp_acceptor& { return acceptor_; }
template <typename SettableSocketOption>
option_wrapper& set_option(const SettableSocketOption& option)
{
return option_wrapper_.set_option(option);
}
/**********************/
protected:
void Disconnect(std::shared_ptr<id_type> id);
virtual void Accept();
virtual TConnectionPtr CreateSession() // Override it
{
return std::make_shared<TCPSession>(context_, buffer_size_);
}
public:
template <class T> void set_connection_cb(T&& cb) { connection_cb_ = cb; }
template <class T> void set_recv_cb(T&& cb) { recv_cb_ = cb; }
template <class T> void set_error_cb(T&& cb) { error_cb_ = cb; }
void set_buffer_size(std::uint64_t buffer_size);
void BroadCastPacket(void* pData, size_t length);
protected:
io_context_ref context_;
tcp_acceptor acceptor_;
id_type next_id_;
TConnectionMap connection_map_;
bool is_running_;
TConnectionCallback connection_cb_;
TErrorCallback error_cb_;
TRecvCallback recv_cb_;
std::uint64_t buffer_size_;
option_wrapper option_wrapper_;
};
}
TCPServer.cpp
#include "nelw/Base.hpp"
#include "nelw/TcpServer.hpp"
#include "nelw/TcpSession.hpp"
#include <algorithm>
namespace nelw
{
TCPServer::TCPServer(io_context_ref io_service) : context_(io_service), acceptor_(context_), next_id_{ 0 }, is_running_(false),
connection_cb_{ nullptr }, error_cb_{ nullptr }, recv_cb_{ nullptr }, buffer_size_{ 4096 }, option_wrapper_(acceptor_)
{
}
TCPServer::~TCPServer()
{
Stop();
}
void TCPServer::Run(boost::asio::ip::tcp::endpoint&& e)
{
acceptor_.open(e.protocol());
acceptor_.bind(e);
acceptor_.listen();
Accept();
is_running_ = true;
}
void TCPServer::Run(std::uint16_t port, std::string_view ip)
{
Run(boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(ip), port));
}
void TCPServer::Disconnect(std::shared_ptr<id_type> id)
{
if (auto it = connection_map_.find(*id); it != connection_map_.end())
{
connection_map_.erase(it);
spdlog::debug("Remove last id {0} left_size {1}", *id, connection_map_.size());
}
}
void TCPServer::Stop()
{
if (!is_running_)
return;
if (!acceptor_.is_open())
return;
for (auto& [_, con] : connection_map_)
con->Disconnect();
acceptor_.cancel();
acceptor_.close();
is_running_ = false;
}
void TCPServer::Accept()
{
if (!acceptor_.is_open())
return;
auto session = CreateSession();
acceptor_.async_accept(session->socket(), [this, session = session](boost::system::error_code const& ec) {
if (!ec)
{
auto id = ++next_id_;
session->set_id(id);
connection_map_[id] = session;
context_.post([this, idPtr = std::make_shared<id_type>(id), session = session, ec = ec]() {
if (error_cb_)
session->set_error_cb(error_cb_);
if (recv_cb_)
session->set_recv_cb(recv_cb_);
if (connection_cb_)
session->set_connection_cb(connection_cb_);
session->set_dc_cb(boost::bind(&TCPServer::Disconnect, this, idPtr));
session->Read();
Accept();
session->OnConnection();
});
}
else
{
if (session)
{
session->OnConnection();
}
}
});
}
void TCPServer::BroadCastPacket(void* pData, size_t length)
{
for (auto& [_, con] : connection_map_)
if (con) con->Send(pData, length);
}
void TCPServer::set_buffer_size(std::uint64_t buffer_size)
{
buffer_size_ = buffer_size_;
}
}
Buffer.hpp
#pragma once
// Modified from evpp project https://github.com/Qihoo360/evpp
// @see https://github.com/Qihoo360/evpp/blob/master/evpp/buffer.h and https://github.com/Qihoo360/evpp/blob/master/evpp/buffer.cc
namespace nelw
{
class Buffer
{
public:
inline static constexpr size_t kCheapPrependSize = 8;
inline static constexpr size_t kInitialSize = 1024;
explicit Buffer(size_t initial_size = kInitialSize, size_t reserved_prepend_size = kCheapPrependSize)
: capacity_(reserved_prepend_size + initial_size)
, read_index_(reserved_prepend_size)
, write_index_(reserved_prepend_size)
, reserved_prepend_size_(reserved_prepend_size) {
buffer_ = new char[capacity_];
assert(length() == 0);
assert(WritableBytes() == initial_size);
assert(PrependableBytes() == reserved_prepend_size);
}
~Buffer() {
delete[] buffer_;
buffer_ = nullptr;
capacity_ = 0;
}
void Swap(Buffer& rhs) {
std::swap(buffer_, rhs.buffer_);
std::swap(capacity_, rhs.capacity_);
std::swap(read_index_, rhs.read_index_);
std::swap(write_index_, rhs.write_index_);
std::swap(reserved_prepend_size_, rhs.reserved_prepend_size_);
}
// Skip advances the reading index of the buffer
void Skip(size_t len) {
if (len < length()) {
read_index_ += len;
}
else {
Reset();
}
}
// Retrieve advances the reading index of the buffer
// Retrieve it the same as Skip.
void Retrieve(size_t len) {
Skip(len);
}
// Truncate discards all but the first n unread bytes from the buffer
// but continues to use the same allocated storage.
// It does nothing if n is greater than the length of the buffer.
void Truncate(size_t n) {
if (n == 0) {
read_index_ = reserved_prepend_size_;
write_index_ = reserved_prepend_size_;
}
else if (write_index_ > read_index_ + n) {
write_index_ = read_index_ + n;
}
}
// Reset resets the buffer to be empty,
// but it retains the underlying storage for use by future writes.
// Reset is the same as Truncate(0).
void Reset() {
Truncate(0);
}
// Increase the capacity of the container to a value that's greater
// or equal to len. If len is greater than the current capacity(),
// new storage is allocated, otherwise the method does nothing.
void Reserve(size_t len) {
if (capacity_ >= len + reserved_prepend_size_) {
return;
}
// TODO add the implementation logic here
grow(len + reserved_prepend_size_);
}
// Make sure there is enough memory space to append more data with length len
void EnsureWritableBytes(size_t len) {
if (WritableBytes() < len) {
grow(len);
}
assert(WritableBytes() >= len);
}
// ToText appends char '0円' to buffer to convert the underlying data to a c-style string text.
// It will not change the length of buffer.
void ToText() {
AppendInt8('0円');
UnwriteBytes(1);
}
// TODO XXX Little-Endian/Big-Endian problem.
#define evppbswap_64(x) \
((((x) & 0xff00000000000000ull) >> 56) \
| (((x) & 0x00ff000000000000ull) >> 40) \
| (((x) & 0x0000ff0000000000ull) >> 24) \
| (((x) & 0x000000ff00000000ull) >> 8) \
| (((x) & 0x00000000ff000000ull) << 8) \
| (((x) & 0x0000000000ff0000ull) << 24) \
| (((x) & 0x000000000000ff00ull) << 40) \
| (((x) & 0x00000000000000ffull) << 56))
// Write
public:
void Write(const void* /*restrict*/ d, size_t len) {
EnsureWritableBytes(len);
memcpy(WriteBegin(), d, len);
assert(write_index_ + len <= capacity_);
write_index_ += len;
}
void Append(const char* /*restrict*/ d, size_t len) {
Write(d, len);
}
void Append(const void* /*restrict*/ d, size_t len) {
Write(d, len);
}
// Append int64_t/int32_t/int16_t with network endian
void AppendInt64(int64_t x) {
int64_t be = evppbswap_64(x);
Write(&be, sizeof be);
}
void AppendInt32(int32_t x) {
int32_t be32 = htonl(x);
Write(&be32, sizeof be32);
}
void AppendInt16(int16_t x) {
int16_t be16 = htons(x);
Write(&be16, sizeof be16);
}
void AppendInt8(int8_t x) {
Write(&x, sizeof x);
}
// Prepend int64_t/int32_t/int16_t with network endian
void PrependInt64(int64_t x) {
int64_t be = evppbswap_64(x);
Prepend(&be, sizeof be);
}
void PrependInt32(int32_t x) {
int32_t be32 = htonl(x);
Prepend(&be32, sizeof be32);
}
void PrependInt16(int16_t x) {
int16_t be16 = htons(x);
Prepend(&be16, sizeof be16);
}
void PrependInt8(int8_t x) {
Prepend(&x, sizeof x);
}
// Insert content, specified by the parameter, into the front of reading index
void Prepend(const void* /*restrict*/ d, size_t len) {
assert(len <= PrependableBytes());
read_index_ -= len;
const char* p = static_cast<const char*>(d);
memcpy(begin() + read_index_, p, len);
}
void UnwriteBytes(size_t n) {
assert(n <= length());
write_index_ -= n;
}
void WriteBytes(size_t n) {
assert(n <= WritableBytes());
write_index_ += n;
}
//Read
public:
// Peek int64_t/int32_t/int16_t/int8_t with network endian
int64_t ReadInt64() {
int64_t result = PeekInt64();
Skip(sizeof result);
return result;
}
int32_t ReadInt32() {
int32_t result = PeekInt32();
Skip(sizeof result);
return result;
}
int16_t ReadInt16() {
int16_t result = PeekInt16();
Skip(sizeof result);
return result;
}
int8_t ReadInt8() {
int8_t result = PeekInt8();
Skip(sizeof result);
return result;
}
std::string ToString() const {
return std::string(data(), length());
}
void Shrink(size_t reserve) {
// Buffer other(length() + reserve);
// other.Append(ToSlice());
// Swap(other);
}
// ReadByte reads and returns the next byte from the buffer.
// If no byte is available, it returns '0円'.
char ReadByte() {
assert(length() >= 1);
if (length() == 0) {
return '0円';
}
return buffer_[read_index_++];
}
// UnreadBytes unreads the last n bytes returned
// by the most recent read operation.
void UnreadBytes(size_t n) {
assert(n < read_index_);
read_index_ -= n;
}
// Peek
public:
// Peek int64_t/int32_t/int16_t/int8_t with network endian
int64_t PeekInt64() const {
assert(length() >= sizeof(int64_t));
int64_t be64 = 0;
::memcpy(&be64, data(), sizeof be64);
return evppbswap_64(be64);
}
int32_t PeekInt32() const {
assert(length() >= sizeof(int32_t));
int32_t be32 = 0;
::memcpy(&be32, data(), sizeof be32);
return ntohl(be32);
}
int16_t PeekInt16() const {
assert(length() >= sizeof(int16_t));
int16_t be16 = 0;
::memcpy(&be16, data(), sizeof be16);
return ntohs(be16);
}
int8_t PeekInt8() const {
assert(length() >= sizeof(int8_t));
int8_t x = *data();
return x;
}
public:
// data returns a pointer of length Buffer.length() holding the unread portion of the buffer.
// The data is valid for use only until the next buffer modification (that is,
// only until the next call to a method like Read, Write, Reset, or Truncate).
// The data aliases the buffer content at least until the next buffer modification,
// so immediate changes to the slice will affect the result of future reads.
const char* data() const {
return buffer_ + read_index_;
}
char* WriteBegin() {
return begin() + write_index_;
}
const char* WriteBegin() const {
return begin() + write_index_;
}
// length returns the number of bytes of the unread portion of the buffer
size_t length() const {
assert(write_index_ >= read_index_);
return write_index_ - read_index_;
}
// size returns the number of bytes of the unread portion of the buffer.
// It is the same as length().
size_t size() const {
return length();
}
// capacity returns the capacity of the buffer's underlying byte slice, that is, the
// total space allocated for the buffer's data.
size_t capacity() const {
return capacity_;
}
size_t WritableBytes() const {
assert(capacity_ >= write_index_);
return capacity_ - write_index_;
}
size_t PrependableBytes() const {
return read_index_;
}
// Helpers
public:
const char* FindCRLF() const {
const char* crlf = std::search(data(), WriteBegin(), kCRLF, kCRLF + 2);
return crlf == WriteBegin() ? nullptr : crlf;
}
const char* FindCRLF(const char* start) const {
assert(data() <= start);
assert(start <= WriteBegin());
const char* crlf = std::search(start, WriteBegin(), kCRLF, kCRLF + 2);
return crlf == WriteBegin() ? nullptr : crlf;
}
const char* FindEOL() const {
const void* eol = memchr(data(), '\n', length());
return static_cast<const char*>(eol);
}
const char* FindEOL(const char* start) const {
assert(data() <= start);
assert(start <= WriteBegin());
const void* eol = memchr(start, '\n', WriteBegin() - start);
return static_cast<const char*>(eol);
}
private:
char* begin() {
return buffer_;
}
const char* begin() const {
return buffer_;
}
void grow(size_t len) {
if (WritableBytes() + PrependableBytes() < len + reserved_prepend_size_) {
//grow the capacity
size_t n = (capacity_ << 1) + len;
size_t m = length();
char* d = new char[n];
memcpy(d + reserved_prepend_size_, begin() + read_index_, m);
write_index_ = m + reserved_prepend_size_;
read_index_ = reserved_prepend_size_;
capacity_ = n;
delete[] buffer_;
buffer_ = d;
}
else {
// move readable data to the front, make space inside buffer
assert(reserved_prepend_size_ < read_index_);
size_t readable = length();
memmove(begin() + reserved_prepend_size_, begin() + read_index_, length());
read_index_ = reserved_prepend_size_;
write_index_ = read_index_ + readable;
assert(readable == length());
assert(WritableBytes() >= len);
}
}
private:
char* buffer_;
size_t capacity_;
size_t read_index_;
size_t write_index_;
size_t reserved_prepend_size_;
inline static constexpr const char kCRLF[3] = { "\r\n" };
};
}
Echo Server example:
#include <nelw/nelw.hpp>
auto main() -> int
{
spdlog::set_pattern("[%l][%t] %v");
boost::asio::io_context context;
std::uint16_t port = 9005;
auto srv = std::make_unique<nelw::TCPServer>(context);
srv->Run(port);
srv->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)).
set_option(boost::asio::ip::tcp::no_delay(true));
srv->set_recv_cb([](const nelw::TConnectionPtr& con, const nelw::TBufferPtr& buf) {
if (auto size = buf->length(); size)
{
auto str = buf->ToString();
con->Send((void*)str.data(), str.size());
buf->Skip(size);
}
});
context.run();
return 0;
}
1 Answer 1
Base.hpp
drags in a lot of stuff that isn't needed for what it defines:
#define SPDLOG_ACTIVE_LEVEL 1 #include <spdlog/spdlog.h> #include <spdlog/sinks/basic_file_sink.h> #include <spdlog/async.h> #include "Utils.hpp"
As far as I can see, all of these can be removed, and only included by the implementation files that need them.