123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- import json
- from collections import namedtuple
- from .symmetries import (
- decode_string, encode_string, get_byte, get_character, parse_url)
- EngineIOSession = namedtuple('EngineIOSession', [
- 'id', 'ping_interval', 'ping_timeout', 'transport_upgrades'])
- SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'args'])
- def parse_host(host, port, resource):
- if not host.startswith('http'):
- host = 'http://' + host
- url_pack = parse_url(host)
- is_secure = url_pack.scheme == 'https'
- port = port or url_pack.port or (443 if is_secure else 80)
- url = '%s:%d%s/%s' % (url_pack.hostname, port, url_pack.path, resource)
- return is_secure, url
- def parse_engineIO_session(engineIO_packet_data):
- d = json.loads(decode_string(engineIO_packet_data))
- return EngineIOSession(
- id=d['sid'],
- ping_interval=d['pingInterval'] / float(1000),
- ping_timeout=d['pingTimeout'] / float(1000),
- transport_upgrades=d['upgrades'])
- def encode_engineIO_content(engineIO_packets):
- content = bytearray()
- for packet_type, packet_data in engineIO_packets:
- packet_text = format_packet_text(packet_type, packet_data)
- content.extend(_make_packet_prefix(packet_text) + packet_text)
- return content
- def decode_engineIO_content(content):
- content_index = 0
- content_length = len(content)
- while content_index < content_length:
- try:
- content_index, packet_length = _read_packet_length(
- content, content_index)
- except IndexError:
- break
- content_index, packet_text = _read_packet_text(
- content, content_index, packet_length)
- engineIO_packet_type, engineIO_packet_data = parse_packet_text(
- packet_text)
- yield engineIO_packet_type, engineIO_packet_data
- def format_socketIO_packet_data(path=None, ack_id=None, args=None):
- socketIO_packet_data = json.dumps(args, ensure_ascii=False) if args else ''
- if ack_id is not None:
- socketIO_packet_data = str(ack_id) + socketIO_packet_data
- if path:
- socketIO_packet_data = path + ',' + socketIO_packet_data
- return socketIO_packet_data
- def parse_socketIO_packet_data(socketIO_packet_data):
- # fix decoding if server doesn't use UTF-8 (decode_string will try UTF-8 and latin-1)
- try:
- data = decode_string(socketIO_packet_data)
- except Exception as ex:
- print("Error on decoding data:")
- print(socketIO_packet_data)
- data = ""
- if data.startswith('/'):
- try:
- path, data = data.split(',', 1)
- except ValueError:
- path = data
- data = ''
- else:
- path = ''
- try:
- ack_id_string, data = data.split('[', 1)
- data = '[' + data
- ack_id = int(ack_id_string)
- except (ValueError, IndexError):
- ack_id = None
- try:
- args = json.loads(data)
- except ValueError:
- args = []
- return SocketIOData(path=path, ack_id=ack_id, args=args)
- def format_packet_text(packet_type, packet_data):
- return encode_string(str(packet_type) + packet_data)
- def parse_packet_text(packet_text):
- packet_type = int(get_character(packet_text, 0))
- packet_data = packet_text[1:]
- return packet_type, packet_data
- def get_namespace_path(socketIO_packet_data):
- if not socketIO_packet_data.startswith(b'/'):
- return ''
- # Loop incrementally in case there is binary data
- parts = []
- for i in range(len(socketIO_packet_data)):
- character = get_character(socketIO_packet_data, i)
- if ',' == character:
- break
- parts.append(character)
- return ''.join(parts)
- def _make_packet_prefix(packet):
- length_string = str(len(packet))
- header_digits = bytearray([0])
- for i in range(len(length_string)):
- header_digits.append(ord(length_string[i]) - 48)
- header_digits.append(255)
- return header_digits
- def _read_packet_length(content, content_index):
- while get_byte(content, content_index) != 0:
- content_index += 1
- content_index += 1
- packet_length_string = ''
- byte = get_byte(content, content_index)
- while byte != 255:
- packet_length_string += str(byte)
- content_index += 1
- byte = get_byte(content, content_index)
- return content_index, int(packet_length_string)
- def _read_packet_text(content, content_index, packet_length):
- while get_byte(content, content_index) == 255:
- content_index += 1
- packet_text = content[content_index:content_index + packet_length]
- return content_index + packet_length, packet_text
|