diff --git a/pandas/tests/io/test_compression.py b/pandas/tests/io/test_compression.py index fd1e9b4fdf211..af250ced01d00 100644 --- a/pandas/tests/io/test_compression.py +++ b/pandas/tests/io/test_compression.py @@ -1,7 +1,6 @@ import gzip import io import os -from pathlib import Path import subprocess import sys import tarfile @@ -31,16 +30,16 @@ ], ) @pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"]) -def test_compression_size(obj, method, compression_only): +def test_compression_size(obj, method, compression_only, temp_file): if compression_only == "tar": compression_only = {"method": "tar", "mode": "w:gz"} - with tm.ensure_clean() as path: - getattr(obj, method)(path, compression=compression_only) - compressed_size = os.path.getsize(path) - getattr(obj, method)(path, compression=None) - uncompressed_size = os.path.getsize(path) - assert uncompressed_size> compressed_size + path = temp_file + getattr(obj, method)(path, compression=compression_only) + compressed_size = os.path.getsize(path) + getattr(obj, method)(path, compression=None) + uncompressed_size = os.path.getsize(path) + assert uncompressed_size> compressed_size @pytest.mark.parametrize( @@ -54,22 +53,25 @@ def test_compression_size(obj, method, compression_only): ], ) @pytest.mark.parametrize("method", ["to_csv", "to_json"]) -def test_compression_size_fh(obj, method, compression_only): - with tm.ensure_clean() as path: - with icom.get_handle( - path, - "w:gz" if compression_only == "tar" else "w", - compression=compression_only, - ) as handles: - getattr(obj, method)(handles.handle) - assert not handles.handle.closed - compressed_size = os.path.getsize(path) - with tm.ensure_clean() as path: - with icom.get_handle(path, "w", compression=None) as handles: - getattr(obj, method)(handles.handle) - assert not handles.handle.closed - uncompressed_size = os.path.getsize(path) - assert uncompressed_size> compressed_size +def test_compression_size_fh(obj, method, compression_only, temp_file): + path = temp_file + with icom.get_handle( + path, + "w:gz" if compression_only == "tar" else "w", + compression=compression_only, + ) as handles: + getattr(obj, method)(handles.handle) + assert not handles.handle.closed + compressed_size = os.path.getsize(path) + + # Create a new temporary file for uncompressed comparison + path2 = temp_file.parent / f"{temp_file.stem}_uncompressed{temp_file.suffix}" + path2.touch() + with icom.get_handle(path2, "w", compression=None) as handles: + getattr(obj, method)(handles.handle) + assert not handles.handle.closed + uncompressed_size = os.path.getsize(path2) + assert uncompressed_size> compressed_size @pytest.mark.parametrize( @@ -81,14 +83,19 @@ def test_compression_size_fh(obj, method, compression_only): ], ) def test_dataframe_compression_defaults_to_infer( - write_method, write_kwargs, read_method, compression_only, compression_to_extension + write_method, + write_kwargs, + read_method, + compression_only, + compression_to_extension, + temp_file, ): # GH22004 input = pd.DataFrame([[1.0, 0, -4], [3.4, 5, 2]], columns=["X", "Y", "Z"]) extension = compression_to_extension[compression_only] - with tm.ensure_clean("compressed" + extension) as path: - getattr(input, write_method)(path, **write_kwargs) - output = read_method(path, compression=compression_only) + path = temp_file.parent / f"compressed{extension}" + getattr(input, write_method)(path, **write_kwargs) + output = read_method(path, compression=compression_only) tm.assert_frame_equal(output, input) @@ -107,37 +114,38 @@ def test_series_compression_defaults_to_infer( read_kwargs, compression_only, compression_to_extension, + temp_file, ): # GH22004 input = pd.Series([0, 5, -2, 10], name="X") extension = compression_to_extension[compression_only] - with tm.ensure_clean("compressed" + extension) as path: - getattr(input, write_method)(path, **write_kwargs) - if "squeeze" in read_kwargs: - kwargs = read_kwargs.copy() - del kwargs["squeeze"] - output = read_method(path, compression=compression_only, **kwargs).squeeze( - "columns" - ) - else: - output = read_method(path, compression=compression_only, **read_kwargs) + path = temp_file.parent / f"compressed{extension}" + getattr(input, write_method)(path, **write_kwargs) + if "squeeze" in read_kwargs: + kwargs = read_kwargs.copy() + del kwargs["squeeze"] + output = read_method(path, compression=compression_only, **kwargs).squeeze( + "columns" + ) + else: + output = read_method(path, compression=compression_only, **read_kwargs) tm.assert_series_equal(output, input, check_names=False) -def test_compression_warning(compression_only): +def test_compression_warning(compression_only, temp_file): # Assert that passing a file object to to_csv while explicitly specifying a # compression protocol triggers a RuntimeWarning, as per GH21227. df = pd.DataFrame( 100 * [[0.123456, 0.234567, 0.567567], [12.32112, 123123.2, 321321.2]], columns=["X", "Y", "Z"], ) - with tm.ensure_clean() as path: - with icom.get_handle(path, "w", compression=compression_only) as handles: - with tm.assert_produces_warning(RuntimeWarning, match="has no effect"): - df.to_csv(handles.handle, compression=compression_only) + path = temp_file + with icom.get_handle(path, "w", compression=compression_only) as handles: + with tm.assert_produces_warning(RuntimeWarning, match="has no effect"): + df.to_csv(handles.handle, compression=compression_only) -def test_compression_binary(compression_only): +def test_compression_binary(compression_only, temp_file): """ Binary file handles support compression. @@ -150,13 +158,13 @@ def test_compression_binary(compression_only): ) # with a file - with tm.ensure_clean() as path: - with open(path, mode="wb") as file: - df.to_csv(file, mode="wb", compression=compression_only) - file.seek(0) # file shouldn't be closed - tm.assert_frame_equal( - df, pd.read_csv(path, index_col=0, compression=compression_only) - ) + path = temp_file + with open(path, mode="wb") as file: + df.to_csv(file, mode="wb", compression=compression_only) + file.seek(0) # file shouldn't be closed + tm.assert_frame_equal( + df, pd.read_csv(path, index_col=0, compression=compression_only) + ) # with BytesIO file = io.BytesIO() @@ -167,7 +175,7 @@ def test_compression_binary(compression_only): ) -def test_gzip_reproducibility_file_name(): +def test_gzip_reproducibility_file_name(temp_file): """ Gzip should create reproducible archives with mtime. @@ -183,13 +191,12 @@ def test_gzip_reproducibility_file_name(): compression_options = {"method": "gzip", "mtime": 1} # test for filename - with tm.ensure_clean() as path: - path = Path(path) - df.to_csv(path, compression=compression_options) - time.sleep(0.1) - output = path.read_bytes() - df.to_csv(path, compression=compression_options) - assert output == path.read_bytes() + path = temp_file + df.to_csv(path, compression=compression_options) + time.sleep(0.1) + output = path.read_bytes() + df.to_csv(path, compression=compression_options) + assert output == path.read_bytes() def test_gzip_reproducibility_file_object(): @@ -259,14 +266,14 @@ def test_with_missing_lzma_runtime(): ], ) @pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"]) -def test_gzip_compression_level(obj, method): +def test_gzip_compression_level(obj, method, temp_file): # GH33196 - with tm.ensure_clean() as path: - getattr(obj, method)(path, compression="gzip") - compressed_size_default = os.path.getsize(path) - getattr(obj, method)(path, compression={"method": "gzip", "compresslevel": 1}) - compressed_size_fast = os.path.getsize(path) - assert compressed_size_default < compressed_size_fast + path = temp_file + getattr(obj, method)(path, compression="gzip") + compressed_size_default = os.path.getsize(path) + getattr(obj, method)(path, compression={"method": "gzip", "compresslevel": 1}) + compressed_size_fast = os.path.getsize(path) + assert compressed_size_default < compressed_size_fast @pytest.mark.parametrize( @@ -280,15 +287,15 @@ def test_gzip_compression_level(obj, method): ], ) @pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"]) -def test_xz_compression_level_read(obj, method): - with tm.ensure_clean() as path: - getattr(obj, method)(path, compression="xz") - compressed_size_default = os.path.getsize(path) - getattr(obj, method)(path, compression={"method": "xz", "preset": 1}) - compressed_size_fast = os.path.getsize(path) - assert compressed_size_default < compressed_size_fast - if method == "to_csv": - pd.read_csv(path, compression="xz") +def test_xz_compression_level_read(obj, method, temp_file): + path = temp_file + getattr(obj, method)(path, compression="xz") + compressed_size_default = os.path.getsize(path) + getattr(obj, method)(path, compression={"method": "xz", "preset": 1}) + compressed_size_fast = os.path.getsize(path) + assert compressed_size_default < compressed_size_fast + if method == "to_csv": + pd.read_csv(path, compression="xz") @pytest.mark.parametrize( @@ -302,13 +309,13 @@ def test_xz_compression_level_read(obj, method): ], ) @pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"]) -def test_bzip_compression_level(obj, method): +def test_bzip_compression_level(obj, method, temp_file): """GH33196 bzip needs file size> 100k to show a size difference between compression levels, so here we just check if the call works when compression is passed as a dict. """ - with tm.ensure_clean() as path: - getattr(obj, method)(path, compression={"method": "bz2", "compresslevel": 1}) + path = temp_file + getattr(obj, method)(path, compression={"method": "bz2", "compresslevel": 1}) @pytest.mark.parametrize( @@ -318,21 +325,21 @@ def test_bzip_compression_level(obj, method): (".tar", tarfile.TarFile), ], ) -def test_empty_archive_zip(suffix, archive): - with tm.ensure_clean(filename=suffix) as path: - with archive(path, "w"): - pass - with pytest.raises(ValueError, match="Zero files found"): - pd.read_csv(path) +def test_empty_archive_zip(suffix, archive, temp_file): + path = temp_file.parent / f"archive{suffix}" + with archive(path, "w"): + pass + with pytest.raises(ValueError, match="Zero files found"): + pd.read_csv(path) -def test_ambiguous_archive_zip(): - with tm.ensure_clean(filename=".zip") as path: - with zipfile.ZipFile(path, "w") as file: - file.writestr("a.csv", "foo,bar") - file.writestr("b.csv", "foo,bar") - with pytest.raises(ValueError, match="Multiple files found in ZIP file"): - pd.read_csv(path) +def test_ambiguous_archive_zip(temp_file): + path = temp_file.parent / "archive.zip" + with zipfile.ZipFile(path, "w") as file: + file.writestr("a.csv", "foo,bar") + file.writestr("b.csv", "foo,bar") + with pytest.raises(ValueError, match="Multiple files found in ZIP file"): + pd.read_csv(path) def test_ambiguous_archive_tar(tmp_path): @@ -352,24 +359,24 @@ def test_ambiguous_archive_tar(tmp_path): pd.read_csv(tarpath) -def test_tar_gz_to_different_filename(): - with tm.ensure_clean(filename=".foo") as file: - pd.DataFrame( - [["1", "2"]], - columns=["foo", "bar"], - ).to_csv(file, compression={"method": "tar", "mode": "w:gz"}, index=False) - with gzip.open(file) as uncompressed: - with tarfile.TarFile(fileobj=uncompressed) as archive: - members = archive.getmembers() - assert len(members) == 1 - content = archive.extractfile(members[0]).read().decode("utf8") - - if is_platform_windows(): - expected = "foo,bar\r\n1,2\r\n" - else: - expected = "foo,bar\n1,2\n" - - assert content == expected +def test_tar_gz_to_different_filename(temp_file): + file = temp_file.parent / "archive.foo" + pd.DataFrame( + [["1", "2"]], + columns=["foo", "bar"], + ).to_csv(file, compression={"method": "tar", "mode": "w:gz"}, index=False) + with gzip.open(file) as uncompressed: + with tarfile.TarFile(fileobj=uncompressed) as archive: + members = archive.getmembers() + assert len(members) == 1 + content = archive.extractfile(members[0]).read().decode("utf8") + + if is_platform_windows(): + expected = "foo,bar\r\n1,2\r\n" + else: + expected = "foo,bar\n1,2\n" + + assert content == expected def test_tar_no_error_on_close(): diff --git a/pandas/tests/io/test_pickle.py b/pandas/tests/io/test_pickle.py index bab2c1561eb99..52878d71777b2 100644 --- a/pandas/tests/io/test_pickle.py +++ b/pandas/tests/io/test_pickle.py @@ -165,26 +165,26 @@ def flatten(data: dict) -> list[tuple[str, Any]]: ) @pytest.mark.parametrize("writer", [pd.to_pickle, python_pickler]) @pytest.mark.parametrize("typ, expected", flatten(create_pickle_data())) -def test_round_trip_current(typ, expected, pickle_writer, writer): - with tm.ensure_clean() as path: - # test writing with each pickler - pickle_writer(expected, path) +def test_round_trip_current(typ, expected, pickle_writer, writer, temp_file): + path = temp_file + # test writing with each pickler + pickle_writer(expected, path) - # test reading with each unpickler - result = pd.read_pickle(path) - compare_element(result, expected, typ) + # test reading with each unpickler + result = pd.read_pickle(path) + compare_element(result, expected, typ) - result = python_unpickler(path) - compare_element(result, expected, typ) + result = python_unpickler(path) + compare_element(result, expected, typ) - # and the same for file objects (GH 35679) - with open(path, mode="wb") as handle: - writer(expected, path) - handle.seek(0) # shouldn't close file handle - with open(path, mode="rb") as handle: - result = pd.read_pickle(handle) - handle.seek(0) # shouldn't close file handle - compare_element(result, expected, typ) + # and the same for file objects (GH 35679) + with open(path, mode="wb") as handle: + writer(expected, path) + handle.seek(0) # shouldn't close file handle + with open(path, mode="rb") as handle: + result = pd.read_pickle(handle) + handle.seek(0) # shouldn't close file handle + compare_element(result, expected, typ) def test_pickle_path_pathlib(): @@ -242,112 +242,100 @@ def compress_file(self, src_path, dest_path, compression): with f: f.write(fh.read()) - def test_write_explicit(self, compression, get_random_path): - base = get_random_path - path1 = base + ".compressed" - path2 = base + ".raw" - - with tm.ensure_clean(path1) as p1, tm.ensure_clean(path2) as p2: - df = DataFrame( - 1.1 * np.arange(120).reshape((30, 4)), - columns=Index(list("ABCD"), dtype=object), - index=Index([f"i-{i}" for i in range(30)], dtype=object), - ) + def test_write_explicit(self, compression, get_random_path, temp_file): + p1 = temp_file.parent / f"{temp_file.stem}.compressed" + p2 = temp_file.parent / f"{temp_file.stem}.raw" + df = DataFrame( + 1.1 * np.arange(120).reshape((30, 4)), + columns=Index(list("ABCD"), dtype=object), + index=Index([f"i-{i}" for i in range(30)], dtype=object), + ) - # write to compressed file - df.to_pickle(p1, compression=compression) + # write to compressed file + df.to_pickle(p1, compression=compression) - # decompress - with tm.decompress_file(p1, compression=compression) as f: - with open(p2, "wb") as fh: - fh.write(f.read()) + # decompress + with tm.decompress_file(p1, compression=compression) as f: + with open(p2, "wb") as fh: + fh.write(f.read()) - # read decompressed file - df2 = pd.read_pickle(p2, compression=None) + # read decompressed file + df2 = pd.read_pickle(p2, compression=None) - tm.assert_frame_equal(df, df2) + tm.assert_frame_equal(df, df2) @pytest.mark.parametrize("compression", ["", "None", "bad", "7z"]) - def test_write_explicit_bad(self, compression, get_random_path): + def test_write_explicit_bad(self, compression, get_random_path, temp_file): df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) - with tm.ensure_clean(get_random_path) as path: - with pytest.raises(ValueError, match="Unrecognized compression type"): - df.to_pickle(path, compression=compression) - - def test_write_infer(self, compression_ext, get_random_path): - base = get_random_path - path1 = base + compression_ext - path2 = base + ".raw" - compression = self._extension_to_compression.get(compression_ext.lower()) - - with tm.ensure_clean(path1) as p1, tm.ensure_clean(path2) as p2: - df = DataFrame( - 1.1 * np.arange(120).reshape((30, 4)), - columns=Index(list("ABCD"), dtype=object), - index=Index([f"i-{i}" for i in range(30)], dtype=object), - ) + path = temp_file + with pytest.raises(ValueError, match="Unrecognized compression type"): + df.to_pickle(path, compression=compression) - # write to compressed file by inferred compression method - df.to_pickle(p1) + def test_write_infer(self, compression_ext, get_random_path, temp_file): + p1 = temp_file.parent / f"{temp_file.stem}{compression_ext}" + p2 = temp_file.parent / f"{temp_file.stem}.raw" + compression = self._extension_to_compression.get(compression_ext.lower()) + df = DataFrame( + 1.1 * np.arange(120).reshape((30, 4)), + columns=Index(list("ABCD"), dtype=object), + index=Index([f"i-{i}" for i in range(30)], dtype=object), + ) - # decompress - with tm.decompress_file(p1, compression=compression) as f: - with open(p2, "wb") as fh: - fh.write(f.read()) + # write to compressed file by inferred compression method + df.to_pickle(p1) - # read decompressed file - df2 = pd.read_pickle(p2, compression=None) + # decompress + with tm.decompress_file(p1, compression=compression) as f: + with open(p2, "wb") as fh: + fh.write(f.read()) - tm.assert_frame_equal(df, df2) + # read decompressed file + df2 = pd.read_pickle(p2, compression=None) - def test_read_explicit(self, compression, get_random_path): - base = get_random_path - path1 = base + ".raw" - path2 = base + ".compressed" + tm.assert_frame_equal(df, df2) - with tm.ensure_clean(path1) as p1, tm.ensure_clean(path2) as p2: - df = DataFrame( - 1.1 * np.arange(120).reshape((30, 4)), - columns=Index(list("ABCD"), dtype=object), - index=Index([f"i-{i}" for i in range(30)], dtype=object), - ) + def test_read_explicit(self, compression, get_random_path, temp_file): + p1 = temp_file.parent / f"{temp_file.stem}.raw" + p2 = temp_file.parent / f"{temp_file.stem}.compressed" + df = DataFrame( + 1.1 * np.arange(120).reshape((30, 4)), + columns=Index(list("ABCD"), dtype=object), + index=Index([f"i-{i}" for i in range(30)], dtype=object), + ) - # write to uncompressed file - df.to_pickle(p1, compression=None) + # write to uncompressed file + df.to_pickle(p1, compression=None) - # compress - self.compress_file(p1, p2, compression=compression) + # compress + self.compress_file(p1, p2, compression=compression) - # read compressed file - df2 = pd.read_pickle(p2, compression=compression) - tm.assert_frame_equal(df, df2) + # read compressed file + df2 = pd.read_pickle(p2, compression=compression) + tm.assert_frame_equal(df, df2) - def test_read_infer(self, compression_ext, get_random_path): - base = get_random_path - path1 = base + ".raw" - path2 = base + compression_ext + def test_read_infer(self, compression_ext, get_random_path, temp_file): + p1 = temp_file.parent / f"{temp_file.stem}.raw" + p2 = temp_file.parent / f"{temp_file.stem}{compression_ext}" compression = self._extension_to_compression.get(compression_ext.lower()) + df = DataFrame( + 1.1 * np.arange(120).reshape((30, 4)), + columns=Index(list("ABCD"), dtype=object), + index=Index([f"i-{i}" for i in range(30)], dtype=object), + ) - with tm.ensure_clean(path1) as p1, tm.ensure_clean(path2) as p2: - df = DataFrame( - 1.1 * np.arange(120).reshape((30, 4)), - columns=Index(list("ABCD"), dtype=object), - index=Index([f"i-{i}" for i in range(30)], dtype=object), - ) - - # write to uncompressed file - df.to_pickle(p1, compression=None) + # write to uncompressed file + df.to_pickle(p1, compression=None) - # compress - self.compress_file(p1, p2, compression=compression) + # compress + self.compress_file(p1, p2, compression=compression) - # read compressed file by inferred compression method - df2 = pd.read_pickle(p2) - tm.assert_frame_equal(df, df2) + # read compressed file by inferred compression method + df2 = pd.read_pickle(p2) + tm.assert_frame_equal(df, df2) # --------------------- @@ -357,44 +345,44 @@ def test_read_infer(self, compression_ext, get_random_path): class TestProtocol: @pytest.mark.parametrize("protocol", [-1, 0, 1, 2]) - def test_read(self, protocol, get_random_path): - with tm.ensure_clean(get_random_path) as path: - df = DataFrame( - 1.1 * np.arange(120).reshape((30, 4)), - columns=Index(list("ABCD"), dtype=object), - index=Index([f"i-{i}" for i in range(30)], dtype=object), - ) - df.to_pickle(path, protocol=protocol) - df2 = pd.read_pickle(path) - tm.assert_frame_equal(df, df2) - - -def test_pickle_buffer_roundtrip(): - with tm.ensure_clean() as path: + def test_read(self, protocol, get_random_path, temp_file): + path = temp_file df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) - with open(path, "wb") as fh: - df.to_pickle(fh) - with open(path, "rb") as fh: - result = pd.read_pickle(fh) - tm.assert_frame_equal(df, result) + df.to_pickle(path, protocol=protocol) + df2 = pd.read_pickle(path) + tm.assert_frame_equal(df, df2) + + +def test_pickle_buffer_roundtrip(temp_file): + path = temp_file + df = DataFrame( + 1.1 * np.arange(120).reshape((30, 4)), + columns=Index(list("ABCD"), dtype=object), + index=Index([f"i-{i}" for i in range(30)], dtype=object), + ) + with open(path, "wb") as fh: + df.to_pickle(fh) + with open(path, "rb") as fh: + result = pd.read_pickle(fh) + tm.assert_frame_equal(df, result) -def test_pickle_fsspec_roundtrip(): +def test_pickle_fsspec_roundtrip(temp_file): pytest.importorskip("fsspec") - with tm.ensure_clean(): - mockurl = "memory://mockfile" - df = DataFrame( - 1.1 * np.arange(120).reshape((30, 4)), - columns=Index(list("ABCD"), dtype=object), - index=Index([f"i-{i}" for i in range(30)], dtype=object), - ) - df.to_pickle(mockurl) - result = pd.read_pickle(mockurl) - tm.assert_frame_equal(df, result) + # Using temp_file for context, but fsspec uses memory URL + mockurl = "memory://mockfile" + df = DataFrame( + 1.1 * np.arange(120).reshape((30, 4)), + columns=Index(list("ABCD"), dtype=object), + index=Index([f"i-{i}" for i in range(30)], dtype=object), + ) + df.to_pickle(mockurl) + result = pd.read_pickle(mockurl) + tm.assert_frame_equal(df, result) class MyTz(datetime.tzinfo): @@ -411,7 +399,7 @@ def test_read_pickle_with_subclass(): assert isinstance(result[1], MyTz) -def test_pickle_binary_object_compression(compression): +def test_pickle_binary_object_compression(compression, temp_file): """ Read/write from binary file-objects w/wo compression. @@ -424,9 +412,9 @@ def test_pickle_binary_object_compression(compression): ) # reference for compression - with tm.ensure_clean() as path: - df.to_pickle(path, compression=compression) - reference = Path(path).read_bytes() + path = temp_file + df.to_pickle(path, compression=compression) + reference = path.read_bytes() # write buffer = io.BytesIO() diff --git a/pandas/tests/io/xml/test_to_xml.py b/pandas/tests/io/xml/test_to_xml.py index ccd7e5d894943..a10c4d684bba0 100644 --- a/pandas/tests/io/xml/test_to_xml.py +++ b/pandas/tests/io/xml/test_to_xml.py @@ -170,30 +170,30 @@ def parser(request): # FILE OUTPUT -def test_file_output_str_read(xml_books, parser, from_file_expected): +def test_file_output_str_read(xml_books, parser, from_file_expected, tmp_path): df_file = read_xml(xml_books, parser=parser) - with tm.ensure_clean("test.xml") as path: - df_file.to_xml(path, parser=parser) - with open(path, "rb") as f: - output = f.read().decode("utf-8").strip() + path = tmp_path / "test.xml" + df_file.to_xml(path, parser=parser) + with open(path, "rb") as f: + output = f.read().decode("utf-8").strip() - output = equalize_decl(output) + output = equalize_decl(output) - assert output == from_file_expected + assert output == from_file_expected -def test_file_output_bytes_read(xml_books, parser, from_file_expected): +def test_file_output_bytes_read(xml_books, parser, from_file_expected, tmp_path): df_file = read_xml(xml_books, parser=parser) - with tm.ensure_clean("test.xml") as path: - df_file.to_xml(path, parser=parser) - with open(path, "rb") as f: - output = f.read().decode("utf-8").strip() + path = tmp_path / "test.xml" + df_file.to_xml(path, parser=parser) + with open(path, "rb") as f: + output = f.read().decode("utf-8").strip() - output = equalize_decl(output) + output = equalize_decl(output) - assert output == from_file_expected + assert output == from_file_expected def test_str_output(xml_books, parser, from_file_expected): @@ -218,7 +218,7 @@ def test_wrong_file_path(parser, geom_df): # INDEX -def test_index_false(xml_books, parser): +def test_index_false(xml_books, parser, tmp_path): expected = """\ @@ -247,17 +247,17 @@ def test_index_false(xml_books, parser): df_file = read_xml(xml_books, parser=parser) - with tm.ensure_clean("test.xml") as path: - df_file.to_xml(path, index=False, parser=parser) - with open(path, "rb") as f: - output = f.read().decode("utf-8").strip() + path = tmp_path / "test.xml" + df_file.to_xml(path, index=False, parser=parser) + with open(path, "rb") as f: + output = f.read().decode("utf-8").strip() - output = equalize_decl(output) + output = equalize_decl(output) - assert output == expected + assert output == expected -def test_index_false_rename_row_root(xml_books, parser): +def test_index_false_rename_row_root(xml_books, parser, tmp_path): expected = """\ @@ -286,12 +286,10 @@ def test_index_false_rename_row_root(xml_books, parser): df_file = read_xml(xml_books, parser=parser) - with tm.ensure_clean("test.xml") as path: - df_file.to_xml( - path, index=False, root_name="books", row_name="book", parser=parser - ) - with open(path, "rb") as f: - output = f.read().decode("utf-8").strip() + path = tmp_path / "test.xml" + df_file.to_xml(path, index=False, root_name="books", row_name="book", parser=parser) + with open(path, "rb") as f: + output = f.read().decode("utf-8").strip() output = equalize_decl(output) @@ -864,21 +862,21 @@ def test_encoding_option_str(xml_baby_names, parser): assert output == encoding_expected -def test_correct_encoding_file(xml_baby_names): +def test_correct_encoding_file(xml_baby_names, tmp_path): pytest.importorskip("lxml") df_file = read_xml(xml_baby_names, encoding="ISO-8859-1", parser="lxml") - with tm.ensure_clean("test.xml") as path: - df_file.to_xml(path, index=False, encoding="ISO-8859-1", parser="lxml") + path = tmp_path / "test.xml" + df_file.to_xml(path, index=False, encoding="ISO-8859-1", parser="lxml") @pytest.mark.parametrize("encoding", ["UTF-8", "UTF-16", "ISO-8859-1"]) -def test_wrong_encoding_option_lxml(xml_baby_names, parser, encoding): +def test_wrong_encoding_option_lxml(xml_baby_names, parser, encoding, tmp_path): pytest.importorskip("lxml") df_file = read_xml(xml_baby_names, encoding="ISO-8859-1", parser="lxml") - with tm.ensure_clean("test.xml") as path: - df_file.to_xml(path, index=False, encoding=encoding, parser=parser) + path = tmp_path / "test.xml" + df_file.to_xml(path, index=False, encoding=encoding, parser=parser) def test_misspelled_encoding(parser, geom_df): @@ -1125,7 +1123,7 @@ def test_incorrect_xsl_eval(geom_df): geom_df.to_xml(stylesheet=StringIO(xsl)) -def test_incorrect_xsl_apply(geom_df): +def test_incorrect_xsl_apply(geom_df, tmp_path): lxml_etree = pytest.importorskip("lxml.etree") xsl = """\ @@ -1141,8 +1139,8 @@ def test_incorrect_xsl_apply(geom_df): """ with pytest.raises(lxml_etree.XSLTApplyError, match="Cannot resolve URI"): - with tm.ensure_clean("test.xml") as path: - geom_df.to_xml(path, stylesheet=StringIO(xsl)) + path = tmp_path / "test.xml" + geom_df.to_xml(path, stylesheet=StringIO(xsl)) def test_stylesheet_with_etree(geom_df): @@ -1296,16 +1294,16 @@ def test_style_to_json(geom_df): """ -def test_compression_output(parser, compression_only, geom_df): - with tm.ensure_clean() as path: - geom_df.to_xml(path, parser=parser, compression=compression_only) +def test_compression_output(parser, compression_only, geom_df, temp_file): + path = temp_file + geom_df.to_xml(path, parser=parser, compression=compression_only) - with get_handle( - path, - "r", - compression=compression_only, - ) as handle_obj: - output = handle_obj.handle.read() + with get_handle( + path, + "r", + compression=compression_only, + ) as handle_obj: + output = handle_obj.handle.read() output = equalize_decl(output) @@ -1313,18 +1311,18 @@ def test_compression_output(parser, compression_only, geom_df): def test_filename_and_suffix_comp( - parser, compression_only, geom_df, compression_to_extension + parser, compression_only, geom_df, compression_to_extension, tmp_path ): compfile = "xml." + compression_to_extension[compression_only] - with tm.ensure_clean(filename=compfile) as path: - geom_df.to_xml(path, parser=parser, compression=compression_only) + path = tmp_path / compfile + geom_df.to_xml(path, parser=parser, compression=compression_only) - with get_handle( - path, - "r", - compression=compression_only, - ) as handle_obj: - output = handle_obj.handle.read() + with get_handle( + path, + "r", + compression=compression_only, + ) as handle_obj: + output = handle_obj.handle.read() output = equalize_decl(output) @@ -1345,10 +1343,10 @@ def test_ea_dtypes(any_numeric_ea_dtype, parser): assert equalize_decl(result).strip() == expected -def test_unsupported_compression(parser, geom_df): +def test_unsupported_compression(parser, geom_df, temp_file): with pytest.raises(ValueError, match="Unrecognized compression type"): - with tm.ensure_clean() as path: - geom_df.to_xml(path, parser=parser, compression="7z") + path = temp_file + geom_df.to_xml(path, parser=parser, compression="7z") # STORAGE OPTIONS

AltStyle によって変換されたページ (->オリジナル) /