commit e5a2fb87e1b0fedf256b86ce570f5ac9b23c9007 Author: Jiang Yio Date: Sun Oct 2 06:45:42 2022 -0400 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..55be276 --- /dev/null +++ b/.gitignore @@ -0,0 +1,154 @@ +# ---> Python +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintainted in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + diff --git a/README.md b/README.md new file mode 100644 index 0000000..bb3e7f8 --- /dev/null +++ b/README.md @@ -0,0 +1,9 @@ +# vistawire-py + +VistA RPC wire protocol implementation in Python. + +## Test client + +```shell +vistawire.py host port +``` \ No newline at end of file diff --git a/XUIAMSSOi.py b/XUIAMSSOi.py new file mode 100644 index 0000000..5d669cf --- /dev/null +++ b/XUIAMSSOi.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 + +import ctypes + +# Load DLL +XUIAMSSOi = ctypes.WinDLL('C:\\Program Files (x86)\\Micro Focus\\Reflection\\XUIAMSSOi.dll') +XUIAMSSOi.MySsoTokenVBA.restype = ctypes.c_long +XUIAMSSOi.MySsoTokenVBA.argtypes = (ctypes.c_wchar_p, ctypes.c_long) + +# Authenticate against smartcard +def MySsoTokenVBA(bufsize: int=15000) -> str: + buf = ctypes.create_unicode_buffer(bufsize) + sz = XUIAMSSOi.MySsoTokenVBA(buf, bufsize) + if sz <= bufsize: + return buf.value.encode('utf-16')[2:].decode('latin-1') + else: + return MySsoTokenVBA(sz) diff --git a/vistawire.py b/vistawire.py new file mode 100644 index 0000000..a763abf --- /dev/null +++ b/vistawire.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 + +import math +import random +import socket +import warnings +from collections import namedtuple +from contextlib import contextmanager + +from typing import Any, Union, Iterable, Generator, Tuple, Sequence, Callable, Optional + +# protocol token [XWB]VTER: [XWB] = NS broker [XWB], V = version 1, T = type 1, E = envelope size 3, R = XWBPRT 0 +RPCRequest = namedtuple('RPCRequest', ('header', 'version', 'type', 'envelope', 'rt', 'command', 'broker', 'method', 'params', 'encoding')) + +bEOT = b'\x04' + +class RPCExc(RuntimeError): pass +class RPCExcFormat(ValueError, RPCExc): pass +class RPCExcServerError(RPCExc): pass +class RPCExcInvalidResult(RPCExc): pass + +class RPCType(object): + LITERAL = b'0' + REFERENCE = b'1' + LIST = b'2' + GLOBAL = b'3' + EMPTY = b'4' + STREAM = b'5' + def __init__(self, value: Any, magic: bytes=None): + self.magic = magic + self.value = value.value if isinstance(value, RPCType) else value + +class RPCDict(dict): + def __init__(self, iterable: Iterable, magic: bytes=RPCType.LIST): + dict.__init__(self, iterable) + self.magic = magic + +class RPCList(list): + def __init__(self, iterable: Iterable, magic: bytes=RPCType.LIST, start: int=1): + list.__init__(self, iterable) + self.magic = magic + self.start = start + def __add__(self, rhs: list) -> 'RPCList': + return RPCList(list.__add__(self, rhs), magic=self.magic, start=self.start) + +class StatefulClient(object): + def __init__(self, sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768, encoding: str='latin-1'): + self._rpc = client(sock, end=end, minsz=minsz, maxsz=maxsz) + self._sock = sock + self._context = 'XUS SIGNON' + self._encoding = encoding + def __call__(self, method: Union[str, bytes], *params: Any, throw: bool=True) -> Any: + if isinstance(method, bytes): + try: + res = self._rpc(method) + if res == b'\x00\x001': + request = load(method, encoding=self._encoding) + if request.method == 'XWB CREATE CONTEXT': + self._context = decrypt(request.params[0]) + except Exception as ex: + warnings.warn(f"Error executing {method}: {ex}") + return res + if (fn := getattr(self, (method := method.strip(' _:')), None)): + return fn(*params, context=context, restore=restore) + if (method := method.replace('_', ' ').upper()) == 'XWB CREATE CONTEXT': + res = r_unpack(self._rpc(dump(method, *params, encoding=self._encoding)), throw=throw, encoding=self._encoding) + if res == '1': + self._context = decrypt(params[0]) + return res + return r_unpack(self._rpc(dump(method, *params, encoding=self._encoding)), throw=throw, encoding=self._encoding) + def setcontext(self, *contexts: str): + if len(contexts) > 0 and self._context not in contexts: + for context in contexts: + if r_unpack(self._rpc(dump('XWB CREATE CONTEXT', encrypt_trivial(context), encoding=self._encoding)), encoding=self._encoding) == '1': + self._context = context + return context + @contextmanager + def context(self): + try: + yield (ctx := self._context) + finally: + if self._context != ctx: + res = r_unpack(self._rpc(dump('XWB CREATE CONTEXT', encrypt_trivial(ctx), encoding=self._encoding)), encoding=self._encoding) + if res != '1': + raise RPCExcInvalidResult('XWB CREATE CONTEXT', ctx, res) + self._context = ctx + +def load(data: bytes, encoding: str='latin-1') -> RPCRequest: + if data.startswith(b'['): + offset = data.index(b']') + 1 + header = data[:offset] + version = data[offset: offset + 1] + type = data[offset + 1: offset + 2] + envelope = int(chr(data[offset + 2])) + rt = data[offset + 3: offset + 4] + command = data[offset + 4] == 0x34 # ord(b'4') + offset += 5 + broker = None + if not command: + broker, offset = s_unpack(data, offset, encoding=encoding) + method, offset = s_unpack(data, offset, encoding=encoding) + params = None + if offset < (sz := len(data)): + if data[offset] != 0x35: + raise RPCExcFormat(f"Char '5' expected at #{offset}, got: {chr(data[offset])}") # ord(b'5') + offset += 1 + params = [] + while offset < sz and data[offset: offset + 2] != b'4f': + value, offset = l_unpack(data, offset, envelope=envelope, encoding=encoding) + params.append(value) + params = tuple(params) + return RPCRequest(header=header, version=version, type=type, envelope=envelope, rt=rt, command=command, broker=broker, method=method, params=params, encoding=encoding) + raise RPCExcFormat(f"Invalid format {data}") + +def dump(method: Union[str, RPCRequest], *params: Any, header: bytes=b'[XWB]', version: bytes=b'1', type: bytes=b'1', envelope: int=0, rt: bytes=b'0', command: bool=False, broker: str='0', encoding: str='latin-1') -> bytes: + if isinstance(method, RPCRequest): + params = method.params + header = method.header + version = method.version + type = method.type + envelope = method.envelope + rt = method.rt + command = method.command + broker = method.broker + encoding = method.encoding + method = method.method + envelope = max(3, math.ceil(math.log10(max(1, max(l_pack_maxlen(arg, encoding=encoding) for arg in params)))) if envelope < 1 and params is not None and len(params) else envelope) + return header + version + type + str(envelope).encode(encoding) + rt + (b'4' if command else (b'2' + s_pack(broker, encoding=encoding))) + s_pack(method, encoding=encoding) + ((b'5' + (b''.join(l_pack(param, envelope=envelope, encoding=encoding) for param in params) if len(params) > 0 else b'4f')) if params is not None else b'') + +def s_pack(value: str, encoding: str='latin-1') -> bytes: + encoded = value.encode(encoding) + if len(encoded) <= 255: + return bytes((len(encoded),)) + encoded + raise RPCExcFormat('cannot s-pack string longer than 255 bytes: ' + repr(value)) + +def l_pack(value: Any, envelope: int=3, wrapped: bool=True, magic: Optional[bytes]=None, encoding: str='latin-1') -> bytes: + if isinstance(value, dict): + bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in value.items()) + return ((magic or getattr(value, 'magic', b'2')) + bare + b'f') if wrapped else bare + elif not isinstance(value, str) and hasattr(value, '__iter__'): + bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in enumerate(value, start=getattr(value, 'start', 1))) + return ((magic or getattr(value, 'magic', b'2')) + bare + b'f') if wrapped else bare + elif isinstance(value, RPCType): + return l_pack(value.value, envelope=envelope, magic=value.magic, encoding=encoding) + else: + encoded = str(value).encode(encoding) + if len(encoded) <= 10**envelope - 1: + bare = str(len(encoded)).zfill(envelope).encode(encoding) + encoded + return ((magic or b'0') + bare + b'f') if wrapped else bare + raise RPCExcFormat(f'cannot l-pack string longer than {10**envelope - 1} bytes with an envelope of {envelope}: ' + repr(value)) + +def l_pack_maxlen(value: Any, encoding: str='latin-1') -> int: + if isinstance(value, dict): + return max(max(l_pack_maxlen(k, encoding=encoding) for k in value.keys()), max(l_pack_maxlen(v, encoding=encoding) for v in value.values())) if len(value) > 0 else 0 + elif not isinstance(value, str) and hasattr(value, '__iter__'): + return max(len(str(len(value))), max(l_pack_maxlen(v, encoding=encoding) for v in value)) if len(value) > 0 else 0 + else: + return len(str(value).encode(encoding)) + +def s_unpack(data: bytes, offset: int=0, encoding='latin-1') -> Tuple[str, int]: + n = data[offset] + return data[offset + 1: (end := offset + 1 + n)].decode(encoding), end + +def l_unpack(data: bytes, offset: int=0, envelope: int=3, bare: bool=False, encoding='latin-1') -> Tuple[Any, int]: + if bare: + n = int(data[offset: offset + envelope].decode(encoding)) + return data[offset + envelope: (end := offset + envelope + n)].decode(encoding), end + magic = data[offset: offset + 1] + if magic == b'0': + value, offset = l_unpack(data, offset + 1, envelope=envelope, bare=True, encoding=encoding) + if data[offset] == 0x66: # ord(b'f') + return value, offset + 1 + raise RPCExcFormat(f"Char 'f' expected, got: {chr(data[offset])}") + elif magic in b'23': + res = {} + while True: + key, offset = l_unpack(data, offset + 1, envelope=envelope, bare=True, encoding=encoding) + if data[offset] in b'012345': + value, offset = l_unpack(data, offset, envelope=envelope, bare=True, encoding=encoding) + res[key] = value + else: + warnings.warn(f"Invalid list in {data}") + if (suffix := data[offset]) == 0x66: # ord(b'f') + return dictlist(res, magic=magic), offset + 1 + elif suffix != 0x74: # ord(b't') + raise RPCExcFormat(f"Char 't' or 'f' expected, got: {chr(data[offset])}") + raise RPCExcFormat(f"Unknown data type {magic}") + +def dictlist(value: dict, start: Optional[int]=None, magic: bytes=RPCType.LIST) -> Union[RPCDict, RPCList]: + index = None + try: + for k in value.keys(): + if index is not None: + if int(k) - index == 1: + index += 1 + else: + return RPCDict(value, magic=magic) + else: + index = start = int(k) + return RPCList(value.values(), magic=magic) if start is None else RPCList(value.values(), magic=magic, start=start) + except ValueError: + return RPCDict(value, magic=magic) + +def r_unpack(data: bytes, throw: bool=True, encoding: str='latin-1') -> Any: + if data[:2] == b'\x00\x00': + if len(data) > 2 and data[2] == 0x18: # 0x18 is CAN + if throw: + raise RPCExcServerError(data[3:].decode(encoding)) + return RPCExcServerError(data[3:].decode(encoding)) + elif data[-1] == 0x1f: # 0x1f is US + return r_unpack_table(data[2:-1].decode(encoding).split('\x1e')) # 0x1e is RS + elif data[-2:] == b'\r\n': + return tuple(data[2:-2].decode(encoding).split('\r\n')) + else: + return data[2:].decode(encoding) if len(data) > 2 else None + if throw: + raise RPCExcFormat(data) + return RPCExcFormat(data) + +def r_unpack_table(rows: Sequence[str]) -> Union[Tuple[dict, ...], Tuple[tuple, ...]]: + # table: ROW\x1eROW\x1eROW\x1eROW\x1eROW\x1e\x1f; row: COL^COL^COL^COL^COL; header field: [IT]\d{5}.+ + if len(rows) > 0 and len(hdr := rows[0]) > 0 and hdr[0] in ('I', 'T') and hdr[1:6].isdecimal(): + header = [field[6:] for field in rows[0].split('^')] + return tuple(dict(zip(header, row.split('^'))) for row in rows[1:] if len(row) > 0) + else: + return tuple(tuple(row.split('^')) for row in rows if len(row) > 0) + +cipherpad = ( + 'wkEo-ZJt!dG)49K{nX1BS$vH<&:Myf*>Ae0jQW=;|#PsO`\'%+rmb[gpqN,l6/hFC@DcUa ]z~R}"V\\iIxu?872.(TYL5_3', + 'rKv`R;M/9BqAF%&tSs#Vh)dO1DZP> *fX\'u[.4lY=-mg_ci802N7LTG<]!CWo:3?{+,5Q}(@jaExn$~p\\IyHwzU"|k6Jeb', + '\\pV(ZJk"WQmCn!Y,y@1d+~8s?[lNMxgHEt=uw|X:qSLjAI*}6zoF{T3#;ca)/h5%`P4$r]G\'9e2if_>UDKb7H=CT8S!', + 'NZW:1}K$byP;jk)7\'`x90B|cq@iSsEnu,(l-hf.&Y_?J#R]+voQXU8mrV[!p4tg~OMez CAaGFD6H53%L/dT2<*>"{\\wI=', + 'vCiJ[D_0xR32c*4.P"G{r7}E8wUgyudF+6-:B=$(sY,LkbHa#\'@Q', + 'hvMX,\'4Ty;[a8/{6l~F_V"}qLI\\!@x(D7bRmUH]W15J%N0BYPkrs&9:$)Zj>u|zwQ=ieC-oGA.#?tfdcO3gp`S+En K2*<', + 'jd!W5[];4\'?ghBzIFN}fAK"#`p_TqtD*1E37XGVs@0nmSe+Y6Qyo-aUu%i8c=H2vJ\\) R:MLb.9,wlO~P', + '2ThtjEM+!=xXb)7,ZV{*ci3"8@_l-HS69L>]\\AUF/Q%:qD?1~m(yvO0e\'<#o$p4dnIzKP|`NrkaGg.ufCRB[; sJYwW}5&', + 'vB\\5/zl-9y:Pj|=(R\'7QJI *&CTX"p0]_3.idcuOefVU#omwNZ`$Fs?L+1Sk<,b)hM4A6[Y%aDrg@~KqEW8t>H};n!2xG{', + 'sFz0Bo@_HfnK>LR}qWXV+D6`Y28=4Cm~G/7-5A\\b9!a#rP.l&M$hc3ijQk;),TvUd<[:I"u1\'NZSOw]*gxtE{eJp|y (?%', + 'M@,D}|LJyGO8`$*ZqH .j>c~hanG', + 'xVa1\']_GU#zm+:5b@06O3Ap8=*7ZFY!H-uEQk; .q)i&rhd', + 'I]Jz7AG@QX."%3Lq>METUo{Pp_ |a6<0dYVSv8:b)~W9NK`(r\'4fs&wim\\kReC2hg=HOj$1B*/nxt,;c#y+![?lFuZ-5D}', + 'Rr(Ge6F Hx>q$m&C%M~Tn,:"o\'tX/*yP.{lZ!YkiVhuw_y|m};d)-7DZ"Fe/Y9 WidFN,1KsmwQ)GJM{I4:C%}#Ep(?HB/r;t.&U8o|l[\'Lg"2hRDyZ5`nbf]qjc0!zS-TkYO<_=76a\\X@$Pe3+xVvu', + 'yYgjf"5VdHc#uA,W1i+v\'6|@pr{n;DJ!8(btPGaQM.LT3oe?NB/&9>Z`-}02*%x<7lsqz4OS ~E$\\R]KI[:UwC_=h)kXmF', + '5:iar.{YU7mBZR@-K|2 "+~`M%8sq4JhPo<_X\\Sg3WC;Tuxz,fvEQ1p9=w}FAI&j/keD0c?)LN6OHV]lGy\'$*>nd[(tb!#' +) +cipherpad_reversed = tuple({c: i for i, c in enumerate(m)} for m in cipherpad) + +def encrypt(plaintext: str) -> str: + associator_idx = identifier_idx = random.randrange(l := len(cipherpad)) + while identifier_idx == associator_idx: + identifier_idx = random.randrange(l) + associator = cipherpad_reversed[associator_idx] + identifier = cipherpad[identifier_idx] + return chr(associator_idx + 32) + ''.join(identifier[associator[i]] for i in plaintext) + chr(identifier_idx + 32) + +def encrypt_trivial(plaintext: str) -> str: + return f' {plaintext} ' + +def decrypt(ciphertext: str) -> str: + associator_idx = ord(ciphertext[-1]) - 32 + identifier_idx = ord(ciphertext[0]) - 32 + associator = cipherpad_reversed[associator_idx] + identifier = cipherpad[identifier_idx] + return ''.join(identifier[associator[i]] for i in ciphertext[1:-1]) + +class sockiter(object): + def __init__(self, sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768): + self.gen = recv_msg(sock, end=end, minsz=minsz, maxsz=maxsz) + self.sock = sock + self.end = end + def __call__(self, msg: bytes) -> int: + return send_msg(self.sock, msg, end=self.end) + def __iter__(self) -> Generator[bytes, None, None]: + return self.gen + +def client(sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768) -> Callable[[bytes], bytes]: + def gen() -> Generator[bytes, Optional[bytes], None]: + responses = recv_msg(sock, end=end, minsz=minsz, maxsz=maxsz) + request = yield + while True: + send_msg(sock, request, end=end) + request = yield next(responses) + gen = gen() + next(gen) + return gen.send + +def send_msg(sock: socket.socket, msg: bytes, end: bytes=bEOT) -> int: + return sock.send(msg + end) + +def recv_msg(sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768) -> Generator[bytes, None, None]: + buf = b'' + bufsz = minsz + while True: + if len(data := sock.recv(bufsz)) > 0: + buf += data + while (idx := buf.find(end)) >= 0: + if idx > 0: + yield buf[:idx] + bufsz = minsz + elif bufsz < maxsz: + bufsz = _x if (_x := bufsz << 1) < maxsz else maxsz + buf = buf[idx + 1:] + +if __name__ == '__main__': + import sys, shlex, getpass, XUIAMSSOi + try: + import readline + except ModuleNotFoundError: + pass + + host = 'localhost' + port = 19000 + + if len(sys.argv) > 1: + host = sys.argv[1] + if len(sys.argv) > 2: + port = int(sys.argv[2]) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + call = client(sock) + print((x := f'VistA {host}:{port}'), '='*len(x), sep='\n') + + try: + print('RPC> TCPConnect') + print(res := r_unpack(call(dump('TCPConnect', sock.getsockname()[0], '0', socket.gethostname(), command=True)), throw=False)) + if res == 'accept': + print('RPC> XUS SIGNON SETUP') + print(res := r_unpack(call(dump('XUS SIGNON SETUP', '', '1')), throw=False)) + retry = True + while retry: + if token := XUIAMSSOi.MySsoTokenVBA(): + print('RPC> XUS ESSO VALIDATE') + print(res := r_unpack(call(dump('XUS ESSO VALIDATE', RPCList((token[i:i+200] for i in range(0, len(token), 200)), magic=RPCType.GLOBAL, start=0))), throw=False)) + else: + token = getpass.getpass('ACCESS CODE: ') + ';' + getpass.getpass('VERIFY CODE: ') + print('RPC> XUS AV CODE') + print(res := r_unpack(call(dump('XUS AV CODE', encrypt_trivial(token))), throw=False)) + if res[0] != '0': + retry = False + while True: + data = input('RPC> ').strip() + if len(data) > 0: + data = shlex.split(data) + data[0] = data[0].replace('_', ' ').upper() + print(r_unpack(call(dump(*data)), throw=False)) + except (EOFError, KeyboardInterrupt): + pass + + print('RPC> #BYE#') + print(r_unpack(call(dump('#BYE#', command=True)), throw=False))