blob: 1a3cc4a0350fe7f6809b31bd7390f89caf2b5652 [file] [log] [blame]
import gdbremote_testcase
class GdbRemoteForkTestBase(gdbremote_testcase.GdbRemoteTestCaseBase):
fork_regex = ("[$]T[0-9a-fA-F]{{2}}thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
"{}:p([0-9a-f]+)[.]([0-9a-f]+).*")
fork_regex_nonstop = ("%Stop:T[0-9a-fA-F]{{2}}"
"thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
"{}:p([0-9a-f]+)[.]([0-9a-f]+).*")
fork_capture = {1: "parent_pid", 2: "parent_tid",
3: "child_pid", 4: "child_tid"}
stop_regex_base = "T[0-9a-fA-F]{{2}}thread:p{}.{};.*reason:signal.*"
stop_regex = "^[$]" + stop_regex_base
def start_fork_test(self, args, variant="fork", nonstop=False):
self.build()
self.prep_debug_monitor_and_inferior(inferior_args=args)
self.add_qSupported_packets(["multiprocess+",
"{}-events+".format(variant)])
ret = self.expect_gdbremote_sequence()
self.assertIn("{}-events+".format(variant), ret["qSupported_response"])
self.reset_test_sequence()
# continue and expect fork
if nonstop:
self.test_sequence.add_log_lines([
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#00",
"send packet: $OK#00",
{"direction": "send",
"regex": self.fork_regex_nonstop.format(variant),
"capture": self.fork_capture},
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
else:
self.test_sequence.add_log_lines([
"read packet: $c#00",
{"direction": "send", "regex": self.fork_regex.format(variant),
"capture": self.fork_capture},
], True)
ret = self.expect_gdbremote_sequence()
self.reset_test_sequence()
return tuple(ret[x] for x in ("parent_pid", "parent_tid",
"child_pid", "child_tid"))
def fork_and_detach_test(self, variant, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test([variant], variant, nonstop=nonstop))
# detach the forked child
self.test_sequence.add_log_lines([
"read packet: $D;{}#00".format(child_pid),
"send packet: $OK#00",
# verify that the current process is correct
"read packet: $qC#00",
"send packet: $QCp{}.{}#00".format(parent_pid, parent_tid),
# verify that the correct processes are detached/available
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $OK#00",
], True)
self.expect_gdbremote_sequence()
self.reset_test_sequence()
return parent_pid, parent_tid
def fork_and_follow_test(self, variant, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test([variant], variant, nonstop=nonstop))
# switch to the forked child
self.test_sequence.add_log_lines([
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
"read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# detach the parent
"read packet: $D;{}#00".format(parent_pid),
"send packet: $OK#00",
# verify that the correct processes are detached/available
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# then resume the child
"read packet: $c#00",
], True)
if nonstop:
self.test_sequence.add_log_lines([
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(child_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
else:
self.test_sequence.add_log_lines([
"send packet: $W00;process:{}#00".format(child_pid),
], True)
self.expect_gdbremote_sequence()
def detach_all_test(self, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test(["fork"], nonstop=nonstop))
self.test_sequence.add_log_lines([
# double-check our PIDs
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $OK#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# detach all processes
"read packet: $D#00",
"send packet: $OK#00",
# verify that both PIDs are invalid now
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $Eff#00",
], True)
self.expect_gdbremote_sequence()
def vkill_test(self, kill_parent=False, kill_child=False, nonstop=False):
assert kill_parent or kill_child
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test(["fork"], nonstop=nonstop))
if kill_parent:
self.test_sequence.add_log_lines([
# kill the process
"read packet: $vKill;{}#00".format(parent_pid),
"send packet: $OK#00",
], True)
if kill_child:
self.test_sequence.add_log_lines([
# kill the process
"read packet: $vKill;{}#00".format(child_pid),
"send packet: $OK#00",
], True)
self.test_sequence.add_log_lines([
# check child PID/TID
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: ${}#00".format("Eff" if kill_child else "OK"),
# check parent PID/TID
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: ${}#00".format("Eff" if kill_parent else "OK"),
], True)
self.expect_gdbremote_sequence()
def resume_one_test(self, run_order, use_vCont=False, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test(["fork", "stop"], nonstop=nonstop))
parent_expect = [
self.stop_regex_base.format(parent_pid, parent_tid),
"W00;process:{}#.*".format(parent_pid),
]
child_expect = [
self.stop_regex_base.format(child_pid, child_tid),
"W00;process:{}#.*".format(child_pid),
]
for x in run_order:
if x == "parent":
pidtid = (parent_pid, parent_tid)
expect = parent_expect.pop(0)
elif x == "child":
pidtid = (child_pid, child_tid)
expect = child_expect.pop(0)
else:
assert False, "unexpected x={}".format(x)
if use_vCont:
self.test_sequence.add_log_lines([
# continue the selected process
"read packet: $vCont;c:p{}.{}#00".format(*pidtid),
], True)
else:
self.test_sequence.add_log_lines([
# continue the selected process
"read packet: $Hcp{}.{}#00".format(*pidtid),
"send packet: $OK#00",
"read packet: $c#00",
], True)
if nonstop:
self.test_sequence.add_log_lines([
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:" + expect},
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
else:
self.test_sequence.add_log_lines([
{"direction": "send", "regex": "[$]" + expect},
], True)
# if at least one process remained, check both PIDs
if parent_expect or child_expect:
self.test_sequence.add_log_lines([
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: ${}#00".format("OK" if parent_expect else "Eff"),
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: ${}#00".format("OK" if child_expect else "Eff"),
], True)
self.expect_gdbremote_sequence()