\$\begingroup\$
\$\endgroup\$
Handles streaming of the response body.
In construction you either pass a content-length
value and the stream will allow you to send that many bites before cutting you off. Or you can pass the transfer-encoding
type in which case the stream will automatically convert the input into the appropriate output stream underneath (currently only support chunked but plan to expand).
StreamOutput.h
#ifndef THORSANVIL_NISSE_NISSEHTTP_STREAMOUTPUT_H
#define THORSANVIL_NISSE_NISSEHTTP_STREAMOUTPUT_H
#include "NisseHTTPConfig.h"
#include "Util.h"
#include <iostream>
namespace ThorsAnvil::Nisse::NisseHTTP
{
class StreamBufOutput: public std::streambuf
{
public:
using Complete = std::function<void()>;
typedef std::streambuf::traits_type traits;
typedef traits::int_type int_type;
typedef traits::char_type char_type;
private:
static std::streamsize constexpr chunkBufferSize = 1024;
std::streamsize remaining;
std::streambuf* buffer;
bool chunked;
bool firstChunk;
std::vector<char> chunkBuffer;
public:
~StreamBufOutput();
StreamBufOutput();
StreamBufOutput(std::ostream& stream, std::size_t length;
StreamBufOutput(std::ostream& stream, Encoding encoding);
StreamBufOutput(StreamBufOutput&& move) noexcept;
StreamBufOutput& operator=(StreamBufOutput&& move) noexcept;
StreamBufOutput(StreamBufOutput const&) = delete;
StreamBufOutput& operator=(StreamBufOutput const&) = delete;
void swap(StreamBufOutput& other) noexcept;
friend void swap(StreamBufOutput& lhs, StreamBufOutput& rhs) {lhs.swap(rhs);}
void done();
protected:
virtual int sync() override;
// Write:
virtual std::streamsize xsputn(char_type const*,std::streamsize) override;
virtual int_type overflow(int_type = traits::eof()) override;
private:
void checkBuffer();
void outputChunkSize(std::streamsize size);
char toHex(int digit);
void sendAllData(char const* s, std::streamsize size);
std::streamsize xsputnChunked(char_type const*,std::streamsize);
std::streamsize xsputnLength(char_type const*,std::streamsize);
int_type overflowChunked(int_type = traits::eof());
int_type overflowLength(int_type = traits::eof());
void dumpBuffer();
};
class StreamOutput: public std::ostream
{
StreamBufOutput buffer;
public:
StreamOutput()
: std::ostream(nullptr)
, buffer()
{}
StreamOutput(std::ostream& stream, std::size_t length)
: std::ostream(nullptr)
, buffer(stream, length)
{
rdbuf(&buffer);
}
StreamOutput(std::ostream& stream, Encoding encoding)
: std::ostream(nullptr)
, buffer(stream, encoding)
{
rdbuf(&buffer);
}
void addBuffer(StreamBufOutput&& newBuffer)
{
buffer = std::move(newBuffer);
rdbuf(&buffer);
clear();
}
};
}
#endif
StreamOutput.cpp
#include "StreamOutput.h"
using namespace ThorsAnvil::Nisse::NisseHTTP;
StreamBufOutput::~StreamBufOutput()
{
done();
}
StreamBufOutput::StreamBufOutput()
: remaining(0)
, buffer(nullptr)
, chunked(false)
, firstChunk(false)
, chunkBuffer(0)
{}
StreamBufOutput::StreamBufOutput(std::ostream& stream, std::size_t length)
: remaining(length)
, buffer(stream.rdbuf())
, chunked(false)
, firstChunk(false)
, chunkBuffer(0)
{}
StreamBufOutput::StreamBufOutput(std::ostream& stream, Encoding /*encoding*/)
: remaining(chunkBufferSize)
, buffer(stream.rdbuf())
, chunked(true)
, firstChunk(true)
, chunkBuffer(chunkBufferSize)
{}
StreamBufOutput::StreamBufOutput(StreamBufOutput&& move) noexcept
: remaining(std::exchange(move.remaining, 0))
, buffer(std::exchange(move.buffer, nullptr))
, chunked(std::exchange(move.chunked, false))
, firstChunk(std::exchange(move.firstChunk, false))
, chunkBuffer(std::move(move.chunkBuffer))
{}
StreamBufOutput& StreamBufOutput::operator=(StreamBufOutput&& move) noexcept
{
remaining = 0;
buffer = nullptr;
chunked = false;
firstChunk = false;
chunkBuffer.resize(0);
swap(move);
return *this;
}
void StreamBufOutput::swap(StreamBufOutput& other) noexcept
{
std::streambuf::swap(other);
using std::swap;
swap(remaining, other.remaining);
swap(buffer, other.buffer);
swap(chunked, other.chunked);
swap(firstChunk, other.firstChunk);
swap(chunkBuffer, other.chunkBuffer);
}
// Control:
int StreamBufOutput::sync()
{
checkBuffer();
dumpBuffer();
return buffer->pubsync();
}
void StreamBufOutput::dumpBuffer()
{
if (chunked)
{
std::streamsize chunkSize = chunkBufferSize - remaining;
if (chunkSize != 0)
{
outputChunkSize(chunkSize);
sendAllData(&chunkBuffer[0], chunkBufferSize - remaining);
sendAllData("\r\n", 2);
remaining = chunkBufferSize;
}
}
}
void StreamBufOutput::done()
{
if (chunked)
{
dumpBuffer();
sendAllData("0\r\n", 3);
remaining = 0;
chunked = false;
}
}
void StreamBufOutput::sendAllData(char const* s, std::streamsize size)
{
while (size != 0)
{
std::streamsize sent = buffer->sputn(s, size);
s += sent;
size -= sent;
}
}
char StreamBufOutput::toHex(int digit)
{
return digit >= 10 ? 'A' + (digit - 10)
: '0' + digit;
}
void StreamBufOutput::outputChunkSize(std::streamsize size)
{
bool started = false;
for (auto x: { 4096, 256, 16, 1})
{
int digit = size / x;
if (digit != 0 || started)
{
started = true;
buffer->sputc(toHex(digit));
}
size = size - (digit * x);
}
sendAllData("\r\n", 2);
}
// Write:
std::streamsize StreamBufOutput::xsputnChunked(char_type const* s,std::streamsize count)
{
if (count > remaining)
{
std::streamsize chunkSize = chunkBufferSize - remaining + count;
outputChunkSize(chunkSize);
sendAllData(&chunkBuffer[0], chunkBufferSize - remaining);
sendAllData(s, count);
sendAllData("\r\n", 2);
remaining = chunkBufferSize;
}
else
{
std::copy(s, s + count, std::begin(chunkBuffer) + ( chunkBufferSize - remaining));
remaining -= count;
}
return count;
}
std::streamsize StreamBufOutput::xsputnLength(char_type const* s,std::streamsize count)
{
std::streamsize result = 0;
while (remaining != 0 && count != 0)
{
std::size_t max = std::min(count, remaining);
std::streamsize sent = buffer->sputn(s, max);
s += sent;
count -= sent;
result += sent;
remaining -= sent;
}
if (remaining == 0)
{
sync();
}
return result;
}
std::streamsize StreamBufOutput::xsputn(char_type const* s,std::streamsize count)
{
if (chunked) {
return xsputnChunked(s, count);
}
else {
return xsputnLength(s, count);
}
}
StreamBufOutput::int_type StreamBufOutput::overflowChunked(int_type ch)
{
if (ch != traits::eof())
{
char_type v = ch;
xsputnChunked(&v, 1);
}
return 1;
}
StreamBufOutput::int_type StreamBufOutput::overflowLength(int_type ch)
{
if (ch == traits::eof()) {
return remaining == 0 ? traits::eof() : 1;
}
int_type result = traits::eof();
if (remaining != 0)
{
if ((result = buffer->sputc(ch)) != traits::eof()) {
--remaining;
}
if (remaining == 0) {
sync();
}
}
return result;
}
StreamBufOutput::int_type StreamBufOutput::overflow(int_type ch)
{
if (chunked) {
return overflowChunked(ch);
}
else {
return overflowLength(ch);
}
}
asked Oct 20, 2024 at 21:02
lang-cpp