# SECUREAUTH LABS. Copyright 2018 SecureAuth Corporation. All rights reserved. # # This software is provided under under a slightly modified version # of the Apache Software License. See the accompanying LICENSE file # for more information. # # Author: Alberto Solino (@agsolino) # # Description: # [MS-NRPC] Interface implementation # # Best way to learn how to use these calls is to grab the protocol standard # so you understand what the call does, and then read the test case located # at https://github.com/SecureAuthCorp/impacket/tree/master/tests/SMB_RPC # # Some calls have helper functions, which makes it even easier to use. # They are located at the end of this file. # Helper functions start with "h". # There are test cases for them too. # from struct import pack from six import b from impacket.dcerpc.v5.ndr import NDRCALL, NDRSTRUCT, NDRENUM, NDRUNION, NDRPOINTER, NDRUniConformantArray, \ NDRUniFixedArray, NDRUniConformantVaryingArray from impacket.dcerpc.v5.dtypes import WSTR, LPWSTR, DWORD, ULONG, USHORT, PGUID, NTSTATUS, NULL, LONG, UCHAR, PRPC_SID, \ GUID, RPC_UNICODE_STRING, SECURITY_INFORMATION, LPULONG from impacket import system_errors, nt_errors from impacket.uuid import uuidtup_to_bin from impacket.dcerpc.v5.enum import Enum from impacket.dcerpc.v5.samr import OLD_LARGE_INTEGER from impacket.dcerpc.v5.lsad import PLSA_FOREST_TRUST_INFORMATION from impacket.dcerpc.v5.rpcrt import DCERPCException from impacket.structure import Structure from impacket import ntlm, crypto, LOG import hmac import hashlib try: from Cryptodome.Cipher import DES, AES, ARC4 except ImportError: LOG.critical("Warning: You don't have any crypto installed. You need pycryptodomex") LOG.critical("See https://pypi.org/project/pycryptodomex/") MSRPC_UUID_NRPC = uuidtup_to_bin(('12345678-1234-ABCD-EF00-01234567CFFB', '1.0')) class DCERPCSessionError(DCERPCException): def __init__(self, error_string=None, error_code=None, packet=None): DCERPCException.__init__(self, error_string, error_code, packet) def __str__( self ): key = self.error_code if key in system_errors.ERROR_MESSAGES: error_msg_short = system_errors.ERROR_MESSAGES[key][0] error_msg_verbose = system_errors.ERROR_MESSAGES[key][1] return 'NRPC SessionError: code: 0x%x - %s - %s' % (self.error_code, error_msg_short, error_msg_verbose) elif key in nt_errors.ERROR_MESSAGES: error_msg_short = nt_errors.ERROR_MESSAGES[key][0] error_msg_verbose = nt_errors.ERROR_MESSAGES[key][1] return 'NRPC SessionError: code: 0x%x - %s - %s' % (self.error_code, error_msg_short, error_msg_verbose) else: return 'NRPC SessionError: unknown error code: 0x%x' % (self.error_code) ################################################################################ # CONSTANTS ################################################################################ # 2.2.1.2.5 NL_DNS_NAME_INFO # Type NlDnsLdapAtSite = 22 NlDnsGcAtSite = 25 NlDnsDsaCname = 28 NlDnsKdcAtSite = 30 NlDnsDcAtSite = 32 NlDnsRfc1510KdcAtSite = 34 NlDnsGenericGcAtSite = 36 # DnsDomainInfoType NlDnsDomainName = 1 NlDnsDomainNameAlias = 2 NlDnsForestName = 3 NlDnsForestNameAlias = 4 NlDnsNdncDomainName = 5 NlDnsRecordName = 6 # 2.2.1.3.15 NL_OSVERSIONINFO_V1 # wSuiteMask VER_SUITE_BACKOFFICE = 0x00000004 VER_SUITE_BLADE = 0x00000400 VER_SUITE_COMPUTE_SERVER = 0x00004000 VER_SUITE_DATACENTER = 0x00000080 VER_SUITE_ENTERPRISE = 0x00000002 VER_SUITE_EMBEDDEDNT = 0x00000040 VER_SUITE_PERSONAL = 0x00000200 VER_SUITE_SINGLEUSERTS = 0x00000100 VER_SUITE_SMALLBUSINESS = 0x00000001 VER_SUITE_SMALLBUSINESS_RESTRICTED = 0x00000020 VER_SUITE_STORAGE_SERVER = 0x00002000 VER_SUITE_TERMINAL = 0x00000010 # wProductType VER_NT_DOMAIN_CONTROLLER = 0x00000002 VER_NT_SERVER = 0x00000003 VER_NT_WORKSTATION = 0x00000001 # 2.2.1.4.18 NETLOGON Specific Access Masks NETLOGON_UAS_LOGON_ACCESS = 0x0001 NETLOGON_UAS_LOGOFF_ACCESS = 0x0002 NETLOGON_CONTROL_ACCESS = 0x0004 NETLOGON_QUERY_ACCESS = 0x0008 NETLOGON_SERVICE_ACCESS = 0x0010 NETLOGON_FTINFO_ACCESS = 0x0020 NETLOGON_WKSTA_RPC_ACCESS = 0x0040 # 3.5.4.9.1 NetrLogonControl2Ex (Opnum 18) # FunctionCode NETLOGON_CONTROL_QUERY = 0x00000001 NETLOGON_CONTROL_REPLICATE = 0x00000002 NETLOGON_CONTROL_SYNCHRONIZE = 0x00000003 NETLOGON_CONTROL_PDC_REPLICATE = 0x00000004 NETLOGON_CONTROL_REDISCOVER = 0x00000005 NETLOGON_CONTROL_TC_QUERY = 0x00000006 NETLOGON_CONTROL_TRANSPORT_NOTIFY = 0x00000007 NETLOGON_CONTROL_FIND_USER = 0x00000008 NETLOGON_CONTROL_CHANGE_PASSWORD = 0x00000009 NETLOGON_CONTROL_TC_VERIFY = 0x0000000A NETLOGON_CONTROL_FORCE_DNS_REG = 0x0000000B NETLOGON_CONTROL_QUERY_DNS_REG = 0x0000000C NETLOGON_CONTROL_BACKUP_CHANGE_LOG = 0x0000FFFC NETLOGON_CONTROL_TRUNCATE_LOG = 0x0000FFFD NETLOGON_CONTROL_SET_DBFLAG = 0x0000FFFE NETLOGON_CONTROL_BREAKPOINT = 0x0000FFFF ################################################################################ # STRUCTURES ################################################################################ # 3.5.4.1 RPC Binding Handles for Netlogon Methods LOGONSRV_HANDLE = WSTR PLOGONSRV_HANDLE = LPWSTR # 2.2.1.1.1 CYPHER_BLOCK class CYPHER_BLOCK(NDRSTRUCT): structure = ( ('Data', '8s=b""'), ) def getAlignment(self): return 1 NET_API_STATUS = DWORD # 2.2.1.1.2 STRING from impacket.dcerpc.v5.lsad import STRING # 2.2.1.1.3 LM_OWF_PASSWORD class CYPHER_BLOCK_ARRAY(NDRUniFixedArray): def getDataLen(self, data, offset=0): return len(CYPHER_BLOCK())*2 class LM_OWF_PASSWORD(NDRSTRUCT): structure = ( ('Data', CYPHER_BLOCK_ARRAY), ) # 2.2.1.1.4 NT_OWF_PASSWORD NT_OWF_PASSWORD = LM_OWF_PASSWORD ENCRYPTED_NT_OWF_PASSWORD = NT_OWF_PASSWORD # 2.2.1.3.4 NETLOGON_CREDENTIAL class UCHAR_FIXED_ARRAY(NDRUniFixedArray): align = 1 def getDataLen(self, data, offset=0): return len(CYPHER_BLOCK()) class NETLOGON_CREDENTIAL(NDRSTRUCT): structure = ( ('Data',UCHAR_FIXED_ARRAY), ) def getAlignment(self): return 1 # 2.2.1.1.5 NETLOGON_AUTHENTICATOR class NETLOGON_AUTHENTICATOR(NDRSTRUCT): structure = ( ('Credential', NETLOGON_CREDENTIAL), ('Timestamp', DWORD), ) class PNETLOGON_AUTHENTICATOR(NDRPOINTER): referent = ( ('Data', NETLOGON_AUTHENTICATOR), ) # 2.2.1.2.1 DOMAIN_CONTROLLER_INFOW class DOMAIN_CONTROLLER_INFOW(NDRSTRUCT): structure = ( ('DomainControllerName', LPWSTR), ('DomainControllerAddress', LPWSTR), ('DomainControllerAddressType', ULONG), ('DomainGuid', GUID), ('DomainName', LPWSTR), ('DnsForestName', LPWSTR), ('Flags', ULONG), ('DcSiteName', LPWSTR), ('ClientSiteName', LPWSTR), ) class PDOMAIN_CONTROLLER_INFOW(NDRPOINTER): referent = ( ('Data', DOMAIN_CONTROLLER_INFOW), ) # 2.2.1.2.2 NL_SITE_NAME_ARRAY class RPC_UNICODE_STRING_ARRAY(NDRUniConformantArray): item = RPC_UNICODE_STRING class PRPC_UNICODE_STRING_ARRAY(NDRPOINTER): referent = ( ('Data', RPC_UNICODE_STRING_ARRAY), ) class NL_SITE_NAME_ARRAY(NDRSTRUCT): structure = ( ('EntryCount', ULONG), ('SiteNames', PRPC_UNICODE_STRING_ARRAY), ) class PNL_SITE_NAME_ARRAY(NDRPOINTER): referent = ( ('Data', NL_SITE_NAME_ARRAY), ) # 2.2.1.2.3 NL_SITE_NAME_EX_ARRAY class RPC_UNICODE_STRING_ARRAY(NDRUniConformantArray): item = RPC_UNICODE_STRING class NL_SITE_NAME_EX_ARRAY(NDRSTRUCT): structure = ( ('EntryCount', ULONG), ('SiteNames', PRPC_UNICODE_STRING_ARRAY), ('SubnetNames', PRPC_UNICODE_STRING_ARRAY), ) class PNL_SITE_NAME_EX_ARRAY(NDRPOINTER): referent = ( ('Data', NL_SITE_NAME_EX_ARRAY), ) # 2.2.1.2.4 NL_SOCKET_ADDRESS # 2.2.1.2.4.1 IPv4 Address Structure class IPv4Address(Structure): structure = ( ('AddressFamily', '> 32) & 0xffffffff sequenceHigh |= 0x80000000 res = pack('>L', sequenceLow) res += pack('>L', sequenceHigh) return res def ComputeNetlogonSignatureAES(authSignature, message, confounder, sessionKey): # [MS-NRPC] Section 3.3.4.2.1, point 7 hm = hmac.new(key=sessionKey, digestmod=hashlib.sha256) hm.update(authSignature.getData()[:8]) # If no confidentiality requested, it should be '' hm.update(confounder) hm.update(bytes(message)) return hm.digest()[:8]+'\x00'*24 def ComputeNetlogonSignatureMD5(authSignature, message, confounder, sessionKey): # [MS-NRPC] Section 3.3.4.2.1, point 7 md5 = hashlib.new('md5') md5.update(b'\x00'*4) md5.update(authSignature.getData()[:8]) # If no confidentiality requested, it should be '' md5.update(confounder) md5.update(bytes(message)) finalMD5 = md5.digest() hm = hmac.new(sessionKey, digestmod=hashlib.md5) hm.update(finalMD5) return hm.digest()[:8] def encryptSequenceNumberRC4(sequenceNum, checkSum, sessionKey): # [MS-NRPC] Section 3.3.4.2.1, point 9 hm = hmac.new(sessionKey, digestmod=hashlib.md5) hm.update(b'\x00'*4) hm2 = hmac.new(hm.digest(), digestmod=hashlib.md5) hm2.update(checkSum) encryptionKey = hm2.digest() cipher = ARC4.new(encryptionKey) return cipher.encrypt(sequenceNum) def decryptSequenceNumberRC4(sequenceNum, checkSum, sessionKey): # [MS-NRPC] Section 3.3.4.2.2, point 5 return encryptSequenceNumberRC4(sequenceNum, checkSum, sessionKey) def encryptSequenceNumberAES(sequenceNum, checkSum, sessionKey): # [MS-NRPC] Section 3.3.4.2.1, point 9 IV = checkSum[:8] + checkSum[:8] Cipher = AES.new(sessionKey, AES.MODE_CFB, IV) return Cipher.encrypt(sequenceNum) def decryptSequenceNumberAES(sequenceNum, checkSum, sessionKey): # [MS-NRPC] Section 3.3.4.2.1, point 9 IV = checkSum[:8] + checkSum[:8] Cipher = AES.new(sessionKey, AES.MODE_CFB, IV) return Cipher.decrypt(sequenceNum) def SIGN(data, confounder, sequenceNum, key, aes = False): if aes is False: signature = NL_AUTH_SIGNATURE() signature['SignatureAlgorithm'] = NL_SIGNATURE_HMAC_MD5 if confounder == '': signature['SealAlgorithm'] = NL_SEAL_NOT_ENCRYPTED else: signature['SealAlgorithm'] = NL_SEAL_RC4 signature['Checksum'] = ComputeNetlogonSignatureMD5(signature, data, confounder, key) signature['SequenceNumber'] = encryptSequenceNumberRC4(deriveSequenceNumber(sequenceNum), signature['Checksum'], key) return signature else: signature = NL_AUTH_SIGNATURE() signature['SignatureAlgorithm'] = NL_SIGNATURE_HMAC_SHA256 if confounder == '': signature['SealAlgorithm'] = NL_SEAL_NOT_ENCRYPTED else: signature['SealAlgorithm'] = NL_SEAL_AES128 signature['Checksum'] = ComputeNetlogonSignatureAES(signature, data, confounder, key) signature['SequenceNumber'] = encryptSequenceNumberAES(deriveSequenceNumber(sequenceNum), signature['Checksum'], key) return signature def SEAL(data, confounder, sequenceNum, key, aes = False): signature = SIGN(data, confounder, sequenceNum, key, aes) sequenceNum = deriveSequenceNumber(sequenceNum) XorKey = bytearray(key) for i in range(len(XorKey)): XorKey[i] = XorKey[i] ^ 0xf0 XorKey = bytes(XorKey) if aes is False: hm = hmac.new(XorKey, digestmod=hashlib.md5) hm.update(b'\x00'*4) hm2 = hmac.new(hm.digest(), digestmod=hashlib.md5) hm2.update(sequenceNum) encryptionKey = hm2.digest() cipher = ARC4.new(encryptionKey) cfounder = cipher.encrypt(confounder) cipher = ARC4.new(encryptionKey) encrypted = cipher.encrypt(data) signature['Confounder'] = cfounder return encrypted, signature else: IV = sequenceNum + sequenceNum cipher = AES.new(XorKey, AES.MODE_CFB, IV) cfounder = cipher.encrypt(confounder) encrypted = cipher.encrypt(data) signature['Confounder'] = cfounder return encrypted, signature def UNSEAL(data, auth_data, key, aes = False): auth_data = NL_AUTH_SIGNATURE(auth_data) XorKey = bytearray(key) for i in range(len(XorKey)): XorKey[i] = XorKey[i] ^ 0xf0 XorKey = bytes(XorKey) if aes is False: sequenceNum = decryptSequenceNumberRC4(auth_data['SequenceNumber'], auth_data['Checksum'], key) hm = hmac.new(XorKey, digestmod=hashlib.md5) hm.update(b'\x00'*4) hm2 = hmac.new(hm.digest(), digestmod=hashlib.md5) hm2.update(sequenceNum) encryptionKey = hm2.digest() cipher = ARC4.new(encryptionKey) cfounder = cipher.encrypt(auth_data['Confounder']) cipher = ARC4.new(encryptionKey) plain = cipher.encrypt(data) return plain, cfounder else: sequenceNum = decryptSequenceNumberAES(auth_data['SequenceNumber'], auth_data['Checksum'], key) IV = sequenceNum + sequenceNum cipher = AES.new(XorKey, AES.MODE_CFB, IV) cfounder = cipher.decrypt(auth_data['Confounder']) plain = cipher.decrypt(data) return plain, cfounder def getSSPType1(workstation='', domain='', signingRequired=False): auth = NL_AUTH_MESSAGE() auth['Flags'] = 0 auth['Buffer'] = b'' auth['Flags'] |= NL_AUTH_MESSAGE_NETBIOS_DOMAIN if domain != '': auth['Buffer'] = auth['Buffer'] + b(domain) + b'\x00' else: auth['Buffer'] += b'WORKGROUP\x00' auth['Flags'] |= NL_AUTH_MESSAGE_NETBIOS_HOST if workstation != '': auth['Buffer'] = auth['Buffer'] + b(workstation) + b'\x00' else: auth['Buffer'] += b'MYHOST\x00' auth['Flags'] |= NL_AUTH_MESSAGE_NETBIOS_HOST_UTF8 if workstation != '': auth['Buffer'] += pack('