@@ -1,7 +1,6 @@
import gzip
import io
import os
from pathlib import Path
import subprocess
import sys
import tarfile
Expand Down
Expand Up
@@ -31,16 +30,16 @@
],
)
@pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
def test_compression_size(obj, method, compression_only):
def test_compression_size(obj, method, compression_only, temp_file ):
if compression_only == "tar":
compression_only = {"method": "tar", "mode": "w:gz"}
with tm.ensure_clean() as path:
getattr(obj, method)(path, compression=compression_only)
compressed_size = os.path.getsize(path)
getattr(obj, method)(path, compression=None)
uncompressed_size = os.path.getsize(path)
assert uncompressed_size > compressed_size
path = temp_file
getattr(obj, method)(path, compression=compression_only)
compressed_size = os.path.getsize(path)
getattr(obj, method)(path, compression=None)
uncompressed_size = os.path.getsize(path)
assert uncompressed_size > compressed_size
@pytest.mark.parametrize(
Expand All
@@ -54,22 +53,25 @@ def test_compression_size(obj, method, compression_only):
],
)
@pytest.mark.parametrize("method", ["to_csv", "to_json"])
def test_compression_size_fh(obj, method, compression_only):
with tm.ensure_clean() as path:
with icom.get_handle(
path,
"w:gz" if compression_only == "tar" else "w",
compression=compression_only,
) as handles:
getattr(obj, method)(handles.handle)
assert not handles.handle.closed
compressed_size = os.path.getsize(path)
with tm.ensure_clean() as path:
with icom.get_handle(path, "w", compression=None) as handles:
getattr(obj, method)(handles.handle)
assert not handles.handle.closed
uncompressed_size = os.path.getsize(path)
assert uncompressed_size > compressed_size
def test_compression_size_fh(obj, method, compression_only, temp_file):
path = temp_file
with icom.get_handle(
path,
"w:gz" if compression_only == "tar" else "w",
compression=compression_only,
) as handles:
getattr(obj, method)(handles.handle)
assert not handles.handle.closed
compressed_size = os.path.getsize(path)
# Create a new temporary file for uncompressed comparison
path2 = temp_file.parent / f"{temp_file.stem}_uncompressed{temp_file.suffix}"
path2.touch()
with icom.get_handle(path2, "w", compression=None) as handles:
getattr(obj, method)(handles.handle)
assert not handles.handle.closed
uncompressed_size = os.path.getsize(path2)
assert uncompressed_size > compressed_size
@pytest.mark.parametrize(
Expand All
@@ -81,14 +83,19 @@ def test_compression_size_fh(obj, method, compression_only):
],
)
def test_dataframe_compression_defaults_to_infer(
write_method, write_kwargs, read_method, compression_only, compression_to_extension
write_method,
write_kwargs,
read_method,
compression_only,
compression_to_extension,
temp_file,
):
# GH22004
input = pd.DataFrame([[1.0, 0, -4], [3.4, 5, 2]], columns=["X", "Y", "Z"])
extension = compression_to_extension[compression_only]
with tm.ensure_clean("compressed" + extension) as path:
getattr(input, write_method)(path, **write_kwargs)
output = read_method(path, compression=compression_only)
path = temp_file.parent / f"compressed{ extension}"
getattr(input, write_method)(path, **write_kwargs)
output = read_method(path, compression=compression_only)
tm.assert_frame_equal(output, input)
Expand All
@@ -107,37 +114,38 @@ def test_series_compression_defaults_to_infer(
read_kwargs,
compression_only,
compression_to_extension,
temp_file,
):
# GH22004
input = pd.Series([0, 5, -2, 10], name="X")
extension = compression_to_extension[compression_only]
with tm.ensure_clean("compressed" + extension) as path:
getattr(input, write_method)(path, **write_kwargs)
if "squeeze" in read_kwargs:
kwargs = read_kwargs.copy()
del kwargs["squeeze"]
output = read_method(path, compression=compression_only, **kwargs).squeeze(
"columns"
)
else:
output = read_method(path, compression=compression_only, **read_kwargs)
path = temp_file.parent / f"compressed{ extension}"
getattr(input, write_method)(path, **write_kwargs)
if "squeeze" in read_kwargs:
kwargs = read_kwargs.copy()
del kwargs["squeeze"]
output = read_method(path, compression=compression_only, **kwargs).squeeze(
"columns"
)
else:
output = read_method(path, compression=compression_only, **read_kwargs)
tm.assert_series_equal(output, input, check_names=False)
def test_compression_warning(compression_only):
def test_compression_warning(compression_only, temp_file ):
# Assert that passing a file object to to_csv while explicitly specifying a
# compression protocol triggers a RuntimeWarning, as per GH21227.
df = pd.DataFrame(
100 * [[0.123456, 0.234567, 0.567567], [12.32112, 123123.2, 321321.2]],
columns=["X", "Y", "Z"],
)
with tm.ensure_clean() as path:
with icom.get_handle(path, "w", compression=compression_only) as handles:
with tm.assert_produces_warning(RuntimeWarning, match="has no effect"):
df.to_csv(handles.handle, compression=compression_only)
path = temp_file
with icom.get_handle(path, "w", compression=compression_only) as handles:
with tm.assert_produces_warning(RuntimeWarning, match="has no effect"):
df.to_csv(handles.handle, compression=compression_only)
def test_compression_binary(compression_only):
def test_compression_binary(compression_only, temp_file ):
"""
Binary file handles support compression.
Expand All
@@ -150,13 +158,13 @@ def test_compression_binary(compression_only):
)
# with a file
with tm.ensure_clean() as path:
with open(path, mode="wb") as file:
df.to_csv(file, mode="wb", compression=compression_only)
file.seek(0) # file shouldn't be closed
tm.assert_frame_equal(
df, pd.read_csv(path, index_col=0, compression=compression_only)
)
path = temp_file
with open(path, mode="wb") as file:
df.to_csv(file, mode="wb", compression=compression_only)
file.seek(0) # file shouldn't be closed
tm.assert_frame_equal(
df, pd.read_csv(path, index_col=0, compression=compression_only)
)
# with BytesIO
file = io.BytesIO()
Expand All
@@ -167,7 +175,7 @@ def test_compression_binary(compression_only):
)
def test_gzip_reproducibility_file_name():
def test_gzip_reproducibility_file_name(temp_file ):
"""
Gzip should create reproducible archives with mtime.
Expand All
@@ -183,13 +191,12 @@ def test_gzip_reproducibility_file_name():
compression_options = {"method": "gzip", "mtime": 1}
# test for filename
with tm.ensure_clean() as path:
path = Path(path)
df.to_csv(path, compression=compression_options)
time.sleep(0.1)
output = path.read_bytes()
df.to_csv(path, compression=compression_options)
assert output == path.read_bytes()
path = temp_file
df.to_csv(path, compression=compression_options)
time.sleep(0.1)
output = path.read_bytes()
df.to_csv(path, compression=compression_options)
assert output == path.read_bytes()
def test_gzip_reproducibility_file_object():
Expand Down
Expand Up
@@ -259,14 +266,14 @@ def test_with_missing_lzma_runtime():
],
)
@pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
def test_gzip_compression_level(obj, method):
def test_gzip_compression_level(obj, method, temp_file ):
# GH33196
with tm.ensure_clean() as path:
getattr(obj, method)(path, compression="gzip")
compressed_size_default = os.path.getsize(path)
getattr(obj, method)(path, compression={"method": "gzip", "compresslevel": 1})
compressed_size_fast = os.path.getsize(path)
assert compressed_size_default < compressed_size_fast
path = temp_file
getattr(obj, method)(path, compression="gzip")
compressed_size_default = os.path.getsize(path)
getattr(obj, method)(path, compression={"method": "gzip", "compresslevel": 1})
compressed_size_fast = os.path.getsize(path)
assert compressed_size_default < compressed_size_fast
@pytest.mark.parametrize(
Expand All
@@ -280,15 +287,15 @@ def test_gzip_compression_level(obj, method):
],
)
@pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
def test_xz_compression_level_read(obj, method):
with tm.ensure_clean() as path:
getattr(obj, method)(path, compression="xz")
compressed_size_default = os.path.getsize(path)
getattr(obj, method)(path, compression={"method": "xz", "preset": 1})
compressed_size_fast = os.path.getsize(path)
assert compressed_size_default < compressed_size_fast
if method == "to_csv":
pd.read_csv(path, compression="xz")
def test_xz_compression_level_read(obj, method, temp_file ):
path = temp_file
getattr(obj, method)(path, compression="xz")
compressed_size_default = os.path.getsize(path)
getattr(obj, method)(path, compression={"method": "xz", "preset": 1})
compressed_size_fast = os.path.getsize(path)
assert compressed_size_default < compressed_size_fast
if method == "to_csv":
pd.read_csv(path, compression="xz")
@pytest.mark.parametrize(
Expand All
@@ -302,13 +309,13 @@ def test_xz_compression_level_read(obj, method):
],
)
@pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
def test_bzip_compression_level(obj, method):
def test_bzip_compression_level(obj, method, temp_file ):
"""GH33196 bzip needs file size > 100k to show a size difference between
compression levels, so here we just check if the call works when
compression is passed as a dict.
"""
with tm.ensure_clean() as path:
getattr(obj, method)(path, compression={"method": "bz2", "compresslevel": 1})
path = temp_file
getattr(obj, method)(path, compression={"method": "bz2", "compresslevel": 1})
@pytest.mark.parametrize(
Expand All
@@ -318,21 +325,21 @@ def test_bzip_compression_level(obj, method):
(".tar", tarfile.TarFile),
],
)
def test_empty_archive_zip(suffix, archive):
with tm.ensure_clean(filename=suffix) as path:
with archive(path, "w"):
pass
with pytest.raises(ValueError, match="Zero files found"):
pd.read_csv(path)
def test_empty_archive_zip(suffix, archive, temp_file ):
path = temp_file.parent / f"archive{suffix}"
with archive(path, "w"):
pass
with pytest.raises(ValueError, match="Zero files found"):
pd.read_csv(path)
def test_ambiguous_archive_zip():
with tm.ensure_clean(filename=" .zip") as path:
with zipfile.ZipFile(path, "w") as file:
file.writestr("a.csv", "foo,bar")
file.writestr("b.csv", "foo,bar")
with pytest.raises(ValueError, match="Multiple files found in ZIP file"):
pd.read_csv(path)
def test_ambiguous_archive_zip(temp_file ):
path = temp_file.parent / "archive .zip"
with zipfile.ZipFile(path, "w") as file:
file.writestr("a.csv", "foo,bar")
file.writestr("b.csv", "foo,bar")
with pytest.raises(ValueError, match="Multiple files found in ZIP file"):
pd.read_csv(path)
def test_ambiguous_archive_tar(tmp_path):
Expand All
@@ -352,24 +359,24 @@ def test_ambiguous_archive_tar(tmp_path):
pd.read_csv(tarpath)
def test_tar_gz_to_different_filename():
with tm.ensure_clean(filename=" .foo") as file:
pd.DataFrame(
[["1", "2"]],
columns=["foo", "bar"],
).to_csv(file, compression={"method": "tar", "mode": "w:gz"}, index=False)
with gzip.open(file) as uncompressed:
with tarfile.TarFile(fileobj=uncompressed) as archive:
members = archive.getmembers()
assert len(members) == 1
content = archive.extractfile(members[0]).read().decode("utf8")
if is_platform_windows():
expected = "foo,bar\r\n1,2\r\n"
else:
expected = "foo,bar\n1,2\n"
assert content == expected
def test_tar_gz_to_different_filename(temp_file ):
file = temp_file.parent / "archive .foo"
pd.DataFrame(
[["1", "2"]],
columns=["foo", "bar"],
).to_csv(file, compression={"method": "tar", "mode": "w:gz"}, index=False)
with gzip.open(file) as uncompressed:
with tarfile.TarFile(fileobj=uncompressed) as archive:
members = archive.getmembers()
assert len(members) == 1
content = archive.extractfile(members[0]).read().decode("utf8")
if is_platform_windows():
expected = "foo,bar\r\n1,2\r\n"
else:
expected = "foo,bar\n1,2\n"
assert content == expected
def test_tar_no_error_on_close():
Expand Down