HTTP Stream A stream that opens an HTTP GET and then acts like a normal C++ istream
Needed a quick stream to get jsonJSON objects.
HTTP Stream A stream that opens an HTTP GET and then acts like a normal C++ istream
Needed a quick stream to get json objects.
Stream that opens an HTTP GET and then acts like a normal C++ istream
Needed a quick stream to get JSON objects.
#ifndef THORSANVIL_STREAM_THOR_STREAM_HTHORSANVIL_SIMPLE_STREAM_THOR_STREAM_H
#define THORSANVIL_STREAM_THOR_STREAM_HTHORSANVIL_SIMPLE_STREAM_THOR_STREAM_H
#include <istream>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <curl/curl.h>
#include <string.h>
namespace ThorsAnvil
{
namespace Stream
{
extern "C" size_t writeFunc(char* ptr, size_t size, size_t nmemb, void* userdata);
extern "C" size_t headFunc(char* ptr, size_t size, size_t nmemb, void* userdata);
class IThorStreamIThorStream;
class IThorSimpleStream: public std::istream
{
friend class IThorStream;
struct SimpleSocketStreamBuffer: public std::streambuf
{
typedef std::streambuf::traits_type traits;
typedef traits::int_type int_type;
SimpleSocketStreamBuffer(std::string const& url, bool useEasyCurl, bool preDownload, std::function<void()> markStreamBad)
: empty(true)
, open(true)
, sizeMarked(false)
, droppedData(false)
, preDownload(preDownload)
, sizeLeft(0)
, markStreamBad(markStreamBad)
{
curl = curl_easy_init();
if(!curl)
{ throw std::runtime_errormarkStreamBad("Failed to create CURL object");
}
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFunc);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headFunc);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, this);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
curl_easy_setopt(curl, CURLOPT_PRIVATE, this);
if (useEasyCurl)
{
/* Perform the request, res will get the return code */
if( CURLcode result = curl_easy_perform(curl);
if ((result != CURLE_OK) && (result != CURLE_WRITE_ERROR))
{ curl_easy_cleanupmarkStreamBad(curl);
throw std::runtime_error("Curl perform failure");}
}
}
~SimpleSocketStreamBuffer()
{
curl_easy_cleanup(curl);
}
virtual int_type underflow()
{
std::unique_lock<std::mutex>if lock(mutexdroppedData);
empty = true;
{ curl_easy_pausemarkStreamBad(curl, CURLPAUSE_CONT);
cond.wait(lock, [&empty, &open](){return !(empty && open);});
return empty ? EOF : buffer[0];EOF;
}
privateprotected:
friend size_t writeFunc(char* ptr, size_t size, size_t nmemb, void* userdata)
{
std::size_t bytes = size*nmemb;
SimpleSocketStreamBuffer* owner = reinterpret_cast<SimpleSocketStreamBuffer*>(userdata);
std::unique_lock<std::mutex> lock(owner->mutex);
if ((!owner->empty) && (!owner->preDownload))
{
// Its not bad yet.
// It only becomes bad if the user tries
// to read any of this data. Then we mark
// it bad. So the actual marking bad is done
// in underflow().
owner->droppedData=true;
return 0;//CURL_WRITEFUNC_PAUSE;
}
owner->sizeLeft -= bytes;
>empty = false;
owner->open std::size_t oldSize = (owner->sizeLeft != 0>buffer.size();
owner->empty = false;
owner->buffer.resize(oldSize + bytes);
std::copy(ptr, ptr + bytes, &owner->buffer[0]>buffer[oldSize]);
owner->setg(&owner->buffer[0], &owner->buffer[0], &owner->buffer[bytes]>buffer[oldSize + bytes]);
owner->cond.notify_one();
if (owner->sizeMarked)
{
owner->sizeLeft -= bytes;
owner->open = (owner->sizeLeft != 0);
}
return bytes;
}
friend size_t headFunc(char* ptr, size_t size, size_t nmemb, void* userdata)
{
if (strncmp(ptr, "HTTP/", 5) == 0)
{
int respCode = 0;
char* space = strchr(ptr+5, ' ');
if ((space != NULL) && (sscanf(space," %d OK", & respCode) == 1) && (respCode == 200))
{ /* GOOD */ }
else
{
SimpleSocketStreamBuffer* owner = reinterpret_cast<SimpleSocketStreamBuffer*>(userdata);
std::unique_lock<std::mutex> lock(owner->mutex);
owner->markStreamBad();
}
}
if (strncmp(ptr, "Content-Length:", 15) == 0)
{
SimpleSocketStreamBuffer* owner = reinterpret_cast<SimpleSocketStreamBuffer*>(userdata);
std::unique_lock<std::mutex> lock(owner->mutex);
owner->sizeLeft = atoi(ptr+15);
owner->sizeMarked = true;
if (owner->preDownload)
{
owner->buffer.reserve(owner->sizeLeft);
}
}
return size*nmemb;
}
bool empty;
bool open;
bool sizeMarked;
bool droppedData;
bool preDownload;
std::size_t sizeLeft;
std::mutex mutex;
std::condition_variable cond;
std::vector<char> buffer;
CURL* curl;
std::function<void()> markStreamBad;
};
SimpleSocketStreamBuffer buffer;
public:
IThorStreamIThorSimpleStream(std::string const& url, bool preDownload = false)
: std::istream(NULL)
, buffer(url, true, preDownload, [this](){this->setstate(std::ios::badbit);})
{
std::istream::rdbuf(&buffer);
}
};
}
}
#endif
It uses CURL to do the heavy lifting.
Also available here:
https://github.com/Loki-Astari/ThorsStream
Currently it does not check the HTTP status code. But I was thinking that maybe anything other than a "200 OK" should result in the bad bit being set. Any other ideas would also be appreciated.
Note: Limitation. It depends on the server providing the "Content length:" header as part of the response.
###Usage Example:
int main()
{
using ThorsAnvil::Stream::IThorStream;
IThorStream stream("http://iPubCrawlMaps.com/api/version");
std::string line;
while(std::getline(stream, line))
{
std::cout << "Line: " << line << "\n";
}
}
#ifndef THORSANVIL_STREAM_THOR_STREAM_H
#define THORSANVIL_STREAM_THOR_STREAM_H
#include <istream>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <curl/curl.h>
#include <string.h>
namespace ThorsAnvil
{
namespace Stream
{
extern "C" size_t writeFunc(char* ptr, size_t size, size_t nmemb, void* userdata);
extern "C" size_t headFunc(char* ptr, size_t size, size_t nmemb, void* userdata);
class IThorStream: public std::istream
{
struct SimpleSocketStreamBuffer: public std::streambuf
{
typedef std::streambuf::traits_type traits;
typedef traits::int_type int_type;
SimpleSocketStreamBuffer(std::string const& url)
: empty(true)
, open(true)
, sizeLeft(0)
{
curl = curl_easy_init();
if(!curl)
{ throw std::runtime_error("Failed to create CURL object");
}
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFunc);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headFunc);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, this);
/* Perform the request, res will get the return code */
if(curl_easy_perform(curl) != CURLE_OK)
{ curl_easy_cleanup(curl);
throw std::runtime_error("Curl perform failure");
}
}
~SimpleSocketStreamBuffer()
{
curl_easy_cleanup(curl);
}
virtual int_type underflow()
{
std::unique_lock<std::mutex> lock(mutex);
empty = true;
curl_easy_pause(curl, CURLPAUSE_CONT);
cond.wait(lock, [&empty, &open](){return !(empty && open);});
return empty ? EOF : buffer[0];
}
private:
friend size_t writeFunc(char* ptr, size_t size, size_t nmemb, void* userdata)
{
std::size_t bytes = size*nmemb;
SimpleSocketStreamBuffer* owner = reinterpret_cast<SimpleSocketStreamBuffer*>(userdata);
std::unique_lock<std::mutex> lock(owner->mutex);
if (!owner->empty)
{ return CURL_WRITEFUNC_PAUSE;
}
owner->sizeLeft -= bytes;
owner->open = (owner->sizeLeft != 0);
owner->empty = false;
owner->buffer.resize(bytes);
std::copy(ptr, ptr + bytes, &owner->buffer[0]);
owner->setg(&owner->buffer[0], &owner->buffer[0], &owner->buffer[bytes]);
owner->cond.notify_one();
return bytes;
}
friend size_t headFunc(char* ptr, size_t size, size_t nmemb, void* userdata)
{
if (strncmp(ptr, "Content-Length:", 15) == 0)
{
SimpleSocketStreamBuffer* owner = reinterpret_cast<SimpleSocketStreamBuffer*>(userdata);
std::unique_lock<std::mutex> lock(owner->mutex);
owner->sizeLeft = atoi(ptr+15);
}
return size*nmemb;
}
bool empty;
bool open;
std::size_t sizeLeft;
std::mutex mutex;
std::condition_variable cond;
std::vector<char> buffer;
CURL* curl;
};
SimpleSocketStreamBuffer buffer;
public:
IThorStream(std::string const& url)
: std::istream(NULL)
, buffer(url)
{
std::istream::rdbuf(&buffer);
}
};
}
}
#endif
It uses CURL to do the heavy lifting.
Also available here:
https://github.com/Loki-Astari/ThorsStream
Currently it does not check the HTTP status code. But I was thinking that maybe anything other than a "200 OK" should result in the bad bit being set. Any other ideas would also be appreciated.
Note: Limitation. It depends on the server providing the "Content length:" header as part of the response.
###Usage Example:
int main()
{
using ThorsAnvil::Stream::IThorStream;
IThorStream stream("http://iPubCrawlMaps.com/api/version");
std::string line;
while(std::getline(stream, line))
{
std::cout << "Line: " << line << "\n";
}
}
#ifndef THORSANVIL_SIMPLE_STREAM_THOR_STREAM_H
#define THORSANVIL_SIMPLE_STREAM_THOR_STREAM_H
#include <istream>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <curl/curl.h>
#include <string.h>
namespace ThorsAnvil
{
namespace Stream
{
extern "C" size_t writeFunc(char* ptr, size_t size, size_t nmemb, void* userdata);
extern "C" size_t headFunc(char* ptr, size_t size, size_t nmemb, void* userdata);
class IThorStream;
class IThorSimpleStream: public std::istream
{
friend class IThorStream;
struct SimpleSocketStreamBuffer: public std::streambuf
{
typedef std::streambuf::traits_type traits;
typedef traits::int_type int_type;
SimpleSocketStreamBuffer(std::string const& url, bool useEasyCurl, bool preDownload, std::function<void()> markStreamBad)
: empty(true)
, open(true)
, sizeMarked(false)
, droppedData(false)
, preDownload(preDownload)
, sizeLeft(0)
, markStreamBad(markStreamBad)
{
curl = curl_easy_init();
if(!curl)
{ markStreamBad();
}
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFunc);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headFunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, this);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
curl_easy_setopt(curl, CURLOPT_PRIVATE, this);
if (useEasyCurl)
{
/* Perform the request, res will get the return code */
CURLcode result = curl_easy_perform(curl);
if ((result != CURLE_OK) && (result != CURLE_WRITE_ERROR))
{ markStreamBad();
}
}
}
~SimpleSocketStreamBuffer()
{
curl_easy_cleanup(curl);
}
virtual int_type underflow()
{
if (droppedData)
{ markStreamBad();
}
return EOF;
}
protected:
friend size_t writeFunc(char* ptr, size_t size, size_t nmemb, void* userdata)
{
std::size_t bytes = size*nmemb;
SimpleSocketStreamBuffer* owner = reinterpret_cast<SimpleSocketStreamBuffer*>(userdata);
std::unique_lock<std::mutex> lock(owner->mutex);
if ((!owner->empty) && (!owner->preDownload))
{
// Its not bad yet.
// It only becomes bad if the user tries
// to read any of this data. Then we mark
// it bad. So the actual marking bad is done
// in underflow().
owner->droppedData=true;
return 0;//CURL_WRITEFUNC_PAUSE;
}
owner->empty = false;
std::size_t oldSize = owner->buffer.size();
owner->buffer.resize(oldSize + bytes);
std::copy(ptr, ptr + bytes, &owner->buffer[oldSize]);
owner->setg(&owner->buffer[0], &owner->buffer[0], &owner->buffer[oldSize + bytes]);
owner->cond.notify_one();
if (owner->sizeMarked)
{
owner->sizeLeft -= bytes;
owner->open = (owner->sizeLeft != 0);
}
return bytes;
}
friend size_t headFunc(char* ptr, size_t size, size_t nmemb, void* userdata)
{
if (strncmp(ptr, "HTTP/", 5) == 0)
{
int respCode = 0;
char* space = strchr(ptr+5, ' ');
if ((space != NULL) && (sscanf(space," %d OK", & respCode) == 1) && (respCode == 200))
{ /* GOOD */ }
else
{
SimpleSocketStreamBuffer* owner = reinterpret_cast<SimpleSocketStreamBuffer*>(userdata);
std::unique_lock<std::mutex> lock(owner->mutex);
owner->markStreamBad();
}
}
if (strncmp(ptr, "Content-Length:", 15) == 0)
{
SimpleSocketStreamBuffer* owner = reinterpret_cast<SimpleSocketStreamBuffer*>(userdata);
std::unique_lock<std::mutex> lock(owner->mutex);
owner->sizeLeft = atoi(ptr+15);
owner->sizeMarked = true;
if (owner->preDownload)
{
owner->buffer.reserve(owner->sizeLeft);
}
}
return size*nmemb;
}
bool empty;
bool open;
bool sizeMarked;
bool droppedData;
bool preDownload;
std::size_t sizeLeft;
std::mutex mutex;
std::condition_variable cond;
std::vector<char> buffer;
CURL* curl;
std::function<void()> markStreamBad;
};
SimpleSocketStreamBuffer buffer;
public:
IThorSimpleStream(std::string const& url, bool preDownload = false)
: std::istream(NULL)
, buffer(url, true, preDownload, [this](){this->setstate(std::ios::badbit);})
{
std::istream::rdbuf(&buffer);
}
};
}
}
#endif