blob: f177d18f77b856f7009c39d8171d4487885fbf81 [file] [log] [blame]
# Copyright (c) 2003-2016 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Author: Alberto Solino (beto@coresecurity.com)
#
# Description:
# SPNEGO functions used by SMB, SMB2/3 and DCERPC
#
from struct import pack, unpack, calcsize
############### GSS Stuff ################
GSS_API_SPNEGO_UUID = '\x2b\x06\x01\x05\x05\x02'
ASN1_SEQUENCE = 0x30
ASN1_AID = 0x60
ASN1_OID = 0x06
ASN1_OCTET_STRING = 0x04
ASN1_MECH_TYPE = 0xa0
ASN1_MECH_TOKEN = 0xa2
ASN1_SUPPORTED_MECH = 0xa1
ASN1_RESPONSE_TOKEN = 0xa2
ASN1_ENUMERATED = 0x0a
MechTypes = {
'+\x06\x01\x04\x01\x827\x02\x02\x1e': 'SNMPv2-SMI::enterprises.311.2.2.30',
'+\x06\x01\x04\x01\x827\x02\x02\n': 'NTLMSSP - Microsoft NTLM Security Support Provider',
'*\x86H\x82\xf7\x12\x01\x02\x02': 'MS KRB5 - Microsoft Kerberos 5',
'*\x86H\x86\xf7\x12\x01\x02\x02': 'KRB5 - Kerberos 5',
'*\x86H\x86\xf7\x12\x01\x02\x02\x03': 'KRB5 - Kerberos 5 - User to User'
}
TypesMech = dict((v,k) for k, v in MechTypes.iteritems())
def asn1encode(data = ''):
#res = asn1.SEQUENCE(str).encode()
#import binascii
#print '\nalex asn1encode str: %s\n' % binascii.hexlify(str)
if 0 <= len(data) <= 0x7F:
res = pack('B', len(data)) + data
elif 0x80 <= len(data) <= 0xFF:
res = pack('BB', 0x81, len(data)) + data
elif 0x100 <= len(data) <= 0xFFFF:
res = pack('!BH', 0x82, len(data)) + data
elif 0x10000 <= len(data) <= 0xffffff:
res = pack('!BBH', 0x83, len(data) >> 16, len(data) & 0xFFFF) + data
elif 0x1000000 <= len(data) <= 0xffffffff:
res = pack('!BL', 0x84, len(data)) + data
else:
raise Exception('Error in asn1encode')
return str(res)
def asn1decode(data = ''):
len1 = unpack('B', data[:1])[0]
data = data[1:]
if len1 == 0x81:
pad = calcsize('B')
len2 = unpack('B',data[:pad])[0]
data = data[pad:]
ans = data[:len2]
elif len1 == 0x82:
pad = calcsize('H')
len2 = unpack('!H', data[:pad])[0]
data = data[pad:]
ans = data[:len2]
elif len1 == 0x83:
pad = calcsize('B') + calcsize('!H')
len2, len3 = unpack('!BH', data[:pad])
data = data[pad:]
ans = data[:len2 << 16 + len3]
elif len1 == 0x84:
pad = calcsize('!L')
len2 = unpack('!L', data[:pad])[0]
data = data[pad:]
ans = data[:len2]
# 1 byte length, string <= 0x7F
else:
pad = 0
ans = data[:len1]
return ans, len(ans)+pad+1
class GSSAPI:
# Generic GSSAPI Header Format
def __init__(self, data = None):
self.fields = {}
self['UUID'] = GSS_API_SPNEGO_UUID
if data:
self.fromString(data)
pass
def __setitem__(self,key,value):
self.fields[key] = value
def __getitem__(self, key):
return self.fields[key]
def __delitem__(self, key):
del self.fields[key]
def __len__(self):
return len(self.getData())
def __str__(self):
return len(self.getData())
def fromString(self, data = None):
# Manual parse of the GSSAPI Header Format
# It should be something like
# AID = 0x60 TAG, BER Length
# OID = 0x06 TAG
# GSSAPI OID
# UUID data (BER Encoded)
# Payload
next_byte = unpack('B',data[:1])[0]
if next_byte != ASN1_AID:
raise Exception('Unknown AID=%x' % next_byte)
data = data[1:]
decode_data, total_bytes = asn1decode(data)
# Now we should have a OID tag
next_byte = unpack('B',decode_data[:1])[0]
if next_byte != ASN1_OID:
raise Exception('OID tag not found %x' % next_byte)
decode_data = decode_data[1:]
# Now the OID contents, should be SPNEGO UUID
uuid, total_bytes = asn1decode(decode_data)
self['OID'] = uuid
# the rest should be the data
self['Payload'] = decode_data[total_bytes:]
#pass
def dump(self):
for i in self.fields.keys():
print "%s: {%r}" % (i,self[i])
def getData(self):
ans = pack('B',ASN1_AID)
ans += asn1encode(
pack('B',ASN1_OID) +
asn1encode(self['UUID']) +
self['Payload'] )
return ans
class SPNEGO_NegTokenResp:
# http://tools.ietf.org/html/rfc4178#page-9
# NegTokenResp ::= SEQUENCE {
# negState [0] ENUMERATED {
# accept-completed (0),
# accept-incomplete (1),
# reject (2),
# request-mic (3)
# } OPTIONAL,
# -- REQUIRED in the first reply from the target
# supportedMech [1] MechType OPTIONAL,
# -- present only in the first reply from the target
# responseToken [2] OCTET STRING OPTIONAL,
# mechListMIC [3] OCTET STRING OPTIONAL,
# ...
# }
# This structure is not prepended by a GSS generic header!
SPNEGO_NEG_TOKEN_RESP = 0xa1
SPNEGO_NEG_TOKEN_TARG = 0xa0
def __init__(self, data = None):
self.fields = {}
if data:
self.fromString(data)
pass
def __setitem__(self,key,value):
self.fields[key] = value
def __getitem__(self, key):
return self.fields[key]
def __delitem__(self, key):
del self.fields[key]
def __len__(self):
return len(self.getData())
def __str__(self):
return len(self.getData())
def fromString(self, data = 0):
payload = data
next_byte = unpack('B', payload[:1])[0]
if next_byte != SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP:
raise Exception('NegTokenResp not found %x' % next_byte)
payload = payload[1:]
decode_data, total_bytes = asn1decode(payload)
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_SEQUENCE:
raise Exception('SEQUENCE tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
next_byte = unpack('B',decode_data[:1])[0]
if next_byte != ASN1_MECH_TYPE:
# MechType not found, could be an AUTH answer
if next_byte != ASN1_RESPONSE_TOKEN:
raise Exception('MechType/ResponseToken tag not found %x' % next_byte)
else:
decode_data2 = decode_data[1:]
decode_data2, total_bytes = asn1decode(decode_data2)
next_byte = unpack('B', decode_data2[:1])[0]
if next_byte != ASN1_ENUMERATED:
raise Exception('Enumerated tag not found %x' % next_byte)
item, total_bytes2 = asn1decode(decode_data)
self['NegResult'] = item
decode_data = decode_data[1:]
decode_data = decode_data[total_bytes:]
# Do we have more data?
if len(decode_data) == 0:
return
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_SUPPORTED_MECH:
if next_byte != ASN1_RESPONSE_TOKEN:
raise Exception('Supported Mech/ResponseToken tag not found %x' % next_byte)
else:
decode_data2 = decode_data[1:]
decode_data2, total_bytes = asn1decode(decode_data2)
next_byte = unpack('B', decode_data2[:1])[0]
if next_byte != ASN1_OID:
raise Exception('OID tag not found %x' % next_byte)
decode_data2 = decode_data2[1:]
item, total_bytes2 = asn1decode(decode_data2)
self['SuportedMech'] = item
decode_data = decode_data[1:]
decode_data = decode_data[total_bytes:]
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_RESPONSE_TOKEN:
raise Exception('Response token tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_OCTET_STRING:
raise Exception('Octet string token tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
self['ResponseToken'] = decode_data
def dump(self):
for i in self.fields.keys():
print "%s: {%r}" % (i,self[i])
def getData(self):
ans = pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP)
if self.fields.has_key('NegResult') and self.fields.has_key('SupportedMech'):
# Server resp
ans += asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(
pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
asn1encode(
pack('B',ASN1_ENUMERATED) +
asn1encode( self['NegResult'] )) +
pack('B',ASN1_SUPPORTED_MECH) +
asn1encode(
pack('B',ASN1_OID) +
asn1encode(self['SupportedMech'])) +
pack('B',ASN1_RESPONSE_TOKEN ) +
asn1encode(
pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
elif self.fields.has_key('NegResult'):
# Server resp
ans += asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(
pack('B', SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
asn1encode(
pack('B',ASN1_ENUMERATED) +
asn1encode( self['NegResult'] ))))
else:
# Client resp
ans += asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(
pack('B', ASN1_RESPONSE_TOKEN) +
asn1encode(
pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
return ans
class SPNEGO_NegTokenInit(GSSAPI):
# http://tools.ietf.org/html/rfc4178#page-8
# NegTokeInit :: = SEQUENCE {
# mechTypes [0] MechTypeList,
# reqFlags [1] ContextFlags OPTIONAL,
# mechToken [2] OCTET STRING OPTIONAL,
# mechListMIC [3] OCTET STRING OPTIONAL,
# }
SPNEGO_NEG_TOKEN_INIT = 0xa0
def fromString(self, data = 0):
GSSAPI.fromString(self, data)
payload = self['Payload']
next_byte = unpack('B', payload[:1])[0]
if next_byte != SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT:
raise Exception('NegTokenInit not found %x' % next_byte)
payload = payload[1:]
decode_data, total_bytes = asn1decode(payload)
# Now we should have a SEQUENCE Tag
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_SEQUENCE:
raise Exception('SEQUENCE tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes2 = asn1decode(decode_data)
next_byte = unpack('B',decode_data[:1])[0]
if next_byte != ASN1_MECH_TYPE:
raise Exception('MechType tag not found %x' % next_byte)
decode_data = decode_data[1:]
remaining_data = decode_data
decode_data, total_bytes3 = asn1decode(decode_data)
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_SEQUENCE:
raise Exception('SEQUENCE tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes4 = asn1decode(decode_data)
# And finally we should have the MechTypes
self['MechTypes'] = []
while decode_data:
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_OID:
# Not a valid OID, there must be something else we won't unpack
break
decode_data = decode_data[1:]
item, total_bytes = asn1decode(decode_data)
self['MechTypes'].append(item)
decode_data = decode_data[total_bytes:]
# Do we have MechTokens as well?
decode_data = remaining_data[total_bytes3:]
if len(decode_data) > 0:
next_byte = unpack('B', decode_data[:1])[0]
if next_byte == ASN1_MECH_TOKEN:
# We have tokens in here!
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
next_byte = unpack('B', decode_data[:1])[0]
if next_byte == ASN1_OCTET_STRING:
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
self['MechToken'] = decode_data
def getData(self):
mechTypes = ''
for i in self['MechTypes']:
mechTypes += pack('B', ASN1_OID)
mechTypes += asn1encode(i)
mechToken = ''
# Do we have tokens to send?
if self.fields.has_key('MechToken'):
mechToken = pack('B', ASN1_MECH_TOKEN) + asn1encode(
pack('B', ASN1_OCTET_STRING) + asn1encode(
self['MechToken']))
ans = pack('B',SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT)
ans += asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(
pack('B', ASN1_MECH_TYPE) +
asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(mechTypes)) + mechToken ))
self['Payload'] = ans
return GSSAPI.getData(self)