0
\$\begingroup\$

Handles streaming of the response body.

In construction you either pass a content-length value and the stream will allow you to send that many bites before cutting you off. Or you can pass the transfer-encoding type in which case the stream will automatically convert the input into the appropriate output stream underneath (currently only support chunked but plan to expand).

StreamOutput.h

#ifndef THORSANVIL_NISSE_NISSEHTTP_STREAMOUTPUT_H
#define THORSANVIL_NISSE_NISSEHTTP_STREAMOUTPUT_H
#include "NisseHTTPConfig.h"
#include "Util.h"
#include <iostream>
namespace ThorsAnvil::Nisse::NisseHTTP
{
class StreamBufOutput: public std::streambuf
{
 public:
 using Complete = std::function<void()>;
 typedef std::streambuf::traits_type traits;
 typedef traits::int_type int_type;
 typedef traits::char_type char_type;
 private:
 static std::streamsize constexpr chunkBufferSize = 1024;
 std::streamsize remaining;
 std::streambuf* buffer;
 bool chunked;
 bool firstChunk;
 std::vector<char> chunkBuffer;
 public:
 ~StreamBufOutput();
 StreamBufOutput();
 StreamBufOutput(std::ostream& stream, std::size_t length;
 StreamBufOutput(std::ostream& stream, Encoding encoding);
 StreamBufOutput(StreamBufOutput&& move) noexcept;
 StreamBufOutput& operator=(StreamBufOutput&& move) noexcept;
 StreamBufOutput(StreamBufOutput const&) = delete;
 StreamBufOutput& operator=(StreamBufOutput const&) = delete;
 void swap(StreamBufOutput& other) noexcept;
 friend void swap(StreamBufOutput& lhs, StreamBufOutput& rhs) {lhs.swap(rhs);}
 void done();
 protected:
 virtual int sync() override;
 // Write:
 virtual std::streamsize xsputn(char_type const*,std::streamsize) override;
 virtual int_type overflow(int_type = traits::eof()) override;
 private:
 void checkBuffer();
 void outputChunkSize(std::streamsize size);
 char toHex(int digit);
 void sendAllData(char const* s, std::streamsize size);
 std::streamsize xsputnChunked(char_type const*,std::streamsize);
 std::streamsize xsputnLength(char_type const*,std::streamsize);
 int_type overflowChunked(int_type = traits::eof());
 int_type overflowLength(int_type = traits::eof());
 void dumpBuffer();
};
class StreamOutput: public std::ostream
{
 StreamBufOutput buffer;
 public:
 StreamOutput()
 : std::ostream(nullptr)
 , buffer()
 {}
 StreamOutput(std::ostream& stream, std::size_t length)
 : std::ostream(nullptr)
 , buffer(stream, length)
 {
 rdbuf(&buffer);
 }
 StreamOutput(std::ostream& stream, Encoding encoding)
 : std::ostream(nullptr)
 , buffer(stream, encoding)
 {
 rdbuf(&buffer);
 }
 void addBuffer(StreamBufOutput&& newBuffer)
 {
 buffer = std::move(newBuffer);
 rdbuf(&buffer);
 clear();
 }
};
}
#endif

StreamOutput.cpp

#include "StreamOutput.h"
using namespace ThorsAnvil::Nisse::NisseHTTP;
StreamBufOutput::~StreamBufOutput()
{
 done();
}
StreamBufOutput::StreamBufOutput()
 : remaining(0)
 , buffer(nullptr)
 , chunked(false)
 , firstChunk(false)
 , chunkBuffer(0)
{}
StreamBufOutput::StreamBufOutput(std::ostream& stream, std::size_t length)
 : remaining(length)
 , buffer(stream.rdbuf())
 , chunked(false)
 , firstChunk(false)
 , chunkBuffer(0)
{}
StreamBufOutput::StreamBufOutput(std::ostream& stream, Encoding /*encoding*/)
 : remaining(chunkBufferSize)
 , buffer(stream.rdbuf())
 , chunked(true)
 , firstChunk(true)
 , chunkBuffer(chunkBufferSize)
{}
StreamBufOutput::StreamBufOutput(StreamBufOutput&& move) noexcept
 : remaining(std::exchange(move.remaining, 0))
 , buffer(std::exchange(move.buffer, nullptr))
 , chunked(std::exchange(move.chunked, false))
 , firstChunk(std::exchange(move.firstChunk, false))
 , chunkBuffer(std::move(move.chunkBuffer))
{}
StreamBufOutput& StreamBufOutput::operator=(StreamBufOutput&& move) noexcept
{
 remaining = 0;
 buffer = nullptr;
 chunked = false;
 firstChunk = false;
 chunkBuffer.resize(0);
 swap(move);
 return *this;
}
void StreamBufOutput::swap(StreamBufOutput& other) noexcept
{
 std::streambuf::swap(other);
 using std::swap;
 swap(remaining, other.remaining);
 swap(buffer, other.buffer);
 swap(chunked, other.chunked);
 swap(firstChunk, other.firstChunk);
 swap(chunkBuffer, other.chunkBuffer);
}
// Control:
int StreamBufOutput::sync()
{
 checkBuffer();
 dumpBuffer();
 return buffer->pubsync();
}
void StreamBufOutput::dumpBuffer()
{
 if (chunked)
 {
 std::streamsize chunkSize = chunkBufferSize - remaining;
 if (chunkSize != 0)
 {
 outputChunkSize(chunkSize);
 sendAllData(&chunkBuffer[0], chunkBufferSize - remaining);
 sendAllData("\r\n", 2);
 remaining = chunkBufferSize;
 }
 }
}
void StreamBufOutput::done()
{
 if (chunked)
 {
 dumpBuffer();
 sendAllData("0\r\n", 3);
 remaining = 0;
 chunked = false;
 }
}
void StreamBufOutput::sendAllData(char const* s, std::streamsize size)
{
 while (size != 0)
 {
 std::streamsize sent = buffer->sputn(s, size);
 s += sent;
 size -= sent;
 }
}
char StreamBufOutput::toHex(int digit)
{
 return digit >= 10 ? 'A' + (digit - 10)
 : '0' + digit;
}
void StreamBufOutput::outputChunkSize(std::streamsize size)
{
 bool started = false;
 for (auto x: { 4096, 256, 16, 1})
 {
 int digit = size / x;
 if (digit != 0 || started)
 {
 started = true;
 buffer->sputc(toHex(digit));
 }
 size = size - (digit * x);
 }
 sendAllData("\r\n", 2);
}
// Write:
std::streamsize StreamBufOutput::xsputnChunked(char_type const* s,std::streamsize count)
{
 if (count > remaining)
 {
 std::streamsize chunkSize = chunkBufferSize - remaining + count;
 outputChunkSize(chunkSize);
 sendAllData(&chunkBuffer[0], chunkBufferSize - remaining);
 sendAllData(s, count);
 sendAllData("\r\n", 2);
 remaining = chunkBufferSize;
 }
 else
 {
 std::copy(s, s + count, std::begin(chunkBuffer) + ( chunkBufferSize - remaining));
 remaining -= count;
 }
 return count;
}
std::streamsize StreamBufOutput::xsputnLength(char_type const* s,std::streamsize count)
{
 std::streamsize result = 0;
 while (remaining != 0 && count != 0)
 {
 std::size_t max = std::min(count, remaining);
 std::streamsize sent = buffer->sputn(s, max);
 s += sent;
 count -= sent;
 result += sent;
 remaining -= sent;
 }
 if (remaining == 0)
 {
 sync();
 }
 return result;
}
std::streamsize StreamBufOutput::xsputn(char_type const* s,std::streamsize count)
{
 if (chunked) {
 return xsputnChunked(s, count);
 }
 else {
 return xsputnLength(s, count);
 }
}
StreamBufOutput::int_type StreamBufOutput::overflowChunked(int_type ch)
{
 if (ch != traits::eof())
 {
 char_type v = ch;
 xsputnChunked(&v, 1);
 }
 return 1;
}
StreamBufOutput::int_type StreamBufOutput::overflowLength(int_type ch)
{
 if (ch == traits::eof()) {
 return remaining == 0 ? traits::eof() : 1;
 }
 int_type result = traits::eof();
 if (remaining != 0)
 {
 if ((result = buffer->sputc(ch)) != traits::eof()) {
 --remaining;
 }
 if (remaining == 0) {
 sync();
 }
 }
 return result;
}
StreamBufOutput::int_type StreamBufOutput::overflow(int_type ch)
{
 if (chunked) {
 return overflowChunked(ch);
 }
 else {
 return overflowLength(ch);
 }
}
asked Oct 20, 2024 at 21:02
\$\endgroup\$

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.