#!/usr/bin/env python3 import math import random import socket import warnings from collections import namedtuple from contextlib import contextmanager from typing import Any, Union, Iterable, Generator, Tuple, Sequence, Callable, Optional # protocol token [XWB]VTER: [XWB] = NS broker [XWB], V = version 1, T = type 1, E = envelope size 3, R = XWBPRT 0 RPCRequest = namedtuple('RPCRequest', ('header', 'version', 'type', 'envelope', 'rt', 'command', 'broker', 'method', 'params', 'encoding')) bEOT = b'\x04' class RPCExc(RuntimeError): pass class RPCExcFormat(ValueError, RPCExc): pass class RPCExcServerError(RPCExc): pass class RPCExcInvalidResult(RPCExc): pass class RPCType(object): LITERAL = b'0' REFERENCE = b'1' LIST = b'2' GLOBAL = b'3' EMPTY = b'4' STREAM = b'5' def __init__(self, value: Any, magic: bytes=None): self.magic = magic self.value = value.value if isinstance(value, RPCType) else value class RPCDict(dict): def __init__(self, iterable: Iterable, magic: bytes=RPCType.LIST): dict.__init__(self, iterable) self.magic = magic class RPCList(list): def __init__(self, iterable: Iterable, magic: bytes=RPCType.LIST, start: int=1): list.__init__(self, iterable) self.magic = magic self.start = start def __add__(self, rhs: list) -> 'RPCList': return RPCList(list.__add__(self, rhs), magic=self.magic, start=self.start) class StatefulClient(object): def __init__(self, sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768, encoding: str='latin-1'): self._rpc = client(sock, end=end, minsz=minsz, maxsz=maxsz) self._sock = sock self._context = 'XUS SIGNON' self._encoding = encoding def __call__(self, method: Union[str, bytes], *params: Any, throw: bool=True) -> Any: if isinstance(method, bytes): try: res = self._rpc(method) if res == b'\x00\x001': request = load(method, encoding=self._encoding) if request.method == 'XWB CREATE CONTEXT': self._context = decrypt(request.params[0]) except Exception as ex: warnings.warn(f"Error executing {method}: {ex}") return res if (fn := getattr(self, (method := method.strip(' _:')), None)): return fn(*params, context=context, restore=restore) if (method := method.replace('_', ' ').upper()) == 'XWB CREATE CONTEXT': res = r_unpack(self._rpc(dump(method, *params, encoding=self._encoding)), throw=throw, encoding=self._encoding) if res == '1': self._context = decrypt(params[0]) return res return r_unpack(self._rpc(dump(method, *params, encoding=self._encoding)), throw=throw, encoding=self._encoding) def setcontext(self, *contexts: str): if len(contexts) > 0 and self._context not in contexts: for context in contexts: if r_unpack(self._rpc(dump('XWB CREATE CONTEXT', encrypt_trivial(context), encoding=self._encoding)), encoding=self._encoding) == '1': self._context = context return context @contextmanager def context(self): try: yield (ctx := self._context) finally: if self._context != ctx: res = r_unpack(self._rpc(dump('XWB CREATE CONTEXT', encrypt_trivial(ctx), encoding=self._encoding)), encoding=self._encoding) if res != '1': raise RPCExcInvalidResult('XWB CREATE CONTEXT', ctx, res) self._context = ctx def load(data: bytes, encoding: str='latin-1') -> RPCRequest: if data.startswith(b'['): offset = data.index(b']') + 1 header = data[:offset] version = data[offset: offset + 1] type = data[offset + 1: offset + 2] envelope = int(chr(data[offset + 2])) rt = data[offset + 3: offset + 4] command = data[offset + 4] == 0x34 # ord(b'4') offset += 5 broker = None if not command: broker, offset = s_unpack(data, offset, encoding=encoding) method, offset = s_unpack(data, offset, encoding=encoding) params = None if offset < (sz := len(data)): if data[offset] != 0x35: raise RPCExcFormat(f"Char '5' expected at #{offset}, got: {chr(data[offset])}") # ord(b'5') offset += 1 params = [] while offset < sz and data[offset: offset + 2] != b'4f': value, offset = l_unpack(data, offset, envelope=envelope, encoding=encoding) params.append(value) params = tuple(params) return RPCRequest(header=header, version=version, type=type, envelope=envelope, rt=rt, command=command, broker=broker, method=method, params=params, encoding=encoding) raise RPCExcFormat(f"Invalid format {data}") def dump(method: Union[str, RPCRequest], *params: Any, header: bytes=b'[XWB]', version: bytes=b'1', type: bytes=b'1', envelope: int=0, rt: bytes=b'0', command: bool=False, broker: str='0', encoding: str='latin-1') -> bytes: if isinstance(method, RPCRequest): params = method.params header = method.header version = method.version type = method.type envelope = method.envelope rt = method.rt command = method.command broker = method.broker encoding = method.encoding method = method.method envelope = max(3, math.ceil(math.log10(max(1, max(l_pack_maxlen(arg, encoding=encoding) for arg in params)))) if envelope < 1 and params is not None and len(params) else envelope) return header + version + type + str(envelope).encode(encoding) + rt + (b'4' if command else (b'2' + s_pack(broker, encoding=encoding))) + s_pack(method, encoding=encoding) + ((b'5' + (b''.join(l_pack(param, envelope=envelope, encoding=encoding) for param in params) if len(params) > 0 else b'4f')) if params is not None else b'') def s_pack(value: str, encoding: str='latin-1') -> bytes: encoded = value.encode(encoding) if len(encoded) <= 255: return bytes((len(encoded),)) + encoded raise RPCExcFormat('cannot s-pack string longer than 255 bytes: ' + repr(value)) def l_pack(value: Any, envelope: int=3, wrapped: bool=True, magic: Optional[bytes]=None, encoding: str='latin-1') -> bytes: if isinstance(value, dict): bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in value.items()) return ((magic or getattr(value, 'magic', b'2')) + bare + b'f') if wrapped else bare elif not isinstance(value, str) and hasattr(value, '__iter__'): bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in enumerate(value, start=getattr(value, 'start', 1))) return ((magic or getattr(value, 'magic', b'2')) + bare + b'f') if wrapped else bare elif isinstance(value, RPCType): return l_pack(value.value, envelope=envelope, magic=value.magic, encoding=encoding) else: encoded = str(value).encode(encoding) if len(encoded) <= 10**envelope - 1: bare = str(len(encoded)).zfill(envelope).encode(encoding) + encoded return ((magic or b'0') + bare + b'f') if wrapped else bare raise RPCExcFormat(f'cannot l-pack string longer than {10**envelope - 1} bytes with an envelope of {envelope}: ' + repr(value)) def l_pack_maxlen(value: Any, encoding: str='latin-1') -> int: if isinstance(value, dict): return max(max(l_pack_maxlen(k, encoding=encoding) for k in value.keys()), max(l_pack_maxlen(v, encoding=encoding) for v in value.values())) if len(value) > 0 else 0 elif not isinstance(value, str) and hasattr(value, '__iter__'): return max(len(str(len(value))), max(l_pack_maxlen(v, encoding=encoding) for v in value)) if len(value) > 0 else 0 else: return len(str(value).encode(encoding)) def s_unpack(data: bytes, offset: int=0, encoding='latin-1') -> Tuple[str, int]: n = data[offset] return data[offset + 1: (end := offset + 1 + n)].decode(encoding), end def l_unpack(data: bytes, offset: int=0, envelope: int=3, bare: bool=False, encoding='latin-1') -> Tuple[Any, int]: if bare: n = int(data[offset: offset + envelope].decode(encoding)) return data[offset + envelope: (end := offset + envelope + n)].decode(encoding), end magic = data[offset: offset + 1] if magic == b'0': value, offset = l_unpack(data, offset + 1, envelope=envelope, bare=True, encoding=encoding) if data[offset] == 0x66: # ord(b'f') return value, offset + 1 raise RPCExcFormat(f"Char 'f' expected, got: {chr(data[offset])}") elif magic in b'23': res = {} while True: key, offset = l_unpack(data, offset + 1, envelope=envelope, bare=True, encoding=encoding) if data[offset] in b'012345': value, offset = l_unpack(data, offset, envelope=envelope, bare=True, encoding=encoding) res[key] = value else: warnings.warn(f"Invalid list in {data}") if (suffix := data[offset]) == 0x66: # ord(b'f') return dictlist(res, magic=magic), offset + 1 elif suffix != 0x74: # ord(b't') raise RPCExcFormat(f"Char 't' or 'f' expected, got: {chr(data[offset])}") raise RPCExcFormat(f"Unknown data type {magic}") def dictlist(value: dict, start: Optional[int]=None, magic: bytes=RPCType.LIST) -> Union[RPCDict, RPCList]: index = None try: for k in value.keys(): if index is not None: if int(k) - index == 1: index += 1 else: return RPCDict(value, magic=magic) else: index = start = int(k) return RPCList(value.values(), magic=magic) if start is None else RPCList(value.values(), magic=magic, start=start) except ValueError: return RPCDict(value, magic=magic) def r_unpack(data: bytes, throw: bool=True, encoding: str='latin-1') -> Any: if data[:2] == b'\x00\x00': if len(data) > 2 and data[2] == 0x18: # 0x18 is CAN if throw: raise RPCExcServerError(data[3:].decode(encoding)) return RPCExcServerError(data[3:].decode(encoding)) elif data[-1] == 0x1f: # 0x1f is US return r_unpack_table(data[2:-1].decode(encoding).split('\x1e')) # 0x1e is RS elif data[-2:] == b'\r\n': return tuple(data[2:-2].decode(encoding).split('\r\n')) else: return data[2:].decode(encoding) if len(data) > 2 else None if throw: raise RPCExcFormat(data) return RPCExcFormat(data) def r_unpack_table(rows: Sequence[str]) -> Union[Tuple[dict, ...], Tuple[tuple, ...]]: # table: ROW\x1eROW\x1eROW\x1eROW\x1eROW\x1e\x1f; row: COL^COL^COL^COL^COL; header field: [IT]\d{5}.+ if len(rows) > 0 and len(hdr := rows[0]) > 0 and hdr[0] in ('I', 'T') and hdr[1:6].isdecimal(): header = [field[6:] for field in rows[0].split('^')] return tuple(dict(zip(header, row.split('^'))) for row in rows[1:] if len(row) > 0) else: return tuple(tuple(row.split('^')) for row in rows if len(row) > 0) cipherpad = ( 'wkEo-ZJt!dG)49K{nX1BS$vH<&:Myf*>Ae0jQW=;|#PsO`\'%+rmb[gpqN,l6/hFC@DcUa ]z~R}"V\\iIxu?872.(TYL5_3', 'rKv`R;M/9BqAF%&tSs#Vh)dO1DZP> *fX\'u[.4lY=-mg_ci802N7LTG<]!CWo:3?{+,5Q}(@jaExn$~p\\IyHwzU"|k6Jeb', '\\pV(ZJk"WQmCn!Y,y@1d+~8s?[lNMxgHEt=uw|X:qSLjAI*}6zoF{T3#;ca)/h5%`P4$r]G\'9e2if_>UDKb7H=CT8S!', 'NZW:1}K$byP;jk)7\'`x90B|cq@iSsEnu,(l-hf.&Y_?J#R]+voQXU8mrV[!p4tg~OMez CAaGFD6H53%L/dT2<*>"{\\wI=', 'vCiJ[D_0xR32c*4.P"G{r7}E8wUgyudF+6-:B=$(sY,LkbHa#\'@Q', 'hvMX,\'4Ty;[a8/{6l~F_V"}qLI\\!@x(D7bRmUH]W15J%N0BYPkrs&9:$)Zj>u|zwQ=ieC-oGA.#?tfdcO3gp`S+En K2*<', 'jd!W5[];4\'?ghBzIFN}fAK"#`p_TqtD*1E37XGVs@0nmSe+Y6Qyo-aUu%i8c=H2vJ\\) R:MLb.9,wlO~P', '2ThtjEM+!=xXb)7,ZV{*ci3"8@_l-HS69L>]\\AUF/Q%:qD?1~m(yvO0e\'<#o$p4dnIzKP|`NrkaGg.ufCRB[; sJYwW}5&', 'vB\\5/zl-9y:Pj|=(R\'7QJI *&CTX"p0]_3.idcuOefVU#omwNZ`$Fs?L+1Sk<,b)hM4A6[Y%aDrg@~KqEW8t>H};n!2xG{', 'sFz0Bo@_HfnK>LR}qWXV+D6`Y28=4Cm~G/7-5A\\b9!a#rP.l&M$hc3ijQk;),TvUd<[:I"u1\'NZSOw]*gxtE{eJp|y (?%', 'M@,D}|LJyGO8`$*ZqH .j>c~hanG', 'xVa1\']_GU#zm+:5b@06O3Ap8=*7ZFY!H-uEQk; .q)i&rhd', 'I]Jz7AG@QX."%3Lq>METUo{Pp_ |a6<0dYVSv8:b)~W9NK`(r\'4fs&wim\\kReC2hg=HOj$1B*/nxt,;c#y+![?lFuZ-5D}', 'Rr(Ge6F Hx>q$m&C%M~Tn,:"o\'tX/*yP.{lZ!YkiVhuw_y|m};d)-7DZ"Fe/Y9 WidFN,1KsmwQ)GJM{I4:C%}#Ep(?HB/r;t.&U8o|l[\'Lg"2hRDyZ5`nbf]qjc0!zS-TkYO<_=76a\\X@$Pe3+xVvu', 'yYgjf"5VdHc#uA,W1i+v\'6|@pr{n;DJ!8(btPGaQM.LT3oe?NB/&9>Z`-}02*%x<7lsqz4OS ~E$\\R]KI[:UwC_=h)kXmF', '5:iar.{YU7mBZR@-K|2 "+~`M%8sq4JhPo<_X\\Sg3WC;Tuxz,fvEQ1p9=w}FAI&j/keD0c?)LN6OHV]lGy\'$*>nd[(tb!#' ) cipherpad_reversed = tuple({c: i for i, c in enumerate(m)} for m in cipherpad) def encrypt(plaintext: str) -> str: associator_idx = identifier_idx = random.randrange(l := len(cipherpad)) while identifier_idx == associator_idx: identifier_idx = random.randrange(l) associator = cipherpad_reversed[associator_idx] identifier = cipherpad[identifier_idx] return chr(associator_idx + 32) + ''.join(identifier[associator[i]] for i in plaintext) + chr(identifier_idx + 32) def encrypt_trivial(plaintext: str) -> str: return f' {plaintext} ' def decrypt(ciphertext: str) -> str: associator_idx = ord(ciphertext[-1]) - 32 identifier_idx = ord(ciphertext[0]) - 32 associator = cipherpad_reversed[associator_idx] identifier = cipherpad[identifier_idx] return ''.join(identifier[associator[i]] for i in ciphertext[1:-1]) class sockiter(object): def __init__(self, sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768): self.gen = recv_msg(sock, end=end, minsz=minsz, maxsz=maxsz) self.sock = sock self.end = end def __call__(self, msg: bytes) -> int: return send_msg(self.sock, msg, end=self.end) def __iter__(self) -> Generator[bytes, None, None]: return self.gen def client(sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768) -> Callable[[bytes], bytes]: def gen() -> Generator[bytes, Optional[bytes], None]: responses = recv_msg(sock, end=end, minsz=minsz, maxsz=maxsz) request = yield while True: send_msg(sock, request, end=end) request = yield next(responses) gen = gen() next(gen) return gen.send def send_msg(sock: socket.socket, msg: bytes, end: bytes=bEOT) -> int: return sock.send(msg + end) def recv_msg(sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768) -> Generator[bytes, None, None]: buf = b'' bufsz = minsz while True: if len(data := sock.recv(bufsz)) > 0: buf += data while (idx := buf.find(end)) >= 0: if idx > 0: yield buf[:idx] bufsz = minsz elif bufsz < maxsz: bufsz = _x if (_x := bufsz << 1) < maxsz else maxsz buf = buf[idx + 1:] if __name__ == '__main__': import sys, shlex, getpass, XUIAMSSOi try: import readline except ModuleNotFoundError: pass host = 'localhost' port = 19000 if len(sys.argv) > 1: host = sys.argv[1] if len(sys.argv) > 2: port = int(sys.argv[2]) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) call = client(sock) print((x := f'VistA {host}:{port}'), '='*len(x), sep='\n') try: print('RPC> TCPConnect') print(res := r_unpack(call(dump('TCPConnect', sock.getsockname()[0], '0', socket.gethostname(), command=True)), throw=False)) if res == 'accept': print('RPC> XUS SIGNON SETUP') print(res := r_unpack(call(dump('XUS SIGNON SETUP', '', '1')), throw=False)) retry = True while retry: if token := XUIAMSSOi.MySsoTokenVBA(): print('RPC> XUS ESSO VALIDATE') print(res := r_unpack(call(dump('XUS ESSO VALIDATE', RPCList((token[i:i+200] for i in range(0, len(token), 200)), magic=RPCType.GLOBAL, start=0))), throw=False)) else: token = getpass.getpass('ACCESS CODE: ') + ';' + getpass.getpass('VERIFY CODE: ') print('RPC> XUS AV CODE') print(res := r_unpack(call(dump('XUS AV CODE', encrypt_trivial(token))), throw=False)) if res[0] != '0': retry = False while True: data = input('RPC> ').strip() if len(data) > 0: data = shlex.split(data) data[0] = data[0].replace('_', ' ').upper() print(r_unpack(call(dump(*data)), throw=False)) except (EOFError, KeyboardInterrupt): pass print('RPC> #BYE#') print(r_unpack(call(dump('#BYE#', command=True)), throw=False))