#!/usr/bin/env python3 import re import sys import getpass import asyncio import configparser import autoproc import ext_session import ext_discovery import ext_scheduling import ext_patient import ext_measurement import ext_lab import ext_note import ext_order import ext_rcrs from typing import Optional, Union, Generator import logging logger = logging.getLogger(__name__) class Config(configparser.ConfigParser): def __init__(self, *filenames, **kw): configparser.ConfigParser.__init__(self, **kw) self.filenames = filenames self.reread() def reread(self, *args, **kw) -> list: configparser.ConfigParser.read(self, self.filenames, *args, **kw) return self def set(self, section: str, key: str, value: str): target = configparser.ConfigParser() target.read(self.filenames[-1]) if not target.has_section(section): target.add_section(section) target[section][key] = value with open(self.filenames[-1], 'w+') as f: target.write(f) configparser.ConfigParser.read(self, self.filenames[-1]) return self async def stdin_reader(opts: dict) -> Generator[bytes, None, None]: """Read from stdin, filter through input_filter, and write into pipe""" try: loop = asyncio.get_event_loop() while True: if opts.get('stdin.echo', True): # input with line editor yield (await loop.run_in_executor(None, sys.stdin.readline)).rstrip('\r\n') else: # input hiding characters yield await loop.run_in_executor(None, getpass.getpass, '') except ConnectionResetError: logger.info('ConnectionResetError: stdin_reader') except KeyboardInterrupt: logger.info('KeyboardInterrupt: stdin_reader') async def log_writer(proc, buffer): with autoproc.subscribe(proc) as pipequeue: while True: buffer.write(await pipequeue.get()) import sqlite3, time, pickle def memoize(database: Union[str, sqlite3.Connection]=':memory:', table: str='tempmemo', prefix: Optional[str]=None, ttl: float=86400, persist=False, cast=None): db = database if isinstance(database, sqlite3.Connection) else sqlite3.connect(database) db.execute(f'CREATE {"" if persist else "TEMPORARY "}TABLE IF NOT EXISTS {table} (uid TEXT PRIMARY KEY, result BLOB, expiry FLOAT)') db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS idx_{table}_uid ON {table} (uid)') db.execute(f'CREATE INDEX IF NOT EXISTS idx_{table}_expiry ON {table} (expiry)') def memoizer(func): if asyncio.iscoroutinefunction(func): async def wrapper(*args, **kw): uid = f'{prefix or func.__name__}:{repr(args)}{repr(kw)}'.encode('utf-8') for row in db.execute(f'SELECT result FROM {table} WHERE uid=? AND expiry>? LIMIT 1', (uid, time.time())): return cast(pickle.loads(row[0])) if cast else pickle.loads(row[0]) result = await func(*args, **kw) with db: db.execute(f'INSERT OR REPLACE INTO {table} (uid, result, expiry) VALUES (?, ?, ?)', (uid, pickle.dumps(result), time.time() + ttl)) return result else: def wrapper(*args, **kw): uid = f'{prefix or func.__name__}:{repr(args)}{repr(kw)}'.encode('utf-8') for row in db.execute(f'SELECT result FROM {table} WHERE uid=? AND expiry>? LIMIT 1', (uid, time.time())): return cast(pickle.loads(row[0])) if cast else pickle.loads(row[0]) result = func(*args, **kw) with db: db.execute(f'INSERT OR REPLACE INTO {table} (uid, result, expiry) VALUES (?, ?, ?)', (uid, pickle.dumps(result), time.time() + ttl)) return result wrapper.__name__ = func.__name__ return wrapper return memoizer def application(proc, config): from quart import Quart, request, send_from_directory app = Quart(__name__) db = sqlite3.connect('./cache.db') from io import StringIO buffer = StringIO() proc.create_task(log_writer(proc, buffer), name='@task:log') @app.route('/api/clinic/list') @memoize(db, table='memo', prefix='clinics', ttl=30*86400, persist=True) async def http_api_clinic_list(): return [item async for item in ext_discovery.cmd_listclinics(proc)] @app.route('/api/appointments//') @memoize(db) async def http_api_appointments(clinics, date): clinics = clinics.strip() return [item async for item in ext_scheduling.cmd_appointments(proc, clinics=clinics.replace('|', '/'), date=date.replace('|', '/'))] if len(clinics) > 0 else [] @app.route('/api/lookup/') @memoize(db) async def http_api_lookup(query): query = re.sub(r'\s+', ' ', query.replace('^', '').strip()) return (await ext_patient.cmd_lookup_patient(proc, query)) if len(query) > 1 else [] @app.route('/api/lookup//') @app.route('/api/lookup///') @memoize(db, cast=tuple) async def http_api_lookup_ordinal(query, ordinal, force=False): query = re.sub(r'\s+', ' ', query.replace('^', '').strip()) return (await ext_patient.cmd_lookup_patient_ordinal(proc, query, ordinal, not not force)) if len(query) > 1 else '', 200, { 'Content-type': 'text/plain' } @app.route('/api/measurements///') @memoize(db) async def http_api_measurements(mrn, alpha, omega): import util return [item async for item in ext_measurement.cmd_entries(proc, mrn, util.vista_strptime(alpha), util.vista_strptime(omega))] @app.route('/api/labs///') @memoize(db) async def http_api_labs(mrn, alpha, omega): import util return [item async for item in ext_lab.cmd_reports(proc, mrn, util.vista_strptime(alpha), util.vista_strptime(omega))] @app.route('/api/notes///') @memoize(db) async def http_api_notes(mrn, alpha, omega): import util return [item async for item in ext_note.cmd_reports(proc, mrn, util.vista_strptime(alpha), util.vista_strptime(omega))] @app.route('/api/orders///') @memoize(db) async def http_api_orders(mrn, alpha, omega): import util return [item async for item in ext_order.cmd_entries(proc, mrn, util.vista_strptime(alpha), util.vista_strptime(omega))] @app.route('/api/rcrs/patients//') @memoize(db) async def http_api_rcrs_patients(alpha, omega): import util return [item async for item in ext_rcrs.cmd_patients(proc, util.vista_strptime(alpha), util.vista_strptime(omega))] @app.route('/api/rcrs/tumors//') @memoize(db) async def http_api_rcrs_tumors(alpha, omega): import util return [item async for item in ext_rcrs.cmd_tumors(proc, util.vista_strptime(alpha), util.vista_strptime(omega))] @app.route('/api/log.txt') async def http_api_log(): return buffer.getvalue(), 200, { 'Content-type': 'text/plain' } @app.route('/api/config/
/', methods=('GET',)) async def get_api_config_value(section, key): try: return config[section][key] or '', 200, { 'Content-type': 'text/plain' } except KeyError: return '', 200, { 'Content-type': 'text/plain' } @app.route('/api/config/
/', methods=('PUT',)) async def put_api_config_value(section, key): config.set(section, key, (await request.get_data()).decode('utf-8')) return {} @app.route('/favicon.png') async def http_favicon(): return await send_from_directory('./frontend/build', 'favicon.png') @app.route('/_app/') async def http_static(path): return await send_from_directory('./frontend/build/_app', path) @app.route('/') @app.route('/') async def http_spa(path='index.html'): return await send_from_directory('./frontend/build', '200.html') return app async def frontend_build(): import shutil proc = None try: while True: proc = await asyncio.create_subprocess_exec(shutil.which('npm'), 'run', '--prefix', 'frontend', 'build', '--', '--watch', stdin=None) await proc.wait() logger.warning(f'Frontend builder exited: {proc.returncode}') except asyncio.exceptions.CancelledError: pass finally: if proc is not None and proc.returncode is None: logger.warning('Terminating frontend builder...') await (await asyncio.create_subprocess_exec('taskkill', '/pid', str(proc.pid), '/t', '/f', stdin=None)).wait() # proc.terminate() await proc.wait() proc._transport.close() # https://github.com/python/cpython/issues/88050 def get_port(): import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('localhost', 0)) port = sock.getsockname()[1] sock.close() return port async def main_async(opts: dict, dev=True): import webbrowser, os config = Config('./config.ini') if dev: builder = asyncio.create_task(frontend_build()) await asyncio.sleep(1) while not os.path.isfile('./frontend/build/200.html'): logger.warning('Waiting for frontend build...') await asyncio.sleep(5) try: proc = await autoproc.create_instrumented_subprocess_exec('ssh', '-T', '-oStrictHostKeyChecking=no', 'nopvista@vista.northport.med.va.gov', stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, stdin_endl=b'\r') asyncio.create_task(proc.wait()) if await ext_session.task_smartcard(proc, config): #proc.create_task(ext_session.task_smartcard(proc, config), name='@task:smartcard') port = get_port() asyncio.create_task(application(proc, config).run_task(port=port)) webbrowser.open(f'http://localhost:{port}/') async for data in stdin_reader(opts): #data = input_filter(data) # filter input, possibly spawning tasks if proc.returncode is None and data is not None: proc.sendline(data) #await pipe.drain() else: return finally: if dev: builder.cancel() await builder if __name__ == '__main__': asyncio.run(main_async({}))