1/*-------------------------------------------------------------------------
3 * walmethods.c - implementations of different ways to write received wal
5 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * src/bin/pg_basebackup/walmethods.c
9 *-------------------------------------------------------------------------
32/* Size of zlib buffer for .tar.gz */
33 #define ZLIB_OUT_SIZE 4096
35/* Size of LZ4 input chunk for .lz4 */
36 #define LZ4_IN_SIZE 4096
38/*-------------------------------------------------------------------------
39 * WalDirectoryMethod - write wal to a directory looking like pg_wal
40 *-------------------------------------------------------------------------
45 const char *temp_suffix,
50 const char *pathname);
52 const char *pathname,
const char *temp_suffix);
71 * Global static data for this method
92 LZ4F_compressionContext_t ctx;
98 #define clear_error(wwmethod) \
99 ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
103 const char *pathname,
const char *temp_suffix)
111 temp_suffix ? temp_suffix :
"");
118 const char *temp_suffix,
size_t pad_to_size)
129 LZ4F_compressionContext_t ctx = NULL;
130 size_t lz4bufsize = 0;
137 snprintf(tmppath,
sizeof(tmppath),
"%s/%s",
142 * Open a file for non-compressed as well as compressed files. Tracking
143 * the file descriptor is important for dir_sync() method as gzflush()
144 * does not do any system calls to fsync() to make changes permanent on
157 gzfp = gzdopen(
fd,
"wb");
166 Z_DEFAULT_STRATEGY) != Z_OK)
179 LZ4F_preferences_t prefs;
181 ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
182 if (LZ4F_isError(ctx_out))
189 lz4bufsize = LZ4F_compressBound(
LZ4_IN_SIZE, NULL);
192 /* assign the compression level, default is 0 */
193 memset(&prefs, 0,
sizeof(prefs));
197 header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
198 if (LZ4F_isError(header_size))
201 (void) LZ4F_freeCompressionContext(ctx);
208 if (
write(
fd, lz4buf, header_size) != header_size)
210 /* If write didn't set errno, assume problem is no disk space */
211 wwmethod->
lasterrno = errno ? errno : ENOSPC;
212 (void) LZ4F_freeCompressionContext(ctx);
220 /* Do pre-padding on non-compressed files */
235 * pg_pwrite() (called via pg_pwrite_zeros()) may have moved the file
236 * position, so reset it (see win32pwrite.c).
238 if (lseek(
fd, 0, SEEK_SET) != 0)
247 * fsync WAL file and containing directory, to ensure the file is
248 * persistently created and zeroed (if padded). That's particularly
249 * important when using synchronous mode, where the file is modified and
250 * fsynced in-place, without a directory fsync.
266 (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
267 (void) LZ4F_freeCompressionContext(ctx);
288 f->lz4bufsize = lz4bufsize;
316 r = (ssize_t) gzwrite(df->gzfp,
buf, count);
319 /* If write didn't set errno, assume problem is no disk space */
330 const void *inbuf =
buf;
343 compressed = LZ4F_compressUpdate(df->ctx,
344 df->lz4buf, df->lz4bufsize,
348 if (LZ4F_isError(compressed))
355 if (
write(df->
fd, df->lz4buf, compressed) != compressed)
357 /* If write didn't set errno, assume problem is no disk space */
362 inbuf = ((
char *) inbuf) + chunk;
365 /* Our caller keeps track of the uncompressed size. */
375 /* If write didn't set errno, assume problem is no disk space */
399 errno = 0;
/* in case gzclose() doesn't set it */
400 r = gzclose(df->gzfp);
409 compressed = LZ4F_compressEnd(df->ctx,
410 df->lz4buf, df->lz4bufsize,
413 if (LZ4F_isError(compressed))
420 if (
write(df->
fd, df->lz4buf, compressed) != compressed)
422 /* If write didn't set errno, assume problem is no disk space */
435 /* Build path to the current version of the file */
442 * If we have a temp prefix, normal operation is to rename the
447 snprintf(tmppath,
sizeof(tmppath),
"%s/%s",
451 /* permanent name, so no need for the prefix */
453 snprintf(tmppath2,
sizeof(tmppath2),
"%s/%s",
460 if (rename(tmppath, tmppath2) != 0)
462 pg_log_error(
"could not rename file \"%s\" to \"%s\": %m",
472 /* Unlink the file once it's closed */
475 snprintf(tmppath,
sizeof(tmppath),
"%s/%s",
483 * Else either CLOSE_NORMAL and no temp suffix, or
484 * CLOSE_NO_RENAME. In this case, fsync the file and containing
485 * directory if sync mode is requested.
501 /* supports free on NULL */
502 LZ4F_freeCompressionContext(df->ctx);
529 f->wwmethod->lasterrno = errno;
540 /* Flush any internal buffers */
541 compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
542 if (LZ4F_isError(compressed))
549 if (
write(df->
fd, df->lz4buf, compressed) != compressed)
551 /* If write didn't set errno, assume problem is no disk space */
571 snprintf(tmppath,
sizeof(tmppath),
"%s/%s",
574 if (
stat(tmppath, &statbuf) != 0)
592 snprintf(tmppath,
sizeof(tmppath),
"%s/%s",
599 * Skip setting dir_data->lasterrno here because we are only checking
617 * Files are fsynced when they are closed, but we need to fsync the
618 * directory entry here as well.
642 int compression_level,
bool sync)
655 return &wwmethod->
base;
659/*-------------------------------------------------------------------------
660 * WalTarMethod - write wal to a tar file containing pg_wal contents
661 *-------------------------------------------------------------------------
665 const char *pathname,
666 const char *temp_suffix,
671 const char *pathname);
673 const char *pathname,
const char *temp_suffix);
713tar_write_compressed_data(
TarMethodData *tar_data,
const void *
buf,
size_t count,
716 tar_data->zp->next_in =
buf;
717 tar_data->zp->avail_in = count;
719 while (tar_data->zp->avail_in || flush)
723 r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
724 if (r == Z_STREAM_ERROR)
737 /* If write didn't set errno, assume problem is no disk space */
742 tar_data->zp->next_out = tar_data->zlibOut;
746 if (r == Z_STREAM_END)
752 /* Reset the stream for writing */
753 if (deflateReset(tar_data->zp) != Z_OK)
773 /* Tarfile will always be positioned at the end */
780 /* If write didn't set errno, assume problem is no disk space */
790 if (!tar_write_compressed_data(tar_data,
buf, count,
false))
798 /* Can't happen - compression enabled with no method set */
808 size_t bytesleft = bytes;
810 memset(zerobuf.
data, 0, XLOG_BLCKSZ);
813 size_t bytestowrite =
Min(bytesleft, XLOG_BLCKSZ);
826 const char *temp_suffix)
831 pathname, temp_suffix ? temp_suffix :
"");
838 const char *temp_suffix,
size_t pad_to_size)
845 if (tar_data->
fd < 0)
848 * We open the tar file only when we first try to write to it.
853 if (tar_data->
fd < 0)
862 tar_data->zp = (z_streamp)
pg_malloc(
sizeof(z_stream));
863 tar_data->zp->zalloc = Z_NULL;
864 tar_data->zp->zfree = Z_NULL;
865 tar_data->zp->opaque = Z_NULL;
866 tar_data->zp->next_out = tar_data->zlibOut;
870 * Initialize deflation library. Adding the magic value 16 to the
871 * default 15 for the windowBits parameter makes the output be
872 * gzip instead of zlib.
875 Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
880 _(
"could not initialize compression library");
886 /* There's no tar header itself, the file starts with regular files */
892 _(
"implementation error: tar files can't have more than one open file");
901 /* Create a header with size set to 0 - we will fill out the size on close */
916 /* Flush existing data */
917 if (!tar_write_compressed_data(tar_data, NULL, 0,
true))
920 /* Turn off compression for header */
921 if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
924 _(
"could not change compression parameters");
946 /* If write didn't set errno, assume problem is no disk space */
947 wwmethod->
lasterrno = errno ? errno : ENOSPC;
956 /* Write header through the zlib APIs but with no compression */
961 /* Re-enable compression for the rest of the file */
963 Z_DEFAULT_STRATEGY) != Z_OK)
965 wwmethod->
lasterrstring =
_(
"could not change compression parameters");
979 * Uncompressed files are padded on creation, but for compression we can't
987 /* Uncompressed, so pad now */
990 /* Seek back to start */
991 if (lseek(tar_data->
fd,
1011 /* Currently not used, so not supported */
1029 * Always sync the whole tarfile, because that's all we can do. This makes
1030 * no sense on compressed files, so just ignore those.
1061 * Unlink the file that we just wrote to the tar. We do this by
1062 * truncating it to the start of the header. This is safe as we only
1063 * allow writing of the very last file.
1065 if (ftruncate(tar_data->
fd, tf->ofs_start) != 0)
1079 * Pad the file itself with zeroes if necessary. Note that this is
1080 * different from the tar format padding -- this is the padding we asked
1081 * for when the file was opened.
1083 if (tf->pad_to_size)
1088 * A compressed tarfile is padded on close since we cannot know
1089 * the size of the compressed output until the end.
1091 size_t sizeleft = tf->
pad_to_size - tf->base.currpos;
1102 * An uncompressed tarfile was padded on creation, so just adjust
1103 * the current position as if we seeked to the end.
1105 tf->base.currpos = tf->pad_to_size;
1110 * Get the size of the file, and pad out to a multiple of the tar block
1119 if (
tar_write(f, zerobuf, padding) != padding)
1127 /* Flush the current buffer */
1128 if (!tar_write_compressed_data(tar_data, NULL, 0,
true))
1134 * Now go back and update the header with the correct filesize and
1135 * possibly also renaming the file. We overwrite the entire current header
1136 * when done, including the checksum.
1143 * We overwrite it with what it was before if we have no tempname,
1144 * since we're going to write the buffer anyway.
1150 if (lseek(tar_data->
fd, tf->ofs_start, SEEK_SET) != ((
TarMethodFile *) f)->ofs_start)
1160 /* If write didn't set errno, assume problem is no disk space */
1168 /* Turn off compression */
1169 if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
1175 /* Overwrite the header, assuming the size will be the same */
1180 /* Turn compression back on */
1182 Z_DEFAULT_STRATEGY) != Z_OK)
1195 /* Move file pointer back down to end, so we can write the next file */
1196 if (lseek(tar_data->
fd, 0, SEEK_END) < 0)
1202 /* Always fsync on close, so the padding gets fsynced */
1205 /* XXX this seems pretty bogus; why is only this case fatal? */
1206 pg_fatal(
"could not fsync file \"%s\": %s",
1210 /* Clean up and done */
1222 /* We only deal with new tarfiles, so nothing externally created exists */
1230 char zerobuf[1024] = {0};
1240 /* A tarfile always ends with two empty blocks */
1244 if (
write(tar_data->
fd, zerobuf,
sizeof(zerobuf)) !=
sizeof(zerobuf))
1246 /* If write didn't set errno, assume problem is no disk space */
1247 wwmethod->
lasterrno = errno ? errno : ENOSPC;
1254 if (!tar_write_compressed_data(tar_data, zerobuf,
sizeof(zerobuf),
1258 /* Also flush all data to make sure the gzip stream is finished */
1259 tar_data->zp->next_in = NULL;
1260 tar_data->zp->avail_in = 0;
1265 r = deflate(tar_data->zp, Z_FINISH);
1267 if (r == Z_STREAM_ERROR)
1280 * If write didn't set errno, assume problem is no disk
1283 wwmethod->
lasterrno = errno ? errno : ENOSPC;
1287 if (r == Z_STREAM_END)
1291 if (deflateEnd(tar_data->zp) != Z_OK)
1293 wwmethod->
lasterrstring =
_(
"could not close compression stream");
1304 /* sync the empty blocks as well, since they're after the last file */
1349 * The argument compression_algorithm is currently ignored. It is in place for
1350 * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
1351 * between the different compression methods. CreateWalTarMethod and its family
1352 * of functions handle only zlib compression.
1357 int compression_level,
bool sync)
1379 return &wwmethod->
base;
int durable_rename(const char *oldfile, const char *newfile, int elevel)
void fsync_fname(const char *fname, bool isdir)
static int fsync_parent_path(const char *fname, int elevel)
void * pg_malloc(size_t size)
char * pg_strdup(const char *in)
void * pg_malloc0(size_t size)
ssize_t pg_pwrite_zeros(int fd, size_t size, off_t offset)
Assert(PointerIsAligned(start, uint64))
#define pg_log_error(...)
static pg_compress_algorithm compression_algorithm
static size_t tarPaddingBytesRequired(size_t len)
int tarChecksum(char *header)
enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime)
void print_tar_number(char *s, int len, uint64 val)
size_t strlcpy(char *dst, const char *src, size_t siz)
static int fd(const char *x, int i)
TarMethodFile * currentfile
char header[TAR_BLOCK_SIZE]
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
const char * lasterrstring
const WalWriteMethodOps * ops
pg_compress_algorithm compression_algorithm
WalWriteMethod * wwmethod
WalWriteMethod * CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
static char * dir_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
static Walfile * dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
struct DirectoryMethodData DirectoryMethodData
static Walfile * tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
static ssize_t tar_write(Walfile *f, const void *buf, size_t count)
static void tar_free(WalWriteMethod *wwmethod)
static const WalWriteMethodOps WalTarMethodOps
struct TarMethodData TarMethodData
static bool dir_finish(WalWriteMethod *wwmethod)
#define clear_error(wwmethod)
static const WalWriteMethodOps WalDirectoryMethodOps
WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
struct DirectoryMethodFile DirectoryMethodFile
static void dir_free(WalWriteMethod *wwmethod)
static int dir_sync(Walfile *f)
static int tar_sync(Walfile *f)
static bool tar_finish(WalWriteMethod *wwmethod)
static int dir_close(Walfile *f, WalCloseMethod method)
static ssize_t dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
static ssize_t tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
struct TarMethodFile TarMethodFile
static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
static int tar_close(Walfile *f, WalCloseMethod method)
static ssize_t dir_write(Walfile *f, const void *buf, size_t count)
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
static bool tar_write_padding_data(TarMethodFile *f, size_t bytes)
static char * tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)