1/*-------------------------------------------------------------------------
4 * Routines for archivers to write a LZ4 compressed data stream.
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/bin/pg_dump/compress_lz4.c
12 *-------------------------------------------------------------------------
24 * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
25 * Redefine it for installations with a lesser version.
27#ifndef LZ4F_HEADER_SIZE_MAX
28#define LZ4F_HEADER_SIZE_MAX 32
31/*---------------------------------
32 * Common to both compression APIs
33 *---------------------------------
37 * (de)compression state used by both the Compressor and Stream APIs.
39typedef struct LZ4State
42 * Used by the Stream API to keep track of the file stream.
46 LZ4F_preferences_t prefs;
48 LZ4F_compressionContext_t ctx;
49 LZ4F_decompressionContext_t dtx;
52 * Used by the Stream API's lazy initialization.
57 * Used by the Stream API to distinguish between compression and
58 * decompression operations.
63 * Used by the Compressor API to mark if the compression headers have been
64 * written after initialization.
66 bool needs_header_flush;
72 * Used by the Stream API to store already uncompressed data that the
73 * caller has not consumed.
75 size_t overflowalloclen;
80 * Used by both APIs to keep track of the compressed data length stored in
86 * Used by both APIs to keep track of error codes.
92 * LZ4State_compression_init
93 * Initialize the required LZ4State members for compression.
95 * Write the LZ4 frame header in a buffer keeping track of its length. Users of
96 * this function can choose when and how to write the header to a file stream.
98 * Returns true on success. In case of a failure returns false, and stores the
99 * error code in state->errcode.
102LZ4State_compression_init(LZ4State *
state)
109 * LZ4F_compressBegin requires a buffer that is greater or equal to
110 * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
112 if (
state->buflen < LZ4F_HEADER_SIZE_MAX)
113 state->buflen = LZ4F_HEADER_SIZE_MAX;
115 status = LZ4F_createCompressionContext(&
state->ctx, LZ4F_VERSION);
116 if (LZ4F_isError(status))
118 state->errcode = status;
123 status = LZ4F_compressBegin(
state->ctx,
126 if (LZ4F_isError(status))
128 state->errcode = status;
132 state->compressedlen = status;
137/*----------------------
139 *----------------------
142/* Private routines that support LZ4 compressed data I/O */
151 LZ4F_decompressionContext_t ctx = NULL;
152 LZ4F_decompressOptions_t dec_opt;
153 LZ4F_errorCode_t status;
155 memset(&dec_opt, 0,
sizeof(dec_opt));
156 status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
157 if (LZ4F_isError(status))
158 pg_fatal(
"could not create LZ4 decompression context: %s",
159 LZ4F_getErrorName(status));
164 while ((r = cs->
readF(AH, &readbuf, &readbuflen)) > 0)
169 /* Process one chunk */
171 readend = readbuf + r;
172 while (readp < readend)
175 size_t read_size = readend - readp;
178 status = LZ4F_decompress(ctx, outbuf, &out_size,
179 readp, &read_size, &dec_opt);
180 if (LZ4F_isError(status))
181 pg_fatal(
"could not decompress: %s",
182 LZ4F_getErrorName(status));
184 ahwrite(outbuf, 1, out_size, AH);
192 status = LZ4F_freeDecompressionContext(ctx);
193 if (LZ4F_isError(status))
194 pg_fatal(
"could not free LZ4 decompression context: %s",
195 LZ4F_getErrorName(status));
200 const void *
data,
size_t dLen)
207 /* Write the header if not yet written. */
211 state->needs_header_flush =
false;
223 status = LZ4F_compressUpdate(
state->ctx,
227 if (LZ4F_isError(status))
228 pg_fatal(
"could not compress data: %s",
229 LZ4F_getErrorName(status));
243 /* Nothing needs to be done */
248 * Write the header if not yet written. The caller is not required to call
249 * writeData if the relation does not contain any data. Thus it is
250 * possible to reach here without having flushed the header. Do it before
251 * ending the compression.
253 if (
state->needs_header_flush)
256 status = LZ4F_compressEnd(
state->ctx,
259 if (LZ4F_isError(status))
260 pg_fatal(
"could not end compression: %s",
261 LZ4F_getErrorName(status));
265 status = LZ4F_freeCompressionContext(
state->ctx);
266 if (LZ4F_isError(status))
267 pg_fatal(
"could not end compression: %s",
268 LZ4F_getErrorName(status));
277 * Public routines that support LZ4 compressed data I/O
284 cs->
readData = ReadDataFromArchiveLZ4;
286 cs->
end = EndCompressorLZ4;
291 * Read operations have access to the whole input. No state needs to be
292 * carried between calls.
301 if (!LZ4State_compression_init(
state))
302 pg_fatal(
"could not initialize LZ4 compression: %s",
303 LZ4F_getErrorName(
state->errcode));
305 /* Remember that the header has not been written. */
306 state->needs_header_flush =
true;
310/*----------------------
311 * Compress Stream API
312 *----------------------
317 * LZ4 equivalent to feof() or gzeof(). Return true iff there is no
318 * decompressed output in the overflow buffer and the end of the backing file
326 return state->overflowlen == 0 && feof(
state->fp);
335 if (LZ4F_isError(
state->errcode))
344 * Initialize an already alloc'ed LZ4State struct for subsequent calls.
346 * Creates the necessary contexts for either compression or decompression. When
347 * compressing data (indicated by compressing=true), it additionally writes the
348 * LZ4 header in the output stream.
350 * Returns true on success. In case of a failure returns false, and stores the
351 * error code in state->errcode.
354LZ4Stream_init(LZ4State *
state,
int size,
bool compressing)
361 state->compressing = compressing;
363 /* When compressing, write LZ4 header to the output stream. */
364 if (
state->compressing)
367 if (!LZ4State_compression_init(
state))
373 errno = (errno) ? errno : ENOSPC;
379 status = LZ4F_createDecompressionContext(&
state->dtx, LZ4F_VERSION);
380 if (LZ4F_isError(status))
382 state->errcode = status;
391 state->overflowlen = 0;
394 state->inited =
true;
399 * Read already decompressed content from the overflow buffer into 'ptr' up to
400 * 'size' bytes, if available. If the eol_flag is set, then stop at the first
401 * occurrence of the newline char prior to 'size' bytes.
403 * Any unread content in the overflow buffer is moved to the beginning.
405 * Returns the number of bytes read from the overflow buffer (and copied into
406 * the 'ptr' buffer), or 0 if the overflow buffer is empty.
409LZ4Stream_read_overflow(LZ4State *
state,
void *ptr,
int size,
bool eol_flag)
414 if (
state->overflowlen == 0)
417 if (
state->overflowlen >= size)
420 readlen =
state->overflowlen;
422 if (eol_flag && (p = memchr(
state->overflowbuf,
'\n', readlen)))
423 /* Include the line terminating char */
424 readlen = p -
state->overflowbuf + 1;
426 memcpy(ptr,
state->overflowbuf, readlen);
427 state->overflowlen -= readlen;
429 if (
state->overflowlen > 0)
430 memmove(
state->overflowbuf,
state->overflowbuf + readlen,
state->overflowlen);
436 * The workhorse for reading decompressed content out of an LZ4 compressed
439 * It will read up to 'ptrsize' decompressed content, or up to the new line
440 * char if found first when the eol_flag is set. It is possible that the
441 * decompressed output generated by reading any compressed input via the
442 * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
443 * at an overflow buffer within LZ4State. Of course, when the function is
444 * called, it will first try to consume any decompressed content already
445 * present in the overflow buffer, before decompressing new content.
447 * Returns the number of bytes of decompressed data copied into the ptr
448 * buffer, or -1 in case of error.
451LZ4Stream_read_internal(LZ4State *
state,
void *ptr,
int ptrsize,
bool eol_flag)
456 bool eol_found =
false;
461 if (!LZ4Stream_init(
state, size,
false /* decompressing */ ))
464 LZ4F_getErrorName(
state->errcode));
468 /* No work needs to be done for a zero-sized output buffer */
472 /* Verify that there is enough space in the outbuf */
473 if (size >
state->buflen)
475 state->buflen = size;
479 /* use already decompressed content if available */
480 dsize = LZ4Stream_read_overflow(
state, ptr, size, eol_flag);
481 if (dsize == size || (eol_flag && memchr(ptr,
'\n', dsize)))
491 rsize = fread(readbuf, 1, size,
state->fp);
492 if (rsize < size && !feof(
state->fp))
498 rp = (
char *) readbuf;
499 rend = (
char *) readbuf + rsize;
504 size_t outlen =
state->buflen;
505 size_t read_remain = rend - rp;
507 memset(
state->buffer, 0, outlen);
508 status = LZ4F_decompress(
state->dtx,
state->buffer, &outlen,
509 rp, &read_remain, NULL);
510 if (LZ4F_isError(status))
512 state->errcode = status;
514 LZ4F_getErrorName(
state->errcode));
521 * fill in what space is available in ptr if the eol flag is set,
522 * either skip if one already found or fill up to EOL if present
525 if (outlen > 0 && dsize < size && eol_found ==
false)
528 size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
529 size_t len = outlen < lib ? outlen : lib;
532 (p = memchr(
state->buffer,
'\n', outlen)) &&
533 (size_t) (p -
state->buffer + 1) <=
len)
539 memcpy((
char *) ptr + dsize,
state->buffer,
len);
542 /* move what did not fit, if any, at the beginning of the buf */
548 /* if there is available output, save it */
551 while (
state->overflowlen + outlen >
state->overflowalloclen)
553 state->overflowalloclen *= 2;
555 state->overflowalloclen);
558 memcpy(
state->overflowbuf +
state->overflowlen,
state->buffer, outlen);
559 state->overflowlen += outlen;
562 }
while (rsize == size && dsize < size && eol_found ==
false);
570 * Compress size bytes from ptr and write them to the stream.
580 if (!LZ4Stream_init(
state, size,
true))
581 pg_fatal(
"unable to initialize LZ4 library: %s",
582 LZ4F_getErrorName(
state->errcode));
592 if (LZ4F_isError(status))
593 pg_fatal(
"error during writing: %s", LZ4F_getErrorName(status));
596 if (fwrite(
state->buffer, 1, status,
state->fp) != status)
598 errno = (errno) ? errno : ENOSPC;
599 pg_fatal(
"error during writing: %m");
602 ptr = ((
const char *) ptr) + chunk;
607 * fread() equivalent implementation for LZ4 compressed files.
615 if ((ret = LZ4Stream_read_internal(
state, ptr, size,
false)) < 0)
616 pg_fatal(
"could not read from input file: %s", LZ4Stream_get_error(CFH));
622 * fgetc() equivalent implementation for LZ4 compressed files.
630 if (LZ4Stream_read_internal(
state, &
c, 1,
false) <= 0)
632 if (!LZ4Stream_eof(CFH))
633 pg_fatal(
"could not read from input file: %s", LZ4Stream_get_error(CFH));
635 pg_fatal(
"could not read from input file: end of file");
642 * fgets() equivalent implementation for LZ4 compressed files.
650 ret = LZ4Stream_read_internal(
state, ptr, size - 1,
true);
653 * LZ4Stream_read_internal returning 0 or -1 means that it was either an
654 * EOF or an error, but gets_func is defined to return NULL in either case
655 * so we can treat both the same here.
661 * Our caller expects the return string to be NULL terminated and we know
662 * that ret is greater than zero.
670 * Finalize (de)compression of a stream. When compressing it will write any
671 * remaining content and/or generated footer from the LZ4 API.
684 if (
state->compressing)
686 status = LZ4F_compressEnd(
state->ctx,
state->buffer,
state->buflen, NULL);
687 if (LZ4F_isError(status))
690 LZ4F_getErrorName(status));
695 if (fwrite(
state->buffer, 1, status,
state->fp) != status)
697 errno = (errno) ? errno : ENOSPC;
702 status = LZ4F_freeCompressionContext(
state->ctx);
703 if (LZ4F_isError(status))
705 LZ4F_getErrorName(status));
709 status = LZ4F_freeDecompressionContext(
state->dtx);
710 if (LZ4F_isError(status))
712 LZ4F_getErrorName(status));
734LZ4Stream_open(
const char *path,
int fd,
const char *
mode,
743 if (
state->fp == NULL)
745 state->errcode = errno;
800 pg_fatal(
"this build does not support compression with %s",
"LZ4");
807 pg_fatal(
"this build does not support compression with %s",
"LZ4");
#define DEFAULT_IO_BUFFER_SIZE
void InitCompressFileHandleLZ4(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
void InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
void * pg_malloc(size_t size)
void * pg_malloc0(size_t size)
void * pg_realloc(void *ptr, size_t size)
if(TABLE==NULL||TABLE_index==NULL)
#define pg_log_error(...)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
static PgChecksumMode mode
static int fd(const char *x, int i)
char * psprintf(const char *fmt,...)
char *(* gets_func)(char *s, int size, CompressFileHandle *CFH)
bool(* open_write_func)(const char *path, const char *mode, CompressFileHandle *CFH)
int(* getc_func)(CompressFileHandle *CFH)
const char *(* get_error_func)(CompressFileHandle *CFH)
bool(* eof_func)(CompressFileHandle *CFH)
size_t(* read_func)(void *ptr, size_t size, CompressFileHandle *CFH)
bool(* open_func)(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
pg_compress_specification compression_spec
bool(* close_func)(CompressFileHandle *CFH)
void(* write_func)(const void *ptr, size_t size, CompressFileHandle *CFH)
void(* readData)(ArchiveHandle *AH, CompressorState *cs)
pg_compress_specification compression_spec
void(* end)(ArchiveHandle *AH, CompressorState *cs)
void(* writeData)(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)