master
/ .localenv / lib / python3.5 / site-packages / prompt_toolkit / contrib / telnet / protocol.py

protocol.py @master

4d078e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
"""
Parser for the Telnet protocol. (Not a complete implementation of the telnet
specification, but sufficient for a command line interface.)

Inspired by `Twisted.conch.telnet`.
"""
from __future__ import unicode_literals

import struct
from six import int2byte, binary_type, iterbytes

from .log import logger

__all__ = (
    'TelnetProtocolParser',
)

# Telnet constants.
NOP      = int2byte(0)
SGA      = int2byte(3)

IAC      = int2byte(255)
DO       = int2byte(253)
DONT     = int2byte(254)
LINEMODE = int2byte(34)
SB       = int2byte(250)
WILL     = int2byte(251)
WONT     = int2byte(252)
MODE     = int2byte(1)
SE       = int2byte(240)
ECHO     = int2byte(1)
NAWS     = int2byte(31)
LINEMODE = int2byte(34)
SUPPRESS_GO_AHEAD = int2byte(3)

DM       = int2byte(242)
BRK      = int2byte(243)
IP       = int2byte(244)
AO       = int2byte(245)
AYT      = int2byte(246)
EC       = int2byte(247)
EL       = int2byte(248)
GA       = int2byte(249)


class TelnetProtocolParser(object):
    """
    Parser for the Telnet protocol.
    Usage::

        def data_received(data):
            print(data)

        def size_received(rows, columns):
            print(rows, columns)

        p = TelnetProtocolParser(data_received, size_received)
        p.feed(binary_data)
    """
    def __init__(self, data_received_callback, size_received_callback):
        self.data_received_callback = data_received_callback
        self.size_received_callback = size_received_callback

        self._parser = self._parse_coroutine()
        self._parser.send(None)

    def received_data(self, data):
        self.data_received_callback(data)

    def do_received(self, data):
        """ Received telnet DO command. """
        logger.info('DO %r', data)

    def dont_received(self, data):
        """ Received telnet DONT command. """
        logger.info('DONT %r', data)

    def will_received(self, data):
        """ Received telnet WILL command. """
        logger.info('WILL %r', data)

    def wont_received(self, data):
        """ Received telnet WONT command. """
        logger.info('WONT %r', data)

    def command_received(self, command, data):
        if command == DO:
            self.do_received(data)

        elif command == DONT:
            self.dont_received(data)

        elif command == WILL:
            self.will_received(data)

        elif command == WONT:
            self.wont_received(data)

        else:
            logger.info('command received %r %r', command, data)

    def naws(self, data):
        """
        Received NAWS. (Window dimensions.)
        """
        if len(data) == 4:
            # NOTE: the first parameter of struct.unpack should be
            # a 'str' object. Both on Py2/py3. This crashes on OSX
            # otherwise.
            columns, rows = struct.unpack(str('!HH'), data)
            self.size_received_callback(rows, columns)
        else:
            logger.warning('Wrong number of NAWS bytes')

    def negotiate(self, data):
        """
        Got negotiate data.
        """
        command, payload = data[0:1], data[1:]
        assert isinstance(command, bytes)

        if command == NAWS:
            self.naws(payload)
        else:
            logger.info('Negotiate (%r got bytes)', len(data))

    def _parse_coroutine(self):
        """
        Parser state machine.
        Every 'yield' expression returns the next byte.
        """
        while True:
            d = yield

            if d == int2byte(0):
                pass  # NOP

            # Go to state escaped.
            elif d == IAC:
                d2 = yield

                if d2 == IAC:
                    self.received_data(d2)

                # Handle simple commands.
                elif d2 in (NOP, DM, BRK, IP, AO, AYT, EC, EL, GA):
                    self.command_received(d2, None)

                # Handle IAC-[DO/DONT/WILL/WONT] commands.
                elif d2 in (DO, DONT, WILL, WONT):
                    d3 = yield
                    self.command_received(d2, d3)

                # Subnegotiation
                elif d2 == SB:
                    # Consume everything until next IAC-SE
                    data = []

                    while True:
                        d3 = yield

                        if d3 == IAC:
                            d4 = yield
                            if d4 == SE:
                                break
                            else:
                                data.append(d4)
                        else:
                            data.append(d3)

                    self.negotiate(b''.join(data))
            else:
                self.received_data(d)

    def feed(self, data):
        """
        Feed data to the parser.
        """
        assert isinstance(data, binary_type)
        for b in iterbytes(data):
            self._parser.send(int2byte(b))