#!/usr/bin/env python3 import json import secrets import string import time from flask import Flask, request, send_from_directory from flask.json import jsonify from flask.json.provider import DefaultJSONProvider import rpc import util import protoswitch import logging logger = logging.getLogger(__name__) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')) logger.addHandler(handler) logger.setLevel(logging.WARNING) class JSONProviderX(DefaultJSONProvider): @staticmethod def default(obj): return json.dumps(obj, sort_keys=True, default=str) class CacheProxyRPC(util.CacheProxy): def __init__(self, obj, persistent=None, volatile=None, prefix=''): util.CacheProxy.__init__(self, obj) if volatile is None: volatile = util.Store().memo self._cache(('__call__', 'close', 'authenticate', 'keepalive', 'XWB_CREATE_CONTEXT', 'TIU_TEMPLATE_SET_ITEMS', 'TIU_TEMPLATE_CREATE/MODIFY', 'TIU_TEMPLATE_DELETE', 'TIU_TEMPLATE_LOCK', 'TIU_TEMPLATE_UNLOCK', 'ORWDX_SAVE', 'ORWDXM1_BLDQRSP'), None) self._cache(('XWB_GET_BROKER_INFO', 'XUS_INTRO_MSG'), volatile, prefix=prefix, ttl=float('inf')) self._cache(None, volatile, prefix=prefix, ttl=float('-inf')) self._cache_persistent(persistent=persistent, prefix=prefix) def _cache_persistent(self, persistent=None, prefix=''): if persistent is None: persistent = util.Store().memo self._cache(('SDEC_RESOURCE', 'ORWU1_NEWLOC', 'ORWLRR_ALLTESTS_ALL', 'ORWORDG_ALLTREE', 'ORWORDG_REVSTS', 'ORWDX_DGNM', 'ORWDX_ORDITM'), persistent, prefix=prefix, ttl=float('inf')) def jsonify_result(value, id=None): return jsonify({ 'result': value._base, 'error': None, 'id': request.json.get('id'), 'ts': value._ts, 'cached': True } if isinstance(value, util.Cached) else { 'result': value, 'error': None, 'id': request.json.get('id'), 'ts': time.time() }) def jsonify_error(ex, id=None): return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': id }) def application(): app = Flask(__name__) app.json = JSONProviderX(app) app.secret = secret = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64)) clients = {} @app.get('/') def cb_index(): return send_from_directory('./htdocs', 'index.html') @app.post('/v1/vista') def cb_connect(): params = request.json['params'] try: if params.get('secret') == secret: cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64)) while cid in clients: cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64)) clients[cid] = client = CacheProxyRPC(rpc.ClientSync(host=params.get('host', 'test.northport.med.va.gov'), port=int(params.get('port', 19009)))) return jsonify_result(cid, id=request.json.get('id')) else: return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') }) except Exception as ex: logger.exception(request.url) return jsonify_error(ex, id=request.json.get('id')) @app.post('/v1/vista//close') def cb_close(cid): try: client = clients[cid] res = client.close() del clients[cid] return jsonify_result(res, id=request.json.get('id')) except Exception as ex: logger.exception(request.url) return jsonify_error(ex, id=request.json.get('id')) @app.post('/v1/vista//serverinfo') def cb_serverinfo(cid): try: client = clients[cid] return jsonify_result(client._obj._server, id=request.json.get('id')) except Exception as ex: logger.exception(request.url) return jsonify_error(ex, id=request.json.get('id')) @app.post('/v1/vista//userinfo') def cb_userinfo(cid): try: client = clients[cid] return jsonify_result(client._obj._user, id=request.json.get('id')) except Exception as ex: logger.exception(request.url) return jsonify_error(ex, id=request.json.get('id')) @app.post('/v1/vista//authenticate') def cb_authenticate(cid): params = request.json['params'] try: client = clients[cid] if 'avcode' in params: user = client.authenticate(params['avcode']) client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo) return jsonify_result(user, id=request.json.get('id')) else: from auth import XUIAMSSOi_MySsoTokenVBA if token := XUIAMSSOi_MySsoTokenVBA(): user = client.authenticate(token) client._cache_persistent(persistent=util.Store(f'cache.{client._server["volume"].lower()}.{client._server["uci"].lower()}.{user[0]}.db', journal_mode='WAL').memo) return jsonify_result(user, id=request.json.get('id')) else: return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') }) except Exception as ex: logger.exception(request.url) return jsonify_error(ex, id=request.json.get('id')) @app.post('/v1/vista/') def cb_call1(cid): try: client = clients[cid] data = request.json kw = {} if 'context' in data: kw['context'] = data['context'] thunk = getattr(client, data['method'].upper()) if getattr(thunk, 'cached', False): if 'ttl' in data: kw['_cache_ttl'] = data['ttl'] if 'stale' in data: kw['_cache_stale'] = data['stale'] return jsonify_result(thunk(*data.get('params', ()), **kw), id=data.get('id')) except Exception as ex: logger.exception(request.url) return jsonify_error(ex, id=request.json.get('id')) @app.post('/v1/vista//') def cb_call2(cid, method): try: client = clients[cid] data = request.json kw = {} if 'context' in data: kw['context'] = data['context'] thunk = getattr(client, method.upper()) if getattr(thunk, 'cached', False): if 'ttl' in data: kw['_cache_ttl'] = data['ttl'] if 'stale' in data: kw['_cache_stale'] = data['stale'] return jsonify_result(thunk(*data.get('params', ()), **kw), id=data.get('id')) except Exception as ex: logger.exception(request.url) return jsonify_error(ex, id=request.json.get('id')) @app.get('/') def cb_static(path): return send_from_directory('./htdocs', path if '.' in path.rsplit('/', 1)[-1] else 'index.html') return app def get_port(): import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('localhost', 0)) port = sock.getsockname()[1] sock.close() return port if __name__ == '__main__': import webbrowser app = application() port = get_port() print(f'http://localhost:{port}/#{app.secret}') webbrowser.open(f'http://localhost:{port}/#{app.secret}') app.run(port=port, request_handler=protoswitch.SwitchingRequestHandler)