-#!/usr/bin/python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Juniper/Pulse TNCC emulator
import sys
import os
import logging
-import StringIO
-import mechanize
-import cookielib
+from io import StringIO
+from http.cookiejar import Cookie, CookieJar
import struct
-import socket
import ssl
import base64
import collections
import zlib
-import HTMLParser
+import html.parser as HTMLParser
import socket
-import netifaces
-import urlgrabber
-import urllib2
import platform
-import json
import datetime
-import pyasn1_modules.pem
-import pyasn1_modules.rfc2459
-import pyasn1.codec.der.decoder
+import hashlib
import xml.etree.ElementTree
+import mechanize
+try:
+ import asn1crypto.pem, asn1crypto.x509
+except ImportError:
+ asn1crypto = None
+try:
+ import netifaces
+except ImportError:
+ netifaces = None
+
ssl._create_default_https_context = ssl._create_unverified_context
debug = False
def decode_0ce5(buf, indent):
s = struct.unpack(str(len(buf)) + "s", buf)[0]
logging.debug('%scmd 0ce5 (string) %d bytes', indent, len(buf))
- s = s.rstrip('\0')
+ s = s.rstrip(b'\0')
logging.debug('%s', s)
return s
id, s = struct.unpack(">I" + str(len(buf) - 4) + "s", buf)
logging.debug('%scmd 0ce7 (id %08x string) %d bytes', indent, id, len(buf))
- if s.startswith('COMPRESSED:'):
+ if s.startswith(b'COMPRESSED:'):
typ, length, data = s.split(':', 2)
s = zlib.decompress(data)
- s = s.rstrip('\0')
+ s = s.rstrip(b'\0')
logging.debug('%s', s)
return (id, s)
def decode_0cf1(buf, indent):
s = struct.unpack(str(len(buf)) + "s", buf)[0]
logging.debug('%scmd 0cf1 (string) %d bytes', indent, len(buf))
- s = s.rstrip('\0')
+ s = s.rstrip(b'\0')
logging.debug('%s', s)
return s
# 0ce7 - string with hex prefixer
def encode_0ce7(s, prefix):
- s += '\0'
+ s += b'\0'
return encode_packet(0x0ce7, 1, struct.pack(">I" + str(len(s)) + "sx",
prefix, s))
# 0cf1 - string without hex prefixer
def encode_0cf1(s):
- s += '\0'
+ s += b'\0'
return encode_packet(0x0ce5, 1, struct.pack(str(len(s)) + "s", s))
# 0cf3 - u32
class x509cert(object):
@staticmethod
- def decode_names(data):
+ def decode_names(names):
ret = dict()
- for i in range(0, len(data)):
- for attr in data[i]:
- type = str(attr.getComponentByPosition(0).getComponentByName('type'))
- value = str(attr.getComponentByPosition(0).getComponentByName('value'))
- value = str(pyasn1.codec.der.decoder.decode(value)[0])
- try:
- ret[type].append(value)
- except:
- ret[type] = [value]
+ for name in names.chosen:
+ for attr in name:
+ type = attr['type'].dotted # dotted-quad value (e.g. '2.5.4.10' = organization)
+ value = attr['value'].native # literal string value (e.g. 'Bigcorp Inc.')
+ ret.setdefault(type, []).append(value)
return ret
- @staticmethod
- def decode_time(tm):
-
- tm_str = tm.getComponent()._value
- tz = 0
-
- if tm_str[-1] == 'Z':
- tz = 0
- tm_str = tm_str[:-1]
- elif '-' in tm_str:
- tm_str, tz = tm_str.split('-')
- tz = datetime.datetime.strptime(tz, '%H%M')
- tz = -(tz.hour * 60 + tz.minute)
- elif '+' in tm_str:
- tm_str, tz = tm_str.split('+')
- tz = datetime.datetime.strptime(tz, '%H%M')
- tz = tz.hour * 60 + tz.minute
- else:
- logging.warn('No timezone in certificate')
-
- if tm.getName() == 'generalTime':
- formats = ['%Y%m%d%H%M%S.%f', '%Y%m%d%H%M%S', '%Y%m%d%H%M', '%Y%m%d%H']
- elif tm.getName() == 'utcTime':
- formats = ['%y%m%d%H%M%S', '%y%m%d%H%M']
- else:
- raise Exception('Unknown time format')
-
- for fmt in formats:
- try:
- ret = datetime.datetime.strptime(tm_str, fmt)
- ret += datetime.timedelta(minutes=tz)
- return ret
- except:
- pass
-
- raise Exception('Could not parse certificate time')
-
def __init__(self, cert_file):
with open(cert_file, 'r') as f:
self.data = f.read()
- f = StringIO.StringIO(self.data)
- substrate = pyasn1_modules.pem.readPemFromFile(f)
- cert = pyasn1.codec.der.decoder.decode(substrate, pyasn1_modules.rfc2459.Certificate())[0]
- tbs = cert.getComponentByName('tbsCertificate')
- self.issuer = self.decode_names(tbs.getComponentByName('issuer'))
- validity = tbs.getComponentByName('validity')
- self.not_before = self.decode_time(validity.getComponentByName("notBefore"))
- self.not_after = self.decode_time(validity.getComponentByName("notAfter"))
- self.subject = self.decode_names(tbs.getComponentByName('subject'))
+ type_name, headers, der_bytes = asn1crypto.pem.unarmor(self.data.encode())
+ cert = asn1crypto.x509.Certificate.load(der_bytes)
+ tbs = cert['tbs_certificate']
+ self.issuer = self.decode_names(tbs['issuer'])
+ self.not_before = tbs['validity']['not_before'].native.astimezone(datetime.timezone.utc).replace(tzinfo=None)
+ self.not_after = tbs['validity']['not_after'].native.astimezone(datetime.timezone.utc).replace(tzinfo=None)
+ self.subject = self.decode_names(tbs['subject'])
class tncc(object):
def __init__(self, vpn_host, device_id=None, funk=None, platform=None, hostname=None, mac_addrs=[], certs=[]):
self.br = mechanize.Browser()
- self.cj = cookielib.LWPCookieJar()
+ self.cj = CookieJar()
self.br.set_cookiejar(self.cj)
# Browser options
return None
def set_cookie(self, name, value):
- cookie = cookielib.Cookie(version=0, name=name, value=value,
+ cookie = Cookie(version=0, name=name, value=value,
port=None, port_specified=False, domain=self.vpn_host,
domain_specified=True, domain_initial_dot=False, path=self.path,
path_specified=True, secure=True, expires=None, discard=True,
response = dict()
last_key = ''
for line in self.r.readlines():
- line = line.strip()
+ line = line.strip().decode()
# Note that msg is too long and gets wrapped, handle it special
if last_key == 'msg' and len(line):
response['msg'] += line
try:
key, val = line.split('=', 1)
response[key] = val
- except:
+ except ValueError:
pass
last_key = key
+ logging.debug('Parsed response:\n\t%s', '\n\t'.join('%r: %r,' % pair for pair in response.items()))
return response
def parse_policy_response(self, msg_data):
try:
key, value = field.split('=', 1)
d[key] = value
- except:
+ except ValueError:
pass
objs.append(d)
p = ParamHTMLParser()
msg += "</ClientAttributes> </FunkMessage>"
- return encode_0ce7(msg, MSG_FUNK_PLATFORM)
+ return encode_0ce7(msg.encode(), MSG_FUNK_PLATFORM)
def gen_funk_present(self):
msg = "<FunkMessage VendorID='2636' ProductID='1' Version='1' Platform='%s' ClientType='Agentless'> " % self.platform
msg += "<Present SequenceID='0'></Present> </FunkMessage>"
- return encode_0ce7(msg, MSG_FUNK)
+ return encode_0ce7(msg.encode(), MSG_FUNK)
def gen_funk_response(self, certs):
msg = "<FunkMessage VendorID='2636' ProductID='1' Version='1' Platform='%s' ClientType='Agentless'> " % self.platform
msg += "<ClientAttributes SequenceID='0'> "
msg += "<Attribute Name='Platform' Value='%s' />" % self.platform
- for name, value in certs.iteritems():
+ for name, value in certs.items():
msg += "<Attribute Name='%s' Value='%s' />" % (name, value.data.strip())
msg += "<Attribute Name='%s' Value='%s' />" % (name, value.data.strip())
msg += "</ClientAttributes> </FunkMessage>"
- return encode_0ce7(msg, MSG_FUNK)
+ return encode_0ce7(msg.encode(), MSG_FUNK)
def gen_policy_request(self):
policy_blocks = collections.OrderedDict({
})
msg = ''
- for policy_key, policy_val in policy_blocks.iteritems():
- v = ''.join([ '%s=%s;' % (k, v) for k, v in policy_val.iteritems()])
+ for policy_key, policy_val in policy_blocks.items():
+ v = ''.join([ '%s=%s;' % (k, v) for k, v in policy_val.items()])
msg += '<parameter name="%s" value="%s">' % (policy_key, v)
- return encode_0ce7(msg, 0xa4c18)
+ return encode_0ce7(msg.encode(), 0xa4c18)
def gen_policy_response(self, policy_objs):
# Make a set of policies
# Default action
msg += 'OK\n'
- return encode_0ce7(msg, MSG_POLICY)
+ return encode_0ce7(msg.encode(), MSG_POLICY)
def get_cookie(self, dspreauth=None, dssignin=None):
self.set_cookie('DSSIGNIN', dssignin)
inner = self.gen_policy_request()
- inner += encode_0ce7('policy request\x00v4', MSG_POLICY)
+ inner += encode_0ce7(b'policy request\x00v4', MSG_POLICY)
if self.funk:
inner += self.gen_funk_platform()
inner += self.gen_funk_present()
- msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5('Accept-Language: en') + encode_0cf3(1))
+ msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5(b'Accept-Language: en') + encode_0cf3(1))
logging.debug('Sending packet -')
decode_packet(msg_raw)
post_attrs = {
'connID': '0',
'timestamp': '0',
- 'msg': base64.b64encode(msg_raw),
+ 'msg': base64.b64encode(msg_raw).decode(),
'firsttime': '1'
}
if self.deviceid:
post_attrs['deviceid'] = self.deviceid
- post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.iteritems()])
+ post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.items()])
self.r = self.br.open('https://' + self.vpn_host + self.path + 'hc/tnchcupdate.cgi', post_data)
# Parse the data returned into a key/value dict
req_certs = dict()
for str_id, sub_str in sub_strings:
if str_id == MSG_POLICY:
- policy_objs += self.parse_policy_response(sub_str)
+ policy_objs += self.parse_policy_response(sub_str.decode())
elif str_id == MSG_FUNK:
- req_certs = self.parse_funk_response(sub_str)
+ req_certs = self.parse_funk_response(sub_str.decode())
if debug:
for obj in policy_objs:
if 'policy' in obj:
logging.debug('policy %s', obj['policy'])
- for key, val in obj.iteritems():
+ for key, val in obj.items():
if key != 'policy':
logging.debug('\t%s %s', key, val)
# Try to locate the required certificates
certs = dict()
- for cert_id, req_dns in req_certs.iteritems():
+ for cert_id, req_dns in req_certs.items():
for cert in self.avail_certs:
fail = False
- for dn_name, dn_vals in req_dns.iteritems():
- for name, val in dn_vals.iteritems():
+ for dn_name, dn_vals in req_dns.items():
+ for name, val in dn_vals.items():
try:
if dn_name == 'IssuerDN':
assert val in cert.issuer[name]
if cert_id not in certs:
logging.warn('Could not find certificate for %s', str(req_dns))
- inner = ''
+ inner = b''
if certs:
inner += self.gen_funk_response(certs)
inner += self.gen_policy_response(policy_objs)
- msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5('Accept-Language: en'))
+ msg_raw = encode_0013(encode_0ce4(inner) + encode_0ce5(b'Accept-Language: en'))
logging.debug('Sending packet -')
decode_packet(msg_raw)
post_attrs = {
'connID': '1',
- 'msg': base64.b64encode(msg_raw),
+ 'msg': base64.b64encode(msg_raw).decode(),
'firsttime': '1'
}
- post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.iteritems()])
+ post_data = ''.join([ '%s=%s;' % (k, v) for k, v in post_attrs.items()])
self.r = self.br.open('https://' + self.vpn_host + self.path + 'hc/tnchcupdate.cgi', post_data)
# We have a new DSPREAUTH cookie
# FIXME: Support for periodic updates
dsid_value = args['Cookie']
+def fingerprint_checking_SSLSocket(_fingerprint):
+ class SSLSocket(ssl.SSLSocket):
+ fingerprint = _fingerprint
+ def do_handshake(self, *args, **kw):
+ res = super().do_handshake(*args, **kw)
+ der_bytes = self.getpeercert(True)
+ cert = asn1crypto.x509.Certificate.load(der_bytes)
+ pubkey = cert.public_key.dump()
+ pin_sha256 = base64.b64encode(hashlib.sha256(pubkey).digest()).decode()
+ if pin_sha256 != self.fingerprint:
+ raise Exception("Server fingerprint %s does not match expected pin-sha256:%s" % (pin_sha256, self.fingerprint))
+ return SSLSocket
+
if __name__ == "__main__":
vpn_host = sys.argv[1]
mac_addrs = [n.strip() for n in os.environ['TNCC_HWADDR'].split(',')]
else:
mac_addrs = []
- for iface in netifaces.interfaces():
- try:
- mac = netifaces.ifaddresses(iface)[netifaces.AF_LINK][0]['addr']
- assert mac != '00:00:00:00:00:00'
- mac_addrs.append(mac)
- except:
- pass
+ if netifaces is None:
+ logging.warn("No netifaces module; mac_addrs will be empty.")
+ else:
+ for iface in netifaces.interfaces():
+ try:
+ mac = netifaces.ifaddresses(iface)[netifaces.AF_LINK][0]['addr']
+ assert mac != '00:00:00:00:00:00'
+ mac_addrs.append(mac)
+ except:
+ pass
hostname = os.environ.get('TNCC_HOSTNAME', socket.gethostname())
+ fingerprint = os.environ.get('TNCC_SHA256')
+ if not fingerprint:
+ logging.warn("TNCC_SHA256 not set, will not validate server certificate")
+ elif not asn1crypto:
+ logging.warn("asn1crypto module not available, will not validate server certificate")
+ else:
+ # we need to monkey-patch this, because SSLContext.sslsocket_class isn't
+ # available until Python 3.7
+ # https://docs.python.org/3/library/ssl.html#ssl.SSLContext.wrap_socket
+ ssl.SSLSocket = fingerprint_checking_SSLSocket(fingerprint)
+
certs = []
- if 'TNCC_CERTS' in os.environ:
- now = datetime.datetime.now()
- for f in os.environ['TNCC_CERTS'].split(','):
- cert = x509cert(f.strip())
- if now < cert.not_before:
- logging.warn('WARNING: %s is not yet valid', f)
- if now > cert.not_after:
- logging.warn('WARNING: %s is expired', f)
- certs.append(cert)
+ if asn1crypto:
+ if 'TNCC_CERTS' in os.environ:
+ now = datetime.datetime.utcnow()
+ for f in os.environ['TNCC_CERTS'].split(','):
+ cert = x509cert(f.strip())
+ if now < cert.not_before:
+ logging.warn('WARNING: %s is not yet valid', f)
+ if now > cert.not_after:
+ logging.warn('WARNING: %s is expired', f)
+ certs.append(cert)
+ else:
+ raise Exception('TNCC_CERTS environment variable set, but asn1crypto module is not available')
# \HKEY_CURRENT_USER\Software\Juniper Networks\Device Id
device_id = os.environ.get('TNCC_DEVICE_ID')
dspreauth_value = sys.argv[2]
dssignin_value = sys.argv[3]
'TNCC ', dspreauth_value, dssignin_value
- print t.get_cookie(dspreauth, dssignin).value
+ print(t.get_cookie(dspreauth, dssignin).value)
else:
sock = socket.fromfd(0, socket.AF_UNIX, socket.SOCK_SEQPACKET)
server = tncc_server(sock, t)