[Python-checkins] [3.8] bpo-36670, regrtest: Fix WindowsLoadTracker() for partial line (GH-16550) (GH-16560)

Victor Stinner webhook-mailer at python.org
Thu Oct 3 11:26:41 EDT 2019


https://github.com/python/cpython/commit/de3195c937d5fca0d79cc93dbafa76c0f89ca5b8
commit: de3195c937d5fca0d79cc93dbafa76c0f89ca5b8
branch: 3.8
author: Victor Stinner <vstinner at python.org>
committer: GitHub <noreply at github.com>
date: 2019年10月03日T17:26:25+02:00
summary:
[3.8] bpo-36670, regrtest: Fix WindowsLoadTracker() for partial line (GH-16550) (GH-16560)
* bpo-36670, regrtest: Fix WindowsLoadTracker() for partial line (GH-16550)
WindowsLoadTracker.read_output() now uses a short buffer for
incomplete line.
(cherry picked from commit 3e04cd268ee9a57f95dc78d8974b21a6fac3f666)
* bpo-36670: Enhance regrtest WindowsLoadTracker (GH-16553)
The last line is now passed to the parser even if it does not end
with a newline, but only if it's a valid value.
(cherry picked from commit c65119d5bfded03f80a9805889391b66fa7bf551)
* bpo-36670: Enhance regrtest (GH-16556)
* Add log() method: add timestamp and load average prefixes
 to main messages.
* WindowsLoadTracker:
 * LOAD_FACTOR_1 is now computed using SAMPLING_INTERVAL
 * Initialize the load to the arithmetic mean of the first 5 values
 of the Processor Queue Length value (so over 5 seconds), rather
 than 0.0.
 * Handle BrokenPipeError and when typeperf exit.
* format_duration(1.5) now returns '1.5 sec', rather than
 '1 sec 500 ms'
(cherry picked from commit 098e25672f1c3578855d5ded4f5147795c9ed956)
files:
M Lib/test/libregrtest/main.py
M Lib/test/libregrtest/runtest_mp.py
M Lib/test/libregrtest/utils.py
M Lib/test/libregrtest/win_utils.py
M Lib/test/test_regrtest.py
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index fd701c452ceb5..76ad3359f2d7a 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -139,16 +139,8 @@ def accumulate_result(self, result, rerun=False):
 print(xml_data, file=sys.__stderr__)
 raise
 
- def display_progress(self, test_index, text):
- if self.ns.quiet:
- return
-
- # "[ 51/405/1] test_tcl passed"
- line = f"{test_index:{self.test_count_width}}{self.test_count}"
- fails = len(self.bad) + len(self.environment_changed)
- if fails and not self.ns.pgo:
- line = f"{line}/{fails}"
- line = f"[{line}] {text}"
+ def log(self, line=''):
+ empty = not line
 
 # add the system load prefix: "load avg: 1.80 "
 load_avg = self.getloadavg()
@@ -159,8 +151,23 @@ def display_progress(self, test_index, text):
 test_time = time.monotonic() - self.start_time
 test_time = datetime.timedelta(seconds=int(test_time))
 line = f"{test_time} {line}"
+
+ if empty:
+ line = line[:-1]
+
 print(line, flush=True)
 
+ def display_progress(self, test_index, text):
+ if self.ns.quiet:
+ return
+
+ # "[ 51/405/1] test_tcl passed"
+ line = f"{test_index:{self.test_count_width}}{self.test_count}"
+ fails = len(self.bad) + len(self.environment_changed)
+ if fails and not self.ns.pgo:
+ line = f"{line}/{fails}"
+ self.log(f"[{line}] {text}")
+
 def parse_args(self, kwargs):
 ns = _parse_args(sys.argv[1:], **kwargs)
 
@@ -302,11 +309,11 @@ def rerun_failed_tests(self):
 
 self.first_result = self.get_tests_result()
 
- print()
- print("Re-running failed tests in verbose mode")
+ self.log()
+ self.log("Re-running failed tests in verbose mode")
 self.rerun = self.bad[:]
 for test_name in self.rerun:
- print(f"Re-running {test_name} in verbose mode", flush=True)
+ self.log(f"Re-running {test_name} in verbose mode")
 self.ns.verbose = True
 result = runtest(self.ns, test_name)
 
@@ -387,7 +394,7 @@ def run_tests_sequential(self):
 
 save_modules = sys.modules.keys()
 
- print("Run tests sequentially")
+ self.log("Run tests sequentially")
 
 previous_test = None
 for test_index, test_name in enumerate(self.tests, 1):
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index a46c78248de39..79afa29fa05bf 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -104,13 +104,14 @@ class ExitThread(Exception):
 
 
 class TestWorkerProcess(threading.Thread):
- def __init__(self, worker_id, pending, output, ns, timeout):
+ def __init__(self, worker_id, runner):
 super().__init__()
 self.worker_id = worker_id
- self.pending = pending
- self.output = output
- self.ns = ns
- self.timeout = timeout
+ self.pending = runner.pending
+ self.output = runner.output
+ self.ns = runner.ns
+ self.timeout = runner.worker_timeout
+ self.regrtest = runner.regrtest
 self.current_test_name = None
 self.start_time = None
 self._popen = None
@@ -294,7 +295,8 @@ def wait_stopped(self, start_time):
 if not self.is_alive():
 break
 dt = time.monotonic() - start_time
- print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True)
+ self.regrtest.log(f"Waiting for {self} thread "
+ f"for {format_duration(dt)}")
 if dt > JOIN_TIMEOUT:
 print_warning(f"Failed to join {self} in {format_duration(dt)}")
 break
@@ -316,6 +318,7 @@ def get_running(workers):
 class MultiprocessTestRunner:
 def __init__(self, regrtest):
 self.regrtest = regrtest
+ self.log = self.regrtest.log
 self.ns = regrtest.ns
 self.output = queue.Queue()
 self.pending = MultiprocessIterator(self.regrtest.tests)
@@ -326,11 +329,10 @@ def __init__(self, regrtest):
 self.workers = None
 
 def start_workers(self):
- self.workers = [TestWorkerProcess(index, self.pending, self.output,
- self.ns, self.worker_timeout)
+ self.workers = [TestWorkerProcess(index, self)
 for index in range(1, self.ns.use_mp + 1)]
- print("Run tests in parallel using %s child processes"
- % len(self.workers))
+ self.log("Run tests in parallel using %s child processes"
+ % len(self.workers))
 for worker in self.workers:
 worker.start()
 
@@ -364,7 +366,7 @@ def _get_result(self):
 # display progress
 running = get_running(self.workers)
 if running and not self.ns.pgo:
- print('running: %s' % ', '.join(running), flush=True)
+ self.log('running: %s' % ', '.join(running))
 
 def display_result(self, mp_result):
 result = mp_result.result
@@ -384,8 +386,7 @@ def _process_result(self, item):
 if item[0]:
 # Thread got an exception
 format_exc = item[1]
- print(f"regrtest worker thread failed: {format_exc}",
- file=sys.stderr, flush=True)
+ print_warning(f"regrtest worker thread failed: {format_exc}")
 return True
 
 self.test_index += 1
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
index fb9971a64f66c..98a60f7a747d9 100644
--- a/Lib/test/libregrtest/utils.py
+++ b/Lib/test/libregrtest/utils.py
@@ -16,11 +16,14 @@ def format_duration(seconds):
 if minutes:
 parts.append('%s min' % minutes)
 if seconds:
- parts.append('%s sec' % seconds)
- if ms:
- parts.append('%s ms' % ms)
+ if parts:
+ # 2 min 1 sec
+ parts.append('%s sec' % seconds)
+ else:
+ # 1.0 sec
+ parts.append('%.1f sec' % (seconds + ms / 1000))
 if not parts:
- return '0 ms'
+ return '%s ms' % ms
 
 parts = parts[:2]
 return ' '.join(parts)
diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py
index f0c17b906f519..028c01106dee0 100644
--- a/Lib/test/libregrtest/win_utils.py
+++ b/Lib/test/libregrtest/win_utils.py
@@ -1,4 +1,5 @@
 import _winapi
+import math
 import msvcrt
 import os
 import subprocess
@@ -10,11 +11,14 @@
 
 # Max size of asynchronous reads
 BUFSIZE = 8192
-# Exponential damping factor (see below)
-LOAD_FACTOR_1 = 0.9200444146293232478931553241
-
 # Seconds per measurement
-SAMPLING_INTERVAL = 5
+SAMPLING_INTERVAL = 1
+# Exponential damping factor to compute exponentially weighted moving average
+# on 1 minute (60 seconds)
+LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
+# Initialize the load using the arithmetic mean of the first NVALUE values
+# of the Processor Queue Length
+NVALUE = 5
 # Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
 # of typeperf are registered
 COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
@@ -30,9 +34,10 @@ class WindowsLoadTracker():
 """
 
 def __init__(self):
- self.load = 0.0
- self.counter_name = ''
- self.popen = None
+ self._values = []
+ self._load = None
+ self._buffer = ''
+ self._popen = None
 self.start()
 
 def start(self):
@@ -64,7 +69,7 @@ def start(self):
 # Spawn off the load monitor
 counter_name = self._get_counter_name()
 command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
- self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
+ self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
 
 # Close our copy of the write end of the pipe
 os.close(command_stdout)
@@ -84,52 +89,88 @@ def _get_counter_name(self):
 process_queue_length = counters_dict['44']
 return f'"\\{system}\\{process_queue_length}"'
 
- def close(self):
- if self.popen is None:
+ def close(self, kill=True):
+ if self._popen is None:
 return
- self.popen.kill()
- self.popen.wait()
- self.popen = None
+
+ self._load = None
+
+ if kill:
+ self._popen.kill()
+ self._popen.wait()
+ self._popen = None
 
 def __del__(self):
 self.close()
 
- def read_output(self):
+ def _parse_line(self, line):
+ # typeperf outputs in a CSV format like this:
+ # "07/19/2018 01:32:26.605","3.000000"
+ # (date, process queue length)
+ tokens = line.split(',')
+ if len(tokens) != 2:
+ raise ValueError
+
+ value = tokens[1]
+ if not value.startswith('"') or not value.endswith('"'):
+ raise ValueError
+ value = value[1:-1]
+ return float(value)
+
+ def _read_lines(self):
 overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
 bytes_read, res = overlapped.GetOverlappedResult(False)
 if res != 0:
- return
+ return ()
 
 output = overlapped.getbuffer()
- return output.decode('oem', 'replace')
+ output = output.decode('oem', 'replace')
+ output = self._buffer + output
+ lines = output.splitlines(True)
+
+ # bpo-36670: typeperf only writes a newline *before* writing a value,
+ # not after. Sometimes, the written line in incomplete (ex: only
+ # timestamp, without the process queue length). Only pass the last line
+ # to the parser if it's a valid value, otherwise store it in
+ # self._buffer.
+ try:
+ self._parse_line(lines[-1])
+ except ValueError:
+ self._buffer = lines.pop(-1)
+ else:
+ self._buffer = ''
+
+ return lines
 
 def getloadavg(self):
- typeperf_output = self.read_output()
- # Nothing to update, just return the current load
- if not typeperf_output:
- return self.load
+ if self._popen is None:
+ return None
+
+ returncode = self._popen.poll()
+ if returncode is not None:
+ self.close(kill=False)
+ return None
+
+ try:
+ lines = self._read_lines()
+ except BrokenPipeError:
+ self.close()
+ return None
+
+ for line in lines:
+ line = line.rstrip()
 
- # Process the backlog of load values
- for line in typeperf_output.splitlines():
 # Ignore the initial header:
 # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
- if '\\\\' in line:
+ if 'PDH-CSV' in line:
 continue
 
 # Ignore blank lines
- if not line.strip():
+ if not line:
 continue
 
- # typeperf outputs in a CSV format like this:
- # "07/19/2018 01:32:26.605","3.000000"
- # (date, process queue length)
 try:
- tokens = line.split(',')
- if len(tokens) != 2:
- raise ValueError
-
- value = tokens[1].replace('"', '')
- load = float(value)
+ processor_queue_length = self._parse_line(line)
 except ValueError:
 print_warning("Failed to parse typeperf output: %a" % line)
 continue
@@ -137,7 +178,13 @@ def getloadavg(self):
 # We use an exponentially weighted moving average, imitating the
 # load calculation on Unix systems.
 # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
- new_load = self.load * LOAD_FACTOR_1 + load * (1.0 - LOAD_FACTOR_1)
- self.load = new_load
-
- return self.load
+ # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+ if self._load is not None:
+ self._load = (self._load * LOAD_FACTOR_1
+ + processor_queue_length * (1.0 - LOAD_FACTOR_1))
+ elif len(self._values) < NVALUE:
+ self._values.append(processor_queue_length)
+ else:
+ self._load = sum(self._values) / len(self._values)
+
+ return self._load
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index 0f44c72069753..d2221b3448fa3 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -25,6 +25,7 @@
 Py_DEBUG = hasattr(sys, 'gettotalrefcount')
 ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
 ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
+LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?'
 
 TEST_INTERRUPTED = textwrap.dedent("""
 from signal import SIGINT, raise_signal
@@ -390,8 +391,8 @@ def check_line(self, output, regex):
 self.assertRegex(output, regex)
 
 def parse_executed_tests(self, output):
- regex = (r'^[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
- % self.TESTNAME_REGEX)
+ regex = (r'^%s\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
+ % (LOG_PREFIX, self.TESTNAME_REGEX))
 parser = re.finditer(regex, output, re.MULTILINE)
 return list(match.group(1) for match in parser)
 
@@ -451,9 +452,10 @@ def list_regex(line_format, tests):
 if rerun:
 regex = list_regex('%s re-run test%s', rerun)
 self.check_line(output, regex)
- self.check_line(output, "Re-running failed tests in verbose mode")
+ regex = LOG_PREFIX + r"Re-running failed tests in verbose mode"
+ self.check_line(output, regex)
 for test_name in rerun:
- regex = f"Re-running {test_name} in verbose mode"
+ regex = LOG_PREFIX + f"Re-running {test_name} in verbose mode"
 self.check_line(output, regex)
 
 if no_test_ran:
@@ -1202,9 +1204,9 @@ def test_format_duration(self):
 self.assertEqual(utils.format_duration(10e-3),
 '10 ms')
 self.assertEqual(utils.format_duration(1.5),
- '1 sec 500 ms')
+ '1.5 sec')
 self.assertEqual(utils.format_duration(1),
- '1 sec')
+ '1.0 sec')
 self.assertEqual(utils.format_duration(2 * 60),
 '2 min')
 self.assertEqual(utils.format_duration(2 * 60 + 1),


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /