From 053053825c7d4f017822ed2be415d4a090e8206e Mon Sep 17 00:00:00 2001 From: inportb Date: Mon, 9 Dec 2024 22:48:05 -0500 Subject: [PATCH] Improved SSO workflow: use WS-Trust STS endpoint directly instead of relying on XUIAMSSOi.dll --- XWBSSOi.py | 201 +++++++++++++++++++++++++++++++++++++++++++++++++++++ auth.py | 17 ----- main.py | 4 +- rpc.py | 4 +- 4 files changed, 205 insertions(+), 21 deletions(-) create mode 100644 XWBSSOi.py delete mode 100644 auth.py diff --git a/XWBSSOi.py b/XWBSSOi.py new file mode 100644 index 0000000..3cb098d --- /dev/null +++ b/XWBSSOi.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 + +import ctypes +import ctypes.wintypes +import winreg +import contextlib +from typing import Any, Optional, Generator + +DEFAULT_USER_AGENT = 'Borland SOAP 1.2' +DEFAULT_ISSUER = 'https://ssoi.sts.va.gov/Issuer/smtoken/SAML2' + +HCERTSTORE = ctypes.c_void_p +PCERT_INFO = ctypes.c_void_p +HCRYPTPROV_LEGACY = ctypes.c_void_p + +CERT_STORE_PROV_MEMORY = b'Memory' +X509_ASN_ENCODING = 0x00000001 +PKCS_7_ASN_ENCODING = 0x00010000 +CERT_COMPARE_ANY = 0 +CERT_COMPARE_SHIFT = 16 +CERT_FIND_ANY = CERT_COMPARE_ANY< Generator[HCERTSTORE, None, None]: + res = CertOpenStore(lpszStoreProvider, dwEncodingType, hCryptProv, dwFlags, pvPara) + try: + yield res + finally: + CertCloseStore(res, 0) + +@contextlib.contextmanager +def ManagedCertOpenSystemStore(hProv: HCRYPTPROV_LEGACY, szSubsystemProtocol: ctypes.wintypes.LPCWSTR) -> Generator[HCERTSTORE, None, None]: + res = CertOpenSystemStoreW(hProv, szSubsystemProtocol) + try: + yield res + finally: + CertCloseStore(res, 0) + +def get_vista_certificate(show_cert_dialog: bool=True, hwnd: Optional[int]=0) -> PCCERT_CONTEXT: + with ManagedCertOpenSystemStore(0, 'MY') as store_system, ManagedCertOpenStore(CERT_STORE_PROV_MEMORY, 0, None, 0, None) as store_memory: + cert_selection = cert_iter = None + while cert_iter := CertEnumCertificatesInStore(store_system, cert_iter): + if cert_valid := CertFindCertificateInStore(store_system, X509_ASN_ENCODING|PKCS_7_ASN_ENCODING, 0, CERT_FIND_ANY, None, None): + name_bufsz = CertGetNameStringW(cert_iter, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, None, 0) + buf = ctypes.create_unicode_buffer(name_bufsz) + CertGetNameStringW(cert_iter, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, buf, name_bufsz) + name_string = buf.value + key_usage_bits = ctypes.wintypes.BYTE() + CertGetIntendedKeyUsage(X509_ASN_ENCODING|PKCS_7_ASN_ENCODING, cert_iter.contents.pCertInfo, ctypes.byref(key_usage_bits), ctypes.sizeof(key_usage_bits)) + valid_date = CertVerifyTimeValidity(None, cert_iter.contents.pCertInfo) + if ((key_usage_bits.value&CERT_DIGITAL_SIGNATURE_KEY_USAGE) == CERT_DIGITAL_SIGNATURE_KEY_USAGE) and (valid_date == 0) and ('Card Authentication' not in name_string) and ('0,' not in name_string) and ('Signature' not in name_string): + CertAddCertificateContextToStore(store_memory, cert_iter, CERT_STORE_ADD_ALWAYS, ctypes.byref(cert_valid)) + cert_selection = cert_valid + return CryptUIDlgSelectCertificateFromStore(store_memory, hwnd if hwnd is not None else GetConsoleWindow(), 'VistA Logon - Certificate Selection', 'Select a certificate for VistA authentication', 0, 0, None) if show_cert_dialog else cert_selection + +def get_certificate_thumbprint(certificate: PCCERT_CONTEXT) -> str: + bufsz = ctypes.wintypes.DWORD() + CertGetCertificateContextProperty(certificate, CERT_HASH_PROP_ID, None, ctypes.byref(bufsz)) + buffer = ctypes.create_string_buffer(bufsz.value) + CertGetCertificateContextProperty(certificate, CERT_HASH_PROP_ID, buffer, ctypes.byref(bufsz)) + return buffer.value + +def get_certificate_friendly_display_name(certificate: PCCERT_CONTEXT) -> str: + name_bufsz = CertGetNameStringW(certificate, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, None, 0) + buffer = ctypes.create_unicode_buffer(name_bufsz) + CertGetNameStringW(certificate, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, None, buffer, name_bufsz) + return buffer.value + +# WS-Trust STS endpoints from VDL documentation +# https://www.va.gov/vdl/documents/Infrastructure/KAAJEE/kaajee_ssowap_8_791_depg_r.pdf +# https://www.va.gov/vdl/documents/Financial_Admin/Bed_Management_Solution_(BMS)/bms_4_0_tm.pdf +# https://www.va.gov/vdl/documents/VistA_GUI_Hybrids/National_Utilization_Management_Integration_Archive/numi_server_setup_guide_v15_9.pdf +get_registry_iam = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\IAM', default='https://services.eauth.va.gov:9301/STS/RequestSecurityToken') +get_registry_iam_ad = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\IAM_AD', default='https://services.eauth.va.gov:9201/STS/RequestSecurityToken') +get_registry_rioserver = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\RIOSERVER', default='SecurityTokenService') +get_registry_rioport = lambda: get_registry_value(winreg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\Vista\\Common\\RIOPORT', default='RequestSecurityToken') +def get_registry_value(hkey: int, subkey: str, value: Optional[str]=None, default: Any=None) -> Any: + try: + with winreg.OpenKey(hkey, subkey) as key: + return winreg.QueryValueEx(key, value)[0] + except FileNotFoundError: + return default + +def get_iam_request(application: str, issuer: str) -> str: + return f''' + + + + + + + + + + {application} + + + + {issuer} + + http://schemas.xmlsoap.org/ws/2005/02/trust/Validate + + +''' + +def get_local_computer_name() -> str: + import socket + return socket.getfqdn() + +def get_app_name() -> str: + import sys, os + return os.path.basename(sys.argv[0]) + +def get_sso_token(iam: Optional[str]=None, ua: Optional[str]=None, certificate: Optional[str]=None, issuer: Optional[str]=None, hostname: Optional[str]=None, application: Optional[str]=None) -> str: + import sys, subprocess + if certificate is None: + if choice := get_vista_certificate(): + certificate = get_certificate_thumbprint(choice).hex() + if certificate is not None: + res = subprocess.run(['curl', '-fsSL', '-X', 'POST', iam or get_registry_iam(), '--ca-native', '--cert', 'CurrentUser\\MY\\' + certificate, '-A', ua or DEFAULT_USER_AGENT, '-H', 'Content-Type: application/xml', '-H', 'Accept: application/xml', '-d', get_iam_request(f"https://{hostname or get_local_computer_name()}/Delphi_RPC_Broker/{application or get_app_name()}", issuer or DEFAULT_ISSUER)], capture_output=True) + print(res.stderr.decode('utf8'), end='', file=sys.stderr) + return res.stdout.decode('utf8') + +async def get_sso_token_async(iam: Optional[str]=None, ua: Optional[str]=None, certificate: Optional[str]=None, issuer: Optional[str]=None, hostname: Optional[str]=None, application: Optional[str]=None) -> str: + import sys, asyncio + if certificate is None: + certificate = get_certificate_thumbprint(get_vista_certificate()).hex() + res = await (await asyncio.create_subprocess_exec('curl', '-fsSL', '-X', 'POST', iam or get_registry_iam(), '--ca-native', '--cert', 'CurrentUser\\MY\\' + certificate, '-A', ua or DEFAULT_USER_AGENT, '-H', 'Content-Type: application/xml', '-H', 'Accept: application/xml', '-d', get_iam_request(f"https://{hostname or get_local_computer_name()}/Delphi_RPC_Broker/{application or get_app_name()}", issuer or DEFAULT_ISSUER), stdin=None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)).communicate() + print(res[1].decode('utf8'), end='', file=sys.stderr) + return res[0].decode('utf8') + +if __name__ == '__main__': + try: + print(get_sso_token()) + except OSError: + exit(1) diff --git a/auth.py b/auth.py deleted file mode 100644 index 83e92bb..0000000 --- a/auth.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 - -import ctypes - -# Load DLL -XUIAMSSOi = ctypes.WinDLL('C:\\Program Files (x86)\\Micro Focus\\Reflection\\XUIAMSSOi.dll') -XUIAMSSOi.MySsoTokenVBA.restype = ctypes.c_long -XUIAMSSOi.MySsoTokenVBA.argtypes = (ctypes.c_wchar_p, ctypes.c_long) - -# Authenticate against smartcard -def XUIAMSSOi_MySsoTokenVBA(bufsize=15000): - buf = ctypes.create_unicode_buffer(bufsize) - sz = XUIAMSSOi.MySsoTokenVBA(buf, bufsize) - if sz <= bufsize: - return buf.value.encode('utf-16')[2:].decode('latin-1') - else: - return XUIAMSSOi_MySsoTokenVBA(sz) diff --git a/main.py b/main.py index e5f374a..399023a 100644 --- a/main.py +++ b/main.py @@ -112,8 +112,8 @@ def application(): client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo) return jsonify_result(user, id=request.json.get('id')) else: - from auth import XUIAMSSOi_MySsoTokenVBA - if token := XUIAMSSOi_MySsoTokenVBA(): + import XWBSSOi + if token := XWBSSOi.get_sso_token(application='CPRSChart.exe'): user = client.authenticate(token) client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo) return jsonify_result(user, id=request.json.get('id')) diff --git a/rpc.py b/rpc.py index c438191..addb983 100644 --- a/rpc.py +++ b/rpc.py @@ -237,13 +237,13 @@ class ClientAsync(object): if __name__ == '__main__': import getpass, code - from auth import XUIAMSSOi_MySsoTokenVBA + import XWBSSOi client = ClientSync(host='test.northport.med.va.gov', port=19009) #client = ClientSync(host='vista.northport.med.va.gov', port=19209) threading.Thread(target=client.keepalive, daemon=True).start() print('\r\n'.join(client.XUS_INTRO_MSG())) - if token := XUIAMSSOi_MySsoTokenVBA(): + if token := XWBSSOi.get_sso_token(application='CPRSChart.exe'): print('authenticate', repr(client.authenticate(token))) else: print('authenticate', repr(client.authenticate(f"{getpass.getpass('ACCESS CODE: ')};{getpass.getpass('VERIFY CODE: ')}")))