You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
628 lines
19 KiB
628 lines
19 KiB
from __future__ import print_function |
|
|
|
import sys, os, re, time, socket, select, subprocess, errno, shutil, random, string, json |
|
from subprocess import check_call, Popen |
|
from optparse import OptionParser |
|
|
|
__all__ = [] |
|
|
|
################################################################## |
|
# Test structure |
|
# |
|
|
|
__all__ += ["test", "end_part", "run_tests", "get_current_test"] |
|
|
|
TESTS = [] |
|
TOTAL = POSSIBLE = 0 |
|
PART_TOTAL = PART_POSSIBLE = 0 |
|
CURRENT_TEST = None |
|
GRADES = {} |
|
|
|
def test(points, title=None, parent=None): |
|
"""Decorator for declaring test functions. If title is None, the |
|
title of the test will be derived from the function name by |
|
stripping the leading "test_" and replacing underscores with |
|
spaces.""" |
|
|
|
def register_test(fn, title=title): |
|
if not title: |
|
assert fn.__name__.startswith("test_") |
|
title = fn.__name__[5:].replace("_", " ") |
|
if parent: |
|
title = " " + title |
|
|
|
def run_test(): |
|
global TOTAL, POSSIBLE, CURRENT_TEST, GRADES |
|
|
|
# Handle test dependencies |
|
if run_test.complete: |
|
return run_test.ok |
|
run_test.complete = True |
|
parent_failed = False |
|
if parent: |
|
parent_failed = not parent() |
|
|
|
# Run the test |
|
fail = None |
|
start = time.time() |
|
CURRENT_TEST = run_test |
|
sys.stdout.write("== Test %s == " % title) |
|
if parent: |
|
sys.stdout.write("\n") |
|
sys.stdout.flush() |
|
try: |
|
if parent_failed: |
|
raise AssertionError('Parent failed: %s' % parent.__name__) |
|
fn() |
|
except AssertionError as e: |
|
fail = str(e) |
|
|
|
# Display and handle test result |
|
POSSIBLE += points |
|
if points: |
|
print("%s: %s" % (title, \ |
|
(color("red", "FAIL") if fail else color("green", "OK"))), end=' ') |
|
if time.time() - start > 0.1: |
|
print("(%.1fs)" % (time.time() - start), end=' ') |
|
print() |
|
if fail: |
|
print(" %s" % fail.replace("\n", "\n ")) |
|
else: |
|
TOTAL += points |
|
if points: |
|
GRADES[title] = 0 if fail else points |
|
|
|
for callback in run_test.on_finish: |
|
callback(fail) |
|
CURRENT_TEST = None |
|
|
|
run_test.ok = not fail |
|
return run_test.ok |
|
|
|
# Record test metadata on the test wrapper function |
|
run_test.__name__ = fn.__name__ |
|
run_test.title = title |
|
run_test.complete = False |
|
run_test.ok = False |
|
run_test.on_finish = [] |
|
TESTS.append(run_test) |
|
return run_test |
|
return register_test |
|
|
|
def end_part(name): |
|
def show_part(): |
|
global PART_TOTAL, PART_POSSIBLE |
|
print("Part %s score: %d/%d" % \ |
|
(name, TOTAL - PART_TOTAL, POSSIBLE - PART_POSSIBLE)) |
|
print() |
|
PART_TOTAL, PART_POSSIBLE = TOTAL, POSSIBLE |
|
show_part.title = "" |
|
TESTS.append(show_part) |
|
|
|
def write_results(): |
|
global options |
|
if not options.results: |
|
return |
|
try: |
|
with open(options.results, "w") as f: |
|
f.write(json.dumps(GRADES)) |
|
except OSError as e: |
|
print("Provided a bad results path. Error:", e) |
|
|
|
def run_tests(): |
|
"""Set up for testing and run the registered test functions.""" |
|
|
|
# Handle command line |
|
global options |
|
parser = OptionParser(usage="usage: %prog [-v] [filters...]") |
|
parser.add_option("-v", "--verbose", action="store_true", |
|
help="print commands") |
|
parser.add_option("--color", choices=["never", "always", "auto"], |
|
default="auto", help="never, always, or auto") |
|
parser.add_option("--results", help="results file path") |
|
(options, args) = parser.parse_args() |
|
|
|
# Start with a full build to catch build errors |
|
make() |
|
|
|
# Clean the file system if there is one |
|
reset_fs() |
|
|
|
# Run tests |
|
limit = list(map(str.lower, args)) |
|
try: |
|
for test in TESTS: |
|
if not limit or any(l in test.title.lower() for l in limit): |
|
test() |
|
if not limit: |
|
write_results() |
|
print("Score: %d/%d" % (TOTAL, POSSIBLE)) |
|
except KeyboardInterrupt: |
|
pass |
|
if TOTAL < POSSIBLE: |
|
sys.exit(1) |
|
|
|
def get_current_test(): |
|
if not CURRENT_TEST: |
|
raise RuntimeError("No test is running") |
|
return CURRENT_TEST |
|
|
|
################################################################## |
|
# Assertions |
|
# |
|
|
|
__all__ += ["assert_equal", "assert_lines_match"] |
|
|
|
def assert_equal(got, expect, msg=""): |
|
if got == expect: |
|
return |
|
if msg: |
|
msg += "\n" |
|
raise AssertionError("%sgot:\n %s\nexpected:\n %s" % |
|
(msg, str(got).replace("\n", "\n "), |
|
str(expect).replace("\n", "\n "))) |
|
|
|
def assert_lines_match(text, *regexps, **kw): |
|
"""Assert that all of regexps match some line in text. If a 'no' |
|
keyword argument is given, it must be a list of regexps that must |
|
*not* match any line in text.""" |
|
|
|
def assert_lines_match_kw(no=[]): |
|
return no |
|
no = assert_lines_match_kw(**kw) |
|
|
|
# Check text against regexps |
|
lines = text.splitlines() |
|
good = set() |
|
bad = set() |
|
for i, line in enumerate(lines): |
|
if any(re.match(r, line) for r in regexps): |
|
good.add(i) |
|
regexps = [r for r in regexps if not re.match(r, line)] |
|
if any(re.match(r, line) for r in no): |
|
bad.add(i) |
|
|
|
if not regexps and not bad: |
|
return |
|
|
|
# We failed; construct an informative failure message |
|
show = set() |
|
for lineno in good.union(bad): |
|
for offset in range(-2, 3): |
|
show.add(lineno + offset) |
|
if regexps: |
|
show.update(n for n in range(len(lines) - 5, len(lines))) |
|
|
|
msg = [] |
|
last = -1 |
|
for lineno in sorted(show): |
|
if 0 <= lineno < len(lines): |
|
if lineno != last + 1: |
|
msg.append("...") |
|
last = lineno |
|
msg.append("%s %s" % (color("red", "BAD ") if lineno in bad else |
|
color("green", "GOOD") if lineno in good |
|
else " ", |
|
lines[lineno])) |
|
if last != len(lines) - 1: |
|
msg.append("...") |
|
if bad: |
|
msg.append("unexpected lines in output") |
|
for r in regexps: |
|
msg.append(color("red", "MISSING") + " '%s'" % r) |
|
raise AssertionError("\n".join(msg)) |
|
|
|
################################################################## |
|
# Utilities |
|
# |
|
|
|
__all__ += ["make", "maybe_unlink", "reset_fs", "color", "random_str", "check_time", "check_answers"] |
|
|
|
MAKE_TIMESTAMP = 0 |
|
|
|
def pre_make(): |
|
"""Delay prior to running make to ensure file mtimes change.""" |
|
while int(time.time()) == MAKE_TIMESTAMP: |
|
time.sleep(0.1) |
|
|
|
def post_make(): |
|
"""Record the time after make completes so that the next run of |
|
make can be delayed if needed.""" |
|
global MAKE_TIMESTAMP |
|
MAKE_TIMESTAMP = int(time.time()) |
|
|
|
def make(*target): |
|
pre_make() |
|
if Popen(("make",) + target).wait(): |
|
sys.exit(1) |
|
post_make() |
|
|
|
def show_command(cmd): |
|
from pipes import quote |
|
print("\n$", " ".join(map(quote, cmd))) |
|
|
|
def maybe_unlink(*paths): |
|
for path in paths: |
|
try: |
|
os.unlink(path) |
|
except EnvironmentError as e: |
|
if e.errno != errno.ENOENT: |
|
raise |
|
|
|
COLORS = {"default": "\033[0m", "red": "\033[31m", "green": "\033[32m"} |
|
|
|
def color(name, text): |
|
if options.color == "always" or (options.color == "auto" and os.isatty(1)): |
|
return COLORS[name] + text + COLORS["default"] |
|
return text |
|
|
|
def reset_fs(): |
|
if os.path.exists("obj/fs/clean-fs.img"): |
|
shutil.copyfile("obj/fs/clean-fs.img", "obj/fs/fs.img") |
|
|
|
def random_str(n=8): |
|
letters = string.ascii_letters + string.digits |
|
return ''.join(random.choice(letters) for _ in range(n)) |
|
|
|
def check_time(): |
|
try: |
|
print("") |
|
with open('time.txt') as f: |
|
d = f.read().strip() |
|
if not re.match(r'^\d+$', d): |
|
raise AssertionError('time.txt does not contain a single integer (number of hours spent on the lab)') |
|
except IOError: |
|
raise AssertionError('Cannot read time.txt') |
|
|
|
def check_answers(file, n=10): |
|
try: |
|
print("") |
|
with open(file) as f: |
|
d = f.read().strip() |
|
if len(d) < n: |
|
raise AssertionError('%s does not seem to contain enough text' % file) |
|
except IOError: |
|
raise AssertionError('Cannot read %s' % file) |
|
|
|
|
|
################################################################## |
|
# Controllers |
|
# |
|
|
|
__all__ += ["QEMU", "GDBClient"] |
|
|
|
class QEMU(object): |
|
_GDBPORT = None |
|
|
|
def __init__(self, *make_args): |
|
# Check that QEMU is not currently running |
|
try: |
|
GDBClient(self.get_gdb_port(), timeout=0).close() |
|
except socket.error: |
|
pass |
|
else: |
|
print("""\ |
|
GDB stub found on port %d. |
|
QEMU appears to already be running. Please exit it if possible or use |
|
'killall qemu' or 'killall qemu.real'.""" % self.get_gdb_port(), file=sys.stderr) |
|
sys.exit(1) |
|
|
|
if options.verbose: |
|
show_command(("make",) + make_args) |
|
cmd = ("make", "-s", "--no-print-directory") + make_args |
|
self.proc = Popen(cmd, stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT, |
|
stdin=subprocess.PIPE) |
|
# Accumulated output as a string |
|
self.output = "" |
|
# Accumulated output as a bytearray |
|
self.outbytes = bytearray() |
|
self.on_output = [] |
|
|
|
@staticmethod |
|
def get_gdb_port(): |
|
if QEMU._GDBPORT is None: |
|
p = Popen(["make", "-s", "--no-print-directory", "print-gdbport"], |
|
stdout=subprocess.PIPE) |
|
(out, _) = p.communicate() |
|
if p.returncode: |
|
raise RuntimeError( |
|
"Failed to get gdbport: make exited with %d" % |
|
p.returncode) |
|
QEMU._GDBPORT = int(out) |
|
return QEMU._GDBPORT |
|
|
|
def fileno(self): |
|
if self.proc: |
|
return self.proc.stdout.fileno() |
|
|
|
def handle_read(self): |
|
buf = os.read(self.proc.stdout.fileno(), 4096) |
|
self.outbytes.extend(buf) |
|
self.output = self.outbytes.decode("utf-8", "replace") |
|
for callback in self.on_output: |
|
callback(buf) |
|
if buf == b"": |
|
self.wait() |
|
return |
|
|
|
def write(self, buf): |
|
if isinstance(buf, str): |
|
buf = buf.encode('utf-8') |
|
self.proc.stdin.write(buf) |
|
self.proc.stdin.flush() |
|
|
|
def wait(self): |
|
if self.proc: |
|
self.proc.wait() |
|
self.proc = None |
|
|
|
def kill(self): |
|
if self.proc: |
|
self.proc.terminate() |
|
|
|
class GDBClient(object): |
|
def __init__(self, port, timeout=15): |
|
start = time.time() |
|
while True: |
|
self.sock = socket.socket() |
|
try: |
|
self.sock.settimeout(1) |
|
self.sock.connect(("localhost", port)) |
|
break |
|
except socket.error: |
|
if time.time() >= start + timeout: |
|
raise |
|
self.__buf = "" |
|
|
|
def fileno(self): |
|
if self.sock: |
|
return self.sock.fileno() |
|
|
|
def handle_read(self): |
|
try: |
|
data = self.sock.recv(4096).decode("ascii", "replace") |
|
except socket.error: |
|
data = "" |
|
if data == "": |
|
self.sock.close() |
|
self.sock = None |
|
return |
|
self.__buf += data |
|
|
|
while True: |
|
m = re.search(r"\$([^#]*)#[0-9a-zA-Z]{2}", self.__buf) |
|
if not m: |
|
break |
|
pkt = m.group(1) |
|
self.__buf = self.__buf[m.end():] |
|
|
|
if pkt.startswith("T05"): |
|
# Breakpoint |
|
raise TerminateTest |
|
|
|
def __send(self, cmd): |
|
packet = "$%s#%02x" % (cmd, sum(map(ord, cmd)) % 256) |
|
self.sock.sendall(packet.encode("ascii")) |
|
|
|
def __send_break(self): |
|
self.sock.sendall(b"\x03") |
|
|
|
def close(self): |
|
if self.sock: |
|
self.sock.close() |
|
self.sock = None |
|
|
|
def cont(self): |
|
self.__send("c") |
|
|
|
def breakpoint(self, addr): |
|
self.__send("Z1,%x,1" % addr) |
|
|
|
|
|
################################################################## |
|
# QEMU test runner |
|
# |
|
|
|
__all__ += ["TerminateTest", "Runner"] |
|
|
|
class TerminateTest(Exception): |
|
pass |
|
|
|
class Runner(): |
|
def __init__(self, *default_monitors): |
|
self.__default_monitors = default_monitors |
|
|
|
def run_qemu(self, *monitors, **kw): |
|
"""Run a QEMU-based test. monitors should functions that will |
|
be called with this Runner instance once QEMU and GDB are |
|
started. Typically, they should register callbacks that throw |
|
TerminateTest when stop events occur. The target_base |
|
argument gives the make target to run. The make_args argument |
|
should be a list of additional arguments to pass to make. The |
|
timeout argument bounds how long to run before returning.""" |
|
|
|
def run_qemu_kw(target_base="qemu", make_args=[], timeout=30): |
|
return target_base, make_args, timeout |
|
target_base, make_args, timeout = run_qemu_kw(**kw) |
|
|
|
# Start QEMU |
|
pre_make() |
|
self.qemu = QEMU(target_base + "-gdb", *make_args) |
|
self.gdb = None |
|
|
|
try: |
|
# Wait for QEMU to start or make to fail. This will set |
|
# self.gdb if QEMU starts. |
|
self.qemu.on_output = [self.__monitor_start] |
|
self.__react([self.qemu], timeout=90) |
|
self.qemu.on_output = [] |
|
if self.gdb is None: |
|
print("Failed to connect to QEMU; output:") |
|
print(self.qemu.output) |
|
sys.exit(1) |
|
post_make() |
|
|
|
# QEMU and GDB are up |
|
self.reactors = [self.qemu, self.gdb] |
|
|
|
# Start monitoring |
|
for m in self.__default_monitors + monitors: |
|
m(self) |
|
|
|
# Run and react |
|
self.gdb.cont() |
|
self.__react(self.reactors, timeout) |
|
finally: |
|
# Shutdown QEMU |
|
try: |
|
if self.gdb is None: |
|
sys.exit(1) |
|
self.qemu.kill() |
|
self.__react(self.reactors, 5) |
|
self.gdb.close() |
|
self.qemu.wait() |
|
except: |
|
print("""\ |
|
Failed to shutdown QEMU. You might need to 'killall qemu' or |
|
'killall qemu.real'. |
|
""") |
|
raise |
|
|
|
def __monitor_start(self, output): |
|
if b"\n" in output: |
|
try: |
|
self.gdb = GDBClient(self.qemu.get_gdb_port(), timeout=2) |
|
raise TerminateTest |
|
except socket.error: |
|
pass |
|
if not len(output): |
|
raise TerminateTest |
|
|
|
def __react(self, reactors, timeout): |
|
deadline = time.time() + timeout |
|
try: |
|
while True: |
|
timeleft = deadline - time.time() |
|
if timeleft < 0: |
|
sys.stdout.write("Timeout! ") |
|
sys.stdout.flush() |
|
return |
|
|
|
rset = [r for r in reactors if r.fileno() is not None] |
|
if not rset: |
|
return |
|
|
|
rset, _, _ = select.select(rset, [], [], timeleft) |
|
for reactor in rset: |
|
reactor.handle_read() |
|
except TerminateTest: |
|
pass |
|
|
|
def user_test(self, binary, *monitors, **kw): |
|
"""Run a user test using the specified binary. Monitors and |
|
keyword arguments are as for run_qemu. This runs on a disk |
|
snapshot unless the keyword argument 'snapshot' is False.""" |
|
|
|
maybe_unlink("obj/kern/init.o", "obj/kern/kernel") |
|
if kw.pop("snapshot", True): |
|
kw.setdefault("make_args", []).append("QEMUEXTRA+=-snapshot") |
|
self.run_qemu(target_base="run-%s" % binary, *monitors, **kw) |
|
|
|
def match(self, *args, **kwargs): |
|
"""Shortcut to call assert_lines_match on the most recent QEMU |
|
output.""" |
|
|
|
assert_lines_match(self.qemu.output, *args, **kwargs) |
|
|
|
################################################################## |
|
# Monitors |
|
# |
|
|
|
__all__ += ["save", "stop_breakpoint", "call_on_line", "stop_on_line", "shell_script"] |
|
|
|
def save(path): |
|
"""Return a monitor that writes QEMU's output to path. If the |
|
test fails, copy the output to path.test-name.""" |
|
|
|
def setup_save(runner): |
|
f.seek(0) |
|
f.truncate() |
|
runner.qemu.on_output.append(f.write) |
|
get_current_test().on_finish.append(save_on_finish) |
|
|
|
def save_on_finish(fail): |
|
f.flush() |
|
save_path = path + "." + get_current_test().__name__[5:] |
|
if fail: |
|
shutil.copyfile(path, save_path) |
|
print(" QEMU output saved to %s" % save_path) |
|
elif os.path.exists(save_path): |
|
os.unlink(save_path) |
|
print(" (Old %s failure log removed)" % save_path) |
|
|
|
f = open(path, "wb") |
|
return setup_save |
|
|
|
def stop_breakpoint(addr): |
|
"""Returns a monitor that stops when addr is reached. addr may be |
|
a number or the name of a symbol.""" |
|
|
|
def setup_breakpoint(runner): |
|
if isinstance(addr, str): |
|
addrs = [int(sym[:16], 16) for sym in open("kernel/kernel.sym") |
|
if sym[17:].strip() == addr] |
|
assert len(addrs), "Symbol %s not found" % addr |
|
runner.gdb.breakpoint(addrs[0]) |
|
else: |
|
runner.gdb.breakpoint(addr) |
|
return setup_breakpoint |
|
|
|
def call_on_line(regexp, callback): |
|
"""Returns a monitor that calls 'callback' when QEMU prints a line |
|
matching 'regexp'.""" |
|
|
|
def setup_call_on_line(runner): |
|
buf = bytearray() |
|
def handle_output(output): |
|
buf.extend(output) |
|
while b"\n" in buf: |
|
line, buf[:] = buf.split(b"\n", 1) |
|
line = line.decode("utf-8", "replace") |
|
if re.match(regexp, line): |
|
callback(line) |
|
runner.qemu.on_output.append(handle_output) |
|
return setup_call_on_line |
|
|
|
def stop_on_line(regexp): |
|
"""Returns a monitor that stops when QEMU prints a line matching |
|
'regexp'.""" |
|
|
|
def stop(line): |
|
raise TerminateTest |
|
return call_on_line(regexp, stop) |
|
|
|
def shell_script(script, terminate_match=None): |
|
"""Returns a monitor that plays the script, and stops when the script is |
|
done executing.""" |
|
|
|
def setup_call_on_line(runner): |
|
class context: |
|
n = 0 |
|
buf = bytearray() |
|
def handle_output(output): |
|
context.buf.extend(output) |
|
if terminate_match is not None: |
|
if re.match(terminate_match, context.buf.decode('utf-8', 'replace')): |
|
raise TerminateTest |
|
if b'$ ' in context.buf: |
|
context.buf = bytearray() |
|
if context.n < len(script): |
|
runner.qemu.write(script[context.n]) |
|
runner.qemu.write('\n') |
|
context.n += 1 |
|
else: |
|
if terminate_match is None: |
|
raise TerminateTest |
|
runner.qemu.on_output.append(handle_output) |
|
return setup_call_on_line
|
|
|