PostgreSQL Source Code: src/bin/pg_dump/compress_zstd.c Source File

PostgreSQL Source Code git master
compress_zstd.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * compress_zstd.c
4 * Routines for archivers to write a Zstd compressed data stream.
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/bin/pg_dump/compress_zstd.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres_fe.h"
16#include <unistd.h>
17
18#include "compress_zstd.h"
19#include "pg_backup_utils.h"
20
21#ifndef USE_ZSTD
22
23void
24 InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
25{
26 pg_fatal("this build does not support compression with %s", "ZSTD");
27}
28
29void
30 InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
31{
32 pg_fatal("this build does not support compression with %s", "ZSTD");
33}
34
35#else
36
37#include <zstd.h>
38
39typedef struct ZstdCompressorState
40{
41 /* This is a normal file to which we read/write compressed data */
42 FILE *fp;
43
44 ZSTD_CStream *cstream;
45 ZSTD_DStream *dstream;
46 ZSTD_outBuffer output;
47 ZSTD_inBuffer input;
48
49 /* pointer to a static string like from strerror(), for Zstd_write() */
50 const char *zstderror;
51} ZstdCompressorState;
52
53static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
54static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
55static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
56 const void *data, size_t dLen);
57static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
58
59static void
60_Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
61 ZSTD_cParameter param, int value, char *paramname)
62{
63 size_t res;
64
65 res = ZSTD_CCtx_setParameter(cstream, param, value);
66 if (ZSTD_isError(res))
67 pg_fatal("could not set compression parameter \"%s\": %s",
68 paramname, ZSTD_getErrorName(res));
69}
70
71/* Return a compression stream with parameters set per argument */
72static ZSTD_CStream *
73_ZstdCStreamParams(pg_compress_specification compress)
74{
75 ZSTD_CStream *cstream;
76
77 cstream = ZSTD_createCStream();
78 if (cstream == NULL)
79 pg_fatal("could not initialize compression library");
80
81 _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
82 compress.level, "level");
83
84 if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
85 _Zstd_CCtx_setParam_or_die(cstream,
86 ZSTD_c_enableLongDistanceMatching,
87 compress.long_distance, "long");
88
89 return cstream;
90}
91
92/* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
93static void
94_ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
95{
96 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
97 ZSTD_inBuffer *input = &zstdcs->input;
98 ZSTD_outBuffer *output = &zstdcs->output;
99
100 /* Loop while there's any input or until flushed */
101 while (input->pos != input->size || flush)
102 {
103 size_t res;
104
105 output->pos = 0;
106 res = ZSTD_compressStream2(zstdcs->cstream, output,
107 input, flush ? ZSTD_e_end : ZSTD_e_continue);
108
109 if (ZSTD_isError(res))
110 pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
111
112 /*
113 * Extra paranoia: avoid zero-length chunks, since a zero length chunk
114 * is the EOF marker in the custom format. This should never happen
115 * but...
116 */
117 if (output->pos > 0)
118 cs->writeF(AH, output->dst, output->pos);
119
120 if (res == 0)
121 break; /* End of frame or all input consumed */
122 }
123}
124
125static void
126EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
127{
128 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
129
130 if (cs->readF != NULL)
131 {
132 Assert(zstdcs->cstream == NULL);
133 ZSTD_freeDStream(zstdcs->dstream);
134 pg_free(unconstify(void *, zstdcs->input.src));
135 }
136 else if (cs->writeF != NULL)
137 {
138 Assert(zstdcs->dstream == NULL);
139 _ZstdWriteCommon(AH, cs, true);
140 ZSTD_freeCStream(zstdcs->cstream);
141 }
142
143 /* output buffer may be allocated in either mode */
144 pg_free(zstdcs->output.dst);
145 pg_free(zstdcs);
146 cs->private_data = NULL;
147}
148
149static void
150WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
151 const void *data, size_t dLen)
152{
153 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
154
155 zstdcs->input.src = data;
156 zstdcs->input.size = dLen;
157 zstdcs->input.pos = 0;
158
159 _ZstdWriteCommon(AH, cs, false);
160}
161
162static void
163ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
164{
165 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
166 ZSTD_outBuffer *output = &zstdcs->output;
167 ZSTD_inBuffer *input = &zstdcs->input;
168 size_t input_allocated_size = ZSTD_DStreamInSize();
169 size_t res;
170
171 for (;;)
172 {
173 size_t cnt;
174
175 /*
176 * Read compressed data. Note that readF can resize the buffer; the
177 * new size is tracked and used for future loops.
178 */
179 input->size = input_allocated_size;
180 cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
181
182 /* ensure that readF didn't *shrink* the buffer */
183 Assert(input->size >= input_allocated_size);
184 input_allocated_size = input->size;
185 input->size = cnt;
186 input->pos = 0;
187
188 if (cnt == 0)
189 break;
190
191 /* Now decompress */
192 while (input->pos < input->size)
193 {
194 output->pos = 0;
195 res = ZSTD_decompressStream(zstdcs->dstream, output, input);
196 if (ZSTD_isError(res))
197 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
198
199 /*
200 * then write the decompressed data to the output handle
201 */
202 ((char *) output->dst)[output->pos] = '0円';
203 ahwrite(output->dst, 1, output->pos, AH);
204
205 if (res == 0)
206 break; /* End of frame */
207 }
208 }
209}
210
211/* Public routine that supports Zstd compressed data I/O */
212void
213InitCompressorZstd(CompressorState *cs,
214 const pg_compress_specification compression_spec)
215{
216 ZstdCompressorState *zstdcs;
217
218 cs->readData = ReadDataFromArchiveZstd;
219 cs->writeData = WriteDataToArchiveZstd;
220 cs->end = EndCompressorZstd;
221
222 cs->compression_spec = compression_spec;
223
224 zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
225 cs->private_data = zstdcs;
226
227 /* We expect that exactly one of readF/writeF is specified */
228 Assert((cs->readF == NULL) != (cs->writeF == NULL));
229
230 if (cs->readF != NULL)
231 {
232 zstdcs->dstream = ZSTD_createDStream();
233 if (zstdcs->dstream == NULL)
234 pg_fatal("could not initialize compression library");
235
236 zstdcs->input.size = ZSTD_DStreamInSize();
237 zstdcs->input.src = pg_malloc(zstdcs->input.size);
238
239 /*
240 * output.size is the buffer size we tell zstd it can output to.
241 * Allocate an additional byte such that ReadDataFromArchiveZstd() can
242 * call ahwrite() with a null-terminated string, which is an optimized
243 * case in ExecuteSqlCommandBuf().
244 */
245 zstdcs->output.size = ZSTD_DStreamOutSize();
246 zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
247 }
248 else if (cs->writeF != NULL)
249 {
250 zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
251
252 zstdcs->output.size = ZSTD_CStreamOutSize();
253 zstdcs->output.dst = pg_malloc(zstdcs->output.size);
254 zstdcs->output.pos = 0;
255 }
256}
257
258/*
259 * Compressed stream API
260 */
261
262static size_t
263Zstd_read_internal(void *ptr, size_t size, CompressFileHandle *CFH, bool exit_on_error)
264{
265 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
266 ZSTD_inBuffer *input = &zstdcs->input;
267 ZSTD_outBuffer *output = &zstdcs->output;
268 size_t input_allocated_size = ZSTD_DStreamInSize();
269 size_t res,
270 cnt;
271
272 /*
273 * If this is the first call to the reading function, initialize the
274 * required datastructures.
275 */
276 if (zstdcs->dstream == NULL)
277 {
278 zstdcs->input.src = pg_malloc0(input_allocated_size);
279 zstdcs->dstream = ZSTD_createDStream();
280 if (zstdcs->dstream == NULL)
281 {
282 if (exit_on_error)
283 pg_fatal("could not initialize compression library");
284 return -1;
285 }
286 }
287
288 output->size = size;
289 output->dst = ptr;
290 output->pos = 0;
291
292 for (;;)
293 {
294 Assert(input->pos <= input->size);
295 Assert(input->size <= input_allocated_size);
296
297 /*
298 * If the input is completely consumed, start back at the beginning
299 */
300 if (input->pos == input->size)
301 {
302 /* input->size is size produced by "fread" */
303 input->size = 0;
304 /* input->pos is position consumed by decompress */
305 input->pos = 0;
306 }
307
308 /* read compressed data if we must produce more input */
309 if (input->pos == input->size)
310 {
311 cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
312 if (ferror(zstdcs->fp))
313 {
314 if (exit_on_error)
315 pg_fatal("could not read from input file: %m");
316 return -1;
317 }
318
319 input->size = cnt;
320
321 Assert(cnt <= input_allocated_size);
322
323 /* If we have no more input to consume, we're done */
324 if (cnt == 0)
325 break;
326 }
327
328 while (input->pos < input->size)
329 {
330 /* now decompress */
331 res = ZSTD_decompressStream(zstdcs->dstream, output, input);
332
333 if (ZSTD_isError(res))
334 {
335 if (exit_on_error)
336 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
337 return -1;
338 }
339
340 if (output->pos == output->size)
341 break; /* No more room for output */
342
343 if (res == 0)
344 break; /* End of frame */
345 }
346
347 if (output->pos == output->size)
348 break; /* We read all the data that fits */
349 }
350
351 return output->pos;
352}
353
354static void
355Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
356{
357 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
358 ZSTD_inBuffer *input = &zstdcs->input;
359 ZSTD_outBuffer *output = &zstdcs->output;
360 size_t res,
361 cnt;
362
363 input->src = ptr;
364 input->size = size;
365 input->pos = 0;
366
367 if (zstdcs->cstream == NULL)
368 {
369 zstdcs->output.size = ZSTD_CStreamOutSize();
370 zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
371 zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
372 if (zstdcs->cstream == NULL)
373 pg_fatal("could not initialize compression library");
374 }
375
376 /* Consume all input, to be flushed later */
377 while (input->pos != input->size)
378 {
379 output->pos = 0;
380 res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
381 if (ZSTD_isError(res))
382 pg_fatal("could not write to file: %s", ZSTD_getErrorName(res));
383
384 errno = 0;
385 cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
386 if (cnt != output->pos)
387 {
388 errno = (errno) ? errno : ENOSPC;
389 pg_fatal("could not write to file: %m");
390 }
391 }
392}
393
394static int
395Zstd_getc(CompressFileHandle *CFH)
396{
397 unsigned char ret;
398
399 if (CFH->read_func(&ret, 1, CFH) != 1)
400 pg_fatal("could not read from input file: end of file");
401 return ret;
402}
403
404static char *
405Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
406{
407 int i;
408
409 Assert(len > 0);
410
411 /*
412 * Read one byte at a time until newline or EOF. This is only used to read
413 * the list of LOs, and the I/O is buffered anyway.
414 */
415 for (i = 0; i < len - 1; ++i)
416 {
417 if (Zstd_read_internal(&buf[i], 1, CFH, false) != 1)
418 break;
419 if (buf[i] == '\n')
420 {
421 ++i;
422 break;
423 }
424 }
425 buf[i] = '0円';
426 return i > 0 ? buf : NULL;
427}
428
429static size_t
430Zstd_read(void *ptr, size_t size, CompressFileHandle *CFH)
431{
432 return Zstd_read_internal(ptr, size, CFH, true);
433}
434
435static bool
436Zstd_close(CompressFileHandle *CFH)
437{
438 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
439 bool success = true;
440
441 if (zstdcs->cstream)
442 {
443 size_t res,
444 cnt;
445 ZSTD_inBuffer *input = &zstdcs->input;
446 ZSTD_outBuffer *output = &zstdcs->output;
447
448 /* Loop until the compression buffers are fully consumed */
449 for (;;)
450 {
451 output->pos = 0;
452 res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
453 if (ZSTD_isError(res))
454 {
455 zstdcs->zstderror = ZSTD_getErrorName(res);
456 success = false;
457 break;
458 }
459
460 errno = 0;
461 cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
462 if (cnt != output->pos)
463 {
464 errno = (errno) ? errno : ENOSPC;
465 zstdcs->zstderror = strerror(errno);
466 success = false;
467 break;
468 }
469
470 if (res == 0)
471 break; /* End of frame */
472 }
473
474 ZSTD_freeCStream(zstdcs->cstream);
475 pg_free(zstdcs->output.dst);
476 }
477
478 if (zstdcs->dstream)
479 {
480 ZSTD_freeDStream(zstdcs->dstream);
481 pg_free(unconstify(void *, zstdcs->input.src));
482 }
483
484 errno = 0;
485 if (fclose(zstdcs->fp) != 0)
486 {
487 zstdcs->zstderror = strerror(errno);
488 success = false;
489 }
490
491 pg_free(zstdcs);
492 CFH->private_data = NULL;
493 return success;
494}
495
496static bool
497Zstd_eof(CompressFileHandle *CFH)
498{
499 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
500
501 return feof(zstdcs->fp);
502}
503
504static bool
505Zstd_open(const char *path, int fd, const char *mode,
506 CompressFileHandle *CFH)
507{
508 FILE *fp;
509 ZstdCompressorState *zstdcs;
510
511 /*
512 * Clear state storage to avoid having the fd point to non-NULL memory on
513 * error return.
514 */
515 CFH->private_data = NULL;
516
517 zstdcs = (ZstdCompressorState *) pg_malloc_extended(sizeof(*zstdcs),
518 MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
519 if (!zstdcs)
520 {
521 errno = ENOMEM;
522 return false;
523 }
524
525 if (fd >= 0)
526 fp = fdopen(dup(fd), mode);
527 else
528 fp = fopen(path, mode);
529
530 if (fp == NULL)
531 {
532 pg_free(zstdcs);
533 return false;
534 }
535
536 zstdcs->fp = fp;
537 CFH->private_data = zstdcs;
538
539 return true;
540}
541
542static bool
543Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
544{
545 char fname[MAXPGPATH];
546
547 sprintf(fname, "%s.zst", path);
548 return CFH->open_func(fname, -1, mode, CFH);
549}
550
551static const char *
552Zstd_get_error(CompressFileHandle *CFH)
553{
554 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
555
556 return zstdcs->zstderror;
557}
558
559void
560InitCompressFileHandleZstd(CompressFileHandle *CFH,
561 const pg_compress_specification compression_spec)
562{
563 CFH->open_func = Zstd_open;
564 CFH->open_write_func = Zstd_open_write;
565 CFH->read_func = Zstd_read;
566 CFH->write_func = Zstd_write;
567 CFH->gets_func = Zstd_gets;
568 CFH->getc_func = Zstd_getc;
569 CFH->close_func = Zstd_close;
570 CFH->eof_func = Zstd_eof;
571 CFH->get_error_func = Zstd_get_error;
572
573 CFH->compression_spec = compression_spec;
574
575 CFH->private_data = NULL;
576}
577
578#endif /* USE_ZSTD */
#define unconstify(underlying_type, expr)
Definition: c.h:1244
void InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
Definition: compress_zstd.c:24
void InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
Definition: compress_zstd.c:30
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition: compression.h:30
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
void * pg_malloc_extended(size_t size, int flags)
Definition: fe_memutils.c:59
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define MCXT_ALLOC_ZERO
Definition: fe_memutils.h:30
#define MCXT_ALLOC_NO_OOM
Definition: fe_memutils.h:29
Assert(PointerIsAligned(start, uint64))
FILE * input
FILE * output
static struct @169 value
static bool success
Definition: initdb.c:187
i
int i
Definition: isn.c:77
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
#define pg_fatal(...)
static PgChecksumMode mode
Definition: pg_checksums.c:55
#define MAXPGPATH
const void size_t len
const void * data
while(p+4<=pend)
static char * buf
Definition: pg_test_fsync.c:72
#define sprintf
Definition: port.h:241
#define strerror
Definition: port.h:252
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char *(* gets_func)(char *s, int size, CompressFileHandle *CFH)
Definition: compress_io.h:153
void * private_data
Definition: compress_io.h:192
bool(* open_write_func)(const char *path, const char *mode, CompressFileHandle *CFH)
Definition: compress_io.h:122
int(* getc_func)(CompressFileHandle *CFH)
Definition: compress_io.h:162
const char *(* get_error_func)(CompressFileHandle *CFH)
Definition: compress_io.h:182
bool(* eof_func)(CompressFileHandle *CFH)
Definition: compress_io.h:169
size_t(* read_func)(void *ptr, size_t size, CompressFileHandle *CFH)
Definition: compress_io.h:132
bool(* open_func)(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
Definition: compress_io.h:111
pg_compress_specification compression_spec
Definition: compress_io.h:187
bool(* close_func)(CompressFileHandle *CFH)
Definition: compress_io.h:176
void(* write_func)(const void *ptr, size_t size, CompressFileHandle *CFH)
Definition: compress_io.h:140
void * private_data
Definition: compress_io.h:87
void(* readData)(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.h:56
pg_compress_specification compression_spec
Definition: compress_io.h:82
void(* end)(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.h:67
ReadFunc readF
Definition: compress_io.h:72
void(* writeData)(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.h:61
WriteFunc writeF
Definition: compress_io.h:77

AltStyle によって変換されたページ (->オリジナル) /