vistassh-py/util.py
2024-03-02 00:45:21 -05:00

122 lines
7.0 KiB
Python

#!/usr/bin/env python3
import re
import datetime
import itertools
from collections import defaultdict
from typing import Any, Union, AsyncGenerator, Iterable
class BucketCache(object):
"""Cache data in buckets, with customizable bucket keys and fetch callback"""
def __init__(self, data: dict=None):
self.data = defaultdict(list, {} if data is None else data)
def add(self, value) -> None:
"""Add one value"""
self.data[bkt := self.bucket(self.bucketkey(value))].append(value)
def update(self, iterable: Iterable) -> None:
"""Add multiple values"""
for value in iterable:
self.add(iterable)
async def retrieve(self, key_start, key_stop, ctx=None) -> AsyncGenerator[Any, None]:
"""Retrieve range, calling fetch callback for fresh data if needed"""
bkt_start = self.bucket(key_start)
bkt_stop = self.bucket(key_stop)
missing = (idx for idx in range(bkt_start, bkt_stop + 1) if idx not in self.data)
for k, g in itertools.groupby(missing, key=(lambda n, c=itertools.count(): n - next(c))): # group values into ranges
async for value in self.fetch(self.unbucket((g := tuple(g))[0]), self.unbucket(g[-1]), ctx):
if bkt_start <= self.bucket(self.bucketkey(value)) <= bkt_stop:
self.add(value)
for idx in range(bkt_start, bkt_stop + 1): # create empty bucket if not exists
for value in self.data[idx]:
if key_start <= self.bucketkey(value) <= key_stop:
yield value
@staticmethod
def bucketkey(value):
"""Get bucket key from value"""
raise NotImplementedError
@staticmethod
def bucket(key) -> int:
"""Get bucket index from bucket key"""
raise NotImplementedError
@staticmethod
def unbucket(idx: int):
raise NotImplementedError
@staticmethod
async def fetch(start, stop, ctx=None) -> AsyncGenerator[Any, None]:
raise NotImplementedError
class TimeSeriesCache(BucketCache):
"""Cache time series data in daily buckets"""
@staticmethod
def bucket(key: datetime.datetime) -> int:
return ((key.date() if isinstance(key, datetime.datetime) else key) - datetime.date(1970, 1, 1)).days
@staticmethod
def unbucket(idx: int) -> datetime.datetime:
return datetime.date(1970, 1, 1) + datetime.timedelta(days=idx)
re_dt_fileman = r'(?P<dt_fileman>(\d{3})(\d{2})(\d{2})\.(\d{2})(\d{2})(\d{2}))' # George Timson's format
re_dt_today = r'(?P<dt_today>T)' # today
re_dt_now = r'(?P<dt_now>N)' # now
re_dt_mdy = r'(?P<dt_mdy>(\d{1,2})[^\w@?]+(\d{1,2})[^\w@?]+(\d{2}|\d{4})\s*)' # m/d/yy, m/d/yyyy
re_dt_ymd = r'(?P<dt_ymd>(\d{4})[^\w@?]+(\d{1,2})[^\w@?]+(\d{1,2})\s*)' # yyyy/m/d
re_dt_yyyymmdd = r'(?P<dt_yyyymmdd>(\d{4})(\d{2})(\d{2}))' # yyyymmdd
re_dt_Mdy = r'(?P<dt_Mdy>([A-Z]{3,})[^\w@?]+(\d{1,2})[^\w@?]+(\d{2}|\d{4})\s*)' # M/d/yy, M/d/yyyy
re_dt_dMy = r'(?P<dt_dMy>((\d{1,2})[^\w@?]+[A-Z]{3,})[^\w@?]+(\d{2}|\d{4})\s*)' # d/M/yy, d/M/yyyy
re_dt_md = r'(?P<dt_md>(\d{1,2})[^\w@?]+(\d{1,2})\s*)' # m/d
re_dt_offset = r'(?P<offset>([-+]\d+)(H|W|M)?)' # +#U
re_dt_time = r'(?:@?(?P<time>(\d{1,2}):?(\d{1,2})))' # time
re_dt_ext = r'(?P<ext>[<>~])' # (nonstandard extension)
rx_dt = re.compile(f'^{re_dt_fileman}|(?:(?:{re_dt_today}|{re_dt_now}|{re_dt_mdy}|{re_dt_ymd}|{re_dt_yyyymmdd}|{re_dt_Mdy}|{re_dt_dMy}|{re_dt_md})?{re_dt_offset}?{re_dt_time}?{re_dt_ext}?)$', re.IGNORECASE)
def vista_strptime(s: str) -> datetime.datetime:
"""Parse VistA-style datetime strings into Python datetime.datetime objects"""
if m := rx_dt.match(s.strip().lower()):
m = m.groupdict()
if m['dt_fileman']:
m1 = re.match(re_dt_fileman, m['dt_fileman'])
return datetime.datetime(year=1700 + int(m1.group(2)), month=int(m1.group(3)), day=int(m1.group(4)))
date = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
if m['dt_today']: pass
elif m['dt_now']: date = datetime.datetime.now()
elif m['dt_mdy']: date = date.replace(month=int((m1 := re.match(re_dt_mdy, m['dt_mdy'], flags=re.I)).group(2)), day=int(m1.group(3)), year=vista_strptime_year(int(m1.group(4)), date))
elif m['dt_ymd']: date = date.replace(year=int((m1 := re.match(re_dt_ymd, m['dt_ymd'], flags=re.I)).group(2)), month=int(m1.group(3)), day=int(m1.group(4)))
elif m['dt_yyyymmdd']: date = date.replace(year=int((m1 := re.match(re_dt_yyyymmdd, m['dt_yyyymmdd'], flags=re.I)).group(2)), month=int(m1.group(3)), day=int(m1.group(4)))
elif m['dt_Mdy']: date = date.replace(month=vista_strptime_month((m1 := re.match(re_dt_Mdy, m['dt_Mdy'], flags=re.I)).group(2)), day=int(m1.group(3)), year=vista_strptime_year(int(m1.group(4)), date))
elif m['dt_dMy']: date = date.replace(day=int((m1 := re.match(re_dt_dMy, m['dt_dMy'], flags=re.I)).group(2)), month=vista_strptime_month(m1.group(3)), year=vista_strptime_year(int(m1.group(4)), date))
elif m['dt_md']: date = date.replace(month=int((m1 := re.match(re_dt_md, m['dt_md'], flags=re.I)).group(2)), day=int(m1.group(3)))
time = datetime.time()
if m['time']:
if m['dt_now']:
raise ValueError('cannot specify time with N or H offset')
m1 = re.match(re_dt_time, m['time'], flags=re.I)
date = date.replace(hour=int(m1.group(2)), minute=int(m1.group(3)))
if m['offset']:
m1 = re.match(re_dt_offset, m['offset'], flags=re.I)
if (offset_unit := m1.group(3)) == 'h' and (m['time'] or m['dt_today']):
raise ValueError('cannot specify time or T with H offset')
date = vista_strptime_offset(date, int(m1.group(2)), offset_unit or 'd')
if m['ext']:
if m['ext'] == '<':
date = date.replace(hour=0, minute=0, second=0, microsecond=0)
elif m['ext'] == '>':
date = date.replace(hour=23, minute=59, second=59, microsecond=999999)
elif m['ext'] == '~':
date = date - datetime.timedelta(microseconds=1)
if date.year < 1800:
raise ValueError('cannot specify year before 1800')
return date
else:
raise ValueError('invalid date/time')
def vista_strptime_year(y: int, today: datetime.datetime) -> int:
"""Promote years to 4 digits"""
return y if y >= 1000 else y2000 if (y2000 := y + 2000) < today.year + 20 else y + 1900
def vista_strptime_month(m: str, mapping: dict={'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12}) -> int:
"""Convert en-US month names to integers"""
return mapping[m[:3]]
def vista_strptime_offset(base: datetime.datetime, offset: int, suffix: str, mapping: dict={'h': 'hours', 'd': 'days', 'w': 'weeks', 'm': 'months'}) -> datetime.datetime:
"""Apply datetime offset"""
return (base + datetime.timedelta(**{mapping[suffix]: offset})) if suffix != 'm' else base.replace(month=month) if (month := base.month + offset) <= 12 else base.replace(month=month%12, year=base.year + month//12)
def vista_strftime(date: Union[datetime.datetime, datetime.date]) -> str:
"""Convert Python datetime.datetime objects into conventional FileMan/Timson format"""
return f'{date.year - 1700:03}{date.month:02}{date.day:02}' if isinstance(date, datetime.date) else f'{date.year - 1700:03}{date.month:02}{date.day:02}.{date.hour:02}{date.minute:02}{date.second:02}'