nuVistA/main.py
2023-04-29 18:34:37 -04:00

184 lines
6.6 KiB
Python

#!/usr/bin/env python3
import json
import secrets
import string
from flask import Flask, request, send_from_directory
from flask.json import jsonify
from flask.json.provider import DefaultJSONProvider
import rpc
import util
import protoswitch
import logging
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
class JSONProviderX(DefaultJSONProvider):
@staticmethod
def default(obj):
return json.dumps(obj, sort_keys=True, default=str)
class CacheProxyRPC(util.CacheProxy):
def __init__(self, obj, persistent=None, volatile=None, prefix=''):
util.CacheProxy.__init__(self, obj)
if volatile is None:
volatile = util.Store().memo
self._cache(('__call__', 'close', 'authenticate', 'keepalive', 'XWB_CREATE_CONTEXT', 'TIU_TEMPLATE_SET ITEMS', 'TIU_TEMPLATE_CREATE/MODIFY', 'TIU_TEMPLATE_DELETE', 'TIU_TEMPLATE_LOCK', 'TIU_TEMPLATE_UNLOCK', 'ORWDX_SAVE', 'ORWDXM1_BLDQRSP'), None)
self._cache(('XWB_GET_BROKER_INFO', 'XUS_INTRO_MSG'), volatile, prefix=prefix, ttl=float('inf'))
self._cache(None, volatile, prefix=prefix, ttl=float('-inf'))
self._cache_persistent(persistent=persistent, prefix=prefix)
def _cache_persistent(self, persistent=None, prefix=''):
if persistent is None:
persistent = util.Store().memo
self._cache(('SDEC_RESOURCE', 'ORWU1_NEWLOC', 'ORWLRR_ALLTESTS_ALL', 'ORWORDG_ALLTREE', 'ORWORDG_REVSTS', 'ORWDX_DGNM', 'ORWDX_ORDITM'), persistent, prefix=prefix, ttl=float('inf'))
def jsonify_result(value, id=None):
return jsonify({ 'result': value._base, 'error': None, 'id': request.json.get('id'), 'ts': value._ts } if isinstance(value, util.Cached) else { 'result': value, 'error': None, 'id': request.json.get('id') })
def jsonify_error(ex, id=None):
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': id })
def application():
app = Flask(__name__)
app.json = JSONProviderX(app)
app.secret = secret = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
clients = {}
@app.get('/')
def cb_index():
return send_from_directory('./htdocs', 'index.html')
@app.post('/v1/vista')
def cb_connect():
params = request.json['params']
try:
if params.get('secret') == secret:
cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
while cid in clients:
cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
clients[cid] = client = CacheProxyRPC(rpc.ClientSync(host=params.get('host', 'test.northport.med.va.gov'), port=int(params.get('port', 19009))))
return jsonify_result(cid, id=request.json.get('id'))
else:
return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') })
except Exception as ex:
logger.exception(request.url)
return jsonify_error(ex, id=request.json.get('id'))
@app.post('/v1/vista/<cid>/close')
def cb_close(cid):
try:
client = clients[cid]
res = client.close()
del clients[cid]
return jsonify_result(res, id=request.json.get('id'))
except Exception as ex:
logger.exception(request.url)
return jsonify_error(ex, id=request.json.get('id'))
@app.post('/v1/vista/<cid>/serverinfo')
def cb_serverinfo(cid):
try:
client = clients[cid]
return jsonify_result(client._obj._server, id=request.json.get('id'))
except Exception as ex:
logger.exception(request.url)
return jsonify_error(ex, id=request.json.get('id'))
@app.post('/v1/vista/<cid>/userinfo')
def cb_userinfo(cid):
try:
client = clients[cid]
return jsonify_result(client._obj._user, id=request.json.get('id'))
except Exception as ex:
logger.exception(request.url)
return jsonify_error(ex, id=request.json.get('id'))
@app.post('/v1/vista/<cid>/authenticate')
def cb_authenticate(cid):
params = request.json['params']
try:
client = clients[cid]
if 'avcode' in params:
user = client.authenticate(params['avcode'])
client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo)
return jsonify_result(user, id=request.json.get('id'))
else:
from auth import XUIAMSSOi_MySsoTokenVBA
if token := XUIAMSSOi_MySsoTokenVBA():
user = client.authenticate(token)
client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo)
return jsonify_result(user, id=request.json.get('id'))
else:
return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') })
except Exception as ex:
logger.exception(request.url)
return jsonify_error(ex, id=request.json.get('id'))
@app.post('/v1/vista/<cid>')
def cb_call1(cid):
try:
client = clients[cid]
data = request.json
kw = {}
if 'context' in data:
kw['context'] = data['context']
thunk = getattr(client, data['method'].upper())
if getattr(thunk, 'cached', False):
if 'ttl' in data:
kw['_cache_ttl'] = data['ttl']
if 'stale' in data:
kw['_cache_stale'] = data['stale']
return jsonify_result(thunk(*data.get('params', ()), **kw), id=data.get('id'))
except Exception as ex:
logger.exception(request.url)
return jsonify_error(ex, id=request.json.get('id'))
@app.post('/v1/vista/<cid>/<method>')
def cb_call2(cid, method):
try:
client = clients[cid]
data = request.json
kw = {}
if 'context' in data:
kw['context'] = data['context']
thunk = getattr(client, method.upper())
if getattr(thunk, 'cached', False):
if 'ttl' in data:
kw['_cache_ttl'] = data['ttl']
if 'stale' in data:
kw['_cache_stale'] = data['stale']
return jsonify_result(thunk(*data.get('params', ()), **kw), id=data.get('id'))
except Exception as ex:
logger.exception(request.url)
return jsonify_error(ex, id=request.json.get('id'))
@app.get('/<path:path>')
def cb_static(path):
return send_from_directory('./htdocs', path if '.' in path.rsplit('/', 1)[-1] else 'index.html')
return app
def get_port():
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', 0))
port = sock.getsockname()[1]
sock.close()
return port
if __name__ == '__main__':
import webbrowser
app = application()
port = get_port()
print(f'http://localhost:{port}/#{app.secret}')
webbrowser.open(f'http://localhost:{port}/#{app.secret}')
app.run(port=port, request_handler=protoswitch.SwitchingRequestHandler)