nuVistA/main.py
2022-10-01 00:38:59 -04:00

163 lines
6.5 KiB
Python

#!/usr/bin/env python3
import json
import secrets
import string
from flask import Flask, request, send_from_directory
from flask.json import jsonify
from flask.json.provider import DefaultJSONProvider
import rpc
import util
import logging
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
class JSONProviderX(DefaultJSONProvider):
@staticmethod
def default(obj):
return json.dumps(obj, sort_keys=True, default=str)
class CacheProxyRPC(util.CacheProxy):
def __init__(self, obj, persistent=None, volatile=None, prefix=''):
util.CacheProxy.__init__(self, obj)
if persistent is None:
persistent = util.Store().memo
if volatile is None:
volatile = util.Store().memo
self._cache(('__call__', 'close', 'authenticate', 'keepalive', 'XWB_CREATE_CONTEXT', 'XWB_IM_HERE'), None)
self._cache(('SDEC_RESOURCE', 'ORWLRR_ALLTESTS_ALL', 'ORWORDG_ALLTREE', 'ORWORDG_REVSTS'), persistent, prefix=prefix, ttl=float('inf'))
self._cache(('XWB_GET_BROKER_INFO', 'XUS_INTRO_MSG'), volatile, prefix=prefix, ttl=float('inf'))
self._cache(None, volatile, prefix=prefix, ttl=float('-inf'))
def _cache_persistent(self, persistent=None, prefix=''):
if persistent is None:
persistent = util.Store().memo
self._cache(('SDEC_RESOURCE', 'ORWLRR_ALLTESTS_ALL'), persistent, prefix=prefix, ttl=float('inf'))
def application():
app = Flask(__name__)
app.json = JSONProviderX(app)
app.secret = secret = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
clients = {}
@app.get('/')
def cb_index():
return send_from_directory('./htdocs', 'index.html')
@app.post('/v1/vista')
def cb_connect():
params = request.json['params']
try:
if params.get('secret') == secret:
cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
while cid in clients:
cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
clients[cid] = client = CacheProxyRPC(rpc.ClientSync(host=params.get('host', 'test.northport.med.va.gov'), port=int(params.get('port', 19009))))
return jsonify({ 'result': cid, 'error': None, 'id': request.json.get('id') })
else:
return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') })
except Exception as ex:
logger.exception(request.url)
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
@app.post('/v1/vista/<cid>/close')
def cb_close(cid):
try:
client = clients[cid]
res = client.close()
del clients[cid]
return jsonify({ 'result': res, 'error': None, 'id': request.json.get('id') })
except Exception as ex:
logger.exception(request.url)
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
@app.post('/v1/vista/<cid>/serverinfo')
def cb_serverinfo(cid):
try:
client = clients[cid]
return jsonify({ 'result': client._obj._server, 'error': None, 'id': request.json.get('id') })
except Exception as ex:
logger.exception(request.url)
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
@app.post('/v1/vista/<cid>/userinfo')
def cb_userinfo(cid):
try:
client = clients[cid]
return jsonify({ 'result': client._obj._user, 'error': None, 'id': request.json.get('id') })
except Exception as ex:
logger.exception(request.url)
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
@app.post('/v1/vista/<cid>/authenticate')
def cb_authenticate(cid):
params = request.json['params']
try:
client = clients[cid]
if 'avcode' in params:
user = client.authenticate(params['avcode'])
client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo)
return jsonify({ 'result': user, 'error': None, 'id': request.json.get('id') })
else:
from auth import XUIAMSSOi_MySsoTokenVBA
if token := XUIAMSSOi_MySsoTokenVBA():
user = client.authenticate(token)
client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo)
return jsonify({ 'result': user, 'error': None, 'id': request.json.get('id') })
else:
return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') })
except Exception as ex:
logger.exception(request.url)
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
@app.post('/v1/vista/<cid>')
def cb_call1(cid):
try:
client = clients[cid]
data = request.json
if 'context' in data:
return jsonify({ 'result': getattr(client, data['method'].upper())(*data.get('params', ()), context=data['context']), 'error': None, 'id': data.get('id') })
else:
return jsonify({ 'result': getattr(client, data['method'].upper())(*data.get('params', ())), 'error': None, 'id': data.get('id') })
except Exception as ex:
logger.exception(request.url)
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
@app.post('/v1/vista/<cid>/<method>')
def cb_call2(cid, method):
try:
client = clients[cid]
data = request.json
return jsonify({ 'result': getattr(client, method.upper())(*data.get('params', ())), 'error': None, 'id': data.get('id') })
except Exception as ex:
logger.exception(request.url)
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
@app.get('/<path:path>')
def cb_static(path):
return send_from_directory('./htdocs', path if '.' in path.rsplit('/', 1)[-1] else 'index.html')
return app
def get_port():
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', 0))
port = sock.getsockname()[1]
sock.close()
return port
if __name__ == '__main__':
import webbrowser
app = application()
port = get_port()
print(f'http://localhost:{port}/#{app.secret}')
webbrowser.open(f'http://localhost:{port}/#{app.secret}')
app.run(port=port)