| from __future__ import print_function |
| |
| import gdbremote_testcase |
| import select |
| import socket |
| from lldbsuite.test.decorators import * |
| from lldbsuite.test.lldbtest import * |
| import lldbsuite.test.lldbplatformutil |
| import random |
| |
| if lldbplatformutil.getHostPlatform() == "windows": |
| import ctypes |
| import ctypes.wintypes |
| from ctypes.wintypes import (BOOL, DWORD, HANDLE, LPCWSTR, LPDWORD, LPVOID) |
| |
| kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
| |
| PIPE_ACCESS_INBOUND = 1 |
| FILE_FLAG_FIRST_PIPE_INSTANCE = 0x00080000 |
| FILE_FLAG_OVERLAPPED = 0x40000000 |
| PIPE_TYPE_BYTE = 0 |
| PIPE_REJECT_REMOTE_CLIENTS = 8 |
| INVALID_HANDLE_VALUE = -1 |
| ERROR_ACCESS_DENIED = 5 |
| ERROR_IO_PENDING = 997 |
| |
| |
| class OVERLAPPED(ctypes.Structure): |
| _fields_ = [("Internal", LPVOID), ("InternalHigh", LPVOID), ("Offset", |
| DWORD), ("OffsetHigh", DWORD), ("hEvent", HANDLE)] |
| |
| def __init__(self): |
| super(OVERLAPPED, self).__init__(Internal=0, InternalHigh=0, |
| Offset=0, OffsetHigh=0, hEvent=None) |
| LPOVERLAPPED = ctypes.POINTER(OVERLAPPED) |
| |
| CreateNamedPipe = kernel32.CreateNamedPipeW |
| CreateNamedPipe.restype = HANDLE |
| CreateNamedPipe.argtypes = (LPCWSTR, DWORD, DWORD, DWORD, DWORD, DWORD, |
| DWORD, LPVOID) |
| |
| ConnectNamedPipe = kernel32.ConnectNamedPipe |
| ConnectNamedPipe.restype = BOOL |
| ConnectNamedPipe.argtypes = (HANDLE, LPOVERLAPPED) |
| |
| CreateEvent = kernel32.CreateEventW |
| CreateEvent.restype = HANDLE |
| CreateEvent.argtypes = (LPVOID, BOOL, BOOL, LPCWSTR) |
| |
| GetOverlappedResultEx = kernel32.GetOverlappedResultEx |
| GetOverlappedResultEx.restype = BOOL |
| GetOverlappedResultEx.argtypes = (HANDLE, LPOVERLAPPED, LPDWORD, DWORD, |
| BOOL) |
| |
| ReadFile = kernel32.ReadFile |
| ReadFile.restype = BOOL |
| ReadFile.argtypes = (HANDLE, LPVOID, DWORD, LPDWORD, LPOVERLAPPED) |
| |
| CloseHandle = kernel32.CloseHandle |
| CloseHandle.restype = BOOL |
| CloseHandle.argtypes = (HANDLE,) |
| |
| class Pipe(object): |
| def __init__(self, prefix): |
| while True: |
| self.name = "lldb-" + str(random.randrange(1e10)) |
| full_name = "\\\\.\\pipe\\" + self.name |
| self._handle = CreateNamedPipe(full_name, PIPE_ACCESS_INBOUND | |
| FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED, |
| PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS, 1, 4096, |
| 4096, 0, None) |
| if self._handle != INVALID_HANDLE_VALUE: |
| break |
| if ctypes.get_last_error() != ERROR_ACCESS_DENIED: |
| raise ctypes.WinError(ctypes.get_last_error()) |
| |
| self._overlapped = OVERLAPPED() |
| self._overlapped.hEvent = CreateEvent(None, True, False, None) |
| result = ConnectNamedPipe(self._handle, self._overlapped) |
| assert result == 0 |
| if ctypes.get_last_error() != ERROR_IO_PENDING: |
| raise ctypes.WinError(ctypes.get_last_error()) |
| |
| def finish_connection(self, timeout): |
| if not GetOverlappedResultEx(self._handle, self._overlapped, |
| ctypes.byref(DWORD(0)), timeout*1000, True): |
| raise ctypes.WinError(ctypes.get_last_error()) |
| |
| def read(self, size, timeout): |
| buf = ctypes.create_string_buffer(size) |
| if not ReadFile(self._handle, ctypes.byref(buf), size, None, |
| self._overlapped): |
| if ctypes.get_last_error() != ERROR_IO_PENDING: |
| raise ctypes.WinError(ctypes.get_last_error()) |
| read = DWORD(0) |
| if not GetOverlappedResultEx(self._handle, self._overlapped, |
| ctypes.byref(read), timeout*1000, True): |
| raise ctypes.WinError(ctypes.get_last_error()) |
| return buf.raw[0:read.value] |
| |
| def close(self): |
| CloseHandle(self._overlapped.hEvent) |
| CloseHandle(self._handle) |
| |
| |
| else: |
| class Pipe(object): |
| def __init__(self, prefix): |
| self.name = os.path.join(prefix, "stub_port_number") |
| os.mkfifo(self.name) |
| self._fd = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK) |
| |
| def finish_connection(self, timeout): |
| pass |
| |
| def read(self, size, timeout): |
| (readers, _, _) = select.select([self._fd], [], [], timeout) |
| if self._fd not in readers: |
| raise TimeoutError |
| return os.read(self._fd, size) |
| |
| def close(self): |
| os.close(self._fd) |
| |
| |
| class TestGdbRemoteConnection(gdbremote_testcase.GdbRemoteTestCaseBase): |
| |
| mydir = TestBase.compute_mydir(__file__) |
| |
| @skipIfRemote # reverse connect is not a supported use case for now |
| def test_reverse_connect_llgs(self): |
| # Reverse connect is the default connection method. |
| self.connect_to_debug_monitor() |
| # Verify we can do the handshake. If that works, we'll call it good. |
| self.do_handshake(self.sock) |
| |
| @skipIfRemote |
| def test_named_pipe_llgs(self): |
| family, type, proto, _, addr = socket.getaddrinfo( |
| self.stub_hostname, 0, proto=socket.IPPROTO_TCP)[0] |
| self.sock = socket.socket(family, type, proto) |
| self.sock.settimeout(self.DEFAULT_TIMEOUT) |
| |
| self.addTearDownHook(lambda: self.sock.close()) |
| |
| pipe = Pipe(self.getBuildDir()) |
| |
| self.addTearDownHook(lambda: pipe.close()) |
| |
| args = self.debug_monitor_extra_args |
| if lldb.remote_platform: |
| args += ["*:0"] |
| else: |
| args += ["localhost:0"] |
| |
| args += ["--named-pipe", pipe.name] |
| |
| server = self.spawnSubprocess( |
| self.debug_monitor_exe, |
| args, |
| install_remote=False) |
| |
| pipe.finish_connection(self.DEFAULT_TIMEOUT) |
| port = pipe.read(10, self.DEFAULT_TIMEOUT) |
| # Trim null byte, convert to int |
| addr = (addr[0], int(port[:-1])) |
| self.sock.connect(addr) |
| |
| # Verify we can do the handshake. If that works, we'll call it good. |
| self.do_handshake(self.sock) |