| import lldb |
| import time |
| import unittest |
| from lldbsuite.test.lldbtest import * |
| from lldbsuite.test.decorators import * |
| from lldbsuite.test.gdbclientutils import * |
| from lldbsuite.test.lldbreverse import ReverseTestBase |
| from lldbsuite.test import lldbutil |
| |
| |
| class TestReverseContinueWatchpoints(ReverseTestBase): |
| @skipIfRemote |
| # Watchpoints don't work in single-step mode |
| @skipIf(macos_version=["<", "15.0"], archs=["arm64"]) |
| def test_reverse_continue_watchpoint(self): |
| self.reverse_continue_watchpoint_internal(async_mode=False) |
| |
| @skipIfRemote |
| # Watchpoints don't work in single-step mode |
| @skipIf(macos_version=["<", "15.0"], archs=["arm64"]) |
| def test_reverse_continue_watchpoint_async(self): |
| self.reverse_continue_watchpoint_internal(async_mode=True) |
| |
| def reverse_continue_watchpoint_internal(self, async_mode): |
| target, process, initial_threads, watch_addr = self.setup_recording(async_mode) |
| |
| error = lldb.SBError() |
| wp_opts = lldb.SBWatchpointOptions() |
| wp_opts.SetWatchpointTypeWrite(lldb.eWatchpointWriteTypeOnModify) |
| watchpoint = target.WatchpointCreateByAddress(watch_addr, 4, wp_opts, error) |
| self.assertTrue(watchpoint) |
| |
| watch_var = target.EvaluateExpression("*g_watched_var_ptr") |
| self.assertEqual(watch_var.GetValueAsSigned(-1), 2) |
| |
| # Reverse-continue to the function "trigger_watchpoint". |
| status = process.ContinueInDirection(lldb.eRunReverse) |
| self.assertSuccess(status) |
| self.expect_async_state_changes( |
| async_mode, process, [lldb.eStateRunning, lldb.eStateStopped] |
| ) |
| # We should stop at the point just before the location was modified. |
| watch_var = target.EvaluateExpression("*g_watched_var_ptr") |
| self.assertEqual(watch_var.GetValueAsSigned(-1), 1) |
| self.expect( |
| "thread list", |
| STOPPED_DUE_TO_WATCHPOINT, |
| substrs=["stopped", "trigger_watchpoint", "stop reason = watchpoint 1"], |
| ) |
| |
| # Stepping forward one instruction should change the value of the variable. |
| initial_threads[0].StepInstruction(False) |
| self.expect_async_state_changes( |
| async_mode, process, [lldb.eStateRunning, lldb.eStateStopped] |
| ) |
| watch_var = target.EvaluateExpression("*g_watched_var_ptr") |
| self.assertEqual(watch_var.GetValueAsSigned(-1), 2) |
| self.expect( |
| "thread list", |
| STOPPED_DUE_TO_WATCHPOINT, |
| substrs=["stopped", "trigger_watchpoint", "stop reason = watchpoint 1"], |
| ) |
| |
| @skipIfRemote |
| # Watchpoints don't work in single-step mode |
| @skipIf(macos_version=["<", "15.0"], archs=["arm64"]) |
| def test_reverse_continue_skip_watchpoint(self): |
| self.reverse_continue_skip_watchpoint_internal(async_mode=False) |
| |
| @skipIfRemote |
| # Watchpoints don't work in single-step mode |
| @skipIf(macos_version=["<", "15.0"], archs=["arm64"]) |
| def test_reverse_continue_skip_watchpoint_async(self): |
| self.reverse_continue_skip_watchpoint_internal(async_mode=True) |
| |
| def reverse_continue_skip_watchpoint_internal(self, async_mode): |
| target, process, initial_threads, watch_addr = self.setup_recording(async_mode) |
| |
| # Reverse-continue over a watchpoint whose condition is false |
| # (via function call). |
| # This tests that we continue in the correct direction after hitting |
| # the watchpoint. |
| error = lldb.SBError() |
| wp_opts = lldb.SBWatchpointOptions() |
| wp_opts.SetWatchpointTypeWrite(lldb.eWatchpointWriteTypeOnModify) |
| watchpoint = target.WatchpointCreateByAddress(watch_addr, 4, wp_opts, error) |
| self.assertTrue(watchpoint) |
| watchpoint.SetCondition("false_condition()") |
| status = process.ContinueInDirection(lldb.eRunReverse) |
| self.expect_async_state_changes( |
| async_mode, process, [lldb.eStateRunning, lldb.eStateStopped] |
| ) |
| self.assertSuccess(status) |
| self.expect( |
| "thread list", |
| STOPPED_DUE_TO_HISTORY_BOUNDARY, |
| substrs=["stopped", "stop reason = history boundary"], |
| ) |
| |
| def setup_recording(self, async_mode): |
| """ |
| Record execution of code between "start_recording" and "stop_recording" breakpoints. |
| |
| Returns with the target stopped at "stop_recording", with recording disabled, |
| ready to reverse-execute. |
| """ |
| self.build() |
| target = self.dbg.CreateTarget(self.getBuildArtifact("a.out")) |
| process = self.connect(target) |
| |
| # Record execution from the start of the function "start_recording" |
| # to the start of the function "stop_recording". We want to keep the |
| # interval that we record as small as possible to minimize the run-time |
| # of our single-stepping recorder. |
| start_recording_bkpt = target.BreakpointCreateByName("start_recording", None) |
| self.assertTrue(start_recording_bkpt.GetNumLocations() > 0) |
| initial_threads = lldbutil.continue_to_breakpoint(process, start_recording_bkpt) |
| self.assertEqual(len(initial_threads), 1) |
| target.BreakpointDelete(start_recording_bkpt.GetID()) |
| |
| frame0 = initial_threads[0].GetFrameAtIndex(0) |
| watched_var_ptr = frame0.FindValue( |
| "g_watched_var_ptr", lldb.eValueTypeVariableGlobal |
| ) |
| watch_addr = watched_var_ptr.GetValueAsUnsigned(0) |
| self.assertTrue(watch_addr > 0) |
| |
| self.start_recording() |
| stop_recording_bkpt = target.BreakpointCreateByName("stop_recording", None) |
| self.assertTrue(stop_recording_bkpt.GetNumLocations() > 0) |
| lldbutil.continue_to_breakpoint(process, stop_recording_bkpt) |
| target.BreakpointDelete(stop_recording_bkpt.GetID()) |
| self.stop_recording() |
| |
| self.dbg.SetAsync(async_mode) |
| self.expect_async_state_changes(async_mode, process, [lldb.eStateStopped]) |
| |
| return target, process, initial_threads, watch_addr |
| |
| def expect_async_state_changes(self, async_mode, process, states): |
| if not async_mode: |
| return |
| listener = self.dbg.GetListener() |
| lldbutil.expect_state_changes(self, listener, process, states) |