vistassh-py/ext_order.py

92 lines
5.0 KiB
Python
Raw Normal View History

2024-03-02 00:34:29 -05:00
#!/usr/bin/env python3
import re
import datetime
import util
import autoproc
import logging
logger = logging.getLogger(__name__)
local_tzinfo = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo
def parse(raw, prev=None):
assert (m := re.match(r'\b(?P<date_entered>\d{2}/\d{2}/\d{2}) (?P<status>[ a-z+]{3})(?:(?P<plus>\+)|[ ])(?P<text0>[^\r\n]{40}) (?P<requestor_name>[^\r\n]{,10})(?: (?P<date_start>\d{2}/\d{2}/\d{2})|[ ]{9})?(?: (?P<date_stop>\d{2}/\d{2})|[ ]{7})?\r\n (?P<time_entered>\d{2}:\d{2}) (?P<text1>[^\r\n]{40}) (?P<requestor_occupation>[^\r\n]{,10})[ ]{4}(?:(?P<time_start>\d{2}:\d{2})|[ ]{5})?[ ]{2}(?:(?P<time_stop>\d{2}:\d{2})|[ ]{5})?\r\n(?P<text>.*?)\r\n\r\n', raw, re.DOTALL))
data = { 'body': raw.strip() }
data.update((k, prev[k] if v == '"' and prev else v) for k, v in ((k, (v.strip() or None) if v is not None else v) for k, v in m.groupdict().items()))
if data.get('date_entered'):
if data.get('time_entered') and data['time_entered'] != '24:00':
data['datetime_entered'] = datetime.datetime.strptime(data['date_entered'] + ' ' + data['time_entered'], '%m/%d/%y %H:%M').replace(tzinfo=local_tzinfo)
else:
data['datetime_entered'] = datetime.datetime.strptime(data['date_entered'], '%m/%d/%y').replace(tzinfo=local_tzinfo).date()
if data.get('date_start'):
if data.get('time_start') and data['time_start'] != '24:00':
data['datetime_start'] = datetime.datetime.strptime(data['date_start'] + ' ' + data['time_start'], '%m/%d/%y %H:%M').replace(tzinfo=local_tzinfo)
else:
data['datetime_start'] = datetime.datetime.strptime(data['date_start'], '%m/%d/%y').replace(tzinfo=local_tzinfo).date()
if data.get('date_stop'):
reference = data.get('datetime_start') or datetime.datetime.now()
if isinstance(reference, datetime.date):
reference = datetime.datetime.combine(reference, datetime.time(0, 0), tzinfo=local_tzinfo)
if data.get('time_stop') and data['time_stop'] != '24:00':
data['datetime_stop'] = adjustyear(datetime.datetime.strptime(data['date_stop'] + ' ' + data['time_stop'], '%m/%d %H:%M').replace(tzinfo=local_tzinfo), reference)
else:
data['datetime_stop'] = adjustyear(datetime.datetime.strptime(data['date_stop'], '%m/%d').replace(tzinfo=local_tzinfo), reference).date()
if text := m.groupdict().get('text'):
assert (m := re.search(r'^[ ]{9}(?:Nrs:(?P<nrs>[^\r\n]{6})|[ ]{10})(?:Chrt:(?P<chrt>[^\r\n]{6})|[ ]{11})Typ:(?P<typ>[^\r\n]{20})Sgn:(?P<sgn>[^\r\n]+?)(?:\r\n|$)', text, re.MULTILINE))
data.update((k, v.strip()) for k, v in m.groupdict().items() if v is not None)
begin, end = m.span()
data['text'] = ((data['text0'].strip() + '\r\n') if data.get('text0') else '') + ((data['text1'].strip() + '\r\n') if data.get('text1') else '') + re.sub(r'\r\n[ \t]+|[ \t]+\r\n', '\r\n', text[:begin].strip())
if len(text) > end:
data['footer'] = text[end:]
text = text[:begin]
else:
data['text'] = ((data['text0'].strip() + '\r\n') if data.get('text0') else '') + (data['text1'].strip() if data.get('text1') else '')
del data['text0'], data['text1']
return data
def adjustyear(dt, after):
if dt > after:
return dt
dt = dt.replace(year=after.year)
return dt if dt > after else dt.replace(year=after.year + 1)
async def cmd_entries(proc, mrn, alpha, omega):
"""Fetch orders"""
async with proc.sendline, autoproc.expect_async(proc) as expect:
proc.sendline('^Results Reporting Menu')
if await expect.endswith('\r\n Press return to continue '): # No patients found
proc.sendline()
assert await expect.endswith('\r\nSelect Patient(s): ', '\r\nSelect Patient: ', timeout_settle=2)
proc.sendline(mrn)
assert await expect.endswith('\r\nSelect Item(s): ')
proc.sendline('9') # Order Summary for Date/time Range
assert await expect.endswith('\r\nStart Date [Time]: T// ')
proc.sendline(util.vista_strftime(alpha))
assert await expect.endswith(re.compile(r'\r\nEnding Date \[Time\] \(inclusive\): (.*?)// $'))
proc.sendline(util.vista_strftime(omega))
assert await expect.endswith('\r\nDEVICE: HOME// ')
proc.sendline('HOME;90;1023')
assert await expect.earliest(' HOME(CRT)\r\n')
pages = []
while True:
match m_delimiter := await expect.endswith('\r\nPress RETURN to continue, \'^\' to exit: ', '\r\nSelect Clinician Menu Option: '):
case autoproc.ExpectMatch(index=0, before=before):
pages.append(before)
proc.sendline()
case autoproc.ExpectMatch(index=1):
break
case _: assert False
proc.sendline('^Patient information AND OE/RR')
2025-01-14 21:56:51 -05:00
async for prompt, response in expect.promptmatches((
(re.compile(r' Press \'RETURN\' to continue, \'\^\' to stop: $'), None),
('Select Patient Information and OE/RR Option: ', None, True),
('Select Patient Information and OE/RR <TEST ACCOUNT> Option: ', None, True),
), throw=True):
if prompt.index == 0:
proc.sendline(response)
2024-03-02 00:34:29 -05:00
expect.clear()
prev = None
for m in re.finditer(r'\b\d{2}/\d{2}/\d{2}.*?\r\n\r\n', '\r\n'.join(pages).replace('\x1b[1m', '').replace('\x1b[m', ''), re.DOTALL):
prev = parse(m.group(0), prev)
yield prev