184 lines
6.6 KiB
Python
184 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import secrets
|
|
import string
|
|
from flask import Flask, request, send_from_directory
|
|
from flask.json import jsonify
|
|
from flask.json.provider import DefaultJSONProvider
|
|
|
|
import rpc
|
|
import util
|
|
import protoswitch
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
|
|
logger.addHandler(handler)
|
|
logger.setLevel(logging.WARNING)
|
|
|
|
class JSONProviderX(DefaultJSONProvider):
|
|
@staticmethod
|
|
def default(obj):
|
|
return json.dumps(obj, sort_keys=True, default=str)
|
|
|
|
class CacheProxyRPC(util.CacheProxy):
|
|
def __init__(self, obj, persistent=None, volatile=None, prefix=''):
|
|
util.CacheProxy.__init__(self, obj)
|
|
if volatile is None:
|
|
volatile = util.Store().memo
|
|
self._cache(('__call__', 'close', 'authenticate', 'keepalive', 'XWB_CREATE_CONTEXT', 'TIU_TEMPLATE_SET_ITEMS', 'TIU_TEMPLATE_CREATE/MODIFY', 'TIU_TEMPLATE_DELETE', 'TIU_TEMPLATE_LOCK', 'TIU_TEMPLATE_UNLOCK', 'ORWDX_SAVE', 'ORWDXM1_BLDQRSP'), None)
|
|
self._cache(('XWB_GET_BROKER_INFO', 'XUS_INTRO_MSG'), volatile, prefix=prefix, ttl=float('inf'))
|
|
self._cache(None, volatile, prefix=prefix, ttl=float('-inf'))
|
|
self._cache_persistent(persistent=persistent, prefix=prefix)
|
|
def _cache_persistent(self, persistent=None, prefix=''):
|
|
if persistent is None:
|
|
persistent = util.Store().memo
|
|
self._cache(('SDEC_RESOURCE', 'ORWU1_NEWLOC', 'ORWLRR_ALLTESTS_ALL', 'ORWORDG_ALLTREE', 'ORWORDG_REVSTS', 'ORWDX_DGNM', 'ORWDX_ORDITM'), persistent, prefix=prefix, ttl=float('inf'))
|
|
|
|
def jsonify_result(value, id=None):
|
|
return jsonify({ 'result': value._base, 'error': None, 'id': request.json.get('id'), 'ts': value._ts } if isinstance(value, util.Cached) else { 'result': value, 'error': None, 'id': request.json.get('id') })
|
|
|
|
def jsonify_error(ex, id=None):
|
|
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': id })
|
|
|
|
def application():
|
|
app = Flask(__name__)
|
|
app.json = JSONProviderX(app)
|
|
|
|
app.secret = secret = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
|
|
clients = {}
|
|
|
|
@app.get('/')
|
|
def cb_index():
|
|
return send_from_directory('./htdocs', 'index.html')
|
|
|
|
@app.post('/v1/vista')
|
|
def cb_connect():
|
|
params = request.json['params']
|
|
try:
|
|
if params.get('secret') == secret:
|
|
cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
|
|
while cid in clients:
|
|
cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
|
|
clients[cid] = client = CacheProxyRPC(rpc.ClientSync(host=params.get('host', 'test.northport.med.va.gov'), port=int(params.get('port', 19009))))
|
|
return jsonify_result(cid, id=request.json.get('id'))
|
|
else:
|
|
return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') })
|
|
except Exception as ex:
|
|
logger.exception(request.url)
|
|
return jsonify_error(ex, id=request.json.get('id'))
|
|
|
|
@app.post('/v1/vista/<cid>/close')
|
|
def cb_close(cid):
|
|
try:
|
|
client = clients[cid]
|
|
res = client.close()
|
|
del clients[cid]
|
|
return jsonify_result(res, id=request.json.get('id'))
|
|
except Exception as ex:
|
|
logger.exception(request.url)
|
|
return jsonify_error(ex, id=request.json.get('id'))
|
|
|
|
@app.post('/v1/vista/<cid>/serverinfo')
|
|
def cb_serverinfo(cid):
|
|
try:
|
|
client = clients[cid]
|
|
return jsonify_result(client._obj._server, id=request.json.get('id'))
|
|
except Exception as ex:
|
|
logger.exception(request.url)
|
|
return jsonify_error(ex, id=request.json.get('id'))
|
|
|
|
@app.post('/v1/vista/<cid>/userinfo')
|
|
def cb_userinfo(cid):
|
|
try:
|
|
client = clients[cid]
|
|
return jsonify_result(client._obj._user, id=request.json.get('id'))
|
|
except Exception as ex:
|
|
logger.exception(request.url)
|
|
return jsonify_error(ex, id=request.json.get('id'))
|
|
|
|
@app.post('/v1/vista/<cid>/authenticate')
|
|
def cb_authenticate(cid):
|
|
params = request.json['params']
|
|
try:
|
|
client = clients[cid]
|
|
if 'avcode' in params:
|
|
user = client.authenticate(params['avcode'])
|
|
client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo)
|
|
return jsonify_result(user, id=request.json.get('id'))
|
|
else:
|
|
from auth import XUIAMSSOi_MySsoTokenVBA
|
|
if token := XUIAMSSOi_MySsoTokenVBA():
|
|
user = client.authenticate(token)
|
|
client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo)
|
|
return jsonify_result(user, id=request.json.get('id'))
|
|
else:
|
|
return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') })
|
|
except Exception as ex:
|
|
logger.exception(request.url)
|
|
return jsonify_error(ex, id=request.json.get('id'))
|
|
|
|
@app.post('/v1/vista/<cid>')
|
|
def cb_call1(cid):
|
|
try:
|
|
client = clients[cid]
|
|
data = request.json
|
|
kw = {}
|
|
if 'context' in data:
|
|
kw['context'] = data['context']
|
|
thunk = getattr(client, data['method'].upper())
|
|
if getattr(thunk, 'cached', False):
|
|
if 'ttl' in data:
|
|
kw['_cache_ttl'] = data['ttl']
|
|
if 'stale' in data:
|
|
kw['_cache_stale'] = data['stale']
|
|
return jsonify_result(thunk(*data.get('params', ()), **kw), id=data.get('id'))
|
|
except Exception as ex:
|
|
logger.exception(request.url)
|
|
return jsonify_error(ex, id=request.json.get('id'))
|
|
|
|
@app.post('/v1/vista/<cid>/<method>')
|
|
def cb_call2(cid, method):
|
|
try:
|
|
client = clients[cid]
|
|
data = request.json
|
|
kw = {}
|
|
if 'context' in data:
|
|
kw['context'] = data['context']
|
|
thunk = getattr(client, method.upper())
|
|
if getattr(thunk, 'cached', False):
|
|
if 'ttl' in data:
|
|
kw['_cache_ttl'] = data['ttl']
|
|
if 'stale' in data:
|
|
kw['_cache_stale'] = data['stale']
|
|
return jsonify_result(thunk(*data.get('params', ()), **kw), id=data.get('id'))
|
|
except Exception as ex:
|
|
logger.exception(request.url)
|
|
return jsonify_error(ex, id=request.json.get('id'))
|
|
|
|
@app.get('/<path:path>')
|
|
def cb_static(path):
|
|
return send_from_directory('./htdocs', path if '.' in path.rsplit('/', 1)[-1] else 'index.html')
|
|
|
|
return app
|
|
|
|
def get_port():
|
|
import socket
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.bind(('localhost', 0))
|
|
port = sock.getsockname()[1]
|
|
sock.close()
|
|
return port
|
|
|
|
if __name__ == '__main__':
|
|
import webbrowser
|
|
|
|
app = application()
|
|
port = get_port()
|
|
print(f'http://localhost:{port}/#{app.secret}')
|
|
webbrowser.open(f'http://localhost:{port}/#{app.secret}')
|
|
app.run(port=port, request_handler=protoswitch.SwitchingRequestHandler)
|