vistawire-py/vistawire.py

356 lines
16 KiB
Python
Raw Permalink Normal View History

2022-10-02 06:45:42 -04:00
#!/usr/bin/env python3
import math
import random
import socket
import warnings
from collections import namedtuple
from contextlib import contextmanager
from typing import Any, Union, Iterable, Generator, Tuple, Sequence, Callable, Optional
# protocol token [XWB]VTER: [XWB] = NS broker [XWB], V = version 1, T = type 1, E = envelope size 3, R = XWBPRT 0
RPCRequest = namedtuple('RPCRequest', ('header', 'version', 'type', 'envelope', 'rt', 'command', 'broker', 'method', 'params', 'encoding'))
bEOT = b'\x04'
class RPCExc(RuntimeError): pass
class RPCExcFormat(ValueError, RPCExc): pass
class RPCExcServerError(RPCExc): pass
class RPCExcInvalidResult(RPCExc): pass
class RPCType(object):
LITERAL = b'0'
REFERENCE = b'1'
LIST = b'2'
GLOBAL = b'3'
EMPTY = b'4'
STREAM = b'5'
def __init__(self, value: Any, magic: bytes=None):
self.magic = magic
self.value = value.value if isinstance(value, RPCType) else value
class RPCDict(dict):
def __init__(self, iterable: Iterable, magic: bytes=RPCType.LIST):
dict.__init__(self, iterable)
self.magic = magic
class RPCList(list):
def __init__(self, iterable: Iterable, magic: bytes=RPCType.LIST, start: int=1):
list.__init__(self, iterable)
self.magic = magic
self.start = start
def __add__(self, rhs: list) -> 'RPCList':
return RPCList(list.__add__(self, rhs), magic=self.magic, start=self.start)
class StatefulClient(object):
def __init__(self, sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768, encoding: str='latin-1'):
self._rpc = client(sock, end=end, minsz=minsz, maxsz=maxsz)
self._sock = sock
self._context = 'XUS SIGNON'
self._encoding = encoding
def __call__(self, method: Union[str, bytes], *params: Any, throw: bool=True) -> Any:
if isinstance(method, bytes):
try:
res = self._rpc(method)
if res == b'\x00\x001':
request = load(method, encoding=self._encoding)
if request.method == 'XWB CREATE CONTEXT':
self._context = decrypt(request.params[0])
except Exception as ex:
warnings.warn(f"Error executing {method}: {ex}")
return res
if (fn := getattr(self, (method := method.strip(' _:')), None)):
return fn(*params, context=context, restore=restore)
if (method := method.replace('_', ' ').upper()) == 'XWB CREATE CONTEXT':
res = r_unpack(self._rpc(dump(method, *params, encoding=self._encoding)), throw=throw, encoding=self._encoding)
if res == '1':
self._context = decrypt(params[0])
return res
return r_unpack(self._rpc(dump(method, *params, encoding=self._encoding)), throw=throw, encoding=self._encoding)
def setcontext(self, *contexts: str):
if len(contexts) > 0 and self._context not in contexts:
for context in contexts:
if r_unpack(self._rpc(dump('XWB CREATE CONTEXT', encrypt_trivial(context), encoding=self._encoding)), encoding=self._encoding) == '1':
self._context = context
return context
@contextmanager
def context(self):
try:
yield (ctx := self._context)
finally:
if self._context != ctx:
res = r_unpack(self._rpc(dump('XWB CREATE CONTEXT', encrypt_trivial(ctx), encoding=self._encoding)), encoding=self._encoding)
if res != '1':
raise RPCExcInvalidResult('XWB CREATE CONTEXT', ctx, res)
self._context = ctx
def load(data: bytes, encoding: str='latin-1') -> RPCRequest:
if data.startswith(b'['):
offset = data.index(b']') + 1
header = data[:offset]
version = data[offset: offset + 1]
type = data[offset + 1: offset + 2]
envelope = int(chr(data[offset + 2]))
rt = data[offset + 3: offset + 4]
command = data[offset + 4] == 0x34 # ord(b'4')
offset += 5
broker = None
if not command:
broker, offset = s_unpack(data, offset, encoding=encoding)
method, offset = s_unpack(data, offset, encoding=encoding)
params = None
if offset < (sz := len(data)):
if data[offset] != 0x35:
raise RPCExcFormat(f"Char '5' expected at #{offset}, got: {chr(data[offset])}") # ord(b'5')
offset += 1
params = []
while offset < sz and data[offset: offset + 2] != b'4f':
value, offset = l_unpack(data, offset, envelope=envelope, encoding=encoding)
params.append(value)
params = tuple(params)
return RPCRequest(header=header, version=version, type=type, envelope=envelope, rt=rt, command=command, broker=broker, method=method, params=params, encoding=encoding)
raise RPCExcFormat(f"Invalid format {data}")
def dump(method: Union[str, RPCRequest], *params: Any, header: bytes=b'[XWB]', version: bytes=b'1', type: bytes=b'1', envelope: int=0, rt: bytes=b'0', command: bool=False, broker: str='0', encoding: str='latin-1') -> bytes:
if isinstance(method, RPCRequest):
params = method.params
header = method.header
version = method.version
type = method.type
envelope = method.envelope
rt = method.rt
command = method.command
broker = method.broker
encoding = method.encoding
method = method.method
envelope = max(3, math.ceil(math.log10(max(1, max(l_pack_maxlen(arg, encoding=encoding) for arg in params)))) if envelope < 1 and params is not None and len(params) else envelope)
return header + version + type + str(envelope).encode(encoding) + rt + (b'4' if command else (b'2' + s_pack(broker, encoding=encoding))) + s_pack(method, encoding=encoding) + ((b'5' + (b''.join(l_pack(param, envelope=envelope, encoding=encoding) for param in params) if len(params) > 0 else b'4f')) if params is not None else b'')
def s_pack(value: str, encoding: str='latin-1') -> bytes:
encoded = value.encode(encoding)
if len(encoded) <= 255:
return bytes((len(encoded),)) + encoded
raise RPCExcFormat('cannot s-pack string longer than 255 bytes: ' + repr(value))
def l_pack(value: Any, envelope: int=3, wrapped: bool=True, magic: Optional[bytes]=None, encoding: str='latin-1') -> bytes:
if isinstance(value, dict):
bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in value.items())
return ((magic or getattr(value, 'magic', b'2')) + bare + b'f') if wrapped else bare
elif not isinstance(value, str) and hasattr(value, '__iter__'):
bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in enumerate(value, start=getattr(value, 'start', 1)))
return ((magic or getattr(value, 'magic', b'2')) + bare + b'f') if wrapped else bare
elif isinstance(value, RPCType):
return l_pack(value.value, envelope=envelope, magic=value.magic, encoding=encoding)
else:
encoded = str(value).encode(encoding)
if len(encoded) <= 10**envelope - 1:
bare = str(len(encoded)).zfill(envelope).encode(encoding) + encoded
return ((magic or b'0') + bare + b'f') if wrapped else bare
raise RPCExcFormat(f'cannot l-pack string longer than {10**envelope - 1} bytes with an envelope of {envelope}: ' + repr(value))
def l_pack_maxlen(value: Any, encoding: str='latin-1') -> int:
if isinstance(value, dict):
return max(max(l_pack_maxlen(k, encoding=encoding) for k in value.keys()), max(l_pack_maxlen(v, encoding=encoding) for v in value.values())) if len(value) > 0 else 0
elif not isinstance(value, str) and hasattr(value, '__iter__'):
return max(len(str(len(value))), max(l_pack_maxlen(v, encoding=encoding) for v in value)) if len(value) > 0 else 0
else:
return len(str(value).encode(encoding))
def s_unpack(data: bytes, offset: int=0, encoding='latin-1') -> Tuple[str, int]:
n = data[offset]
return data[offset + 1: (end := offset + 1 + n)].decode(encoding), end
def l_unpack(data: bytes, offset: int=0, envelope: int=3, bare: bool=False, encoding='latin-1') -> Tuple[Any, int]:
if bare:
n = int(data[offset: offset + envelope].decode(encoding))
return data[offset + envelope: (end := offset + envelope + n)].decode(encoding), end
magic = data[offset: offset + 1]
if magic == b'0':
value, offset = l_unpack(data, offset + 1, envelope=envelope, bare=True, encoding=encoding)
if data[offset] == 0x66: # ord(b'f')
return value, offset + 1
raise RPCExcFormat(f"Char 'f' expected, got: {chr(data[offset])}")
elif magic in b'23':
res = {}
while True:
key, offset = l_unpack(data, offset + 1, envelope=envelope, bare=True, encoding=encoding)
if data[offset] in b'012345':
value, offset = l_unpack(data, offset, envelope=envelope, bare=True, encoding=encoding)
res[key] = value
else:
warnings.warn(f"Invalid list in {data}")
if (suffix := data[offset]) == 0x66: # ord(b'f')
return dictlist(res, magic=magic), offset + 1
elif suffix != 0x74: # ord(b't')
raise RPCExcFormat(f"Char 't' or 'f' expected, got: {chr(data[offset])}")
raise RPCExcFormat(f"Unknown data type {magic}")
def dictlist(value: dict, start: Optional[int]=None, magic: bytes=RPCType.LIST) -> Union[RPCDict, RPCList]:
index = None
try:
for k in value.keys():
if index is not None:
if int(k) - index == 1:
index += 1
else:
return RPCDict(value, magic=magic)
else:
index = start = int(k)
return RPCList(value.values(), magic=magic) if start is None else RPCList(value.values(), magic=magic, start=start)
except ValueError:
return RPCDict(value, magic=magic)
def r_unpack(data: bytes, throw: bool=True, encoding: str='latin-1') -> Any:
if data[:2] == b'\x00\x00':
if len(data) > 2 and data[2] == 0x18: # 0x18 is CAN
if throw:
raise RPCExcServerError(data[3:].decode(encoding))
return RPCExcServerError(data[3:].decode(encoding))
elif data[-1] == 0x1f: # 0x1f is US
return r_unpack_table(data[2:-1].decode(encoding).split('\x1e')) # 0x1e is RS
elif data[-2:] == b'\r\n':
return tuple(data[2:-2].decode(encoding).split('\r\n'))
else:
return data[2:].decode(encoding) if len(data) > 2 else None
if throw:
raise RPCExcFormat(data)
return RPCExcFormat(data)
def r_unpack_table(rows: Sequence[str]) -> Union[Tuple[dict, ...], Tuple[tuple, ...]]:
# table: ROW\x1eROW\x1eROW\x1eROW\x1eROW\x1e\x1f; row: COL^COL^COL^COL^COL; header field: [IT]\d{5}.+
if len(rows) > 0 and len(hdr := rows[0]) > 0 and hdr[0] in ('I', 'T') and hdr[1:6].isdecimal():
header = [field[6:] for field in rows[0].split('^')]
return tuple(dict(zip(header, row.split('^'))) for row in rows[1:] if len(row) > 0)
else:
return tuple(tuple(row.split('^')) for row in rows if len(row) > 0)
cipherpad = (
'wkEo-ZJt!dG)49K{nX1BS$vH<&:Myf*>Ae0jQW=;|#PsO`\'%+rmb[gpqN,l6/hFC@DcUa ]z~R}"V\\iIxu?872.(TYL5_3',
'rKv`R;M/9BqAF%&tSs#Vh)dO1DZP> *fX\'u[.4lY=-mg_ci802N7LTG<]!CWo:3?{+,5Q}(@jaExn$~p\\IyHwzU"|k6Jeb',
'\\pV(ZJk"WQmCn!Y,y@1d+~8s?[lNMxgHEt=uw|X:qSLjAI*}6zoF{T3#;ca)/h5%`P4$r]G\'9e2if_>UDKb7<v0&- RBO.',
'depjt3g4W)qD0V~NJar\\B "?OYhcu[<Ms%Z`RIL_6:]AX-zG.#}$@vk7/5x&*m;(yb2Fn+l\'PwUof1K{9,|EQi>H=CT8S!',
'NZW:1}K$byP;jk)7\'`x90B|cq@iSsEnu,(l-hf.&Y_?J#R]+voQXU8mrV[!p4tg~OMez CAaGFD6H53%L/dT2<*>"{\\wI=',
'vCiJ<oZ9|phXVNn)m K`t/SI%]A5qOWe\\&?;jT~M!fz1l>[D_0xR32c*4.P"G{r7}E8wUgyudF+6-:B=$(sY,LkbHa#\'@Q',
'hvMX,\'4Ty;[a8/{6l~F_V"}qLI\\!@x(D7bRmUH]W15J%N0BYPkrs&9:$)Zj>u|zwQ=ieC-oGA.#?tfdcO3gp`S+En K2*<',
'jd!W5[];4\'<C$/&x|rZ(k{>?ghBzIFN}fAK"#`p_TqtD*1E37XGVs@0nmSe+Y6Qyo-aUu%i8c=H2vJ\\) R:MLb.9,wlO~P',
'2ThtjEM+!=xXb)7,ZV{*ci3"8@_l-HS69L>]\\AUF/Q%:qD?1~m(yvO0e\'<#o$p4dnIzKP|`NrkaGg.ufCRB[; sJYwW}5&',
'vB\\5/zl-9y:Pj|=(R\'7QJI *&CTX"p0]_3.idcuOefVU#omwNZ`$Fs?L+1Sk<,b)hM4A6[Y%aDrg@~KqEW8t>H};n!2xG{',
'sFz0Bo@_HfnK>LR}qWXV+D6`Y28=4Cm~G/7-5A\\b9!a#rP.l&M$hc3ijQk;),TvUd<[:I"u1\'NZSOw]*gxtE{eJp|y (?%',
'M@,D}|LJyGO8`$*ZqH .j>c~h<d=fimszv[#-53F!+a;NC\'6T91IV?(0x&/{B)w"]Q\\YUWprk4:ol%g2nE7teRKbAPuS_X',
'.mjY#_0*H<B=Q+FML6]s;r2:e8R}[ic&KA 1w{)vV5d,$u"~xD/Pg?IyfthO@CzWp%!`N4Z\'3-(o|J9XUE7k\\TlqSb>anG',
'xVa1\']_GU<X`|\\NgM?LS9{"jT%s$}y[nvtlefB2RKJW~(/cIDCPow4,>#zm+:5b@06O3Ap8=*7ZFY!H-uEQk; .q)i&rhd',
'I]Jz7AG@QX."%3Lq>METUo{Pp_ |a6<0dYVSv8:b)~W9NK`(r\'4fs&wim\\kReC2hg=HOj$1B*/nxt,;c#y+![?lFuZ-5D}',
'Rr(Ge6F Hx>q$m&C%M~Tn,:"o\'tX/*yP.{lZ!YkiVhuw_<KE5a[;}W0gjsz3]@7cI2\\QN?f#4p|vb1OUBD9)=-LJA+d`S8',
'I~k>y|m};d)-7DZ"Fe/Y<B:xwojR,Vh]O0Sc[`$sg8GXE!1&Qrzp._W%TNK(=J 3i*2abuHA4C\'?Mv\\Pq{n#56LftUl@9+',
'~A*>9 WidFN,1KsmwQ)GJM{I4:C%}#Ep(?HB/r;t.&U8o|l[\'Lg"2hRDyZ5`nbf]qjc0!zS-TkYO<_=76a\\X@$Pe3+xVvu',
'yYgjf"5VdHc#uA,W1i+v\'6|@pr{n;DJ!8(btPGaQM.LT3oe?NB/&9>Z`-}02*%x<7lsqz4OS ~E$\\R]KI[:UwC_=h)kXmF',
'5:iar.{YU7mBZR@-K|2 "+~`M%8sq4JhPo<_X\\Sg3WC;Tuxz,fvEQ1p9=w}FAI&j/keD0c?)LN6OHV]lGy\'$*>nd[(tb!#'
)
cipherpad_reversed = tuple({c: i for i, c in enumerate(m)} for m in cipherpad)
def encrypt(plaintext: str) -> str:
associator_idx = identifier_idx = random.randrange(l := len(cipherpad))
while identifier_idx == associator_idx:
identifier_idx = random.randrange(l)
associator = cipherpad_reversed[associator_idx]
identifier = cipherpad[identifier_idx]
return chr(associator_idx + 32) + ''.join(identifier[associator[i]] for i in plaintext) + chr(identifier_idx + 32)
def encrypt_trivial(plaintext: str) -> str:
return f' {plaintext} '
def decrypt(ciphertext: str) -> str:
associator_idx = ord(ciphertext[-1]) - 32
identifier_idx = ord(ciphertext[0]) - 32
associator = cipherpad_reversed[associator_idx]
identifier = cipherpad[identifier_idx]
return ''.join(identifier[associator[i]] for i in ciphertext[1:-1])
class sockiter(object):
def __init__(self, sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768):
self.gen = recv_msg(sock, end=end, minsz=minsz, maxsz=maxsz)
self.sock = sock
self.end = end
def __call__(self, msg: bytes) -> int:
return send_msg(self.sock, msg, end=self.end)
def __iter__(self) -> Generator[bytes, None, None]:
return self.gen
def client(sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768) -> Callable[[bytes], bytes]:
def gen() -> Generator[bytes, Optional[bytes], None]:
responses = recv_msg(sock, end=end, minsz=minsz, maxsz=maxsz)
request = yield
while True:
send_msg(sock, request, end=end)
request = yield next(responses)
gen = gen()
next(gen)
return gen.send
def send_msg(sock: socket.socket, msg: bytes, end: bytes=bEOT) -> int:
return sock.send(msg + end)
def recv_msg(sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768) -> Generator[bytes, None, None]:
buf = b''
bufsz = minsz
while True:
if len(data := sock.recv(bufsz)) > 0:
buf += data
while (idx := buf.find(end)) >= 0:
if idx > 0:
yield buf[:idx]
bufsz = minsz
elif bufsz < maxsz:
bufsz = _x if (_x := bufsz << 1) < maxsz else maxsz
buf = buf[idx + 1:]
if __name__ == '__main__':
import sys, shlex, getpass, XUIAMSSOi
try:
import readline
except ModuleNotFoundError:
pass
host = 'localhost'
port = 19000
if len(sys.argv) > 1:
host = sys.argv[1]
if len(sys.argv) > 2:
port = int(sys.argv[2])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
call = client(sock)
print((x := f'VistA {host}:{port}'), '='*len(x), sep='\n')
try:
print('RPC> TCPConnect')
print(res := r_unpack(call(dump('TCPConnect', sock.getsockname()[0], '0', socket.gethostname(), command=True)), throw=False))
if res == 'accept':
print('RPC> XUS SIGNON SETUP')
print(res := r_unpack(call(dump('XUS SIGNON SETUP', '', '1')), throw=False))
retry = True
while retry:
if token := XUIAMSSOi.MySsoTokenVBA():
print('RPC> XUS ESSO VALIDATE')
print(res := r_unpack(call(dump('XUS ESSO VALIDATE', RPCList((token[i:i+200] for i in range(0, len(token), 200)), magic=RPCType.GLOBAL, start=0))), throw=False))
else:
token = getpass.getpass('ACCESS CODE: ') + ';' + getpass.getpass('VERIFY CODE: ')
print('RPC> XUS AV CODE')
print(res := r_unpack(call(dump('XUS AV CODE', encrypt_trivial(token))), throw=False))
if res[0] != '0':
retry = False
while True:
data = input('RPC> ').strip()
if len(data) > 0:
data = shlex.split(data)
data[0] = data[0].replace('_', ' ').upper()
print(r_unpack(call(dump(*data)), throw=False))
except (EOFError, KeyboardInterrupt):
pass
print('RPC> #BYE#')
print(r_unpack(call(dump('#BYE#', command=True)), throw=False))