First
This commit is contained in:
147
main.py
Normal file
147
main.py
Normal file
@@ -0,0 +1,147 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import json
|
||||
import secrets
|
||||
import string
|
||||
from flask import Flask, request, send_from_directory
|
||||
from flask.json import jsonify
|
||||
from flask.json.provider import DefaultJSONProvider
|
||||
|
||||
import rpc
|
||||
import util
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.WARNING)
|
||||
|
||||
class JSONProviderX(DefaultJSONProvider):
|
||||
@staticmethod
|
||||
def default(obj):
|
||||
return json.dumps(obj, sort_keys=True, default=str)
|
||||
|
||||
class CacheProxyRPC(util.CacheProxy):
|
||||
def __init__(self, obj, persistent=None, volatile=None, prefix=''):
|
||||
util.CacheProxy.__init__(self, obj)
|
||||
if persistent is None:
|
||||
persistent = util.Store().memo
|
||||
if volatile is None:
|
||||
volatile = util.Store().memo
|
||||
self._cache(('__call__', 'close', 'authenticate', 'keepalive', 'XWB_CREATE_CONTEXT', 'XWB_IM_HERE'), None)
|
||||
self._cache(('SDEC_RESOURCE', 'ORWLRR_ALLTESTS_ALL'), persistent, prefix=prefix, ttl=float('inf'))
|
||||
self._cache(('XWB_GET_BROKER_INFO', 'XUS_INTRO_MSG'), volatile, prefix=prefix, ttl=float('inf'))
|
||||
self._cache(None, volatile, prefix=prefix, ttl=float('-inf'))
|
||||
def _cache_persistent(self, persistent=None, prefix=''):
|
||||
if persistent is None:
|
||||
persistent = util.Store().memo
|
||||
self._cache(('SDEC_RESOURCE', 'ORWLRR_ALLTESTS_ALL'), persistent, prefix=prefix, ttl=float('inf'))
|
||||
|
||||
def application():
|
||||
app = Flask(__name__)
|
||||
app.json = JSONProviderX(app)
|
||||
|
||||
app.secret = secret = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
|
||||
clients = {}
|
||||
|
||||
@app.get('/')
|
||||
def cb_index():
|
||||
return send_from_directory('./htdocs', 'index.html')
|
||||
|
||||
@app.post('/v1/vista')
|
||||
def cb_connect():
|
||||
params = request.json['params']
|
||||
if params.get('secret') == secret:
|
||||
cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
|
||||
while cid in clients:
|
||||
cid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(64))
|
||||
clients[cid] = client = CacheProxyRPC(rpc.ClientSync(host=params.get('host', 'test.northport.med.va.gov'), port=int(params.get('port', 19009))))
|
||||
return jsonify({ 'result': cid, 'error': None, 'id': request.json.get('id') })
|
||||
else:
|
||||
return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') })
|
||||
|
||||
@app.post('/v1/vista/<cid>/serverinfo')
|
||||
def cb_serverinfo(cid):
|
||||
try:
|
||||
client = clients[cid]
|
||||
return jsonify({ 'result': client._obj._server._asdict() if client._obj._server else None, 'error': None, 'id': request.json.get('id') })
|
||||
except Exception as ex:
|
||||
logger.exception(request.url)
|
||||
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
|
||||
|
||||
@app.post('/v1/vista/<cid>/userinfo')
|
||||
def cb_userinfo(cid):
|
||||
try:
|
||||
client = clients[cid]
|
||||
return jsonify({ 'result': client._obj._user, 'error': None, 'id': request.json.get('id') })
|
||||
except Exception as ex:
|
||||
logger.exception(request.url)
|
||||
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
|
||||
|
||||
@app.post('/v1/vista/<cid>/authenticate')
|
||||
def cb_authenticate(cid):
|
||||
params = request.json['params']
|
||||
try:
|
||||
client = clients[cid]
|
||||
if 'avcode' in params:
|
||||
user = client.authenticate(params['avcode'])
|
||||
client._cache_persistent(persistent=util.Store(f'cache.{client._server.volume.lower()}.{client._server.uci.lower()}.{user[0]}.db', journal_mode='WAL').memo)
|
||||
return jsonify({ 'result': user, 'error': None, 'id': request.json.get('id') })
|
||||
else:
|
||||
from auth import XUIAMSSOi_MySsoTokenVBA
|
||||
if token := XUIAMSSOi_MySsoTokenVBA():
|
||||
user = client.authenticate(token)
|
||||
client._cache_persistent(persistent=util.Store(f'cache.{client._server.volume.lower()}.{client._server.uci.lower()}.{user[0]}.db', journal_mode='WAL').memo)
|
||||
return jsonify({ 'result': user, 'error': None, 'id': request.json.get('id') })
|
||||
else:
|
||||
return jsonify({ 'result': None, 'error': { 'type': 'Unauthorized', 'args': [] }, 'id': request.json.get('id') })
|
||||
except Exception as ex:
|
||||
logger.exception(request.url)
|
||||
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
|
||||
|
||||
@app.post('/v1/vista/<cid>')
|
||||
def cb_call1(cid):
|
||||
try:
|
||||
client = clients[cid]
|
||||
data = request.json
|
||||
if 'context' in data:
|
||||
return jsonify({ 'result': getattr(client, data['method'].upper())(*data.get('params', ()), context=data['context']), 'error': None, 'id': data.get('id') })
|
||||
else:
|
||||
return jsonify({ 'result': getattr(client, data['method'].upper())(*data.get('params', ())), 'error': None, 'id': data.get('id') })
|
||||
except Exception as ex:
|
||||
logger.exception(request.url)
|
||||
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
|
||||
|
||||
@app.post('/v1/vista/<cid>/<method>')
|
||||
def cb_call2(cid, method):
|
||||
try:
|
||||
client = clients[cid]
|
||||
data = request.json
|
||||
return jsonify({ 'result': getattr(client, method.upper())(*data.get('params', ())), 'error': None, 'id': data.get('id') })
|
||||
except Exception as ex:
|
||||
logger.exception(request.url)
|
||||
return jsonify({ 'result': None, 'error': { 'type': ex.__class__.__name__, 'args': ex.args }, 'id': request.json.get('id') })
|
||||
|
||||
@app.get('/<path:path>')
|
||||
def cb_static(path):
|
||||
return send_from_directory('./htdocs', path if '.' in path.rsplit('/', 1)[-1] else 'index.html')
|
||||
|
||||
return app
|
||||
|
||||
def get_port():
|
||||
import socket
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.bind(('localhost', 0))
|
||||
port = sock.getsockname()[1]
|
||||
sock.close()
|
||||
return port
|
||||
|
||||
if __name__ == '__main__':
|
||||
import webbrowser
|
||||
app = application()
|
||||
port = get_port()
|
||||
print(f'http://localhost:{port}/#{app.secret}')
|
||||
webbrowser.open(f'http://localhost:{port}/#{app.secret}')
|
||||
app.run(port=port)
|
Reference in New Issue
Block a user