1/*-------------------------------------------------------------------------
4 * Routines for archivers to write a Zstd compressed data stream.
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/bin/pg_dump/compress_zstd.c
12 *-------------------------------------------------------------------------
26 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
32 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
39typedef struct ZstdCompressorState
41 /* This is a normal file to which we read/write compressed data */
44 ZSTD_CStream *cstream;
45 ZSTD_DStream *dstream;
49 /* pointer to a static string like from strerror(), for Zstd_write() */
50 const char *zstderror;
56 const void *
data,
size_t dLen);
60_Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
61 ZSTD_cParameter param,
int value,
char *paramname)
65 res = ZSTD_CCtx_setParameter(cstream, param,
value);
66 if (ZSTD_isError(res))
67 pg_fatal(
"could not set compression parameter \"%s\": %s",
68 paramname, ZSTD_getErrorName(res));
71/* Return a compression stream with parameters set per argument */
75 ZSTD_CStream *cstream;
77 cstream = ZSTD_createCStream();
79 pg_fatal(
"could not initialize compression library");
81 _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
82 compress.
level,
"level");
85 _Zstd_CCtx_setParam_or_die(cstream,
86 ZSTD_c_enableLongDistanceMatching,
92/* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
96 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->
private_data;
97 ZSTD_inBuffer *
input = &zstdcs->input;
98 ZSTD_outBuffer *
output = &zstdcs->output;
100 /* Loop while there's any input or until flushed */
106 res = ZSTD_compressStream2(zstdcs->cstream,
output,
107 input, flush ? ZSTD_e_end : ZSTD_e_continue);
109 if (ZSTD_isError(res))
110 pg_fatal(
"could not compress data: %s", ZSTD_getErrorName(res));
113 * Extra paranoia: avoid zero-length chunks, since a zero length chunk
114 * is the EOF marker in the custom format. This should never happen
121 break;
/* End of frame or all input consumed */
128 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->
private_data;
132 Assert(zstdcs->cstream == NULL);
133 ZSTD_freeDStream(zstdcs->dstream);
136 else if (cs->
writeF != NULL)
138 Assert(zstdcs->dstream == NULL);
139 _ZstdWriteCommon(AH, cs,
true);
140 ZSTD_freeCStream(zstdcs->cstream);
143 /* output buffer may be allocated in either mode */
151 const void *
data,
size_t dLen)
153 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->
private_data;
155 zstdcs->input.src =
data;
156 zstdcs->input.size = dLen;
157 zstdcs->input.pos = 0;
159 _ZstdWriteCommon(AH, cs,
false);
165 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->
private_data;
166 ZSTD_outBuffer *
output = &zstdcs->output;
167 ZSTD_inBuffer *
input = &zstdcs->input;
168 size_t input_allocated_size = ZSTD_DStreamInSize();
176 * Read compressed data. Note that readF can resize the buffer; the
177 * new size is tracked and used for future loops.
179 input->size = input_allocated_size;
182 /* ensure that readF didn't *shrink* the buffer */
184 input_allocated_size =
input->size;
195 res = ZSTD_decompressStream(zstdcs->dstream,
output,
input);
196 if (ZSTD_isError(res))
197 pg_fatal(
"could not decompress data: %s", ZSTD_getErrorName(res));
200 * then write the decompressed data to the output handle
206 break;
/* End of frame */
211/* Public routine that supports Zstd compressed data I/O */
216 ZstdCompressorState *zstdcs;
218 cs->
readData = ReadDataFromArchiveZstd;
220 cs->
end = EndCompressorZstd;
224 zstdcs = (ZstdCompressorState *)
pg_malloc0(
sizeof(*zstdcs));
227 /* We expect that exactly one of readF/writeF is specified */
230 if (cs->
readF != NULL)
232 zstdcs->dstream = ZSTD_createDStream();
233 if (zstdcs->dstream == NULL)
234 pg_fatal(
"could not initialize compression library");
236 zstdcs->input.size = ZSTD_DStreamInSize();
237 zstdcs->input.src =
pg_malloc(zstdcs->input.size);
240 * output.size is the buffer size we tell zstd it can output to.
241 * Allocate an additional byte such that ReadDataFromArchiveZstd() can
242 * call ahwrite() with a null-terminated string, which is an optimized
243 * case in ExecuteSqlCommandBuf().
245 zstdcs->output.size = ZSTD_DStreamOutSize();
246 zstdcs->output.dst =
pg_malloc(zstdcs->output.size + 1);
248 else if (cs->
writeF != NULL)
252 zstdcs->output.size = ZSTD_CStreamOutSize();
253 zstdcs->output.dst =
pg_malloc(zstdcs->output.size);
254 zstdcs->output.pos = 0;
259 * Compressed stream API
263Zstd_read_internal(
void *ptr,
size_t size,
CompressFileHandle *CFH,
bool exit_on_error)
265 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
266 ZSTD_inBuffer *
input = &zstdcs->input;
267 ZSTD_outBuffer *
output = &zstdcs->output;
268 size_t input_allocated_size = ZSTD_DStreamInSize();
273 * If this is the first call to the reading function, initialize the
274 * required datastructures.
276 if (zstdcs->dstream == NULL)
278 zstdcs->input.src =
pg_malloc0(input_allocated_size);
279 zstdcs->dstream = ZSTD_createDStream();
280 if (zstdcs->dstream == NULL)
283 pg_fatal(
"could not initialize compression library");
298 * If the input is completely consumed, start back at the beginning
302 /* input->size is size produced by "fread" */
304 /* input->pos is position consumed by decompress */
308 /* read compressed data if we must produce more input */
311 cnt = fread(
unconstify(
void *,
input->src), 1, input_allocated_size, zstdcs->fp);
312 if (ferror(zstdcs->fp))
315 pg_fatal(
"could not read from input file: %m");
321 Assert(cnt <= input_allocated_size);
323 /* If we have no more input to consume, we're done */
331 res = ZSTD_decompressStream(zstdcs->dstream,
output,
input);
333 if (ZSTD_isError(res))
336 pg_fatal(
"could not decompress data: %s", ZSTD_getErrorName(res));
341 break;
/* No more room for output */
344 break;
/* End of frame */
348 break;
/* We read all the data that fits */
357 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
358 ZSTD_inBuffer *
input = &zstdcs->input;
359 ZSTD_outBuffer *
output = &zstdcs->output;
367 if (zstdcs->cstream == NULL)
369 zstdcs->output.size = ZSTD_CStreamOutSize();
370 zstdcs->output.dst =
pg_malloc0(zstdcs->output.size);
372 if (zstdcs->cstream == NULL)
373 pg_fatal(
"could not initialize compression library");
376 /* Consume all input, to be flushed later */
380 res = ZSTD_compressStream2(zstdcs->cstream,
output,
input, ZSTD_e_continue);
381 if (ZSTD_isError(res))
382 pg_fatal(
"could not write to file: %s", ZSTD_getErrorName(res));
388 errno = (errno) ? errno : ENOSPC;
389 pg_fatal(
"could not write to file: %m");
400 pg_fatal(
"could not read from input file: end of file");
412 * Read one byte at a time until newline or EOF. This is only used to read
413 * the list of LOs, and the I/O is buffered anyway.
415 for (
i = 0;
i <
len - 1; ++
i)
417 if (Zstd_read_internal(&
buf[
i], 1, CFH,
false) != 1)
426 return i > 0 ?
buf : NULL;
432 return Zstd_read_internal(ptr, size, CFH,
true);
438 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
445 ZSTD_inBuffer *
input = &zstdcs->input;
446 ZSTD_outBuffer *
output = &zstdcs->output;
448 /* Loop until the compression buffers are fully consumed */
452 res = ZSTD_compressStream2(zstdcs->cstream,
output,
input, ZSTD_e_end);
453 if (ZSTD_isError(res))
455 zstdcs->zstderror = ZSTD_getErrorName(res);
464 errno = (errno) ? errno : ENOSPC;
465 zstdcs->zstderror =
strerror(errno);
471 break;
/* End of frame */
474 ZSTD_freeCStream(zstdcs->cstream);
480 ZSTD_freeDStream(zstdcs->dstream);
485 if (fclose(zstdcs->fp) != 0)
487 zstdcs->zstderror =
strerror(errno);
499 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
501 return feof(zstdcs->fp);
505Zstd_open(
const char *path,
int fd,
const char *
mode,
509 ZstdCompressorState *zstdcs;
512 * Clear state storage to avoid having the fd point to non-NULL memory on
526 fp = fdopen(dup(
fd),
mode);
528 fp = fopen(path,
mode);
547 sprintf(fname,
"%s.zst", path);
554 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
556 return zstdcs->zstderror;
#define unconstify(underlying_type, expr)
void InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
void InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
void * pg_malloc(size_t size)
void * pg_malloc_extended(size_t size, int flags)
void * pg_malloc0(size_t size)
#define MCXT_ALLOC_NO_OOM
Assert(PointerIsAligned(start, uint64))
if(TABLE==NULL||TABLE_index==NULL)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
static PgChecksumMode mode
static int fd(const char *x, int i)
char *(* gets_func)(char *s, int size, CompressFileHandle *CFH)
bool(* open_write_func)(const char *path, const char *mode, CompressFileHandle *CFH)
int(* getc_func)(CompressFileHandle *CFH)
const char *(* get_error_func)(CompressFileHandle *CFH)
bool(* eof_func)(CompressFileHandle *CFH)
size_t(* read_func)(void *ptr, size_t size, CompressFileHandle *CFH)
bool(* open_func)(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
pg_compress_specification compression_spec
bool(* close_func)(CompressFileHandle *CFH)
void(* write_func)(const void *ptr, size_t size, CompressFileHandle *CFH)
void(* readData)(ArchiveHandle *AH, CompressorState *cs)
pg_compress_specification compression_spec
void(* end)(ArchiveHandle *AH, CompressorState *cs)
void(* writeData)(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)