123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488 |
- import enum
- import time
- import random
- import iofree
- from . import ciphers
- from iofree import schema
- MAX_LIFETIME = 24 * 3600 * 7
- AGE_MOD = 2 ** 32
- class AlertLevel(enum.IntEnum):
- warning = 1
- fatal = 2
- class AlertDescription(enum.IntEnum):
- close_notify = 0
- unexpected_message = 10
- bad_record_mac = 20
- record_overflow = 22
- handshake_failure = 40
- bad_certificate = 42
- unsupported_certificate = 43
- certificate_revoked = 44
- certificate_expired = 45
- certificate_unknown = 46
- illegal_parameter = 47
- unknown_ca = 48
- access_denied = 49
- decode_error = 50
- decrypt_error = 51
- protocol_version = 70
- insufficient_security = 71
- internal_error = 80
- inappropriate_fallback = 86
- user_canceled = 90
- missing_extension = 109
- unsupported_extension = 110
- unrecognized_name = 112
- bad_certificate_status_response = 113
- unknown_psk_identity = 115
- certificate_required = 116
- no_application_protocol = 120
- class KeyUpdateRequest(enum.IntEnum):
- update_not_requested = 0
- update_requested = 1
- class ExtensionType(enum.IntEnum):
- server_name = 0
- max_fragment_length = 1
- status_request = 5
- supported_groups = 10
- signature_algorithms = 13
- use_srtp = 14
- heartbeat = 15
- application_layer_protocol_negotiation = 16
- signed_certificate_timestamp = 18
- client_certificate_type = 19
- server_certificate_type = 20
- padding = 21
- pre_shared_key = 41
- early_data = 42
- supported_versions = 43
- cookie = 44
- psk_key_exchange_modes = 45
- certificate_authorities = 47
- oid_filters = 48
- post_handshake_auth = 49
- signature_algorithms_cert = 50
- key_share = 51
- class HandshakeType(enum.IntEnum):
- client_hello = 1
- server_hello = 2
- new_session_ticket = 4
- end_of_early_data = 5
- encrypted_extensions = 8
- certificate = 11
- certificate_request = 13
- certificate_verify = 15
- finished = 20
- key_update = 24
- message_hash = 254
- class NameType(enum.IntEnum):
- host_name = 0
- class SignatureScheme(enum.IntEnum):
- # RSASSA-PKCS1-v1_5 algorithms
- # rsa_pkcs1_sha256 = 0x0401
- # rsa_pkcs1_sha384 = 0x0501
- # rsa_pkcs1_sha512 = 0x0601
- # # ECDSA algorithms
- # ecdsa_secp256r1_sha256 = 0x0403
- # ecdsa_secp384r1_sha384 = 0x0503
- # ecdsa_secp521r1_sha512 = 0x0603
- # # RSASSA-PSS algorithms with public key OID rsaEncryption
- # rsa_pss_rsae_sha256 = 0x0804
- # rsa_pss_rsae_sha384 = 0x0805
- # rsa_pss_rsae_sha512 = 0x0806
- # EdDSA algorithms
- ed25519 = 0x0807
- # ed448 = 0x0808
- # # RSASSA-PSS algorithms with public key OID RSASSA-PSS
- # rsa_pss_pss_sha256 = 0x0809
- # rsa_pss_pss_sha384 = 0x080a
- # rsa_pss_pss_sha512 = 0x080b
- # # Legacy algorithms
- # rsa_pkcs1_sha1 = 0x0201
- # ecdsa_sha1 = 0x0203
- # # Reserved Code Points
- # # private_use(0xFE00..0xFFFF)
- class NamedGroup(enum.IntEnum):
- # Elliptic Curve Groups (ECDHE)
- # secp256r1 = 0x0017
- # secp384r1 = 0x0018
- # secp521r1 = 0x0019
- x25519 = 0x001D
- # x448 = 0x001E
- # # Finite Field Groups (DHE)
- # ffdhe2048 = 0x0100
- # ffdhe3072 = 0x0101
- # ffdhe4096 = 0x0102
- # ffdhe6144 = 0x0103
- # ffdhe8192 = 0x0104
- # Reserved Code Points
- # ffdhe_private_use(0x01FC..0x01FF)
- # ecdhe_private_use(0xFE00..0xFEFF)
- class PskKeyExchangeMode(enum.IntEnum):
- psk_ke = 0
- psk_dhe_ke = 1
- class CipherSuite(enum.IntEnum):
- TLS_AES_128_GCM_SHA256 = 0x1301
- # TLS_AES_256_GCM_SHA384 = 0x1302
- # TLS_CHACHA20_POLY1305_SHA256 = 0x1303
- # TLS_AES_128_CCM_SHA256 = 0x1304
- # TLS_AES_128_CCM_8_SHA256 = 0x1305
- class ContentType(enum.IntEnum):
- invalid = 0
- change_cipher_spec = 20
- alert = 21
- handshake = 22
- application_data = 23
- heartbeat = 24
- def tls_plaintext(self, payload):
- return TLSPlaintext.pack(self, payload)
- class TLSCiphertext(schema.BinarySchema):
- opaque_type = schema.MustEqual(
- schema.SizedIntEnum(schema.uint8, ContentType), ContentType.application_data
- )
- legacy_record_version = schema.MustEqual(schema.Bytes(2), b"\x03\x03")
- encrypted_record = schema.LengthPrefixedBytes(schema.uint16be)
- class ServerName(schema.BinarySchema):
- name_type = schema.MustEqual(
- schema.SizedIntEnum(schema.uint8, NameType), NameType.host_name
- )
- name = schema.Switch(
- "name_type", {NameType.host_name: schema.LengthPrefixedString(schema.uint16be)}
- )
- class PskIdentity(schema.BinarySchema):
- identity = schema.LengthPrefixedBytes(schema.uint16be)
- obfuscated_ticket_age = schema.uint32be
- class OfferedPsks(schema.BinarySchema):
- identities = schema.LengthPrefixedObjectList(schema.uint16be, PskIdentity)
- binders = schema.LengthPrefixedObjectList(
- schema.uint16be, schema.LengthPrefixedBytes(schema.uint8)
- )
- class KeyShareEntry(schema.BinarySchema):
- group = schema.SizedIntEnum(schema.uint16be, NamedGroup)
- key_exchange = schema.LengthPrefixedBytes(schema.uint16be)
- extensions = {
- ExtensionType.server_name: schema.LengthPrefixedObject(
- schema.uint16be, schema.LengthPrefixedObjectList(schema.uint16be, ServerName)
- ),
- ExtensionType.signature_algorithms: schema.LengthPrefixedObject(
- schema.uint16be,
- schema.LengthPrefixedObjectList(
- schema.uint16be, schema.SizedIntEnum(schema.uint16be, SignatureScheme)
- ),
- ),
- ExtensionType.supported_groups: schema.LengthPrefixedObject(
- schema.uint16be,
- schema.LengthPrefixedObjectList(
- schema.uint16be, schema.SizedIntEnum(schema.uint16be, NamedGroup)
- ),
- ),
- ExtensionType.psk_key_exchange_modes: schema.LengthPrefixedObject(
- schema.uint16be,
- schema.LengthPrefixedObjectList(
- schema.uint8, schema.SizedIntEnum(schema.uint8, PskKeyExchangeMode)
- ),
- ),
- ExtensionType.early_data: schema.LengthPrefixedBytes(schema.uint16be),
- ExtensionType.pre_shared_key: schema.LengthPrefixedObject(
- schema.uint16be, OfferedPsks
- ),
- }
- client_extensions = extensions.copy()
- client_extensions[ExtensionType.supported_versions] = schema.LengthPrefixedObject(
- schema.uint16be, schema.LengthPrefixedObjectList(schema.uint8, schema.Bytes(2))
- )
- client_extensions[ExtensionType.key_share] = schema.LengthPrefixedObject(
- schema.uint16be, schema.LengthPrefixedObjectList(schema.uint16be, KeyShareEntry)
- )
- server_extensions = extensions.copy()
- server_extensions[ExtensionType.supported_versions] = schema.LengthPrefixedBytes(
- schema.uint16be
- )
- server_extensions[ExtensionType.key_share] = schema.LengthPrefixedObject(
- schema.uint16be, KeyShareEntry
- )
- class Extension(schema.BinarySchema):
- @classmethod
- def server_names(cls, names):
- return cls(ExtensionType.server_name, [ServerName(..., name) for name in names])
- @classmethod
- def supported_versions(cls, versions):
- return cls(ExtensionType.supported_versions, versions)
- @classmethod
- def selected_version(cls, version):
- return cls(ExtensionType.supported_versions, version)
- @classmethod
- def signature_algorithms(cls, schemes):
- return cls(ExtensionType.signature_algorithms, schemes)
- @classmethod
- def supported_groups(cls, groups):
- return cls(ExtensionType.supported_groups, groups)
- @classmethod
- def key_share(cls, key_share_entries):
- return cls(ExtensionType.key_share, key_share_entries)
- @classmethod
- def psk_key_exchange_modes(cls, modes):
- return cls(ExtensionType.psk_key_exchange_modes, modes)
- @classmethod
- def early_data(cls, data):
- return cls(ExtensionType.early_data, data)
- @classmethod
- def pre_shared_key(cls, offered_psks: OfferedPsks):
- return cls(ExtensionType.pre_shared_key, offered_psks)
- class ServerExtension(Extension):
- ext_type = schema.SizedIntEnum(schema.uint16be, ExtensionType)
- ext_data = schema.Switch("ext_type", server_extensions)
- class ClientExtension(Extension):
- ext_type = schema.SizedIntEnum(schema.uint16be, ExtensionType)
- ext_data = schema.Switch("ext_type", client_extensions)
- class ClientHello(schema.BinarySchema):
- legacy_version = schema.MustEqual(schema.Bytes(2), b"\x03\x03")
- rand = schema.Bytes(32)
- legacy_session_id = schema.LengthPrefixedBytes(schema.uint8)
- cipher_suites = schema.LengthPrefixedObjectList(
- schema.uint16be, schema.SizedIntEnum(schema.uint16be, CipherSuite)
- )
- legacy_compression_methods = schema.MustEqual(schema.Bytes(2), b"\x01\x00")
- extensions = schema.LengthPrefixedObjectList(schema.uint16be, ClientExtension)
- class ServerHello(schema.BinarySchema):
- legacy_version = schema.MustEqual(schema.Bytes(2), b"\x03\x03")
- rand = schema.Bytes(32)
- legacy_session_id_echo = schema.LengthPrefixedBytes(schema.uint8)
- cipher_suite = schema.SizedIntEnum(schema.uint16be, CipherSuite)
- legacy_compression_method = schema.MustEqual(schema.uint8, 0)
- extensions = schema.LengthPrefixedObjectList(schema.uint16be, ServerExtension)
- def get_cipher(self):
- if self.cipher_suite == CipherSuite.TLS_AES_128_GCM_SHA256:
- return ciphers.TLS_AES_128_GCM_SHA256
- elif self.cipher_suite == CipherSuite.TLS_AES_256_GCM_SHA384:
- return ciphers.TLS_AES_256_GCM_SHA384
- elif self.cipher_suite == CipherSuite.TLS_AES_128_CCM_SHA256:
- return ciphers.TLS_AES_128_CCM_SHA256
- elif self.cipher_suite == CipherSuite.TLS_AES_128_CCM_8_SHA256:
- return ciphers.TLS_AES_128_CCM_8_SHA256
- elif self.cipher_suite == CipherSuite.TLS_CHACHA20_POLY1305_SHA256:
- return ciphers.TLS_CHACHA20_POLY1305_SHA256
- else:
- raise Exception("bad cipher suite")
- @property
- def extensions_dict(self):
- return {ext.ext_type: ext.ext_data for ext in self.extensions}
- class CertificateEntry(schema.BinarySchema):
- cert_data = schema.LengthPrefixedBytes(schema.uint24be)
- extensions = schema.LengthPrefixedObjectList(schema.uint16be, ServerExtension)
- # # x = x509.load_der_x509_certificate(data=cert_data, backend=backend)
- # # x = load_certificate(FILETYPE_ASN1, cert_data)
- # from tls.Crypto.PublicKey import RSA
- # key = RSA.import_key(cert_data)
- class Certificate(schema.BinarySchema):
- certificate_request_context = schema.LengthPrefixedBytes(schema.uint8)
- certificate_list = schema.LengthPrefixedObjectList(
- schema.uint24be, CertificateEntry
- )
- class CertificateVerify(schema.BinarySchema):
- algorithm = schema.SizedIntEnum(schema.uint16be, SignatureScheme)
- signature = schema.LengthPrefixedBytes(schema.uint16be)
- class NewSessionTicket(schema.BinarySchema):
- ticket_lifetime = schema.uint32be
- ticket_age_add = schema.uint32be
- ticket_nonce = schema.LengthPrefixedBytes(schema.uint8)
- ticket = schema.LengthPrefixedBytes(schema.uint16be)
- extensions = schema.LengthPrefixedObjectList(schema.uint16be, ServerExtension)
- def __post_init__(self):
- self.outdated_time = time.time() + min(self.ticket_lifetime, MAX_LIFETIME)
- self.obfuscated_ticket_age = (
- (self.ticket_lifetime * 1000) + self.ticket_age_add
- ) % AGE_MOD
- def is_outdated(self):
- return time.time() >= self.outdated_time
- def to_psk_identity(self):
- return PskIdentity(self.ticket, self.obfuscated_ticket_age)
- class Handshake(schema.BinarySchema):
- msg_type = schema.SizedIntEnum(schema.uint8, HandshakeType)
- msg = schema.LengthPrefixedObject(
- schema.uint24be,
- schema.Switch(
- "msg_type",
- {
- HandshakeType.client_hello: ClientHello,
- HandshakeType.server_hello: ServerHello,
- HandshakeType.encrypted_extensions: schema.LengthPrefixedObjectList(
- schema.uint16be, ServerExtension
- ),
- HandshakeType.certificate: Certificate,
- HandshakeType.certificate_verify: CertificateVerify,
- HandshakeType.finished: schema.Bytes(32),
- HandshakeType.new_session_ticket: NewSessionTicket,
- HandshakeType.end_of_early_data: schema.MustEqual(
- schema.Bytes(-1), b""
- ),
- HandshakeType.key_update: schema.SizedIntEnum(
- schema.uint8, KeyUpdateRequest
- ),
- },
- ),
- )
- class Alert(schema.BinarySchema):
- level = schema.SizedIntEnum(schema.uint8, AlertLevel)
- description = schema.SizedIntEnum(schema.uint8, AlertDescription)
- conten_type_cases = {
- ContentType.handshake: Handshake,
- ContentType.application_data: schema.Bytes(-1),
- ContentType.alert: Alert,
- ContentType.change_cipher_spec: schema.MustEqual(schema.Bytes(1), b"\x01"),
- }
- class TLSPlaintext(schema.BinarySchema):
- content_type = schema.SizedIntEnum(schema.uint8, ContentType)
- legacy_record_version = schema.Bytes(2)
- fragment = schema.LengthPrefixedBytes(schema.uint16be)
- # @classmethod
- # def get_handshake(cls, content_type: ContentType):
- # plaintext = yield from cls.get_value()
- # return Handshake.parse(plaintext.fragment)
- @classmethod
- def pack(cls, content_type: ContentType, data: bytes) -> bytes:
- assert len(data) > 0, "need data"
- data = memoryview(data)
- fragments = []
- while True:
- if len(data) > 16384:
- fragments.append(data[:16384])
- data = data[16384:]
- else:
- fragments.append(data)
- break
- is_handshake = content_type is ContentType.handshake
- return b"".join(
- cls(
- content_type,
- b"\x03\x01" if i == 0 and is_handshake else b"\x03\x03",
- bytes(frg),
- ).binary
- for i, frg in enumerate(fragments)
- )
- def is_overflow(self):
- return (
- self.content_type is ContentType.application_data
- and len(self.fragment) > (16384 + 256)
- ) or (
- self.content_type is not ContentType.application_data
- and len(self.fragment) > 16384
- )
- class TLSInnerPlaintext(schema.BinarySchema):
- content = schema.Bytes(-1)
- content_type = schema.SizedIntEnum(schema.uint8, ContentType)
- padding = schema.Bytes(-1)
- def tls_ciphertext(self, cipher):
- return cipher.tls_ciphertext(self.binary)
- @classmethod
- def pack(cls, content, content_type):
- padding = b"\x00" * random.randint(0, 10)
- return cls(content, content_type, padding)
- @classmethod
- def from_alert(cls, alert: Alert):
- padding = b"\x00" * random.randint(0, 10)
- return cls(alert.binary, ContentType.alert, padding)
- @classmethod
- def from_handshake(cls, handshake: Handshake):
- padding = b"\x00" * random.randint(0, 10)
- return cls(handshake.binary, ContentType.handshake, padding)
- @classmethod
- def from_application_data(cls, payload: bytes):
- padding = b"\x00" * random.randint(0, 10)
- return cls(payload, ContentType.application_data, padding)
- @classmethod
- def get_value(cls):
- yield from iofree.wait()
- bytes_ = yield from iofree.read()
- bytes_without_padding = bytes_.rstrip(b"\x00")
- padding_len = len(bytes_) - len(bytes_without_padding)
- content = bytes_without_padding[:-1]
- content_type = bytes_without_padding[-1]
- return cls(content, ContentType(content_type), b"\x00" * padding_len)
|