1 /***
2 * ==++==
3 *
4 * Copyright (c) Microsoft Corporation. All rights reserved.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * ==--==
17 * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
18 *
19 * This file defines a stream buffer that is based on a raw pointer and block size. Unlike a vector-based
20 * stream buffer, the buffer cannot be expanded or contracted, it has a fixed capacity.
21 *
22 * For the latest on this and related APIs, please see http://casablanca.codeplex.com.
23 *
24 * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
25 ****/
26 #pragma once
27
28 #ifndef _CASA_RAWPTR_STREAMS_H
29 #define _CASA_RAWPTR_STREAMS_H
30
31 #include <vector>
32 #include <queue>
33 #include <algorithm>
34 #include <iterator>
35
36 #include "pplx/pplxtasks.h"
37 #include "cpprest/astreambuf.h"
38 #include "cpprest/streams.h"
39
41
42 // Forward declarations
44
45 namespace details {
46
51 template<typename _CharType>
53 {
54 public:
55 typedef _CharType char_type;
56
57 typedef typename basic_streambuf<_CharType>::traits traits;
58 typedef typename basic_streambuf<_CharType>::int_type int_type;
59 typedef typename basic_streambuf<_CharType>::pos_type pos_type;
60 typedef typename basic_streambuf<_CharType>::off_type off_type;
61
67 m_data(nullptr),
68 m_current_position(0),
69 m_size(0)
70 {
71 }
72
77 {
80 }
81
82 protected:
83
88
93
98 virtual utility::size64_t
size()
const
99 {
100 return utility::size64_t(m_size);
101 }
102
108 virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in)
const
109 {
110 return 0;
111 }
112
121 {
122 return;
123 }
124
131 {
132 // See the comment in seek around the restiction that we do not allow read head to
133 // seek beyond the current size.
134 _ASSERTE(m_current_position <= m_size);
135
138 return (size_t)(writeend - readhead);
139 }
140
146 {
147 if (mode & std::ios_base::in)
148 {
150 }
151
152 if (mode & std::ios_base::out)
153 {
155 }
156
158 {
159 m_data = nullptr;
160 }
161
162 // Exceptions will be propagated out of _close_read or _close_write
163 return pplx::task_from_result();
164 }
165
167 {
168 return pplx::task_from_result(true);
169 }
170
172 {
173 if (m_current_position >= m_size)
174 return pplx::task_from_result<int_type>(traits::eof());
175 int_type retVal = (this->write(&ch, 1) == 1) ? static_cast<int_type>(ch) : traits::eof();
176 return pplx::task_from_result<int_type>(retVal);
177 }
178
180 {
182 if ( newSize > m_size )
183 return pplx::task_from_exception<size_t>(std::make_exception_ptr(std::runtime_error("Writing past the end of the buffer")));
184 return pplx::task_from_result<size_t>(this->write(ptr, count));
185 }
186
193 {
195
198 size_t space_left = (size_t)(writeend - readhead);
199
200 if (space_left < count) return nullptr;
201
202 // Let the caller copy the data
203 return (_CharType*)(m_data+m_current_position);
204 }
205
211 {
212 // Update the write position and satisfy any pending reads
213 update_current_position(m_current_position+actual);
214 }
215
229 virtual bool acquire(_Out_ _CharType*& ptr, _Out_
size_t& count)
230 {
231 count = 0;
232 ptr = nullptr;
233
234 if (!this->
can_read())
return false;
235
237
238 if (count > 0)
239 {
240 ptr = (_CharType*)(m_data+m_current_position);
241 return true;
242 }
243 else
244 {
245 ptr = nullptr;
246
247 // Can only be open for read OR write, not both. If there is no data then
248 // we have reached the end of the stream so indicate such with true.
249 return true;
250 }
251 }
252
259 virtual void release(_Out_writes_opt_ (count) _CharType *ptr, _In_
size_t count)
260 {
261 if (ptr != nullptr)
262 update_current_position(m_current_position + count);
263 }
264
265 virtual pplx::task<size_t> _getn(_Out_writes_ (count) _CharType *ptr, _In_
size_t count)
266 {
267 return pplx::task_from_result(this->read(ptr, count));
268 }
269
270 size_t _sgetn(_Out_writes_ (count) _CharType *ptr, _In_ size_t count)
271 {
272 return this->read(ptr, count);
273 }
274
275 virtual size_t _scopy(_Out_writes_ (count) _CharType *ptr, _In_ size_t count)
276 {
277 return this->read(ptr, count, false);
278 }
279
281 {
282 return pplx::task_from_result(this->read_byte(true));
283 }
284
285 virtual int_type _sbumpc()
286 {
287 return this->read_byte(true);
288 }
289
291 {
292 return pplx::task_from_result(this->read_byte(false));
293 }
294
295 int_type _sgetc()
296 {
297 return this->read_byte(false);
298 }
299
301 {
302 if (m_current_position >= m_size-1)
303 return pplx::task_from_result(basic_streambuf<_CharType>::traits::eof());
304
305 this->read_byte(true);
306 return pplx::task_from_result(this->read_byte(false));
307 }
308
310 {
311 auto pos =
seekoff(-1, std::ios_base::cur, std::ios_base::in);
312 if ( pos == (pos_type)traits::eof())
313 return pplx::task_from_result(traits::eof());
315 }
316
324 virtual pos_type
getpos(std::ios_base::openmode mode)
const
325 {
326 if ( ((mode & std::ios_base::in) && !this->
can_read()) ||
327 ((mode & std::ios_base::out) && !this->
can_write()))
328 return static_cast<pos_type>(traits::eof());
329
330 if (mode == std::ios_base::in)
331 return (pos_type)m_current_position;
332 else if (mode == std::ios_base::out)
333 return (pos_type)m_current_position;
334 else
335 return (pos_type)traits::eof();
336 }
337
345 virtual pos_type
seekpos(pos_type position, std::ios_base::openmode mode)
346 {
347 pos_type beg(0);
348 pos_type end(m_size);
349
350 if (position >= beg)
351 {
352 auto pos = static_cast<size_t>(position);
353
354 // Read head
355 if ((mode & std::ios_base::in) && this->
can_read())
356 {
357 if (position <= end)
358 {
359 // We do not allow reads to seek beyond the end or before the start position.
360 update_current_position(pos);
361 return static_cast<pos_type>(m_current_position);
362 }
363 }
364
365 // Write head
366 if ((mode & std::ios_base::out) && this->
can_write())
367 {
368 // Update write head and satisfy read requests if any
369 update_current_position(pos);
370
371 return static_cast<pos_type>(m_current_position);
372 }
373 }
374
375 return static_cast<pos_type>(traits::eof());
376 }
377
387 virtual pos_type
seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode)
388 {
389 pos_type beg = 0;
390 pos_type cur = static_cast<pos_type>(m_current_position);
391 pos_type end = static_cast<pos_type>(m_size);
392
393 switch ( way )
394 {
395 case std::ios_base::beg:
396 return seekpos(beg + offset, mode);
397
398 case std::ios_base::cur:
399 return seekpos(cur + offset, mode);
400
401 case std::ios_base::end:
402 return seekpos(end + offset, mode);
403
404 default:
405 return static_cast<pos_type>(traits::eof());
406 }
407 }
408
409 private:
410 template<typename _CharType1> friend class ::concurrency::streams::rawptr_buffer;
411
419 m_data(const_cast<_CharType*>(data)),
420 m_size(size),
421 m_current_position(0)
422 {
423 validate_mode(std::ios_base::in);
424 }
425
433 : streambuf_state_manager<_CharType>(mode),
434 m_data(data),
435 m_size(size),
436 m_current_position(0)
437 {
438 validate_mode(mode);
439 }
440
441 static void validate_mode(std::ios_base::openmode mode)
442 {
443 // Disallow simultaneous use of the stream buffer for writing and reading.
444 if ((mode & std::ios_base::in) && (mode & std::ios_base::out))
445 throw std::invalid_argument("this combination of modes on raw pointer stream not supported");
446 }
447
451 bool can_satisfy(size_t) const
452 {
453 // We can always satisfy a read, at least partially, unless the
454 // read position is at the very end of the buffer.
456 }
457
462 int_type read_byte(bool advance = true)
463 {
464 _CharType value;
465 auto read_size = this->read(&value, 1, advance);
466 return read_size == 1 ? static_cast<int_type>(value) : traits::eof();
467 }
468
474 size_t read(_Out_writes_ (count) _CharType *ptr, _In_ size_t count, bool advance = true)
475 {
476 if (!can_satisfy(count))
477 return 0;
478
481
482 size_t newPos = m_current_position + read_size;
483
484 auto readBegin = m_data + m_current_position;
485 auto readEnd = m_data + newPos;
486
487 #ifdef _WIN32
488 // Avoid warning C4996: Use checked iterators under SECURE_SCL
489 std::copy(readBegin, readEnd, stdext::checked_array_iterator<_CharType *>(ptr, count));
490 #else
491 std::copy(readBegin, readEnd, ptr);
492 #endif // _WIN32
493
494 if (advance)
495 {
496 update_current_position(newPos);
497 }
498
499 return (size_t) read_size;
500 }
501
505 size_t write(const _CharType *ptr, size_t count)
506 {
507 if (!this->
can_write() || (count == 0))
return 0;
508
510
511 if ( newSize > m_size )
512 throw std::runtime_error("Writing past the end of the buffer");
513
514 // Copy the data
515 #ifdef _WIN32
516 // Avoid warning C4996: Use checked iterators under SECURE_SCL
517 std::copy(ptr, ptr + count, stdext::checked_array_iterator<_CharType *>(m_data, m_size, m_current_position));
518 #else
519 std::copy(ptr, ptr + count, m_data+m_current_position);
520 #endif // _WIN32
521
522 // Update write head and satisfy pending reads if any
523 update_current_position(newSize);
524
525 return count;
526 }
527
531 void update_current_position(size_t newPos)
532 {
533 // The new write head
534 m_current_position = newPos;
535
536 _ASSERTE(m_current_position <= m_size);
537 }
538
539 // The actual memory block
540 _CharType* m_data;
541
542 // The size of the memory block
543 size_t m_size;
544
545 // Read/write head
546 size_t m_current_position;
547 };
548
549 } // namespace details
550
558 template<typename _CharType>
559 class rawptr_buffer : public streambuf<_CharType>
560 {
561 public:
562 typedef _CharType char_type;
563
570 :
streambuf<char_type>(std::shared_ptr<details::basic_rawptr_buffer<char_type>>(new details::basic_rawptr_buffer<char_type>(data, size)))
571 {
572 }
573
580 :
streambuf<char_type>(std::shared_ptr<details::basic_rawptr_buffer<char_type>>(new details::basic_rawptr_buffer<char_type>(data, size, mode)))
581 {
582 }
583
588 {
589 }
590 };
591
599 template<typename _CharType>
601 {
602 public:
603 typedef _CharType char_type;
605
612 static concurrency::streams::basic_istream<char_type>
open_istream(
const char_type* data,
size_t size)
613 {
614 return concurrency::streams::basic_istream<char_type>(buffer_type(data, size));
615 }
616
623 static concurrency::streams::basic_istream<char_type>
open_istream(char_type* data,
size_t size)
624 {
625 return concurrency::streams::basic_istream<char_type>(buffer_type(data, size, std::ios::in));
626 }
627
634 static concurrency::streams::basic_ostream<char_type>
open_ostream(char_type* data,
size_t size)
635 {
636 return concurrency::streams::basic_ostream<char_type>(buffer_type(data, size, std::ios::out));
637 }
638 };
639
640 }} // namespaces
641
642 #endif
virtual pplx::task< int_type > getc()
Reads a single character from the stream without advancing the read position.
Definition: astreambuf.h:526
The rawptr_stream class is used to create memory-backed streams that support writing or reading seque...
Definition: rawptrstream.h:600
rawptr_buffer(const char_type *data, size_t size)
Create a rawptr_buffer given a pointer to a memory block and the size of the block.
Definition: rawptrstream.h:569
virtual bool is_open() const
Checks if the stream buffer is open.
Definition: astreambuf.h:390
Reference-counted stream buffer.
Definition: astreambuf.h:804
rawptr_buffer(char_type *data, size_t size, std::ios_base::openmode mode=std::ios::out)
Create a rawptr_buffer given a pointer to a memory block and the size of the block.
Definition: rawptrstream.h:579
virtual utility::size64_t size() const
Gets the size of the stream, if known. Calls to has_size will determine whether the result of size ca...
Definition: rawptrstream.h:98
static concurrency::streams::basic_istream< char_type > open_istream(char_type *data, size_t size)
Create a rawptr-stream given a pointer to a writable memory block and the size of the block...
Definition: rawptrstream.h:623
rawptr_buffer()
Default constructor.
Definition: rawptrstream.h:587
void get() const
Returns the result this task produced. If the task is not in a terminal state, a call to get will wai...
Definition: pplxtasks.h:4440
virtual bool can_read() const
can_read is used to determine whether a stream buffer will support read operations (get)...
Definition: astreambuf.h:373
virtual pplx::task< void > _close_read()
The real read head close operation, implementation should override it if there is any resource to be ...
Definition: astreambuf.h:711
virtual size_t buffer_size(std::ios_base::openmode=std::ios_base::in) const
Get the stream buffer size, if one has been set.
Definition: rawptrstream.h:108
virtual pplx::task< void > close(std::ios_base::openmode mode)
Closes the stream buffer, preventing further read or write operations.
Definition: rawptrstream.h:145
void _commit(size_t actual)
Submits a block already allocated by the stream buffer.
Definition: rawptrstream.h:210
virtual ~basic_rawptr_buffer()
Destructor
Definition: rawptrstream.h:76
virtual pplx::task< void > _close_write()
The real write head close operation, implementation should override it if there is any resource to be...
Definition: astreambuf.h:720
The basic_rawptr_buffer class serves as a memory-based steam buffer that supports both writing and re...
Definition: rawptrstream.h:52
virtual bool can_write() const
can_write is used to determine whether a stream buffer will support write operations (put)...
Definition: astreambuf.h:381
static concurrency::streams::basic_ostream< char_type > open_ostream(char_type *data, size_t size)
Create a rawptr-stream given a pointer to a writable memory block and the size of the block...
Definition: rawptrstream.h:634
Definition: astreambuf.h:362
virtual bool can_seek() const
can_seek is used to determine whether a stream buffer supports seeking.
Definition: rawptrstream.h:87
basic_rawptr_buffer()
Constructor
Definition: rawptrstream.h:65
The Parallel Patterns Library (PPL) task class. A task object represents work that can be executed as...
Definition: pplxtasks.h:176
virtual bool acquire(_Out_ _CharType *&ptr, _Out_ size_t &count)
Gets a pointer to the next already allocated contiguous block of data.
Definition: rawptrstream.h:229
virtual pos_type getpos(std::ios_base::openmode mode) const
Gets the current read or write position in the stream.
Definition: rawptrstream.h:324
virtual utility::size64_t size() const
Gets the total number of characters in the stream buffer, if known. Calls to has_size will determine ...
Definition: astreambuf.h:913
The Parallel Patterns Library (PPL) task class. A task object represents work that can be executed as...
Definition: pplxtasks.h:4173
virtual void release(_Out_writes_opt_(count) _CharType *ptr, _In_ size_t count)
Releases a block of data acquired using ::acquire method. This frees the stream buffer to de-allocate...
Definition: rawptrstream.h:259
static concurrency::streams::basic_istream< char_type > open_istream(const char_type *data, size_t size)
Create a rawptr-stream given a pointer to a read-only memory block and the size of the block...
Definition: rawptrstream.h:612
Definition: astreambuf.h:37
The rawptr_buffer class serves as a memory-based stream buffer that supports reading sequences of cha...
Definition: rawptrstream.h:43
_CharType * _alloc(size_t count)
Allocates a contiguous memory block and returns it.
Definition: rawptrstream.h:192
virtual size_t in_avail() const
For any input stream, in_avail returns the number of characters that are immediately available to be ...
Definition: rawptrstream.h:130
virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode)
Seeks to a position given by a relative offset.
Definition: rawptrstream.h:387
virtual void set_buffer_size(size_t, std::ios_base::openmode=std::ios_base::in)
Set the stream buffer implementation to buffer or not buffer.
Definition: rawptrstream.h:120
virtual bool has_size() const
has_size is used to determine whether a stream buffer supports size().
Definition: rawptrstream.h:92
virtual pos_type seekpos(pos_type position, std::ios_base::openmode mode)
Seeks to the given position.
Definition: rawptrstream.h:345