|  | """ | 
|  | The LLVM Compiler Infrastructure | 
|  |  | 
|  | This file is distributed under the University of Illinois Open Source | 
|  | License. See LICENSE.TXT for details. | 
|  |  | 
|  | Sync lldb and related source from a local machine to a remote machine. | 
|  |  | 
|  | This facilitates working on the lldb sourcecode on multiple machines | 
|  | and multiple OS types, verifying changes across all. | 
|  |  | 
|  |  | 
|  | This module provides asyncore channels used within the LLDB test | 
|  | framework. | 
|  | """ | 
|  |  | 
|  | from __future__ import print_function | 
|  | from __future__ import absolute_import | 
|  |  | 
|  |  | 
|  | # System modules | 
|  | import asyncore | 
|  | import socket | 
|  |  | 
|  | # Third-party modules | 
|  | from six.moves import cPickle | 
|  |  | 
|  | # LLDB modules | 
|  |  | 
|  |  | 
|  | class UnpicklingForwardingReaderChannel(asyncore.dispatcher): | 
|  | """Provides an unpickling, forwarding asyncore dispatch channel reader. | 
|  |  | 
|  | Inferior dotest.py processes with side-channel-based test results will | 
|  | send test result event data in a pickled format, one event at a time. | 
|  | This class supports reconstructing the pickled data and forwarding it | 
|  | on to its final destination. | 
|  |  | 
|  | The channel data is written in the form: | 
|  | {num_payload_bytes}#{payload_bytes} | 
|  |  | 
|  | The bulk of this class is devoted to reading and parsing out | 
|  | the payload bytes. | 
|  | """ | 
|  |  | 
|  | def __init__(self, file_object, async_map, forwarding_func): | 
|  | asyncore.dispatcher.__init__(self, sock=file_object, map=async_map) | 
|  |  | 
|  | self.header_contents = b"" | 
|  | self.packet_bytes_remaining = 0 | 
|  | self.reading_header = True | 
|  | self.ibuffer = b'' | 
|  | self.forwarding_func = forwarding_func | 
|  | if forwarding_func is None: | 
|  | # This whole class is useless if we do nothing with the | 
|  | # unpickled results. | 
|  | raise Exception("forwarding function must be set") | 
|  |  | 
|  | # Initiate all connections by sending an ack.  This allows | 
|  | # the initiators of the socket to await this to ensure | 
|  | # that this end is up and running (and therefore already | 
|  | # into the async map). | 
|  | ack_bytes = b'*' | 
|  | file_object.send(ack_bytes) | 
|  |  | 
|  | def deserialize_payload(self): | 
|  | """Unpickles the collected input buffer bytes and forwards.""" | 
|  | if len(self.ibuffer) > 0: | 
|  | self.forwarding_func(cPickle.loads(self.ibuffer)) | 
|  | self.ibuffer = b'' | 
|  |  | 
|  | def consume_header_bytes(self, data): | 
|  | """Consumes header bytes from the front of data. | 
|  | @param data the incoming data stream bytes | 
|  | @return any data leftover after consuming header bytes. | 
|  | """ | 
|  | # We're done if there is no content. | 
|  | if not data or (len(data) == 0): | 
|  | return None | 
|  |  | 
|  | full_header_len = 4 | 
|  |  | 
|  | assert len(self.header_contents) < full_header_len | 
|  |  | 
|  | bytes_avail = len(data) | 
|  | bytes_needed = full_header_len - len(self.header_contents) | 
|  | header_bytes_avail = min(bytes_needed, bytes_avail) | 
|  | self.header_contents += data[:header_bytes_avail] | 
|  | if len(self.header_contents) == full_header_len: | 
|  | import struct | 
|  | # End of header. | 
|  | self.packet_bytes_remaining = struct.unpack( | 
|  | "!I", self.header_contents)[0] | 
|  | self.header_contents = b"" | 
|  | self.reading_header = False | 
|  | return data[header_bytes_avail:] | 
|  |  | 
|  | # If we made it here, we've exhausted the data and | 
|  | # we're still parsing header content. | 
|  | return None | 
|  |  | 
|  | def consume_payload_bytes(self, data): | 
|  | """Consumes payload bytes from the front of data. | 
|  | @param data the incoming data stream bytes | 
|  | @return any data leftover after consuming remaining payload bytes. | 
|  | """ | 
|  | if not data or (len(data) == 0): | 
|  | # We're done and there's nothing to do. | 
|  | return None | 
|  |  | 
|  | data_len = len(data) | 
|  | if data_len <= self.packet_bytes_remaining: | 
|  | # We're consuming all the data provided. | 
|  | self.ibuffer += data | 
|  | self.packet_bytes_remaining -= data_len | 
|  |  | 
|  | # If we're no longer waiting for payload bytes, | 
|  | # we flip back to parsing header bytes and we | 
|  | # unpickle the payload contents. | 
|  | if self.packet_bytes_remaining < 1: | 
|  | self.reading_header = True | 
|  | self.deserialize_payload() | 
|  |  | 
|  | # We're done, no more data left. | 
|  | return None | 
|  | else: | 
|  | # We're only consuming a portion of the data since | 
|  | # the data contains more than the payload amount. | 
|  | self.ibuffer += data[:self.packet_bytes_remaining] | 
|  | data = data[self.packet_bytes_remaining:] | 
|  |  | 
|  | # We now move on to reading the header. | 
|  | self.reading_header = True | 
|  | self.packet_bytes_remaining = 0 | 
|  |  | 
|  | # And we can deserialize the payload. | 
|  | self.deserialize_payload() | 
|  |  | 
|  | # Return the remaining data. | 
|  | return data | 
|  |  | 
|  | def handle_read(self): | 
|  | # Read some data from the socket. | 
|  | try: | 
|  | data = self.recv(8192) | 
|  | # print('driver socket READ: %d bytes' % len(data)) | 
|  | except socket.error as socket_error: | 
|  | print( | 
|  | "\nINFO: received socket error when reading data " | 
|  | "from test inferior:\n{}".format(socket_error)) | 
|  | raise | 
|  | except Exception as general_exception: | 
|  | print( | 
|  | "\nERROR: received non-socket error when reading data " | 
|  | "from the test inferior:\n{}".format(general_exception)) | 
|  | raise | 
|  |  | 
|  | # Consume the message content. | 
|  | while data and (len(data) > 0): | 
|  | # If we're reading the header, gather header bytes. | 
|  | if self.reading_header: | 
|  | data = self.consume_header_bytes(data) | 
|  | else: | 
|  | data = self.consume_payload_bytes(data) | 
|  |  | 
|  | def handle_close(self): | 
|  | # print("socket reader: closing port") | 
|  | self.close() | 
|  |  | 
|  |  | 
|  | class UnpicklingForwardingListenerChannel(asyncore.dispatcher): | 
|  | """Provides a socket listener asyncore channel for unpickling/forwarding. | 
|  |  | 
|  | This channel will listen on a socket port (use 0 for host-selected).  Any | 
|  | client that connects will have an UnpicklingForwardingReaderChannel handle | 
|  | communication over the connection. | 
|  |  | 
|  | The dotest parallel test runners, when collecting test results, open the | 
|  | test results side channel over a socket.  This channel handles connections | 
|  | from inferiors back to the test runner.  Each worker fires up a listener | 
|  | for each inferior invocation.  This simplifies the asyncore.loop() usage, | 
|  | one of the reasons for implementing with asyncore.  This listener shuts | 
|  | down once a single connection is made to it. | 
|  | """ | 
|  |  | 
|  | def __init__(self, async_map, host, port, backlog_count, forwarding_func): | 
|  | asyncore.dispatcher.__init__(self, map=async_map) | 
|  | self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | 
|  | self.set_reuse_addr() | 
|  | self.bind((host, port)) | 
|  | self.address = self.socket.getsockname() | 
|  | self.listen(backlog_count) | 
|  | self.handler = None | 
|  | self.async_map = async_map | 
|  | self.forwarding_func = forwarding_func | 
|  | if forwarding_func is None: | 
|  | # This whole class is useless if we do nothing with the | 
|  | # unpickled results. | 
|  | raise Exception("forwarding function must be set") | 
|  |  | 
|  | def handle_accept(self): | 
|  | (sock, addr) = self.socket.accept() | 
|  | if sock and addr: | 
|  | # print('Incoming connection from %s' % repr(addr)) | 
|  | self.handler = UnpicklingForwardingReaderChannel( | 
|  | sock, self.async_map, self.forwarding_func) | 
|  |  | 
|  | def handle_close(self): | 
|  | self.close() |