blob: 83a3b71b0adfbb6ffc98cc68daecbdbc270f8f12 [file]
import datetime
import http.server
import ipaddress
import os
import shutil
import socketserver
import ssl
import sys
import threading
from functools import partial
import lldb
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
"""
Test debug symbol acquisition from a local SymStore repository. This can work
cross-platform and for arbitrary debug info formats. We only support PDB
currently.
"""
class MockedSymStore:
"""
Context Manager to populate a file structure equivalent to SymStore.exe
"""
def __init__(self, test, exe, pdb):
self._test = test
self._exe = exe
self._pdb = pdb
self.cache_dir = test.getBuildArtifact("cache")
def get_key_pdb(self, exe):
"""
Module UUID: 12345678-1234-5678-9ABC-DEF012345678-00000001
To SymStore key: 12345678123456789ABCDEF0123456781
"""
spec = lldb.SBModuleSpec()
spec.SetFileSpec(lldb.SBFileSpec(self._test.getBuildArtifact(exe)))
module = lldb.SBModule(spec)
raw = module.GetUUIDString().replace("-", "").upper()
if len(raw) != 40:
raise RuntimeError("Unexpected number of bytes in embedded UUID")
guid_hex = raw[:32]
age = int(raw[32:], 16)
return guid_hex + str(age)
def __enter__(self):
"""
Mock local symstore directory tree, move PDB there and report path.
"""
key = None
if self._test.getDebugInfo() == "pdb":
key = self.get_key_pdb(self._exe)
self._test.assertIsNotNone(key)
symstore_dir = self._test.getBuildArtifact("symstore")
pdb_dir = os.path.join(symstore_dir, self._pdb, key)
os.makedirs(pdb_dir, exist_ok=True)
shutil.move(
self._test.getBuildArtifact(self._pdb),
os.path.join(pdb_dir, self._pdb),
)
# We always configure a valid fallback cache, because we might not have
# permission to write outside the test directory.
self._test.runCmd(
f"settings set plugin.symbol-locator.symstore.cache {self.cache_dir}"
)
return symstore_dir
def __exit__(self, *exc_info):
"""
Reset settings
"""
self._test.runCmd("settings clear plugin.symbol-locator.symstore")
class NtSymbolPath:
"""
Context Manager to temporarily set the _NT_SYMBOL_PATH environment variable.
"""
def __init__(self, value):
self._value = value
self._saved = None
def __enter__(self):
self._saved = os.environ.get("_NT_SYMBOL_PATH")
os.environ["_NT_SYMBOL_PATH"] = self._value
def __exit__(self, *exc_info):
if self._saved is None:
os.environ.pop("_NT_SYMBOL_PATH", None)
else:
os.environ["_NT_SYMBOL_PATH"] = self._saved
class HTTPServer:
"""
Context Manager to serve a local directory tree via HTTP.
"""
def __init__(self, dir, handler=None):
address = ("localhost", 0) # auto-select free port
if handler is None:
handler = partial(http.server.SimpleHTTPRequestHandler, directory=dir)
self._server = socketserver.ThreadingTCPServer(address, handler)
self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
def __enter__(self):
self._thread.start()
host, port = self._server.server_address
return f"http://{host}:{port}"
def __exit__(self, *exc_info):
if self._server:
self._server.shutdown()
self._server.server_close()
if self._thread:
self._thread.join()
class HTTPSServer:
"""
Context Manager to serve a local directory tree via HTTPS.
"""
class ErrorAwareTCPServer(socketserver.ThreadingTCPServer):
"""TCP layer that will suppress errors of the given type."""
def __init__(self, address, handler, err):
self.err = err
super().__init__(address, handler)
def handle_error(self, request, client_address):
if isinstance(sys.exc_info()[1], self.err):
return
super().handle_error(request, client_address)
def __init__(self, dir=None, handler=None, cert=None):
if handler is None:
handler = partial(http.server.SimpleHTTPRequestHandler, directory=dir)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(cert.file, cert.key_file)
address = ("localhost", 0) # auto-select free port
self._server = self.ErrorAwareTCPServer(address, handler, ssl.SSLError)
self._server.socket = ctx.wrap_socket(self._server.socket, server_side=True)
self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
def __enter__(self):
self._thread.start()
host, port = self._server.server_address
return f"https://{host}:{port}"
def __exit__(self, *exc_info):
if self._server:
self._server.shutdown()
self._server.server_close()
if self._thread:
self._thread.join()
class SelfSignedCert:
"""
Self-signed cert/key pair for localhost.
"""
def __init__(self, tmpdir):
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "localhost")])
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=1))
.add_extension(
x509.SubjectAlternativeName(
[
x509.DNSName("localhost"),
x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
]
),
critical=False,
)
.sign(key, hashes.SHA256())
)
os.makedirs(tmpdir, exist_ok=False)
self.key_file = os.path.join(tmpdir, "server.key")
self.file = os.path.join(tmpdir, "server.crt")
with open(self.key_file, "wb") as f:
f.write(
key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
)
)
with open(self.file, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
self.fingerprint = cert.fingerprint(hashes.SHA256()).hex()
class RedirectHandler(http.server.BaseHTTPRequestHandler):
base_url = None
def do_GET(self):
self.send_response(301)
self.send_header("Location", self.base_url + self.path)
self.end_headers()
def log_message(self, *args):
pass # suppress request logs
class RequestCounter(http.server.SimpleHTTPRequestHandler):
requests = 0 # class-level so all instances share one counter
def __init__(self, *args, directory=None, **kwargs):
super().__init__(*args, directory=directory, **kwargs)
def do_GET(self):
RequestCounter.requests += 1
super().do_GET()
class SymStoreTests(TestBase):
TEST_WITH_PDB_DEBUG_INFO = True
def build_inferior(self):
if self.getDebugInfo() != "pdb":
self.skipTest("Non-PDB debug info variants not yet supported")
self.build()
exe_file = "a.out"
sym_file = "a.pdb"
self.assertTrue(os.path.isfile(self.getBuildArtifact(exe_file)))
self.assertTrue(os.path.isfile(self.getBuildArtifact(sym_file)))
return exe_file, sym_file
def assertFiles(self, dir, expected):
actual = sum(len(f) for _, _, f in os.walk(dir))
self.assertEqual(actual, expected)
def try_breakpoint(self, exe, should_have_loc, ext_lookup=True):
enable = "true" if ext_lookup else "false"
self.runCmd(f"settings set symbols.enable-external-lookup {enable}")
target = self.dbg.CreateTarget(self.getBuildArtifact(exe))
self.assertTrue(target and target.IsValid(), "Target is valid")
bp = target.BreakpointCreateByName("func")
self.assertTrue(bp and bp.IsValid(), "Breakpoint is valid")
self.assertEqual(bp.GetNumLocations(), 1 if should_have_loc else 0)
self.dbg.DeleteTarget(target)
def test_no_symstore_url(self):
"""
Check that breakpoint doesn't resolve without SymStore.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym):
self.try_breakpoint(exe, should_have_loc=False)
def test_external_lookup_off(self):
"""
Check that breakpoint doesn't resolve with external lookup disabled.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as dir:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {dir}")
self.try_breakpoint(exe, ext_lookup=False, should_have_loc=False)
def test_local_dir(self):
"""
Check that breakpoint resolves with local SymStore.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as dir:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {dir}")
self.try_breakpoint(exe, should_have_loc=True)
def test_http_not_found(self):
"""
Check that we don't issue a warning for a 404 response from a symbol server.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as symstore_dir:
os.makedirs(f"{symstore_dir}_empty", exist_ok=False)
with HTTPServer(f"{symstore_dir}_empty") as url:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {url}")
warnings = ""
with open(self.getBuildArtifact("stderr.txt"), "w+b") as err_file:
self.dbg.SetErrorFileHandle(err_file, False)
self.try_breakpoint(exe, should_have_loc=False)
self.dbg.SetErrorFileHandle(sys.stderr, False)
err_file.seek(0)
warnings = err_file.read().decode()
self.assertEqual(warnings, "")
def test_http(self):
"""
Check that breakpoint resolves with remote SymStore.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as dir:
with HTTPServer(dir) as url:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {url}")
self.try_breakpoint(exe, should_have_loc=True)
def test_sympath_local_dir(self):
"""
Check that breakpoint resolves with plain directory in _NT_SYMBOL_PATH.
The PDB is not copied to the configured cache.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with NtSymbolPath(dir):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 0)
def test_sympath_local_srv(self):
"""
Check that breakpoint resolves with local directory in server notation
in _NT_SYMBOL_PATH. The PDB is not copied to the configured cache.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with NtSymbolPath(f"srv*{dir}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 0)
def test_sympath_srv(self):
"""
Check that breakpoint resolves with an HTTP symbol server in _NT_SYMBOL_PATH
using the srv* notation. The PDB is stored in the configured cache.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
self.assertFiles(symstore.cache_dir, 0)
with HTTPServer(dir) as url:
with NtSymbolPath(f"srv*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
key = symstore.get_key_pdb(exe)
cache_file = os.path.join(symstore.cache_dir, sym, key, sym)
self.assertTrue(os.path.isfile(cache_file))
self.assertFiles(symstore.cache_dir, 1)
def test_sympath_cache_explicit(self):
"""
Check PDB storage with explicit cache in _NT_SYMBOL_PATH.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with HTTPServer(dir) as url:
explicit_cache = self.getBuildArtifact("explicit_cache")
with NtSymbolPath(f"srv*{explicit_cache}*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 0)
self.assertFiles(explicit_cache, 1)
def test_sympath_cache_implicit(self):
"""
Check PDB storage with implicit cache in _NT_SYMBOL_PATH.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with HTTPServer(dir) as url:
implicit_cache = self.getBuildArtifact("implicit_cache")
with NtSymbolPath(f"cache*{implicit_cache};srv*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 0)
self.assertFiles(implicit_cache, 1)
def test_sympath_cache_invalid(self):
"""
Check that PDB is stored in configured default cache
if path in _NT_SYMBOL_PATH is invalid.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with HTTPServer(dir) as url:
invalid_cache = ":\\<invalid_path>"
self.assertFiles(symstore.cache_dir, 0)
with NtSymbolPath(f"cache*{invalid_cache};srv*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 1)
def test_sympath_cache_empty(self):
"""
Check that PDB is stored in configured default cache
if path in _NT_SYMBOL_PATH is empty.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with HTTPServer(dir) as url:
self.assertFiles(symstore.cache_dir, 0)
with NtSymbolPath(f"cache*;srv*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 1)
def test_lookup_order(self):
"""
Check that _NT_SYMBOL_PATH takes precedence over symstore.urls setting.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
RequestCounter.requests = 0
with symstore as dir:
with HTTPServer(dir, RequestCounter) as url:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {url}")
with NtSymbolPath(dir):
self.try_breakpoint(exe, should_have_loc=True)
self.assertEqual(RequestCounter.requests, 0)
@skipUnlessPackageAvailable("cryptography")
def test_https(self):
"""
Check that breakpoint resolves with remote SymStore via HTTPS.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as symstore_dir:
cert = SelfSignedCert(self.getBuildArtifact("cert"))
with HTTPSServer(dir=symstore_dir, cert=cert) as https_url:
# We accept only the self-signed certificate with this fingerprint
self.runCmd(
f"settings set plugin.symbol-locator.symstore.tls-cert-fingerprint {cert.fingerprint}"
)
self.runCmd(
f"settings set plugin.symbol-locator.symstore.urls {https_url}"
)
self.try_breakpoint(exe, should_have_loc=True)
@skipUnlessPackageAvailable("cryptography")
def test_https_reject_selfsigned_cert(self):
"""
Check that LLDB rejects an HTTPS server with an untrusted self-signed cert.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as symstore_dir:
cert = SelfSignedCert(self.getBuildArtifact("cert"))
with HTTPSServer(dir=symstore_dir, cert=cert) as https_url:
# No fingerprint set
self.runCmd(
f"settings set plugin.symbol-locator.symstore.urls {https_url}"
)
self.try_breakpoint(exe, should_have_loc=False)
# Incorrect fingerprint set
bogus = "DEADBEEFCAFEBABE"
self.runCmd(
f"settings set plugin.symbol-locator.symstore.tls-cert-fingerprint {bogus}{bogus}{bogus}{bogus}"
)
self.try_breakpoint(exe, should_have_loc=False)
@skipUnlessPackageAvailable("cryptography")
def test_https_reject_redirect_http(self):
"""
Check that LLDB does not retrieve symbols from servers that redirect with security downgrades.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as symstore_dir:
with HTTPServer(symstore_dir) as http_url:
RedirectHandler.base_url = http_url
cert = SelfSignedCert(self.getBuildArtifact("cert"))
with HTTPSServer(handler=RedirectHandler, cert=cert) as https_url:
self.runCmd(
f"settings set plugin.symbol-locator.symstore.urls {https_url}"
)
self.runCmd(
f"settings set plugin.symbol-locator.symstore.tls-cert-fingerprint {cert.fingerprint}"
)
self.try_breakpoint(exe, should_have_loc=False)