]> www.infradead.org Git - users/dwmw2/openconnect.git/commitdiff
tncc-emulate.py: update to modernized Python 3.x version
authorDaniel Lenski <dlenski@gmail.com>
Sun, 5 Apr 2020 00:17:06 +0000 (17:17 -0700)
committerDaniel Lenski <dlenski@gmail.com>
Wed, 8 Apr 2020 03:55:08 +0000 (20:55 -0700)
Copied from:
    https://github.com/dlenski/juniper-vpn-py/blob/5c5c6c021a80b926990e2598d27f18d3aba60513/tncc.py

Signed-off-by: Daniel Lenski <dlenski@gmail.com>
trojans/tncc-emulate.py
www/changelog.xml

index e3f436bb4c067cd760f52f01d9396196134e02d9..ed1de9bd2afe40a16f23172ccb7fed20388263db 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python3
 # -*- coding: utf-8 -*-
 
 # Juniper/Pulse TNCC emulator
 import sys
 import os
 import logging
-import StringIO
-import mechanize
-import cookielib
+from io import StringIO
+from http.cookiejar import Cookie, CookieJar
 import struct
-import socket
 import ssl
 import base64
 import collections
 import zlib
-import HTMLParser
+import html.parser as HTMLParser
 import socket
-import netifaces
-import urlgrabber
-import urllib2
 import platform
-import json
 import datetime
-import pyasn1_modules.pem
-import pyasn1_modules.rfc2459
-import pyasn1.codec.der.decoder
+import hashlib
 import xml.etree.ElementTree
 
+import mechanize
+try:
+    import asn1crypto.pem, asn1crypto.x509
+except ImportError:
+    asn1crypto = None
+try:
+    import netifaces
+except ImportError:
+    netifaces = None
+
 ssl._create_default_https_context = ssl._create_unverified_context
 
 debug = False
@@ -92,7 +94,7 @@ def decode_0ce4(buf, indent):
 def decode_0ce5(buf, indent):
     s = struct.unpack(str(len(buf)) + "s", buf)[0]
     logging.debug('%scmd 0ce5 (string) %d bytes', indent, len(buf))
-    s = s.rstrip('\0')
+    s = s.rstrip(b'\0')
     logging.debug('%s', s)
     return s
 
@@ -101,11 +103,11 @@ def decode_0ce7(buf, indent):
     id, s = struct.unpack(">I" + str(len(buf) - 4) + "s", buf)
     logging.debug('%scmd 0ce7 (id %08x string) %d bytes', indent, id, len(buf))
 
-    if s.startswith('COMPRESSED:'):
+    if s.startswith(b'COMPRESSED:'):
         typ, length, data = s.split(':', 2)
         s = zlib.decompress(data)
 
-    s = s.rstrip('\0')
+    s = s.rstrip(b'\0')
     logging.debug('%s', s)
     return (id, s)
 
@@ -121,7 +123,7 @@ def decode_0cf0(buf, indent):
 def decode_0cf1(buf, indent):
     s = struct.unpack(str(len(buf)) + "s", buf)[0]
     logging.debug('%scmd 0cf1 (string) %d bytes', indent, len(buf))
-    s = s.rstrip('\0')
+    s = s.rstrip(b'\0')
     logging.debug('%s', s)
     return s
 
@@ -191,7 +193,7 @@ def encode_0ce5(s):
 
 # 0ce7 - string with hex prefixer
 def encode_0ce7(s, prefix):
-    s += '\0'
+    s += b'\0'
     return encode_packet(0x0ce7, 1, struct.pack(">I" + str(len(s)) + "sx",
                                 prefix, s))
 
@@ -201,7 +203,7 @@ def encode_0cf0(buf):
 
 # 0cf1 - string without hex prefixer
 def encode_0cf1(s):
-    s += '\0'
+    s += b'\0'
     return encode_packet(0x0ce5, 1, struct.pack(str(len(s)) + "s", s))
 
 # 0cf3 - u32
@@ -211,68 +213,25 @@ def encode_0cf3(i):
 class x509cert(object):
 
     @staticmethod
-    def decode_names(data):
+    def decode_names(names):
         ret = dict()
-        for i in range(0, len(data)):
-            for attr in data[i]:
-                type = str(attr.getComponentByPosition(0).getComponentByName('type'))
-                value = str(attr.getComponentByPosition(0).getComponentByName('value'))
-                value = str(pyasn1.codec.der.decoder.decode(value)[0])
-                try:
-                    ret[type].append(value)
-                except:
-                    ret[type] = [value]
+        for name in names.chosen:
+            for attr in name:
+                type = attr['type'].dotted    # dotted-quad value (e.g. '2.5.4.10' = organization)
+                value = attr['value'].native  # literal string value (e.g. 'Bigcorp Inc.')
+                ret.setdefault(type, []).append(value)
         return ret
 
-    @staticmethod
-    def decode_time(tm):
-
-        tm_str = tm.getComponent()._value
-        tz = 0
-
-        if tm_str[-1] == 'Z':
-            tz = 0
-            tm_str = tm_str[:-1]
-        elif '-' in tm_str:
-            tm_str, tz = tm_str.split('-')
-            tz = datetime.datetime.strptime(tz, '%H%M')
-            tz = -(tz.hour * 60 + tz.minute)
-        elif '+' in tm_str:
-            tm_str, tz = tm_str.split('+')
-            tz = datetime.datetime.strptime(tz, '%H%M')
-            tz = tz.hour * 60 + tz.minute
-        else:
-            logging.warn('No timezone in certificate')
-
-        if tm.getName() == 'generalTime':
-            formats = ['%Y%m%d%H%M%S.%f', '%Y%m%d%H%M%S', '%Y%m%d%H%M', '%Y%m%d%H']
-        elif tm.getName() == 'utcTime':
-            formats = ['%y%m%d%H%M%S', '%y%m%d%H%M']
-        else:
-            raise Exception('Unknown time format')
-
-        for fmt in formats:
-            try:
-                ret = datetime.datetime.strptime(tm_str, fmt)
-                ret += datetime.timedelta(minutes=tz)
-                return ret
-            except:
-                pass
-
-        raise Exception('Could not parse certificate time')
-
     def __init__(self, cert_file):
         with open(cert_file, 'r') as f:
             self.data = f.read()
-        f = StringIO.StringIO(self.data)
-        substrate = pyasn1_modules.pem.readPemFromFile(f)
-        cert = pyasn1.codec.der.decoder.decode(substrate, pyasn1_modules.rfc2459.Certificate())[0]
-        tbs = cert.getComponentByName('tbsCertificate')
-        self.issuer = self.decode_names(tbs.getComponentByName('issuer'))
-        validity = tbs.getComponentByName('validity')
-        self.not_before = self.decode_time(validity.getComponentByName("notBefore"))
-        self.not_after = self.decode_time(validity.getComponentByName("notAfter"))
-        self.subject = self.decode_names(tbs.getComponentByName('subject'))
+        type_name, headers, der_bytes = asn1crypto.pem.unarmor(self.data.encode())
+        cert = asn1crypto.x509.Certificate.load(der_bytes)
+        tbs = cert['tbs_certificate']
+        self.issuer = self.decode_names(tbs['issuer'])
+        self.not_before = tbs['validity']['not_before'].native.astimezone(datetime.timezone.utc).replace(tzinfo=None)
+        self.not_after = tbs['validity']['not_after'].native.astimezone(datetime.timezone.utc).replace(tzinfo=None)
+        self.subject = self.decode_names(tbs['subject'])
 
 class tncc(object):
     def __init__(self, vpn_host, device_id=None, funk=None, platform=None, hostname=None, mac_addrs=[], certs=[]):
@@ -289,7 +248,7 @@ class tncc(object):
 
         self.br = mechanize.Browser()
 
-        self.cj = cookielib.LWPCookieJar()
+        self.cj = CookieJar()
         self.br.set_cookiejar(self.cj)
 
         # Browser options
@@ -318,7 +277,7 @@ class tncc(object):
         return None
 
     def set_cookie(self, name, value):
-        cookie = cookielib.Cookie(version=0, name=name, value=value,
+        cookie = Cookie(version=0, name=name, value=value,
                 port=None, port_specified=False, domain=self.vpn_host,
                 domain_specified=True, domain_initial_dot=False, path=self.path,
                 path_specified=True, secure=True, expires=None, discard=True,
@@ -330,7 +289,7 @@ class tncc(object):
         response = dict()
         last_key = ''
         for line in self.r.readlines():
-            line = line.strip()
+            line = line.strip().decode()
             # Note that msg is too long and gets wrapped, handle it special
             if last_key == 'msg' and len(line):
                 response['msg'] += line
@@ -339,9 +298,10 @@ class tncc(object):
                 try:
                     key, val = line.split('=', 1)
                     response[key] = val
-                except:
+                except ValueError:
                     pass
                 last_key = key
+        logging.debug('Parsed response:\n\t%s', '\n\t'.join('%r: %r,' % pair for pair in response.items()))
         return response
 
     def parse_policy_response(self, msg_data):
@@ -361,7 +321,7 @@ class tncc(object):
                                 try:
                                     key, value = field.split('=', 1)
                                     d[key] = value
-                                except:
+                                except ValueError:
                                     pass
                             objs.append(d)
         p = ParamHTMLParser()
@@ -406,24 +366,24 @@ class tncc(object):
 
         msg += "</ClientAttributes>  </FunkMessage>"
 
-        return encode_0ce7(msg, MSG_FUNK_PLATFORM)
+        return encode_0ce7(msg.encode(), MSG_FUNK_PLATFORM)
 
     def gen_funk_present(self):
         msg = "<FunkMessage VendorID='2636' ProductID='1' Version='1' Platform='%s' ClientType='Agentless'> " % self.platform
         msg += "<Present SequenceID='0'></Present>  </FunkMessage>"
-        return encode_0ce7(msg, MSG_FUNK)
+        return encode_0ce7(msg.encode(), MSG_FUNK)
 
     def gen_funk_response(self, certs):
 
         msg = "<FunkMessage VendorID='2636' ProductID='1' Version='1' Platform='%s' ClientType='Agentless'> " % self.platform
         msg += "<ClientAttributes SequenceID='0'> "
         msg += "<Attribute Name='Platform' Value='%s' />" % self.platform
-        for name, value in certs.iteritems():
+        for name, value in certs.items():
             msg += "<Attribute Name='%s' Value='%s' />" % (name, value.data.strip())
             msg += "<Attribute Name='%s' Value='%s' />" % (name, value.data.strip())
         msg += "</ClientAttributes>  </FunkMessage>"
 
-        return encode_0ce7(msg, MSG_FUNK)
+        return encode_0ce7(msg.encode(), MSG_FUNK)
 
     def gen_policy_request(self):
         policy_blocks = collections.OrderedDict({
@@ -445,11 +405,11 @@ class tncc(object):
         })
 
         msg = ''
-        for policy_key, policy_val in policy_blocks.iteritems():
-            v = ''.join([ '%s=%s;' % (k, v) for k, v in policy_val.iteritems()])
+        for policy_key, policy_val in policy_blocks.items():
+            v = ''.join([ '%s=%s;' % (k, v) for k, v in policy_val.items()])
             msg += '<parameter name="%s" value="%s">' % (policy_key, v)
 
-        return encode_0ce7(msg, 0xa4c18)
+        return encode_0ce7(msg.encode(), 0xa4c18)
 
     def gen_policy_response(self, policy_objs):
         # Make a set of policies
@@ -471,7 +431,7 @@ class tncc(object):
                 # Default action
                 msg += 'OK\n'
 
-        return encode_0ce7(msg, MSG_POLICY)
+        return encode_0ce7(msg.encode(), MSG_POLICY)
 
     def get_cookie(self, dspreauth=None, dssignin=None):
 
@@ -488,25 +448,25 @@ class tncc(object):
                 self.set_cookie('DSSIGNIN', dssignin)
 
         inner = self.gen_policy_request()
-        inner += encode_0ce7('policy request\x00v4', MSG_POLICY)
+        inner += encode_0ce7(b'policy request\x00v4', MSG_POLICY)
         if self.funk:
             inner += self.gen_funk_platform()
             inner += self.gen_funk_present()
 
-        msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5('Accept-Language: en') + encode_0cf3(1))
+        msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5(b'Accept-Language: en') + encode_0cf3(1))
         logging.debug('Sending packet -')
         decode_packet(msg_raw)
 
         post_attrs = {
             'connID': '0',
             'timestamp': '0',
-            'msg': base64.b64encode(msg_raw),
+            'msg': base64.b64encode(msg_raw).decode(),
             'firsttime': '1'
         }
         if self.deviceid:
             post_attrs['deviceid'] = self.deviceid
 
-        post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.iteritems()])
+        post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.items()])
         self.r = self.br.open('https://' + self.vpn_host + self.path + 'hc/tnchcupdate.cgi', post_data)
 
         # Parse the data returned into a key/value dict
@@ -525,25 +485,25 @@ class tncc(object):
         req_certs = dict()
         for str_id, sub_str in sub_strings:
             if str_id == MSG_POLICY:
-                policy_objs += self.parse_policy_response(sub_str)
+                policy_objs += self.parse_policy_response(sub_str.decode())
             elif str_id == MSG_FUNK:
-                req_certs = self.parse_funk_response(sub_str)
+                req_certs = self.parse_funk_response(sub_str.decode())
 
         if debug:
             for obj in policy_objs:
                 if 'policy' in obj:
                     logging.debug('policy %s', obj['policy'])
-                    for key, val in obj.iteritems():
+                    for key, val in obj.items():
                         if key != 'policy':
                             logging.debug('\t%s %s', key, val)
 
         # Try to locate the required certificates
         certs = dict()
-        for cert_id, req_dns in req_certs.iteritems():
+        for cert_id, req_dns in req_certs.items():
             for cert in self.avail_certs:
                 fail = False
-                for dn_name, dn_vals in req_dns.iteritems():
-                    for name, val in dn_vals.iteritems():
+                for dn_name, dn_vals in req_dns.items():
+                    for name, val in dn_vals.items():
                         try:
                             if dn_name == 'IssuerDN':
                                 assert val in cert.issuer[name]
@@ -561,22 +521,22 @@ class tncc(object):
             if cert_id not in certs:
                 logging.warn('Could not find certificate for %s', str(req_dns))
 
-        inner = ''
+        inner = b''
         if certs:
             inner += self.gen_funk_response(certs)
         inner += self.gen_policy_response(policy_objs)
 
-        msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5('Accept-Language: en'))
+        msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5(b'Accept-Language: en'))
         logging.debug('Sending packet -')
         decode_packet(msg_raw)
 
         post_attrs = {
             'connID': '1',
-            'msg': base64.b64encode(msg_raw),
+            'msg': base64.b64encode(msg_raw).decode(),
             'firsttime': '1'
         }
 
-        post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.iteritems()])
+        post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.items()])
         self.r = self.br.open('https://' + self.vpn_host + self.path + 'hc/tnchcupdate.cgi', post_data)
 
         # We have a new DSPREAUTH cookie
@@ -607,6 +567,19 @@ class tncc_server(object):
             # FIXME: Support for periodic updates
             dsid_value = args['Cookie']
 
+def fingerprint_checking_SSLSocket(_fingerprint):
+    class SSLSocket(ssl.SSLSocket):
+        fingerprint = _fingerprint
+        def do_handshake(self, *args, **kw):
+            res = super().do_handshake(*args, **kw)
+            der_bytes = self.getpeercert(True)
+            cert = asn1crypto.x509.Certificate.load(der_bytes)
+            pubkey = cert.public_key.dump()
+            pin_sha256 = base64.b64encode(hashlib.sha256(pubkey).digest()).decode()
+            if pin_sha256 != self.fingerprint:
+                raise Exception("Server fingerprint %s does not match expected pin-sha256:%s" % (pin_sha256, self.fingerprint))
+    return SSLSocket
+
 if __name__ == "__main__":
     vpn_host = sys.argv[1]
 
@@ -618,26 +591,43 @@ if __name__ == "__main__":
         mac_addrs = [n.strip() for n in os.environ['TNCC_HWADDR'].split(',')]
     else:
         mac_addrs = []
-        for iface in netifaces.interfaces():
-            try:
-                mac = netifaces.ifaddresses(iface)[netifaces.AF_LINK][0]['addr']
-                assert mac != '00:00:00:00:00:00'
-                mac_addrs.append(mac)
-            except:
-                pass
+        if netifaces is None:
+            logging.warn("No netifaces module; mac_addrs will be empty.")
+        else:
+            for iface in netifaces.interfaces():
+                try:
+                    mac = netifaces.ifaddresses(iface)[netifaces.AF_LINK][0]['addr']
+                    assert mac != '00:00:00:00:00:00'
+                    mac_addrs.append(mac)
+                except:
+                    pass
 
     hostname = os.environ.get('TNCC_HOSTNAME', socket.gethostname())
 
+    fingerprint = os.environ.get('TNCC_SHA256')
+    if not fingerprint:
+        logging.warn("TNCC_SHA256 not set, will not validate server certificate")
+    elif not asn1crypto:
+        logging.warn("asn1crypto module not available, will not validate server certificate")
+    else:
+        # we need to monkey-patch this, because SSLContext.sslsocket_class isn't
+        # available until Python 3.7
+        # https://docs.python.org/3/library/ssl.html#ssl.SSLContext.wrap_socket
+        ssl.SSLSocket = fingerprint_checking_SSLSocket(fingerprint)
+
     certs = []
-    if 'TNCC_CERTS' in os.environ:
-        now = datetime.datetime.now()
-        for f in os.environ['TNCC_CERTS'].split(','):
-            cert = x509cert(f.strip())
-            if now < cert.not_before:
-                logging.warn('WARNING: %s is not yet valid', f)
-            if now > cert.not_after:
-                logging.warn('WARNING: %s is expired', f)
-            certs.append(cert)
+    if asn1crypto:
+        if 'TNCC_CERTS' in os.environ:
+            now = datetime.datetime.utcnow()
+            for f in os.environ['TNCC_CERTS'].split(','):
+                cert = x509cert(f.strip())
+                if now < cert.not_before:
+                    logging.warn('WARNING: %s is not yet valid', f)
+                if now > cert.not_after:
+                    logging.warn('WARNING: %s is expired', f)
+                certs.append(cert)
+    else:
+        raise Exception('TNCC_CERTS environment variable set, but asn1crypto module is not available')
 
     # \HKEY_CURRENT_USER\Software\Juniper Networks\Device Id
     device_id = os.environ.get('TNCC_DEVICE_ID')
@@ -648,7 +638,7 @@ if __name__ == "__main__":
         dspreauth_value = sys.argv[2]
         dssignin_value = sys.argv[3]
         'TNCC ', dspreauth_value, dssignin_value
-        print t.get_cookie(dspreauth, dssignin).value
+        print(t.get_cookie(dspreauth, dssignin).value)
     else:
         sock = socket.fromfd(0, socket.AF_UNIX, socket.SOCK_SEQPACKET)
         server = tncc_server(sock, t)
index 6e95c8347415cce0ef44668747dc2e710f4baaab..077d758d25fe0ad8a36b4fcea525c4532c4f2df6 100644 (file)
@@ -17,7 +17,7 @@
      <ul>
        <li>Don't abort Pulse connection when server-provided certificate MD5 doesn't match.</li>
        <li>Fix off-by-one in check for bad GnuTLS versions, and add build and run time checks.</li>
-       <li>Convert <tt>tncc-wrapper.py</tt> to Python 3, and include <tt>tncc-emulate.py</tt> as well.</li>
+       <li>Convert <tt>tncc-wrapper.py</tt> to Python 3, and include modernized <tt>tncc-emulate.py</tt> as well.</li>
      </ul><br/>
   </li>
   <li><b><a href="ftp://ftp.infradead.org/pub/openconnect/openconnect-8.06.tar.gz">OpenConnect v8.06</a></b>