202 lines
11 KiB
Python
202 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import ctypes
|
|
import ctypes.wintypes
|
|
import winreg
|
|
import contextlib
|
|
from typing import Any, Optional, Generator
|
|
|
|
DEFAULT_USER_AGENT = 'Borland SOAP 1.2'
|
|
DEFAULT_ISSUER = 'https://ssoi.sts.va.gov/Issuer/smtoken/SAML2'
|
|
|
|
HCERTSTORE = ctypes.c_void_p
|
|
PCERT_INFO = ctypes.c_void_p
|
|
HCRYPTPROV_LEGACY = ctypes.c_void_p
|
|
|
|
CERT_STORE_PROV_MEMORY = b'Memory'
|
|
X509_ASN_ENCODING = 0x00000001
|
|
PKCS_7_ASN_ENCODING = 0x00010000
|
|
CERT_COMPARE_ANY = 0
|
|
CERT_COMPARE_SHIFT = 16
|
|
CERT_FIND_ANY = CERT_COMPARE_ANY<<CERT_COMPARE_SHIFT
|
|
CERT_NAME_FRIENDLY_DISPLAY_TYPE = 5
|
|
CERT_DIGITAL_SIGNATURE_KEY_USAGE = 0x80
|
|
CERT_STORE_ADD_ALWAYS = 4
|
|
CERT_HASH_PROP_ID = CERT_SHA1_HASH_PROP_ID = 3
|
|
|
|
class CERT_CONTEXT(ctypes.Structure):
|
|
_fields_ = [
|
|
('dwCertEncodingType', ctypes.wintypes.DWORD),
|
|
('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
|
|
('cbCertEncoded', ctypes.wintypes.DWORD),
|
|
('pCertInfo', PCERT_INFO),
|
|
('hCertStore', HCERTSTORE),
|
|
]
|
|
PCCERT_CONTEXT = ctypes.POINTER(CERT_CONTEXT)
|
|
|
|
crypt32 = ctypes.WinDLL('crypt32')
|
|
|
|
CertOpenStore = crypt32.CertOpenStore
|
|
CertOpenStore.restype = HCERTSTORE
|
|
CertOpenStore.argtypes = (ctypes.wintypes.LPCSTR, ctypes.wintypes.DWORD, HCRYPTPROV_LEGACY, ctypes.wintypes.DWORD, ctypes.c_void_p)
|
|
|
|
CertOpenSystemStoreW = crypt32.CertOpenSystemStoreW
|
|
CertOpenSystemStoreW.restype = HCERTSTORE
|
|
CertOpenSystemStoreW.argtypes = (HCRYPTPROV_LEGACY, ctypes.wintypes.LPCWSTR)
|
|
|
|
CertCloseStore = crypt32.CertCloseStore
|
|
CertCloseStore.restype = ctypes.wintypes.BOOL
|
|
CertCloseStore.argtypes = (HCERTSTORE, ctypes.wintypes.DWORD)
|
|
|
|
CertEnumCertificatesInStore = crypt32.CertEnumCertificatesInStore
|
|
CertEnumCertificatesInStore.restype = PCCERT_CONTEXT
|
|
CertEnumCertificatesInStore.argtypes = (HCERTSTORE, PCCERT_CONTEXT)
|
|
|
|
CertFindCertificateInStore = crypt32.CertFindCertificateInStore
|
|
CertFindCertificateInStore.restype = PCCERT_CONTEXT
|
|
CertFindCertificateInStore.argtypes = (HCERTSTORE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.c_void_p, PCCERT_CONTEXT)
|
|
|
|
CertAddCertificateContextToStore = crypt32.CertAddCertificateContextToStore
|
|
CertAddCertificateContextToStore.restype = ctypes.wintypes.BOOL
|
|
CertAddCertificateContextToStore.argtypes = (HCERTSTORE, PCCERT_CONTEXT, ctypes.wintypes.DWORD, ctypes.POINTER(PCCERT_CONTEXT))
|
|
|
|
CertGetNameStringW = crypt32.CertGetNameStringW
|
|
CertGetNameStringW.restype = ctypes.wintypes.DWORD
|
|
CertGetNameStringW.argtypes = (PCCERT_CONTEXT, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.c_void_p, ctypes.wintypes.LPWSTR, ctypes.wintypes.DWORD)
|
|
|
|
CertGetIntendedKeyUsage = crypt32.CertGetIntendedKeyUsage
|
|
CertGetIntendedKeyUsage.restype = ctypes.wintypes.BOOL
|
|
CertGetIntendedKeyUsage.argtypes = (ctypes.wintypes.DWORD, PCERT_INFO, ctypes.POINTER(ctypes.wintypes.BYTE), ctypes.wintypes.DWORD)
|
|
|
|
CertGetCertificateContextProperty = crypt32.CertGetCertificateContextProperty
|
|
CertGetCertificateContextProperty.restype = ctypes.wintypes.BOOL
|
|
CertGetCertificateContextProperty.argtypes = (PCCERT_CONTEXT, ctypes.wintypes.DWORD, ctypes.c_void_p, ctypes.POINTER(ctypes.wintypes.DWORD))
|
|
|
|
CertVerifyTimeValidity = crypt32.CertVerifyTimeValidity
|
|
CertVerifyTimeValidity.restype = ctypes.wintypes.LONG
|
|
CertVerifyTimeValidity.argtypes = (ctypes.wintypes.LPFILETIME, PCERT_INFO)
|
|
|
|
cryptui = ctypes.WinDLL('cryptui')
|
|
|
|
CryptUIDlgSelectCertificateFromStore = cryptui.CryptUIDlgSelectCertificateFromStore
|
|
CryptUIDlgSelectCertificateFromStore.restype = PCCERT_CONTEXT
|
|
CryptUIDlgSelectCertificateFromStore.argtypes = (HCERTSTORE, ctypes.wintypes.HWND, ctypes.wintypes.LPCWSTR, ctypes.wintypes.LPCWSTR, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.c_void_p)
|
|
|
|
GetConsoleWindow = ctypes.windll.kernel32.GetConsoleWindow
|
|
GetConsoleWindow.restype = ctypes.wintypes.HWND
|
|
|
|
@contextlib.contextmanager
|
|
def ManagedCertOpenStore(lpszStoreProvider: ctypes.wintypes.LPCSTR, dwEncodingType: ctypes.wintypes.DWORD, hCryptProv: HCRYPTPROV_LEGACY, dwFlags: ctypes.wintypes.DWORD, pvPara: ctypes.c_void_p) -> Generator[HCERTSTORE, None, None]:
|
|
res = CertOpenStore(lpszStoreProvider, dwEncodingType, hCryptProv, dwFlags, pvPara)
|
|
try:
|
|
yield res
|
|
finally:
|
|
CertCloseStore(res, 0)
|
|
|
|
@contextlib.contextmanager
|
|
def ManagedCertOpenSystemStore(hProv: HCRYPTPROV_LEGACY, szSubsystemProtocol: ctypes.wintypes.LPCWSTR) -> Generator[HCERTSTORE, None, None]:
|
|
res = CertOpenSystemStoreW(hProv, szSubsystemProtocol)
|
|
try:
|
|
yield res
|
|
finally:
|
|
CertCloseStore(res, 0)
|
|
|
|
def get_vista_certificate(show_cert_dialog: bool=True, hwnd: Optional[int]=0) -> PCCERT_CONTEXT:
|
|
with ManagedCertOpenSystemStore(0, 'MY') as store_system, ManagedCertOpenStore(CERT_STORE_PROV_MEMORY, 0, None, 0, None) as store_memory:
|
|
cert_selection = cert_iter = None
|
|
while cert_iter := CertEnumCertificatesInStore(store_system, cert_iter):
|
|
if cert_valid := CertFindCertificateInStore(store_system, X509_ASN_ENCODING|PKCS_7_ASN_ENCODING, 0, CERT_FIND_ANY, None, None):
|
|
name_bufsz = CertGetNameStringW(cert_iter, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, None, 0)
|
|
buf = ctypes.create_unicode_buffer(name_bufsz)
|
|
CertGetNameStringW(cert_iter, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, buf, name_bufsz)
|
|
name_string = buf.value
|
|
key_usage_bits = ctypes.wintypes.BYTE()
|
|
CertGetIntendedKeyUsage(X509_ASN_ENCODING|PKCS_7_ASN_ENCODING, cert_iter.contents.pCertInfo, ctypes.byref(key_usage_bits), ctypes.sizeof(key_usage_bits))
|
|
valid_date = CertVerifyTimeValidity(None, cert_iter.contents.pCertInfo)
|
|
if ((key_usage_bits.value&CERT_DIGITAL_SIGNATURE_KEY_USAGE) == CERT_DIGITAL_SIGNATURE_KEY_USAGE) and (valid_date == 0) and ('Card Authentication' not in name_string) and ('0,' not in name_string) and ('Signature' not in name_string):
|
|
CertAddCertificateContextToStore(store_memory, cert_iter, CERT_STORE_ADD_ALWAYS, ctypes.byref(cert_valid))
|
|
cert_selection = cert_valid
|
|
return CryptUIDlgSelectCertificateFromStore(store_memory, hwnd if hwnd is not None else GetConsoleWindow(), 'VistA Logon - Certificate Selection', 'Select a certificate for VistA authentication', 0, 0, None) if show_cert_dialog else cert_selection
|
|
|
|
def get_certificate_thumbprint(certificate: PCCERT_CONTEXT) -> str:
|
|
bufsz = ctypes.wintypes.DWORD()
|
|
CertGetCertificateContextProperty(certificate, CERT_HASH_PROP_ID, None, ctypes.byref(bufsz))
|
|
buffer = ctypes.create_string_buffer(bufsz.value)
|
|
CertGetCertificateContextProperty(certificate, CERT_HASH_PROP_ID, buffer, ctypes.byref(bufsz))
|
|
return buffer.value
|
|
|
|
def get_certificate_friendly_display_name(certificate: PCCERT_CONTEXT) -> str:
|
|
name_bufsz = CertGetNameStringW(certificate, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, None, 0)
|
|
buffer = ctypes.create_unicode_buffer(name_bufsz)
|
|
CertGetNameStringW(certificate, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, buffer, name_bufsz)
|
|
return buffer.value
|
|
|
|
# WS-Trust STS endpoints from VDL documentation
|
|
# https://www.va.gov/vdl/documents/Infrastructure/KAAJEE/kaajee_ssowap_8_791_depg_r.pdf
|
|
# https://www.va.gov/vdl/documents/Financial_Admin/Bed_Management_Solution_(BMS)/bms_4_0_tm.pdf
|
|
# https://www.va.gov/vdl/documents/VistA_GUI_Hybrids/National_Utilization_Management_Integration_Archive/numi_server_setup_guide_v15_9.pdf
|
|
get_registry_iam = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\IAM', default='https://services.eauth.va.gov:9301/STS/RequestSecurityToken')
|
|
get_registry_iam_ad = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\IAM_AD', default='https://services.eauth.va.gov:9201/STS/RequestSecurityToken')
|
|
get_registry_rioserver = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\RIOSERVER', default='SecurityTokenService')
|
|
get_registry_rioport = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\RIOPORT', default='RequestSecurityToken')
|
|
def get_registry_value(hkey: int, subkey: str, value: Optional[str]=None, default: Any=None) -> Any:
|
|
try:
|
|
with winreg.OpenKey(hkey, subkey) as key:
|
|
return winreg.QueryValueEx(key, value)[0]
|
|
except FileNotFoundError:
|
|
return default
|
|
|
|
def get_iam_request(application: str, issuer: str) -> str:
|
|
return f'''<?xml version="1.0" encoding="UTF-8"?>
|
|
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ns="http://docs.oasis-open.org/ws-sx/ws-trust/200512">
|
|
<soapenv:Header/>
|
|
<soapenv:Body>
|
|
<ns:RequestSecurityToken>
|
|
<ns:Base>
|
|
<wss:TLS xmlns:wss="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"/>
|
|
</ns:Base>
|
|
<wsp:AppliesTo xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy">
|
|
<wsa:EndpointReference xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing">
|
|
<wsa:Address>{application}</wsa:Address>
|
|
</wsa:EndpointReference>
|
|
</wsp:AppliesTo>
|
|
<ns:Issuer>
|
|
<wsa:Address xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing">{issuer}</wsa:Address>
|
|
</ns:Issuer>
|
|
<ns:RequestType>http://schemas.xmlsoap.org/ws/2005/02/trust/Validate</ns:RequestType>
|
|
</ns:RequestSecurityToken>
|
|
</soapenv:Body>
|
|
</soapenv:Envelope>'''
|
|
|
|
def get_local_computer_name() -> str:
|
|
import socket
|
|
return socket.getfqdn()
|
|
|
|
def get_app_name() -> str:
|
|
import sys, os
|
|
return os.path.basename(sys.argv[0])
|
|
|
|
def get_sso_token(iam: Optional[str]=None, ua: Optional[str]=None, certificate: Optional[str]=None, issuer: Optional[str]=None, hostname: Optional[str]=None, application: Optional[str]=None) -> str:
|
|
import sys, subprocess
|
|
if certificate is None:
|
|
if choice := get_vista_certificate():
|
|
certificate = get_certificate_thumbprint(choice).hex()
|
|
if certificate is not None:
|
|
res = subprocess.run(['curl', '-fsSL', '-X', 'POST', iam or get_registry_iam(), '--ca-native', '--cert', 'CurrentUser\\MY\\' + certificate, '-A', ua or DEFAULT_USER_AGENT, '-H', 'Content-Type: application/xml', '-H', 'Accept: application/xml', '-d', get_iam_request(f"https://{hostname or get_local_computer_name()}/Delphi_RPC_Broker/{application or get_app_name()}", issuer or DEFAULT_ISSUER)], capture_output=True)
|
|
print(res.stderr.decode('utf8'), end='', file=sys.stderr)
|
|
return res.stdout.decode('utf8')
|
|
|
|
async def get_sso_token_async(iam: Optional[str]=None, ua: Optional[str]=None, certificate: Optional[str]=None, issuer: Optional[str]=None, hostname: Optional[str]=None, application: Optional[str]=None) -> str:
|
|
import sys, asyncio
|
|
if certificate is None:
|
|
certificate = get_certificate_thumbprint(get_vista_certificate()).hex()
|
|
res = await (await asyncio.create_subprocess_exec('curl', '-fsSL', '-X', 'POST', iam or get_registry_iam(), '--ca-native', '--cert', 'CurrentUser\\MY\\' + certificate, '-A', ua or DEFAULT_USER_AGENT, '-H', 'Content-Type: application/xml', '-H', 'Accept: application/xml', '-d', get_iam_request(f"https://{hostname or get_local_computer_name()}/Delphi_RPC_Broker/{application or get_app_name()}", issuer or DEFAULT_ISSUER), stdin=None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)).communicate()
|
|
print(res[1].decode('utf8'), end='', file=sys.stderr)
|
|
return res[0].decode('utf8')
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
print(get_sso_token())
|
|
except OSError:
|
|
exit(1)
|