First
This commit is contained in:
85
ext_order.py
Normal file
85
ext_order.py
Normal file
@@ -0,0 +1,85 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import re
|
||||
import datetime
|
||||
import util
|
||||
import autoproc
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
local_tzinfo = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo
|
||||
def parse(raw, prev=None):
|
||||
assert (m := re.match(r'\b(?P<date_entered>\d{2}/\d{2}/\d{2}) (?P<status>[ a-z+]{3})(?:(?P<plus>\+)|[ ])(?P<text0>[^\r\n]{40}) (?P<requestor_name>[^\r\n]{,10})(?: (?P<date_start>\d{2}/\d{2}/\d{2})|[ ]{9})?(?: (?P<date_stop>\d{2}/\d{2})|[ ]{7})?\r\n (?P<time_entered>\d{2}:\d{2}) (?P<text1>[^\r\n]{40}) (?P<requestor_occupation>[^\r\n]{,10})[ ]{4}(?:(?P<time_start>\d{2}:\d{2})|[ ]{5})?[ ]{2}(?:(?P<time_stop>\d{2}:\d{2})|[ ]{5})?\r\n(?P<text>.*?)\r\n\r\n', raw, re.DOTALL))
|
||||
data = { 'body': raw.strip() }
|
||||
data.update((k, prev[k] if v == '"' and prev else v) for k, v in ((k, (v.strip() or None) if v is not None else v) for k, v in m.groupdict().items()))
|
||||
if data.get('date_entered'):
|
||||
if data.get('time_entered') and data['time_entered'] != '24:00':
|
||||
data['datetime_entered'] = datetime.datetime.strptime(data['date_entered'] + ' ' + data['time_entered'], '%m/%d/%y %H:%M').replace(tzinfo=local_tzinfo)
|
||||
else:
|
||||
data['datetime_entered'] = datetime.datetime.strptime(data['date_entered'], '%m/%d/%y').replace(tzinfo=local_tzinfo).date()
|
||||
if data.get('date_start'):
|
||||
if data.get('time_start') and data['time_start'] != '24:00':
|
||||
data['datetime_start'] = datetime.datetime.strptime(data['date_start'] + ' ' + data['time_start'], '%m/%d/%y %H:%M').replace(tzinfo=local_tzinfo)
|
||||
else:
|
||||
data['datetime_start'] = datetime.datetime.strptime(data['date_start'], '%m/%d/%y').replace(tzinfo=local_tzinfo).date()
|
||||
if data.get('date_stop'):
|
||||
reference = data.get('datetime_start') or datetime.datetime.now()
|
||||
if isinstance(reference, datetime.date):
|
||||
reference = datetime.datetime.combine(reference, datetime.time(0, 0), tzinfo=local_tzinfo)
|
||||
if data.get('time_stop') and data['time_stop'] != '24:00':
|
||||
data['datetime_stop'] = adjustyear(datetime.datetime.strptime(data['date_stop'] + ' ' + data['time_stop'], '%m/%d %H:%M').replace(tzinfo=local_tzinfo), reference)
|
||||
else:
|
||||
data['datetime_stop'] = adjustyear(datetime.datetime.strptime(data['date_stop'], '%m/%d').replace(tzinfo=local_tzinfo), reference).date()
|
||||
if text := m.groupdict().get('text'):
|
||||
assert (m := re.search(r'^[ ]{9}(?:Nrs:(?P<nrs>[^\r\n]{6})|[ ]{10})(?:Chrt:(?P<chrt>[^\r\n]{6})|[ ]{11})Typ:(?P<typ>[^\r\n]{20})Sgn:(?P<sgn>[^\r\n]+?)(?:\r\n|$)', text, re.MULTILINE))
|
||||
data.update((k, v.strip()) for k, v in m.groupdict().items() if v is not None)
|
||||
begin, end = m.span()
|
||||
data['text'] = ((data['text0'].strip() + '\r\n') if data.get('text0') else '') + ((data['text1'].strip() + '\r\n') if data.get('text1') else '') + re.sub(r'\r\n[ \t]+|[ \t]+\r\n', '\r\n', text[:begin].strip())
|
||||
if len(text) > end:
|
||||
data['footer'] = text[end:]
|
||||
text = text[:begin]
|
||||
else:
|
||||
data['text'] = ((data['text0'].strip() + '\r\n') if data.get('text0') else '') + (data['text1'].strip() if data.get('text1') else '')
|
||||
del data['text0'], data['text1']
|
||||
return data
|
||||
|
||||
def adjustyear(dt, after):
|
||||
if dt > after:
|
||||
return dt
|
||||
dt = dt.replace(year=after.year)
|
||||
return dt if dt > after else dt.replace(year=after.year + 1)
|
||||
|
||||
async def cmd_entries(proc, mrn, alpha, omega):
|
||||
"""Fetch orders"""
|
||||
async with proc.sendline, autoproc.expect_async(proc) as expect:
|
||||
proc.sendline('^Results Reporting Menu')
|
||||
if await expect.endswith('\r\n Press return to continue '): # No patients found
|
||||
proc.sendline()
|
||||
assert await expect.endswith('\r\nSelect Patient(s): ', '\r\nSelect Patient: ', timeout_settle=2)
|
||||
proc.sendline(mrn)
|
||||
assert await expect.endswith('\r\nSelect Item(s): ')
|
||||
proc.sendline('9') # Order Summary for Date/time Range
|
||||
assert await expect.endswith('\r\nStart Date [Time]: T// ')
|
||||
proc.sendline(util.vista_strftime(alpha))
|
||||
assert await expect.endswith(re.compile(r'\r\nEnding Date \[Time\] \(inclusive\): (.*?)// $'))
|
||||
proc.sendline(util.vista_strftime(omega))
|
||||
assert await expect.endswith('\r\nDEVICE: HOME// ')
|
||||
proc.sendline('HOME;90;1023')
|
||||
assert await expect.earliest(' HOME(CRT)\r\n')
|
||||
pages = []
|
||||
while True:
|
||||
match m_delimiter := await expect.endswith('\r\nPress RETURN to continue, \'^\' to exit: ', '\r\nSelect Clinician Menu Option: '):
|
||||
case autoproc.ExpectMatch(index=0, before=before):
|
||||
pages.append(before)
|
||||
proc.sendline()
|
||||
case autoproc.ExpectMatch(index=1):
|
||||
break
|
||||
case _: assert False
|
||||
proc.sendline('^Patient information AND OE/RR')
|
||||
assert await expect.endswith('\r\nSelect Patient Information and OE/RR Option: ', '\r\nSelect Patient Information and OE/RR <TEST ACCOUNT> Option: ')
|
||||
expect.clear()
|
||||
prev = None
|
||||
for m in re.finditer(r'\b\d{2}/\d{2}/\d{2}.*?\r\n\r\n', '\r\n'.join(pages).replace('\x1b[1m', '').replace('\x1b[m', ''), re.DOTALL):
|
||||
prev = parse(m.group(0), prev)
|
||||
yield prev
|
Reference in New Issue
Block a user