Initial commit
This commit is contained in:
commit
e5a2fb87e1
154
.gitignore
vendored
Normal file
154
.gitignore
vendored
Normal file
@ -0,0 +1,154 @@
|
||||
# ---> Python
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintainted in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
|
9
README.md
Normal file
9
README.md
Normal file
@ -0,0 +1,9 @@
|
||||
# vistawire-py
|
||||
|
||||
VistA RPC wire protocol implementation in Python.
|
||||
|
||||
## Test client
|
||||
|
||||
```shell
|
||||
vistawire.py host port
|
||||
```
|
17
XUIAMSSOi.py
Normal file
17
XUIAMSSOi.py
Normal file
@ -0,0 +1,17 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import ctypes
|
||||
|
||||
# Load DLL
|
||||
XUIAMSSOi = ctypes.WinDLL('C:\\Program Files (x86)\\Micro Focus\\Reflection\\XUIAMSSOi.dll')
|
||||
XUIAMSSOi.MySsoTokenVBA.restype = ctypes.c_long
|
||||
XUIAMSSOi.MySsoTokenVBA.argtypes = (ctypes.c_wchar_p, ctypes.c_long)
|
||||
|
||||
# Authenticate against smartcard
|
||||
def MySsoTokenVBA(bufsize: int=15000) -> str:
|
||||
buf = ctypes.create_unicode_buffer(bufsize)
|
||||
sz = XUIAMSSOi.MySsoTokenVBA(buf, bufsize)
|
||||
if sz <= bufsize:
|
||||
return buf.value.encode('utf-16')[2:].decode('latin-1')
|
||||
else:
|
||||
return MySsoTokenVBA(sz)
|
355
vistawire.py
Normal file
355
vistawire.py
Normal file
@ -0,0 +1,355 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import math
|
||||
import random
|
||||
import socket
|
||||
import warnings
|
||||
from collections import namedtuple
|
||||
from contextlib import contextmanager
|
||||
|
||||
from typing import Any, Union, Iterable, Generator, Tuple, Sequence, Callable, Optional
|
||||
|
||||
# protocol token [XWB]VTER: [XWB] = NS broker [XWB], V = version 1, T = type 1, E = envelope size 3, R = XWBPRT 0
|
||||
RPCRequest = namedtuple('RPCRequest', ('header', 'version', 'type', 'envelope', 'rt', 'command', 'broker', 'method', 'params', 'encoding'))
|
||||
|
||||
bEOT = b'\x04'
|
||||
|
||||
class RPCExc(RuntimeError): pass
|
||||
class RPCExcFormat(ValueError, RPCExc): pass
|
||||
class RPCExcServerError(RPCExc): pass
|
||||
class RPCExcInvalidResult(RPCExc): pass
|
||||
|
||||
class RPCType(object):
|
||||
LITERAL = b'0'
|
||||
REFERENCE = b'1'
|
||||
LIST = b'2'
|
||||
GLOBAL = b'3'
|
||||
EMPTY = b'4'
|
||||
STREAM = b'5'
|
||||
def __init__(self, value: Any, magic: bytes=None):
|
||||
self.magic = magic
|
||||
self.value = value.value if isinstance(value, RPCType) else value
|
||||
|
||||
class RPCDict(dict):
|
||||
def __init__(self, iterable: Iterable, magic: bytes=RPCType.LIST):
|
||||
dict.__init__(self, iterable)
|
||||
self.magic = magic
|
||||
|
||||
class RPCList(list):
|
||||
def __init__(self, iterable: Iterable, magic: bytes=RPCType.LIST, start: int=1):
|
||||
list.__init__(self, iterable)
|
||||
self.magic = magic
|
||||
self.start = start
|
||||
def __add__(self, rhs: list) -> 'RPCList':
|
||||
return RPCList(list.__add__(self, rhs), magic=self.magic, start=self.start)
|
||||
|
||||
class StatefulClient(object):
|
||||
def __init__(self, sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768, encoding: str='latin-1'):
|
||||
self._rpc = client(sock, end=end, minsz=minsz, maxsz=maxsz)
|
||||
self._sock = sock
|
||||
self._context = 'XUS SIGNON'
|
||||
self._encoding = encoding
|
||||
def __call__(self, method: Union[str, bytes], *params: Any, throw: bool=True) -> Any:
|
||||
if isinstance(method, bytes):
|
||||
try:
|
||||
res = self._rpc(method)
|
||||
if res == b'\x00\x001':
|
||||
request = load(method, encoding=self._encoding)
|
||||
if request.method == 'XWB CREATE CONTEXT':
|
||||
self._context = decrypt(request.params[0])
|
||||
except Exception as ex:
|
||||
warnings.warn(f"Error executing {method}: {ex}")
|
||||
return res
|
||||
if (fn := getattr(self, (method := method.strip(' _:')), None)):
|
||||
return fn(*params, context=context, restore=restore)
|
||||
if (method := method.replace('_', ' ').upper()) == 'XWB CREATE CONTEXT':
|
||||
res = r_unpack(self._rpc(dump(method, *params, encoding=self._encoding)), throw=throw, encoding=self._encoding)
|
||||
if res == '1':
|
||||
self._context = decrypt(params[0])
|
||||
return res
|
||||
return r_unpack(self._rpc(dump(method, *params, encoding=self._encoding)), throw=throw, encoding=self._encoding)
|
||||
def setcontext(self, *contexts: str):
|
||||
if len(contexts) > 0 and self._context not in contexts:
|
||||
for context in contexts:
|
||||
if r_unpack(self._rpc(dump('XWB CREATE CONTEXT', encrypt_trivial(context), encoding=self._encoding)), encoding=self._encoding) == '1':
|
||||
self._context = context
|
||||
return context
|
||||
@contextmanager
|
||||
def context(self):
|
||||
try:
|
||||
yield (ctx := self._context)
|
||||
finally:
|
||||
if self._context != ctx:
|
||||
res = r_unpack(self._rpc(dump('XWB CREATE CONTEXT', encrypt_trivial(ctx), encoding=self._encoding)), encoding=self._encoding)
|
||||
if res != '1':
|
||||
raise RPCExcInvalidResult('XWB CREATE CONTEXT', ctx, res)
|
||||
self._context = ctx
|
||||
|
||||
def load(data: bytes, encoding: str='latin-1') -> RPCRequest:
|
||||
if data.startswith(b'['):
|
||||
offset = data.index(b']') + 1
|
||||
header = data[:offset]
|
||||
version = data[offset: offset + 1]
|
||||
type = data[offset + 1: offset + 2]
|
||||
envelope = int(chr(data[offset + 2]))
|
||||
rt = data[offset + 3: offset + 4]
|
||||
command = data[offset + 4] == 0x34 # ord(b'4')
|
||||
offset += 5
|
||||
broker = None
|
||||
if not command:
|
||||
broker, offset = s_unpack(data, offset, encoding=encoding)
|
||||
method, offset = s_unpack(data, offset, encoding=encoding)
|
||||
params = None
|
||||
if offset < (sz := len(data)):
|
||||
if data[offset] != 0x35:
|
||||
raise RPCExcFormat(f"Char '5' expected at #{offset}, got: {chr(data[offset])}") # ord(b'5')
|
||||
offset += 1
|
||||
params = []
|
||||
while offset < sz and data[offset: offset + 2] != b'4f':
|
||||
value, offset = l_unpack(data, offset, envelope=envelope, encoding=encoding)
|
||||
params.append(value)
|
||||
params = tuple(params)
|
||||
return RPCRequest(header=header, version=version, type=type, envelope=envelope, rt=rt, command=command, broker=broker, method=method, params=params, encoding=encoding)
|
||||
raise RPCExcFormat(f"Invalid format {data}")
|
||||
|
||||
def dump(method: Union[str, RPCRequest], *params: Any, header: bytes=b'[XWB]', version: bytes=b'1', type: bytes=b'1', envelope: int=0, rt: bytes=b'0', command: bool=False, broker: str='0', encoding: str='latin-1') -> bytes:
|
||||
if isinstance(method, RPCRequest):
|
||||
params = method.params
|
||||
header = method.header
|
||||
version = method.version
|
||||
type = method.type
|
||||
envelope = method.envelope
|
||||
rt = method.rt
|
||||
command = method.command
|
||||
broker = method.broker
|
||||
encoding = method.encoding
|
||||
method = method.method
|
||||
envelope = max(3, math.ceil(math.log10(max(1, max(l_pack_maxlen(arg, encoding=encoding) for arg in params)))) if envelope < 1 and params is not None and len(params) else envelope)
|
||||
return header + version + type + str(envelope).encode(encoding) + rt + (b'4' if command else (b'2' + s_pack(broker, encoding=encoding))) + s_pack(method, encoding=encoding) + ((b'5' + (b''.join(l_pack(param, envelope=envelope, encoding=encoding) for param in params) if len(params) > 0 else b'4f')) if params is not None else b'')
|
||||
|
||||
def s_pack(value: str, encoding: str='latin-1') -> bytes:
|
||||
encoded = value.encode(encoding)
|
||||
if len(encoded) <= 255:
|
||||
return bytes((len(encoded),)) + encoded
|
||||
raise RPCExcFormat('cannot s-pack string longer than 255 bytes: ' + repr(value))
|
||||
|
||||
def l_pack(value: Any, envelope: int=3, wrapped: bool=True, magic: Optional[bytes]=None, encoding: str='latin-1') -> bytes:
|
||||
if isinstance(value, dict):
|
||||
bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in value.items())
|
||||
return ((magic or getattr(value, 'magic', b'2')) + bare + b'f') if wrapped else bare
|
||||
elif not isinstance(value, str) and hasattr(value, '__iter__'):
|
||||
bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in enumerate(value, start=getattr(value, 'start', 1)))
|
||||
return ((magic or getattr(value, 'magic', b'2')) + bare + b'f') if wrapped else bare
|
||||
elif isinstance(value, RPCType):
|
||||
return l_pack(value.value, envelope=envelope, magic=value.magic, encoding=encoding)
|
||||
else:
|
||||
encoded = str(value).encode(encoding)
|
||||
if len(encoded) <= 10**envelope - 1:
|
||||
bare = str(len(encoded)).zfill(envelope).encode(encoding) + encoded
|
||||
return ((magic or b'0') + bare + b'f') if wrapped else bare
|
||||
raise RPCExcFormat(f'cannot l-pack string longer than {10**envelope - 1} bytes with an envelope of {envelope}: ' + repr(value))
|
||||
|
||||
def l_pack_maxlen(value: Any, encoding: str='latin-1') -> int:
|
||||
if isinstance(value, dict):
|
||||
return max(max(l_pack_maxlen(k, encoding=encoding) for k in value.keys()), max(l_pack_maxlen(v, encoding=encoding) for v in value.values())) if len(value) > 0 else 0
|
||||
elif not isinstance(value, str) and hasattr(value, '__iter__'):
|
||||
return max(len(str(len(value))), max(l_pack_maxlen(v, encoding=encoding) for v in value)) if len(value) > 0 else 0
|
||||
else:
|
||||
return len(str(value).encode(encoding))
|
||||
|
||||
def s_unpack(data: bytes, offset: int=0, encoding='latin-1') -> Tuple[str, int]:
|
||||
n = data[offset]
|
||||
return data[offset + 1: (end := offset + 1 + n)].decode(encoding), end
|
||||
|
||||
def l_unpack(data: bytes, offset: int=0, envelope: int=3, bare: bool=False, encoding='latin-1') -> Tuple[Any, int]:
|
||||
if bare:
|
||||
n = int(data[offset: offset + envelope].decode(encoding))
|
||||
return data[offset + envelope: (end := offset + envelope + n)].decode(encoding), end
|
||||
magic = data[offset: offset + 1]
|
||||
if magic == b'0':
|
||||
value, offset = l_unpack(data, offset + 1, envelope=envelope, bare=True, encoding=encoding)
|
||||
if data[offset] == 0x66: # ord(b'f')
|
||||
return value, offset + 1
|
||||
raise RPCExcFormat(f"Char 'f' expected, got: {chr(data[offset])}")
|
||||
elif magic in b'23':
|
||||
res = {}
|
||||
while True:
|
||||
key, offset = l_unpack(data, offset + 1, envelope=envelope, bare=True, encoding=encoding)
|
||||
if data[offset] in b'012345':
|
||||
value, offset = l_unpack(data, offset, envelope=envelope, bare=True, encoding=encoding)
|
||||
res[key] = value
|
||||
else:
|
||||
warnings.warn(f"Invalid list in {data}")
|
||||
if (suffix := data[offset]) == 0x66: # ord(b'f')
|
||||
return dictlist(res, magic=magic), offset + 1
|
||||
elif suffix != 0x74: # ord(b't')
|
||||
raise RPCExcFormat(f"Char 't' or 'f' expected, got: {chr(data[offset])}")
|
||||
raise RPCExcFormat(f"Unknown data type {magic}")
|
||||
|
||||
def dictlist(value: dict, start: Optional[int]=None, magic: bytes=RPCType.LIST) -> Union[RPCDict, RPCList]:
|
||||
index = None
|
||||
try:
|
||||
for k in value.keys():
|
||||
if index is not None:
|
||||
if int(k) - index == 1:
|
||||
index += 1
|
||||
else:
|
||||
return RPCDict(value, magic=magic)
|
||||
else:
|
||||
index = start = int(k)
|
||||
return RPCList(value.values(), magic=magic) if start is None else RPCList(value.values(), magic=magic, start=start)
|
||||
except ValueError:
|
||||
return RPCDict(value, magic=magic)
|
||||
|
||||
def r_unpack(data: bytes, throw: bool=True, encoding: str='latin-1') -> Any:
|
||||
if data[:2] == b'\x00\x00':
|
||||
if len(data) > 2 and data[2] == 0x18: # 0x18 is CAN
|
||||
if throw:
|
||||
raise RPCExcServerError(data[3:].decode(encoding))
|
||||
return RPCExcServerError(data[3:].decode(encoding))
|
||||
elif data[-1] == 0x1f: # 0x1f is US
|
||||
return r_unpack_table(data[2:-1].decode(encoding).split('\x1e')) # 0x1e is RS
|
||||
elif data[-2:] == b'\r\n':
|
||||
return tuple(data[2:-2].decode(encoding).split('\r\n'))
|
||||
else:
|
||||
return data[2:].decode(encoding) if len(data) > 2 else None
|
||||
if throw:
|
||||
raise RPCExcFormat(data)
|
||||
return RPCExcFormat(data)
|
||||
|
||||
def r_unpack_table(rows: Sequence[str]) -> Union[Tuple[dict, ...], Tuple[tuple, ...]]:
|
||||
# table: ROW\x1eROW\x1eROW\x1eROW\x1eROW\x1e\x1f; row: COL^COL^COL^COL^COL; header field: [IT]\d{5}.+
|
||||
if len(rows) > 0 and len(hdr := rows[0]) > 0 and hdr[0] in ('I', 'T') and hdr[1:6].isdecimal():
|
||||
header = [field[6:] for field in rows[0].split('^')]
|
||||
return tuple(dict(zip(header, row.split('^'))) for row in rows[1:] if len(row) > 0)
|
||||
else:
|
||||
return tuple(tuple(row.split('^')) for row in rows if len(row) > 0)
|
||||
|
||||
cipherpad = (
|
||||
'wkEo-ZJt!dG)49K{nX1BS$vH<&:Myf*>Ae0jQW=;|#PsO`\'%+rmb[gpqN,l6/hFC@DcUa ]z~R}"V\\iIxu?872.(TYL5_3',
|
||||
'rKv`R;M/9BqAF%&tSs#Vh)dO1DZP> *fX\'u[.4lY=-mg_ci802N7LTG<]!CWo:3?{+,5Q}(@jaExn$~p\\IyHwzU"|k6Jeb',
|
||||
'\\pV(ZJk"WQmCn!Y,y@1d+~8s?[lNMxgHEt=uw|X:qSLjAI*}6zoF{T3#;ca)/h5%`P4$r]G\'9e2if_>UDKb7<v0&- RBO.',
|
||||
'depjt3g4W)qD0V~NJar\\B "?OYhcu[<Ms%Z`RIL_6:]AX-zG.#}$@vk7/5x&*m;(yb2Fn+l\'PwUof1K{9,|EQi>H=CT8S!',
|
||||
'NZW:1}K$byP;jk)7\'`x90B|cq@iSsEnu,(l-hf.&Y_?J#R]+voQXU8mrV[!p4tg~OMez CAaGFD6H53%L/dT2<*>"{\\wI=',
|
||||
'vCiJ<oZ9|phXVNn)m K`t/SI%]A5qOWe\\&?;jT~M!fz1l>[D_0xR32c*4.P"G{r7}E8wUgyudF+6-:B=$(sY,LkbHa#\'@Q',
|
||||
'hvMX,\'4Ty;[a8/{6l~F_V"}qLI\\!@x(D7bRmUH]W15J%N0BYPkrs&9:$)Zj>u|zwQ=ieC-oGA.#?tfdcO3gp`S+En K2*<',
|
||||
'jd!W5[];4\'<C$/&x|rZ(k{>?ghBzIFN}fAK"#`p_TqtD*1E37XGVs@0nmSe+Y6Qyo-aUu%i8c=H2vJ\\) R:MLb.9,wlO~P',
|
||||
'2ThtjEM+!=xXb)7,ZV{*ci3"8@_l-HS69L>]\\AUF/Q%:qD?1~m(yvO0e\'<#o$p4dnIzKP|`NrkaGg.ufCRB[; sJYwW}5&',
|
||||
'vB\\5/zl-9y:Pj|=(R\'7QJI *&CTX"p0]_3.idcuOefVU#omwNZ`$Fs?L+1Sk<,b)hM4A6[Y%aDrg@~KqEW8t>H};n!2xG{',
|
||||
'sFz0Bo@_HfnK>LR}qWXV+D6`Y28=4Cm~G/7-5A\\b9!a#rP.l&M$hc3ijQk;),TvUd<[:I"u1\'NZSOw]*gxtE{eJp|y (?%',
|
||||
'M@,D}|LJyGO8`$*ZqH .j>c~h<d=fimszv[#-53F!+a;NC\'6T91IV?(0x&/{B)w"]Q\\YUWprk4:ol%g2nE7teRKbAPuS_X',
|
||||
'.mjY#_0*H<B=Q+FML6]s;r2:e8R}[ic&KA 1w{)vV5d,$u"~xD/Pg?IyfthO@CzWp%!`N4Z\'3-(o|J9XUE7k\\TlqSb>anG',
|
||||
'xVa1\']_GU<X`|\\NgM?LS9{"jT%s$}y[nvtlefB2RKJW~(/cIDCPow4,>#zm+:5b@06O3Ap8=*7ZFY!H-uEQk; .q)i&rhd',
|
||||
'I]Jz7AG@QX."%3Lq>METUo{Pp_ |a6<0dYVSv8:b)~W9NK`(r\'4fs&wim\\kReC2hg=HOj$1B*/nxt,;c#y+![?lFuZ-5D}',
|
||||
'Rr(Ge6F Hx>q$m&C%M~Tn,:"o\'tX/*yP.{lZ!YkiVhuw_<KE5a[;}W0gjsz3]@7cI2\\QN?f#4p|vb1OUBD9)=-LJA+d`S8',
|
||||
'I~k>y|m};d)-7DZ"Fe/Y<B:xwojR,Vh]O0Sc[`$sg8GXE!1&Qrzp._W%TNK(=J 3i*2abuHA4C\'?Mv\\Pq{n#56LftUl@9+',
|
||||
'~A*>9 WidFN,1KsmwQ)GJM{I4:C%}#Ep(?HB/r;t.&U8o|l[\'Lg"2hRDyZ5`nbf]qjc0!zS-TkYO<_=76a\\X@$Pe3+xVvu',
|
||||
'yYgjf"5VdHc#uA,W1i+v\'6|@pr{n;DJ!8(btPGaQM.LT3oe?NB/&9>Z`-}02*%x<7lsqz4OS ~E$\\R]KI[:UwC_=h)kXmF',
|
||||
'5:iar.{YU7mBZR@-K|2 "+~`M%8sq4JhPo<_X\\Sg3WC;Tuxz,fvEQ1p9=w}FAI&j/keD0c?)LN6OHV]lGy\'$*>nd[(tb!#'
|
||||
)
|
||||
cipherpad_reversed = tuple({c: i for i, c in enumerate(m)} for m in cipherpad)
|
||||
|
||||
def encrypt(plaintext: str) -> str:
|
||||
associator_idx = identifier_idx = random.randrange(l := len(cipherpad))
|
||||
while identifier_idx == associator_idx:
|
||||
identifier_idx = random.randrange(l)
|
||||
associator = cipherpad_reversed[associator_idx]
|
||||
identifier = cipherpad[identifier_idx]
|
||||
return chr(associator_idx + 32) + ''.join(identifier[associator[i]] for i in plaintext) + chr(identifier_idx + 32)
|
||||
|
||||
def encrypt_trivial(plaintext: str) -> str:
|
||||
return f' {plaintext} '
|
||||
|
||||
def decrypt(ciphertext: str) -> str:
|
||||
associator_idx = ord(ciphertext[-1]) - 32
|
||||
identifier_idx = ord(ciphertext[0]) - 32
|
||||
associator = cipherpad_reversed[associator_idx]
|
||||
identifier = cipherpad[identifier_idx]
|
||||
return ''.join(identifier[associator[i]] for i in ciphertext[1:-1])
|
||||
|
||||
class sockiter(object):
|
||||
def __init__(self, sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768):
|
||||
self.gen = recv_msg(sock, end=end, minsz=minsz, maxsz=maxsz)
|
||||
self.sock = sock
|
||||
self.end = end
|
||||
def __call__(self, msg: bytes) -> int:
|
||||
return send_msg(self.sock, msg, end=self.end)
|
||||
def __iter__(self) -> Generator[bytes, None, None]:
|
||||
return self.gen
|
||||
|
||||
def client(sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768) -> Callable[[bytes], bytes]:
|
||||
def gen() -> Generator[bytes, Optional[bytes], None]:
|
||||
responses = recv_msg(sock, end=end, minsz=minsz, maxsz=maxsz)
|
||||
request = yield
|
||||
while True:
|
||||
send_msg(sock, request, end=end)
|
||||
request = yield next(responses)
|
||||
gen = gen()
|
||||
next(gen)
|
||||
return gen.send
|
||||
|
||||
def send_msg(sock: socket.socket, msg: bytes, end: bytes=bEOT) -> int:
|
||||
return sock.send(msg + end)
|
||||
|
||||
def recv_msg(sock: socket.socket, end: bytes=bEOT, minsz: int=1024, maxsz: int=32768) -> Generator[bytes, None, None]:
|
||||
buf = b''
|
||||
bufsz = minsz
|
||||
while True:
|
||||
if len(data := sock.recv(bufsz)) > 0:
|
||||
buf += data
|
||||
while (idx := buf.find(end)) >= 0:
|
||||
if idx > 0:
|
||||
yield buf[:idx]
|
||||
bufsz = minsz
|
||||
elif bufsz < maxsz:
|
||||
bufsz = _x if (_x := bufsz << 1) < maxsz else maxsz
|
||||
buf = buf[idx + 1:]
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys, shlex, getpass, XUIAMSSOi
|
||||
try:
|
||||
import readline
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
|
||||
host = 'localhost'
|
||||
port = 19000
|
||||
|
||||
if len(sys.argv) > 1:
|
||||
host = sys.argv[1]
|
||||
if len(sys.argv) > 2:
|
||||
port = int(sys.argv[2])
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect((host, port))
|
||||
call = client(sock)
|
||||
print((x := f'VistA {host}:{port}'), '='*len(x), sep='\n')
|
||||
|
||||
try:
|
||||
print('RPC> TCPConnect')
|
||||
print(res := r_unpack(call(dump('TCPConnect', sock.getsockname()[0], '0', socket.gethostname(), command=True)), throw=False))
|
||||
if res == 'accept':
|
||||
print('RPC> XUS SIGNON SETUP')
|
||||
print(res := r_unpack(call(dump('XUS SIGNON SETUP', '', '1')), throw=False))
|
||||
retry = True
|
||||
while retry:
|
||||
if token := XUIAMSSOi.MySsoTokenVBA():
|
||||
print('RPC> XUS ESSO VALIDATE')
|
||||
print(res := r_unpack(call(dump('XUS ESSO VALIDATE', RPCList((token[i:i+200] for i in range(0, len(token), 200)), magic=RPCType.GLOBAL, start=0))), throw=False))
|
||||
else:
|
||||
token = getpass.getpass('ACCESS CODE: ') + ';' + getpass.getpass('VERIFY CODE: ')
|
||||
print('RPC> XUS AV CODE')
|
||||
print(res := r_unpack(call(dump('XUS AV CODE', encrypt_trivial(token))), throw=False))
|
||||
if res[0] != '0':
|
||||
retry = False
|
||||
while True:
|
||||
data = input('RPC> ').strip()
|
||||
if len(data) > 0:
|
||||
data = shlex.split(data)
|
||||
data[0] = data[0].replace('_', ' ').upper()
|
||||
print(r_unpack(call(dump(*data)), throw=False))
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
pass
|
||||
|
||||
print('RPC> #BYE#')
|
||||
print(r_unpack(call(dump('#BYE#', command=True)), throw=False))
|
Loading…
x
Reference in New Issue
Block a user