vistassh-py/ext_scheduling.py

114 lines
5.4 KiB
Python

#!/usr/bin/env python3
"""Appointments"""
import re
import datetime
import autoproc
import logging
logger = logging.getLogger(__name__)
local_tzinfo = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo
async def cmd_appointments(proc, clinics='NPT-HEM/ONC ATTENDING', date='T', storeview=None):
"""List appointments"""
async with proc.sendline, autoproc.expect_async(proc) as expect:
proc.sendline('^Appointment List')
assert await expect.endswith('\r\nSelect division: ALL// ')
proc.sendline() # default ALL
assert await expect.endswith('\r\nCount, Non Count, or Both: C//')
proc.sendline('Both')
assert await expect.endswith('\r\nSelect clinic: ALL// ')
for line in clinics.strip().split('^'):
proc.sendline(line)
match await expect.endswith('\x07 ??\r\nSelect clinic: ALL// ', '\r\nSelect another clinic: ', '\r\nSelect clinic: ALL// ', re.compile(r'\r\nCHOOSE \d+-\d+: $'), '\r\n ...OK? Yes// '):
case autoproc.ExpectMatch(index=0):
raise RuntimeError
case autoproc.ExpectMatch(index=3):
proc.sendline('1') # choose first option
case autoproc.ExpectMatch(index=4):
proc.sendline() # accept confirmation
proc.sendline() # finish clinic selection with blank entry
assert await expect.endswith('\r\nFor date: ')
proc.sendline(date)
if storeview:
assert (m := await expect.endswith(re.compile(r'\((.*?)\)', flags=re.MULTILINE|re.DOTALL)))
date = datetime.datetime.strptime(m.group(1), '%b %d, %Y').date()
storeview.delete('date_scheduled=?', (str(date),))
assert await expect.endswith(re.compile(r'Include Primary Care assignment information in the output\? NO// ', flags=re.MULTILINE|re.DOTALL))
proc.sendline() # default NO
assert await expect.endswith('\r\nNumber of copies: 1// ')
proc.sendline() # default 1
assert await expect.endswith('\r\nDEVICE: HOME// ')
proc.sendline() # default HOME
assert await expect.earliest('Right Margin: 80// ')
proc.sendline() # default 80 (maximum 256)
async for clinic in vista_appointment_clinics(proc, expect):
body = re.split(r'(?:^|\n)([ \d]\d:\d{2} [AP]M)\n\n', clinic['body'])
for i in range(1, len(body), 2):
item = clinic.copy()
del item['body']
item['time_scheduled'] = datetime.datetime.combine(item['date_scheduled'], datetime.datetime.strptime(body[i].strip(), '%I:%M %p').time()).replace(tzinfo=local_tzinfo)
detail = re.sub(r'\r\n(\s{0,9}\S)', r'\1', body[i + 1]) # collapse hard wrap
name, detail = detail.split('\n', 1)
item['flag'] = name[:10].strip()
assert (match := re.search(r'^(?P<patient_name>\w.*?)\s+(?P<patient_last4>\d{4}).*?$', name[10:]))
item.update(match.groupdict())
match = re.search(r'^\s{41}\b(?P<location_ward>.*?)\n\s{41}\b(?P<location_bed>.*?)$', detail, re.MULTILINE)
if match:
item.update(match.groupdict())
match = re.search(r'^\s{10}Phone #: (?P<patient_phone>.*)$', detail, re.MULTILINE)
if match:
item.update(match.groupdict())
item['comment'] = '\r\n'.join(m.group(1) for m in re.finditer(r'^\s{15}(\w.*?)$', detail, re.MULTILINE))
yield item
proc.sendline('^Patient information AND OE/RR')
async for prompt, response in expect.promptmatches((
(re.compile(r' Press \'RETURN\' to continue, \'\^\' to stop: $'), None),
('Select Patient Information and OE/RR Option: ', None, True),
('Select Patient Information and OE/RR <TEST ACCOUNT> Option: ', None, True),
), throw=True):
if prompt.index == 0:
proc.sendline(response)
expect.clear()
async def vista_appointment_clinics(proc, expect):
"""List appointments by clinic, collecting all pages"""
item = None
while True:
m = await expect.earliest('\x07')
if m:
try:
head, body = re.split(r'\n_{10,}\n', m.before.replace('\r\n', '\n'))
except ValueError:
logger.warning('invalid page %r'%m.before)
continue
if body.strip() not in {'No activity found for this clinic date!', 'Clinic cancelled for this date!'}:
assert (m1 := re.search(r'^Date printed: (?P<time_printed>.*?)\s+Page: (?P<print_page>\d+)$', head, re.MULTILINE))
assert (m2 := re.search(r'Appointments for (?P<clinic>.+) clinic on (?P<day_scheduled>[A-Z]+) (?P<date_scheduled>(?:[A-Z]+) (?:[0-9]+), (?:[0-9]+))', head))
if int(m1.group('print_page')) > 1:
# next page of same report
assert datetime.datetime.strptime(m1.group('time_printed'), '%b %d,%Y@%H:%M') == item['time_printed']
assert m2.group('clinic') == item['clinic']
assert m2.group('day_scheduled') == item['day_scheduled']
assert datetime.datetime.strptime(m2.group('date_scheduled'), '%b %d, %Y').date() == item['date_scheduled']
item['body'] += '\r\n' + body.rstrip('\r\n') # concatenate report bodies
else:
# new report
if item is not None:
yield item
item = {
'time_printed': datetime.datetime.strptime(m1.group('time_printed'), '%b %d,%Y@%H:%M'),
'clinic': m2.group('clinic'),
'day_scheduled': m2.group('day_scheduled'),
'date_scheduled': datetime.datetime.strptime(m2.group('date_scheduled'), '%b %d, %Y').date(),
'body': body.strip('\r\n')
}
m1 = re.search(r'Appointments for (?P<clinic>.+) clinic on (?P<day>[A-Z]+) (?P<date>(?:[A-Z]+) (?:[0-9]+), (?:[0-9]+))', head)
assert await expect.endswith('\r\nPress return to continue or "^" to escape ')
proc.sendline()
else: # leftovers
if item is not None:
yield item
break