nuVistA/XWBSSOi.py

202 lines
11 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
import ctypes
import ctypes.wintypes
import winreg
import contextlib
from typing import Any, Optional, Generator
DEFAULT_USER_AGENT = 'Borland SOAP 1.2'
DEFAULT_ISSUER = 'https://ssoi.sts.va.gov/Issuer/smtoken/SAML2'
HCERTSTORE = ctypes.c_void_p
PCERT_INFO = ctypes.c_void_p
HCRYPTPROV_LEGACY = ctypes.c_void_p
CERT_STORE_PROV_MEMORY = b'Memory'
X509_ASN_ENCODING = 0x00000001
PKCS_7_ASN_ENCODING = 0x00010000
CERT_COMPARE_ANY = 0
CERT_COMPARE_SHIFT = 16
CERT_FIND_ANY = CERT_COMPARE_ANY<<CERT_COMPARE_SHIFT
CERT_NAME_FRIENDLY_DISPLAY_TYPE = 5
CERT_DIGITAL_SIGNATURE_KEY_USAGE = 0x80
CERT_STORE_ADD_ALWAYS = 4
CERT_HASH_PROP_ID = CERT_SHA1_HASH_PROP_ID = 3
class CERT_CONTEXT(ctypes.Structure):
_fields_ = [
('dwCertEncodingType', ctypes.wintypes.DWORD),
('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
('cbCertEncoded', ctypes.wintypes.DWORD),
('pCertInfo', PCERT_INFO),
('hCertStore', HCERTSTORE),
]
PCCERT_CONTEXT = ctypes.POINTER(CERT_CONTEXT)
crypt32 = ctypes.WinDLL('crypt32')
CertOpenStore = crypt32.CertOpenStore
CertOpenStore.restype = HCERTSTORE
CertOpenStore.argtypes = (ctypes.wintypes.LPCSTR, ctypes.wintypes.DWORD, HCRYPTPROV_LEGACY, ctypes.wintypes.DWORD, ctypes.c_void_p)
CertOpenSystemStoreW = crypt32.CertOpenSystemStoreW
CertOpenSystemStoreW.restype = HCERTSTORE
CertOpenSystemStoreW.argtypes = (HCRYPTPROV_LEGACY, ctypes.wintypes.LPCWSTR)
CertCloseStore = crypt32.CertCloseStore
CertCloseStore.restype = ctypes.wintypes.BOOL
CertCloseStore.argtypes = (HCERTSTORE, ctypes.wintypes.DWORD)
CertEnumCertificatesInStore = crypt32.CertEnumCertificatesInStore
CertEnumCertificatesInStore.restype = PCCERT_CONTEXT
CertEnumCertificatesInStore.argtypes = (HCERTSTORE, PCCERT_CONTEXT)
CertFindCertificateInStore = crypt32.CertFindCertificateInStore
CertFindCertificateInStore.restype = PCCERT_CONTEXT
CertFindCertificateInStore.argtypes = (HCERTSTORE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.c_void_p, PCCERT_CONTEXT)
CertAddCertificateContextToStore = crypt32.CertAddCertificateContextToStore
CertAddCertificateContextToStore.restype = ctypes.wintypes.BOOL
CertAddCertificateContextToStore.argtypes = (HCERTSTORE, PCCERT_CONTEXT, ctypes.wintypes.DWORD, ctypes.POINTER(PCCERT_CONTEXT))
CertGetNameStringW = crypt32.CertGetNameStringW
CertGetNameStringW.restype = ctypes.wintypes.DWORD
CertGetNameStringW.argtypes = (PCCERT_CONTEXT, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.c_void_p, ctypes.wintypes.LPWSTR, ctypes.wintypes.DWORD)
CertGetIntendedKeyUsage = crypt32.CertGetIntendedKeyUsage
CertGetIntendedKeyUsage.restype = ctypes.wintypes.BOOL
CertGetIntendedKeyUsage.argtypes = (ctypes.wintypes.DWORD, PCERT_INFO, ctypes.POINTER(ctypes.wintypes.BYTE), ctypes.wintypes.DWORD)
CertGetCertificateContextProperty = crypt32.CertGetCertificateContextProperty
CertGetCertificateContextProperty.restype = ctypes.wintypes.BOOL
CertGetCertificateContextProperty.argtypes = (PCCERT_CONTEXT, ctypes.wintypes.DWORD, ctypes.c_void_p, ctypes.POINTER(ctypes.wintypes.DWORD))
CertVerifyTimeValidity = crypt32.CertVerifyTimeValidity
CertVerifyTimeValidity.restype = ctypes.wintypes.LONG
CertVerifyTimeValidity.argtypes = (ctypes.wintypes.LPFILETIME, PCERT_INFO)
cryptui = ctypes.WinDLL('cryptui')
CryptUIDlgSelectCertificateFromStore = cryptui.CryptUIDlgSelectCertificateFromStore
CryptUIDlgSelectCertificateFromStore.restype = PCCERT_CONTEXT
CryptUIDlgSelectCertificateFromStore.argtypes = (HCERTSTORE, ctypes.wintypes.HWND, ctypes.wintypes.LPCWSTR, ctypes.wintypes.LPCWSTR, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.c_void_p)
GetConsoleWindow = ctypes.windll.kernel32.GetConsoleWindow
GetConsoleWindow.restype = ctypes.wintypes.HWND
@contextlib.contextmanager
def ManagedCertOpenStore(lpszStoreProvider: ctypes.wintypes.LPCSTR, dwEncodingType: ctypes.wintypes.DWORD, hCryptProv: HCRYPTPROV_LEGACY, dwFlags: ctypes.wintypes.DWORD, pvPara: ctypes.c_void_p) -> Generator[HCERTSTORE, None, None]:
res = CertOpenStore(lpszStoreProvider, dwEncodingType, hCryptProv, dwFlags, pvPara)
try:
yield res
finally:
CertCloseStore(res, 0)
@contextlib.contextmanager
def ManagedCertOpenSystemStore(hProv: HCRYPTPROV_LEGACY, szSubsystemProtocol: ctypes.wintypes.LPCWSTR) -> Generator[HCERTSTORE, None, None]:
res = CertOpenSystemStoreW(hProv, szSubsystemProtocol)
try:
yield res
finally:
CertCloseStore(res, 0)
def get_vista_certificate(show_cert_dialog: bool=True, hwnd: Optional[int]=0) -> PCCERT_CONTEXT:
with ManagedCertOpenSystemStore(0, 'MY') as store_system, ManagedCertOpenStore(CERT_STORE_PROV_MEMORY, 0, None, 0, None) as store_memory:
cert_selection = cert_iter = None
while cert_iter := CertEnumCertificatesInStore(store_system, cert_iter):
if cert_valid := CertFindCertificateInStore(store_system, X509_ASN_ENCODING|PKCS_7_ASN_ENCODING, 0, CERT_FIND_ANY, None, None):
name_bufsz = CertGetNameStringW(cert_iter, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, None, 0)
buf = ctypes.create_unicode_buffer(name_bufsz)
CertGetNameStringW(cert_iter, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, buf, name_bufsz)
name_string = buf.value
key_usage_bits = ctypes.wintypes.BYTE()
CertGetIntendedKeyUsage(X509_ASN_ENCODING|PKCS_7_ASN_ENCODING, cert_iter.contents.pCertInfo, ctypes.byref(key_usage_bits), ctypes.sizeof(key_usage_bits))
valid_date = CertVerifyTimeValidity(None, cert_iter.contents.pCertInfo)
if ((key_usage_bits.value&CERT_DIGITAL_SIGNATURE_KEY_USAGE) == CERT_DIGITAL_SIGNATURE_KEY_USAGE) and (valid_date == 0) and ('Card Authentication' not in name_string) and ('0,' not in name_string) and ('Signature' not in name_string):
CertAddCertificateContextToStore(store_memory, cert_iter, CERT_STORE_ADD_ALWAYS, ctypes.byref(cert_valid))
cert_selection = cert_valid
return CryptUIDlgSelectCertificateFromStore(store_memory, hwnd if hwnd is not None else GetConsoleWindow(), 'VistA Logon - Certificate Selection', 'Select a certificate for VistA authentication', 0, 0, None) if show_cert_dialog else cert_selection
def get_certificate_thumbprint(certificate: PCCERT_CONTEXT) -> str:
bufsz = ctypes.wintypes.DWORD()
CertGetCertificateContextProperty(certificate, CERT_HASH_PROP_ID, None, ctypes.byref(bufsz))
buffer = ctypes.create_string_buffer(bufsz.value)
CertGetCertificateContextProperty(certificate, CERT_HASH_PROP_ID, buffer, ctypes.byref(bufsz))
return buffer.value
def get_certificate_friendly_display_name(certificate: PCCERT_CONTEXT) -> str:
name_bufsz = CertGetNameStringW(certificate, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, None, 0)
buffer = ctypes.create_unicode_buffer(name_bufsz)
CertGetNameStringW(certificate, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, buffer, name_bufsz)
return buffer.value
# WS-Trust STS endpoints from VDL documentation
# https://www.va.gov/vdl/documents/Infrastructure/KAAJEE/kaajee_ssowap_8_791_depg_r.pdf
# https://www.va.gov/vdl/documents/Financial_Admin/Bed_Management_Solution_(BMS)/bms_4_0_tm.pdf
# https://www.va.gov/vdl/documents/VistA_GUI_Hybrids/National_Utilization_Management_Integration_Archive/numi_server_setup_guide_v15_9.pdf
get_registry_iam = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\IAM', default='https://services.eauth.va.gov:9301/STS/RequestSecurityToken')
get_registry_iam_ad = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\IAM_AD', default='https://services.eauth.va.gov:9201/STS/RequestSecurityToken')
get_registry_rioserver = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\RIOSERVER', default='SecurityTokenService')
get_registry_rioport = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\RIOPORT', default='RequestSecurityToken')
def get_registry_value(hkey: int, subkey: str, value: Optional[str]=None, default: Any=None) -> Any:
try:
with winreg.OpenKey(hkey, subkey) as key:
return winreg.QueryValueEx(key, value)[0]
except FileNotFoundError:
return default
def get_iam_request(application: str, issuer: str) -> str:
return f'''<?xml version="1.0" encoding="UTF-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ns="http://docs.oasis-open.org/ws-sx/ws-trust/200512">
<soapenv:Header/>
<soapenv:Body>
<ns:RequestSecurityToken>
<ns:Base>
<wss:TLS xmlns:wss="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"/>
</ns:Base>
<wsp:AppliesTo xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy">
<wsa:EndpointReference xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing">
<wsa:Address>{application}</wsa:Address>
</wsa:EndpointReference>
</wsp:AppliesTo>
<ns:Issuer>
<wsa:Address xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing">{issuer}</wsa:Address>
</ns:Issuer>
<ns:RequestType>http://schemas.xmlsoap.org/ws/2005/02/trust/Validate</ns:RequestType>
</ns:RequestSecurityToken>
</soapenv:Body>
</soapenv:Envelope>'''
def get_local_computer_name() -> str:
import socket
return socket.getfqdn()
def get_app_name() -> str:
import sys, os
return os.path.basename(sys.argv[0])
def get_sso_token(iam: Optional[str]=None, ua: Optional[str]=None, certificate: Optional[str]=None, issuer: Optional[str]=None, hostname: Optional[str]=None, application: Optional[str]=None) -> str:
import sys, subprocess
if certificate is None:
if choice := get_vista_certificate():
certificate = get_certificate_thumbprint(choice).hex()
if certificate is not None:
res = subprocess.run(['curl', '-fsSL', '-X', 'POST', iam or get_registry_iam(), '--ca-native', '--cert', 'CurrentUser\\MY\\' + certificate, '-A', ua or DEFAULT_USER_AGENT, '-H', 'Content-Type: application/xml', '-H', 'Accept: application/xml', '-d', get_iam_request(f"https://{hostname or get_local_computer_name()}/Delphi_RPC_Broker/{application or get_app_name()}", issuer or DEFAULT_ISSUER)], capture_output=True)
print(res.stderr.decode('utf8'), end='', file=sys.stderr)
return res.stdout.decode('utf8')
async def get_sso_token_async(iam: Optional[str]=None, ua: Optional[str]=None, certificate: Optional[str]=None, issuer: Optional[str]=None, hostname: Optional[str]=None, application: Optional[str]=None) -> str:
import sys, asyncio
if certificate is None:
certificate = get_certificate_thumbprint(get_vista_certificate()).hex()
res = await (await asyncio.create_subprocess_exec('curl', '-fsSL', '-X', 'POST', iam or get_registry_iam(), '--ca-native', '--cert', 'CurrentUser\\MY\\' + certificate, '-A', ua or DEFAULT_USER_AGENT, '-H', 'Content-Type: application/xml', '-H', 'Accept: application/xml', '-d', get_iam_request(f"https://{hostname or get_local_computer_name()}/Delphi_RPC_Broker/{application or get_app_name()}", issuer or DEFAULT_ISSUER), stdin=None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)).communicate()
print(res[1].decode('utf8'), end='', file=sys.stderr)
return res[0].decode('utf8')
if __name__ == '__main__':
try:
print(get_sso_token())
except OSError:
exit(1)