From: Daniel Lenski Date: Sun, 5 Apr 2020 00:17:06 +0000 (-0700) Subject: tncc-emulate.py: update to modernized Python 3.x version X-Git-Tag: v8.09~14^2~5 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=c63ce8c58ec88d42b6e87d7cc79b41b84a97ed24;p=users%2Fdwmw2%2Fopenconnect.git tncc-emulate.py: update to modernized Python 3.x version Copied from: https://github.com/dlenski/juniper-vpn-py/blob/5c5c6c021a80b926990e2598d27f18d3aba60513/tncc.py Signed-off-by: Daniel Lenski --- diff --git a/trojans/tncc-emulate.py b/trojans/tncc-emulate.py index e3f436bb..ed1de9bd 100755 --- a/trojans/tncc-emulate.py +++ b/trojans/tncc-emulate.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # Juniper/Pulse TNCC emulator @@ -19,28 +19,30 @@ import sys import os import logging -import StringIO -import mechanize -import cookielib +from io import StringIO +from http.cookiejar import Cookie, CookieJar import struct -import socket import ssl import base64 import collections import zlib -import HTMLParser +import html.parser as HTMLParser import socket -import netifaces -import urlgrabber -import urllib2 import platform -import json import datetime -import pyasn1_modules.pem -import pyasn1_modules.rfc2459 -import pyasn1.codec.der.decoder +import hashlib import xml.etree.ElementTree +import mechanize +try: + import asn1crypto.pem, asn1crypto.x509 +except ImportError: + asn1crypto = None +try: + import netifaces +except ImportError: + netifaces = None + ssl._create_default_https_context = ssl._create_unverified_context debug = False @@ -92,7 +94,7 @@ def decode_0ce4(buf, indent): def decode_0ce5(buf, indent): s = struct.unpack(str(len(buf)) + "s", buf)[0] logging.debug('%scmd 0ce5 (string) %d bytes', indent, len(buf)) - s = s.rstrip('\0') + s = s.rstrip(b'\0') logging.debug('%s', s) return s @@ -101,11 +103,11 @@ def decode_0ce7(buf, indent): id, s = struct.unpack(">I" + str(len(buf) - 4) + "s", buf) logging.debug('%scmd 0ce7 (id %08x string) %d bytes', indent, id, len(buf)) - if s.startswith('COMPRESSED:'): + if s.startswith(b'COMPRESSED:'): typ, length, data = s.split(':', 2) s = zlib.decompress(data) - s = s.rstrip('\0') + s = s.rstrip(b'\0') logging.debug('%s', s) return (id, s) @@ -121,7 +123,7 @@ def decode_0cf0(buf, indent): def decode_0cf1(buf, indent): s = struct.unpack(str(len(buf)) + "s", buf)[0] logging.debug('%scmd 0cf1 (string) %d bytes', indent, len(buf)) - s = s.rstrip('\0') + s = s.rstrip(b'\0') logging.debug('%s', s) return s @@ -191,7 +193,7 @@ def encode_0ce5(s): # 0ce7 - string with hex prefixer def encode_0ce7(s, prefix): - s += '\0' + s += b'\0' return encode_packet(0x0ce7, 1, struct.pack(">I" + str(len(s)) + "sx", prefix, s)) @@ -201,7 +203,7 @@ def encode_0cf0(buf): # 0cf1 - string without hex prefixer def encode_0cf1(s): - s += '\0' + s += b'\0' return encode_packet(0x0ce5, 1, struct.pack(str(len(s)) + "s", s)) # 0cf3 - u32 @@ -211,68 +213,25 @@ def encode_0cf3(i): class x509cert(object): @staticmethod - def decode_names(data): + def decode_names(names): ret = dict() - for i in range(0, len(data)): - for attr in data[i]: - type = str(attr.getComponentByPosition(0).getComponentByName('type')) - value = str(attr.getComponentByPosition(0).getComponentByName('value')) - value = str(pyasn1.codec.der.decoder.decode(value)[0]) - try: - ret[type].append(value) - except: - ret[type] = [value] + for name in names.chosen: + for attr in name: + type = attr['type'].dotted # dotted-quad value (e.g. '2.5.4.10' = organization) + value = attr['value'].native # literal string value (e.g. 'Bigcorp Inc.') + ret.setdefault(type, []).append(value) return ret - @staticmethod - def decode_time(tm): - - tm_str = tm.getComponent()._value - tz = 0 - - if tm_str[-1] == 'Z': - tz = 0 - tm_str = tm_str[:-1] - elif '-' in tm_str: - tm_str, tz = tm_str.split('-') - tz = datetime.datetime.strptime(tz, '%H%M') - tz = -(tz.hour * 60 + tz.minute) - elif '+' in tm_str: - tm_str, tz = tm_str.split('+') - tz = datetime.datetime.strptime(tz, '%H%M') - tz = tz.hour * 60 + tz.minute - else: - logging.warn('No timezone in certificate') - - if tm.getName() == 'generalTime': - formats = ['%Y%m%d%H%M%S.%f', '%Y%m%d%H%M%S', '%Y%m%d%H%M', '%Y%m%d%H'] - elif tm.getName() == 'utcTime': - formats = ['%y%m%d%H%M%S', '%y%m%d%H%M'] - else: - raise Exception('Unknown time format') - - for fmt in formats: - try: - ret = datetime.datetime.strptime(tm_str, fmt) - ret += datetime.timedelta(minutes=tz) - return ret - except: - pass - - raise Exception('Could not parse certificate time') - def __init__(self, cert_file): with open(cert_file, 'r') as f: self.data = f.read() - f = StringIO.StringIO(self.data) - substrate = pyasn1_modules.pem.readPemFromFile(f) - cert = pyasn1.codec.der.decoder.decode(substrate, pyasn1_modules.rfc2459.Certificate())[0] - tbs = cert.getComponentByName('tbsCertificate') - self.issuer = self.decode_names(tbs.getComponentByName('issuer')) - validity = tbs.getComponentByName('validity') - self.not_before = self.decode_time(validity.getComponentByName("notBefore")) - self.not_after = self.decode_time(validity.getComponentByName("notAfter")) - self.subject = self.decode_names(tbs.getComponentByName('subject')) + type_name, headers, der_bytes = asn1crypto.pem.unarmor(self.data.encode()) + cert = asn1crypto.x509.Certificate.load(der_bytes) + tbs = cert['tbs_certificate'] + self.issuer = self.decode_names(tbs['issuer']) + self.not_before = tbs['validity']['not_before'].native.astimezone(datetime.timezone.utc).replace(tzinfo=None) + self.not_after = tbs['validity']['not_after'].native.astimezone(datetime.timezone.utc).replace(tzinfo=None) + self.subject = self.decode_names(tbs['subject']) class tncc(object): def __init__(self, vpn_host, device_id=None, funk=None, platform=None, hostname=None, mac_addrs=[], certs=[]): @@ -289,7 +248,7 @@ class tncc(object): self.br = mechanize.Browser() - self.cj = cookielib.LWPCookieJar() + self.cj = CookieJar() self.br.set_cookiejar(self.cj) # Browser options @@ -318,7 +277,7 @@ class tncc(object): return None def set_cookie(self, name, value): - cookie = cookielib.Cookie(version=0, name=name, value=value, + cookie = Cookie(version=0, name=name, value=value, port=None, port_specified=False, domain=self.vpn_host, domain_specified=True, domain_initial_dot=False, path=self.path, path_specified=True, secure=True, expires=None, discard=True, @@ -330,7 +289,7 @@ class tncc(object): response = dict() last_key = '' for line in self.r.readlines(): - line = line.strip() + line = line.strip().decode() # Note that msg is too long and gets wrapped, handle it special if last_key == 'msg' and len(line): response['msg'] += line @@ -339,9 +298,10 @@ class tncc(object): try: key, val = line.split('=', 1) response[key] = val - except: + except ValueError: pass last_key = key + logging.debug('Parsed response:\n\t%s', '\n\t'.join('%r: %r,' % pair for pair in response.items())) return response def parse_policy_response(self, msg_data): @@ -361,7 +321,7 @@ class tncc(object): try: key, value = field.split('=', 1) d[key] = value - except: + except ValueError: pass objs.append(d) p = ParamHTMLParser() @@ -406,24 +366,24 @@ class tncc(object): msg += " " - return encode_0ce7(msg, MSG_FUNK_PLATFORM) + return encode_0ce7(msg.encode(), MSG_FUNK_PLATFORM) def gen_funk_present(self): msg = " " % self.platform msg += " " - return encode_0ce7(msg, MSG_FUNK) + return encode_0ce7(msg.encode(), MSG_FUNK) def gen_funk_response(self, certs): msg = " " % self.platform msg += " " msg += "" % self.platform - for name, value in certs.iteritems(): + for name, value in certs.items(): msg += "" % (name, value.data.strip()) msg += "" % (name, value.data.strip()) msg += " " - return encode_0ce7(msg, MSG_FUNK) + return encode_0ce7(msg.encode(), MSG_FUNK) def gen_policy_request(self): policy_blocks = collections.OrderedDict({ @@ -445,11 +405,11 @@ class tncc(object): }) msg = '' - for policy_key, policy_val in policy_blocks.iteritems(): - v = ''.join([ '%s=%s;' % (k, v) for k, v in policy_val.iteritems()]) + for policy_key, policy_val in policy_blocks.items(): + v = ''.join([ '%s=%s;' % (k, v) for k, v in policy_val.items()]) msg += '' % (policy_key, v) - return encode_0ce7(msg, 0xa4c18) + return encode_0ce7(msg.encode(), 0xa4c18) def gen_policy_response(self, policy_objs): # Make a set of policies @@ -471,7 +431,7 @@ class tncc(object): # Default action msg += 'OK\n' - return encode_0ce7(msg, MSG_POLICY) + return encode_0ce7(msg.encode(), MSG_POLICY) def get_cookie(self, dspreauth=None, dssignin=None): @@ -488,25 +448,25 @@ class tncc(object): self.set_cookie('DSSIGNIN', dssignin) inner = self.gen_policy_request() - inner += encode_0ce7('policy request\x00v4', MSG_POLICY) + inner += encode_0ce7(b'policy request\x00v4', MSG_POLICY) if self.funk: inner += self.gen_funk_platform() inner += self.gen_funk_present() - msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5('Accept-Language: en') + encode_0cf3(1)) + msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5(b'Accept-Language: en') + encode_0cf3(1)) logging.debug('Sending packet -') decode_packet(msg_raw) post_attrs = { 'connID': '0', 'timestamp': '0', - 'msg': base64.b64encode(msg_raw), + 'msg': base64.b64encode(msg_raw).decode(), 'firsttime': '1' } if self.deviceid: post_attrs['deviceid'] = self.deviceid - post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.iteritems()]) + post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.items()]) self.r = self.br.open('https://' + self.vpn_host + self.path + 'hc/tnchcupdate.cgi', post_data) # Parse the data returned into a key/value dict @@ -525,25 +485,25 @@ class tncc(object): req_certs = dict() for str_id, sub_str in sub_strings: if str_id == MSG_POLICY: - policy_objs += self.parse_policy_response(sub_str) + policy_objs += self.parse_policy_response(sub_str.decode()) elif str_id == MSG_FUNK: - req_certs = self.parse_funk_response(sub_str) + req_certs = self.parse_funk_response(sub_str.decode()) if debug: for obj in policy_objs: if 'policy' in obj: logging.debug('policy %s', obj['policy']) - for key, val in obj.iteritems(): + for key, val in obj.items(): if key != 'policy': logging.debug('\t%s %s', key, val) # Try to locate the required certificates certs = dict() - for cert_id, req_dns in req_certs.iteritems(): + for cert_id, req_dns in req_certs.items(): for cert in self.avail_certs: fail = False - for dn_name, dn_vals in req_dns.iteritems(): - for name, val in dn_vals.iteritems(): + for dn_name, dn_vals in req_dns.items(): + for name, val in dn_vals.items(): try: if dn_name == 'IssuerDN': assert val in cert.issuer[name] @@ -561,22 +521,22 @@ class tncc(object): if cert_id not in certs: logging.warn('Could not find certificate for %s', str(req_dns)) - inner = '' + inner = b'' if certs: inner += self.gen_funk_response(certs) inner += self.gen_policy_response(policy_objs) - msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5('Accept-Language: en')) + msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5(b'Accept-Language: en')) logging.debug('Sending packet -') decode_packet(msg_raw) post_attrs = { 'connID': '1', - 'msg': base64.b64encode(msg_raw), + 'msg': base64.b64encode(msg_raw).decode(), 'firsttime': '1' } - post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.iteritems()]) + post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.items()]) self.r = self.br.open('https://' + self.vpn_host + self.path + 'hc/tnchcupdate.cgi', post_data) # We have a new DSPREAUTH cookie @@ -607,6 +567,19 @@ class tncc_server(object): # FIXME: Support for periodic updates dsid_value = args['Cookie'] +def fingerprint_checking_SSLSocket(_fingerprint): + class SSLSocket(ssl.SSLSocket): + fingerprint = _fingerprint + def do_handshake(self, *args, **kw): + res = super().do_handshake(*args, **kw) + der_bytes = self.getpeercert(True) + cert = asn1crypto.x509.Certificate.load(der_bytes) + pubkey = cert.public_key.dump() + pin_sha256 = base64.b64encode(hashlib.sha256(pubkey).digest()).decode() + if pin_sha256 != self.fingerprint: + raise Exception("Server fingerprint %s does not match expected pin-sha256:%s" % (pin_sha256, self.fingerprint)) + return SSLSocket + if __name__ == "__main__": vpn_host = sys.argv[1] @@ -618,26 +591,43 @@ if __name__ == "__main__": mac_addrs = [n.strip() for n in os.environ['TNCC_HWADDR'].split(',')] else: mac_addrs = [] - for iface in netifaces.interfaces(): - try: - mac = netifaces.ifaddresses(iface)[netifaces.AF_LINK][0]['addr'] - assert mac != '00:00:00:00:00:00' - mac_addrs.append(mac) - except: - pass + if netifaces is None: + logging.warn("No netifaces module; mac_addrs will be empty.") + else: + for iface in netifaces.interfaces(): + try: + mac = netifaces.ifaddresses(iface)[netifaces.AF_LINK][0]['addr'] + assert mac != '00:00:00:00:00:00' + mac_addrs.append(mac) + except: + pass hostname = os.environ.get('TNCC_HOSTNAME', socket.gethostname()) + fingerprint = os.environ.get('TNCC_SHA256') + if not fingerprint: + logging.warn("TNCC_SHA256 not set, will not validate server certificate") + elif not asn1crypto: + logging.warn("asn1crypto module not available, will not validate server certificate") + else: + # we need to monkey-patch this, because SSLContext.sslsocket_class isn't + # available until Python 3.7 + # https://docs.python.org/3/library/ssl.html#ssl.SSLContext.wrap_socket + ssl.SSLSocket = fingerprint_checking_SSLSocket(fingerprint) + certs = [] - if 'TNCC_CERTS' in os.environ: - now = datetime.datetime.now() - for f in os.environ['TNCC_CERTS'].split(','): - cert = x509cert(f.strip()) - if now < cert.not_before: - logging.warn('WARNING: %s is not yet valid', f) - if now > cert.not_after: - logging.warn('WARNING: %s is expired', f) - certs.append(cert) + if asn1crypto: + if 'TNCC_CERTS' in os.environ: + now = datetime.datetime.utcnow() + for f in os.environ['TNCC_CERTS'].split(','): + cert = x509cert(f.strip()) + if now < cert.not_before: + logging.warn('WARNING: %s is not yet valid', f) + if now > cert.not_after: + logging.warn('WARNING: %s is expired', f) + certs.append(cert) + else: + raise Exception('TNCC_CERTS environment variable set, but asn1crypto module is not available') # \HKEY_CURRENT_USER\Software\Juniper Networks\Device Id device_id = os.environ.get('TNCC_DEVICE_ID') @@ -648,7 +638,7 @@ if __name__ == "__main__": dspreauth_value = sys.argv[2] dssignin_value = sys.argv[3] 'TNCC ', dspreauth_value, dssignin_value - print t.get_cookie(dspreauth, dssignin).value + print(t.get_cookie(dspreauth, dssignin).value) else: sock = socket.fromfd(0, socket.AF_UNIX, socket.SOCK_SEQPACKET) server = tncc_server(sock, t) diff --git a/www/changelog.xml b/www/changelog.xml index 6e95c834..077d758d 100644 --- a/www/changelog.xml +++ b/www/changelog.xml @@ -17,7 +17,7 @@
  • Don't abort Pulse connection when server-provided certificate MD5 doesn't match.
  • Fix off-by-one in check for bad GnuTLS versions, and add build and run time checks.
  • -
  • Convert tncc-wrapper.py to Python 3, and include tncc-emulate.py as well.
  • +
  • Convert tncc-wrapper.py to Python 3, and include modernized tncc-emulate.py as well.

  • OpenConnect v8.06