Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

TST: Assist Replace ensure_clean utility function with the temp_file pytest fixture #62435 #62461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
pandeconscious wants to merge 5 commits into pandas-dev:main
base: main
Choose a base branch
Loading
from pandeconscious:main
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 141 additions & 139 deletions pandas/tests/frame/methods/test_to_csv.py
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def _return_result_expected(
self,
df,
chunksize,
temp_file,
r_dtype=None,
c_dtype=None,
rnlvl=None,
Expand All @@ -260,15 +261,15 @@ def _return_result_expected(
kwargs["index_col"] = list(range(rnlvl))
kwargs["header"] = list(range(cnlvl))

with tm.ensure_clean("__tmp_to_csv_moar__") as path:
df.to_csv(path, encoding="utf8", chunksize=chunksize)
recons = self.read_csv(path, **kwargs)
path = str(temp_file)
df.to_csv(path, encoding="utf8", chunksize=chunksize)
recons = self.read_csv(path, **kwargs)
else:
kwargs["header"] = 0

with tm.ensure_clean("__tmp_to_csv_moar__") as path:
df.to_csv(path, encoding="utf8", chunksize=chunksize)
recons = self.read_csv(path, **kwargs)
path = str(temp_file)
df.to_csv(path, encoding="utf8", chunksize=chunksize)
recons = self.read_csv(path, **kwargs)

def _to_uni(x):
if not isinstance(x, str):
Expand Down Expand Up @@ -353,13 +354,13 @@ def _to_uni(x):
@pytest.mark.parametrize(
"nrows", [2, 10, 99, 100, 101, 102, 198, 199, 200, 201, 202, 249, 250, 251]
)
def test_to_csv_nrows(self, nrows):
def test_to_csv_nrows(self, nrows, temp_file):
df = DataFrame(
np.ones((nrows, 4)),
index=date_range("2020年01月01日", periods=nrows),
columns=Index(list("abcd"), dtype=object),
)
result, expected = self._return_result_expected(df, 1000, "dt", "s")
result, expected = self._return_result_expected(df, 1000, temp_file, "dt", "s")
expected.index = expected.index.astype("M8[ns]")
tm.assert_frame_equal(result, expected, check_names=False)

Expand All @@ -372,7 +373,7 @@ def test_to_csv_nrows(self, nrows):
)
@pytest.mark.parametrize("ncols", [1, 2, 3, 4])
@pytest.mark.filterwarnings(r"ignore:PeriodDtype\[B\] is deprecated:FutureWarning")
def test_to_csv_idx_types(self, nrows, r_idx_type, c_idx_type, ncols):
def test_to_csv_idx_types(self, nrows, r_idx_type, c_idx_type, ncols, temp_file):
axes = {
"i": lambda n: Index(np.arange(n), dtype=np.int64),
"s": lambda n: Index([f"{i}_{chr(i)}" for i in range(97, 97 + n)]),
Expand All @@ -387,6 +388,7 @@ def test_to_csv_idx_types(self, nrows, r_idx_type, c_idx_type, ncols):
result, expected = self._return_result_expected(
df,
1000,
temp_file,
r_idx_type,
c_idx_type,
)
Expand All @@ -401,18 +403,18 @@ def test_to_csv_idx_types(self, nrows, r_idx_type, c_idx_type, ncols):
"nrows", [10, 98, 99, 100, 101, 102, 198, 199, 200, 201, 202, 249, 250, 251]
)
@pytest.mark.parametrize("ncols", [1, 2, 3, 4])
def test_to_csv_idx_ncols(self, nrows, ncols):
def test_to_csv_idx_ncols(self, nrows, ncols, temp_file):
df = DataFrame(
np.ones((nrows, ncols)),
index=Index([f"i-{i}" for i in range(nrows)], name="a"),
columns=Index([f"i-{i}" for i in range(ncols)], name="a"),
)
result, expected = self._return_result_expected(df, 1000)
result, expected = self._return_result_expected(df, 1000, temp_file)
tm.assert_frame_equal(result, expected, check_names=False)

@pytest.mark.slow
@pytest.mark.parametrize("nrows", [10, 98, 99, 100, 101, 102])
def test_to_csv_dup_cols(self, nrows):
def test_to_csv_dup_cols(self, nrows, temp_file):
df = DataFrame(
np.ones((nrows, 3)),
index=Index([f"i-{i}" for i in range(nrows)], name="a"),
Expand All @@ -427,25 +429,29 @@ def test_to_csv_dup_cols(self, nrows):
ix[-2:] = ["rdupe", "rdupe"]
df.index = ix
df.columns = cols
result, expected = self._return_result_expected(df, 1000, dupe_col=True)
result, expected = self._return_result_expected(
df, 1000, temp_file, dupe_col=True
)
tm.assert_frame_equal(result, expected, check_names=False)

@pytest.mark.slow
def test_to_csv_empty(self):
def test_to_csv_empty(self, temp_file):
df = DataFrame(index=np.arange(10, dtype=np.int64))
result, expected = self._return_result_expected(df, 1000)
result, expected = self._return_result_expected(df, 1000, temp_file)
tm.assert_frame_equal(result, expected, check_column_type=False)

@pytest.mark.slow
def test_to_csv_chunksize(self):
def test_to_csv_chunksize(self, temp_file):
chunksize = 1000
rows = chunksize // 2 + 1
df = DataFrame(
np.ones((rows, 2)),
columns=Index(list("ab")),
index=MultiIndex.from_arrays([range(rows) for _ in range(2)]),
)
result, expected = self._return_result_expected(df, chunksize, rnlvl=2)
result, expected = self._return_result_expected(
df, chunksize, temp_file, rnlvl=2
)
tm.assert_frame_equal(result, expected, check_names=False)

@pytest.mark.slow
Expand All @@ -461,7 +467,7 @@ def test_to_csv_chunksize(self):
[{"r_idx_nlevels": 2, "c_idx_nlevels": 2}, {"rnlvl": 2, "cnlvl": 2}],
],
)
def test_to_csv_params(self, nrows, df_params, func_params, ncols):
def test_to_csv_params(self, nrows, df_params, func_params, ncols, temp_file):
if df_params.get("r_idx_nlevels"):
index = MultiIndex.from_arrays(
[f"i-{i}" for i in range(nrows)]
Expand All @@ -478,7 +484,9 @@ def test_to_csv_params(self, nrows, df_params, func_params, ncols):
else:
columns = Index([f"i-{i}" for i in range(ncols)])
df = DataFrame(np.ones((nrows, ncols)), index=index, columns=columns)
result, expected = self._return_result_expected(df, 1000, **func_params)
result, expected = self._return_result_expected(
df, 1000, temp_file, **func_params
)
tm.assert_frame_equal(result, expected, check_names=False)

def test_to_csv_from_csv_w_some_infs(self, temp_file, float_frame):
Expand Down Expand Up @@ -595,108 +603,104 @@ def test_to_csv_multiindex(self, temp_file, float_frame, datetime_frame):
# needed if setUp becomes class method
datetime_frame.index = old_index

with tm.ensure_clean("__tmp_to_csv_multiindex__") as path:
# GH3571, GH1651, GH3141

def _make_frame(names=None):
if names is True:
names = ["first", "second"]
return DataFrame(
np.random.default_rng(2).integers(0, 10, size=(3, 3)),
columns=MultiIndex.from_tuples(
[("bah", "foo"), ("bah", "bar"), ("ban", "baz")], names=names
),
dtype="int64",
)

# column & index are multi-index
df = DataFrame(
np.ones((5, 3)),
columns=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(3)] for _ in range(4)], names=list("abcd")
),
index=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(5)] for _ in range(2)], names=list("ab")
def _make_frame(names=None):
if names is True:
names = ["first", "second"]
return DataFrame(
np.random.default_rng(2).integers(0, 10, size=(3, 3)),
columns=MultiIndex.from_tuples(
[("bah", "foo"), ("bah", "bar"), ("ban", "baz")], names=names
),
dtype="int64",
)
df.to_csv(path)
result = read_csv(path, header=[0, 1, 2, 3], index_col=[0, 1])
tm.assert_frame_equal(df, result)

# column is mi
df = DataFrame(
np.ones((5, 3)),
columns=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(3)] for _ in range(4)], names=list("abcd")
),
)
df.to_csv(path)
result = read_csv(path, header=[0, 1, 2, 3], index_col=0)
tm.assert_frame_equal(df, result)

# dup column names?
df = DataFrame(
np.ones((5, 3)),
columns=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(3)] for _ in range(4)], names=list("abcd")
),
index=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(5)] for _ in range(3)], names=list("abc")
),
)
df.to_csv(path)
result = read_csv(path, header=[0, 1, 2, 3], index_col=[0, 1, 2])
tm.assert_frame_equal(df, result)

# writing with no index
df = _make_frame()
df.to_csv(path, index=False)
result = read_csv(path, header=[0, 1])
tm.assert_frame_equal(df, result)

# we lose the names here
df = _make_frame(True)
df.to_csv(path, index=False)
result = read_csv(path, header=[0, 1])
assert com.all_none(*result.columns.names)
result.columns.names = df.columns.names
tm.assert_frame_equal(df, result)

# whatsnew example
df = _make_frame()
df.to_csv(path)
result = read_csv(path, header=[0, 1], index_col=[0])
tm.assert_frame_equal(df, result)

df = _make_frame(True)
df.to_csv(path)
result = read_csv(path, header=[0, 1], index_col=[0])
tm.assert_frame_equal(df, result)

# invalid options
df = _make_frame(True)
df.to_csv(path)

for i in [6, 7]:
msg = f"len of {i}, but only 5 lines in file"
with pytest.raises(ParserError, match=msg):
read_csv(path, header=list(range(i)), index_col=0)

# write with cols
msg = "cannot specify cols with a MultiIndex"
with pytest.raises(TypeError, match=msg):
df.to_csv(path, columns=["foo", "bar"])

with tm.ensure_clean("__tmp_to_csv_multiindex__") as path:
# empty
tsframe[:0].to_csv(path)
recons = self.read_csv(path)

exp = tsframe[:0]
exp.index = []

tm.assert_index_equal(recons.columns, exp.columns)
assert len(recons) == 0

# column & index are multi-index
df = DataFrame(
np.ones((5, 3)),
columns=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(3)] for _ in range(4)], names=list("abcd")
),
index=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(5)] for _ in range(2)], names=list("ab")
),
)
df.to_csv(path)
result = read_csv(path, header=[0, 1, 2, 3], index_col=[0, 1])
tm.assert_frame_equal(df, result)

# column is mi
df = DataFrame(
np.ones((5, 3)),
columns=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(3)] for _ in range(4)], names=list("abcd")
),
)
df.to_csv(path)
result = read_csv(path, header=[0, 1, 2, 3], index_col=0)
tm.assert_frame_equal(df, result)

# dup column names?
df = DataFrame(
np.ones((5, 3)),
columns=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(3)] for _ in range(4)], names=list("abcd")
),
index=MultiIndex.from_arrays(
[[f"i-{i}" for i in range(5)] for _ in range(3)], names=list("abc")
),
)
df.to_csv(path)
result = read_csv(path, header=[0, 1, 2, 3], index_col=[0, 1, 2])
tm.assert_frame_equal(df, result)

# writing with no index
df = _make_frame()
df.to_csv(path, index=False)
result = read_csv(path, header=[0, 1])
tm.assert_frame_equal(df, result)

# we lose the names here
df = _make_frame(True)
df.to_csv(path, index=False)
result = read_csv(path, header=[0, 1])
assert com.all_none(*result.columns.names)
result.columns.names = df.columns.names
tm.assert_frame_equal(df, result)

# whatsnew example
df = _make_frame()
df.to_csv(path)
result = read_csv(path, header=[0, 1], index_col=[0])
tm.assert_frame_equal(df, result)

df = _make_frame(True)
df.to_csv(path)
result = read_csv(path, header=[0, 1], index_col=[0])
tm.assert_frame_equal(df, result)

# invalid options
df = _make_frame(True)
df.to_csv(path)

for i in [6, 7]:
msg = f"len of {i}, but only 5 lines in file"
with pytest.raises(ParserError, match=msg):
read_csv(path, header=list(range(i)), index_col=0)

# write with cols
msg = "cannot specify cols with a MultiIndex"
with pytest.raises(TypeError, match=msg):
df.to_csv(path, columns=["foo", "bar"])

# empty
tsframe[:0].to_csv(path)
recons = self.read_csv(path)

exp = tsframe[:0]
exp.index = []

tm.assert_index_equal(recons.columns, exp.columns)
assert len(recons) == 0

def test_to_csv_interval_index(self, temp_file, using_infer_string):
# GH 28210
Expand Down Expand Up @@ -808,16 +812,15 @@ def test_to_csv_dups_cols(self, temp_file):

df.columns = [0, 1, 2] * 5

with tm.ensure_clean() as filename:
df.to_csv(filename)
result = read_csv(filename, index_col=0)
df.to_csv(path)
result = read_csv(path, index_col=0)

# date cols
for i in ["0.4", "1.4", "2.4"]:
result[i] = to_datetime(result[i])
# date cols
for i in ["0.4", "1.4", "2.4"]:
result[i] = to_datetime(result[i])

result.columns = df.columns
tm.assert_frame_equal(result, df)
result.columns = df.columns
tm.assert_frame_equal(result, df)

def test_to_csv_dups_cols2(self, temp_file):
# GH3457
Expand Down Expand Up @@ -1197,18 +1200,17 @@ def test_to_csv_with_dst_transitions_with_pickle(self, start, end, temp_file):
idx = idx._with_freq(None) # freq does not round-trip
idx._data._freq = None # otherwise there is trouble on unpickle
df = DataFrame({"values": 1, "idx": idx}, index=idx)
with tm.ensure_clean("csv_date_format_with_dst") as path:
df.to_csv(path, index=True)
result = read_csv(path, index_col=0)
result.index = (
to_datetime(result.index, utc=True)
.tz_convert("Europe/Paris")
.as_unit("ns")
)
result["idx"] = to_datetime(result["idx"], utc=True).astype(
"datetime64[ns, Europe/Paris]"
)
tm.assert_frame_equal(result, df)

path = str(temp_file)
df.to_csv(path, index=True)
result = read_csv(path, index_col=0)
result.index = (
to_datetime(result.index, utc=True).tz_convert("Europe/Paris").as_unit("ns")
)
result["idx"] = to_datetime(result["idx"], utc=True).astype(
"datetime64[ns, Europe/Paris]"
)
tm.assert_frame_equal(result, df)

# assert working
df.astype(str)
Expand Down
Loading
Loading

AltStyle によって変換されたページ (->オリジナル) /