nuVistA/util.py
2023-04-29 18:34:37 -04:00

249 lines
12 KiB
Python

#!/usr/bin/env python3
import re
import time
import datetime
import itertools
import sqlite3
import asyncio
import threading
from weakref import WeakValueDictionary
try:
from cPickle import dumps, loads
except ImportError:
from pickle import dumps, loads
from typing import Any, Union, AsyncGenerator, Iterable, Tuple, Callable
class Cached(object):
def __init__(self, base, ts=None):
self._base = base
self._ts = ts
def __getattr__(self, key):
return getattr(self._base, key)
def __getitem__(self, key):
return getitem(self._base, key)
class Store(object):
def __init__(self, database: Union[sqlite3.Connection, str]=':memory:', synchronous: bool=None, journal_mode: bool=None, default_factory: Union[Callable, None]=None):
self._db = database if isinstance(database, sqlite3.Connection) else sqlite3.connect(database, check_same_thread=False)
if synchronous:
self._db.execute(f'PRAGMA synchronous = {synchronous}')
if journal_mode:
self._db.execute(f'PRAGMA journal_mode = {journal_mode}')
self._mappings = WeakValueDictionary()
self._default_factory = default_factory
self.execute = self._db.execute
self.commit = self._db.commit
self.__enter__ = self._db.__enter__
self.__exit__ = self._db.__exit__
def __getitem__(self, key: str) -> 'Mapping':
if key not in self._mappings:
self._mappings[key] = res = Mapping(database=self, table=key)
return self._mappings[key]
def __delitem__(self, key: str):
with self._db:
self._db.execute(f'DROP TABLE "{key}"')
del self._mappings[key]
__getattr__ = __getitem__; __delattr__ = __delitem__
class Mapping(object):
def __init__(self, database: Union[Store, sqlite3.Connection, str]=':memory:', table: str='store'):
self._store = database if isinstance(database, Store) else Store(database)
self._tbl = table
self.commit = self._store.commit
with self._store._db:
self._store.execute(f'CREATE TABLE IF NOT EXISTS "{self._tbl}" (key TEXT PRIMARY KEY, value BLOB, ts FLOAT)')
self._store.execute(f'CREATE INDEX IF NOT EXISTS "{self._tbl}_ts" ON "{self._tbl}" (ts)')
def __enter__(self):
return self._store.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
return self._store.__exit__(exc_type, exc_val, exc_tb)
def count(self, ttl: float=float('inf'), now: float=0) -> int:
return self._store.execute(f'SELECT COUNT(*) FROM "{self._tbl}" WHERE ts > ?', ((now or time.time()) - ttl,)).fetchone()[0]
def has(self, key: str, ttl: float=float('inf'), now: float=0) -> bool:
for row in self._store.execute(f'SELECT 1 FROM "{self._tbl}" WHERE key = ? AND ts > ? LIMIT 1', (key, (now or time.time()) - ttl)):
return True
return False
def get(self, key: Union[str, slice], ttl: float=float('inf'), now: float=0, **kw) -> Any:
if isinstance(key, slice):
key, ttl, now = key.start, key.stop, key.step
for row in self._store.execute(f'SELECT value, ts FROM "{self._tbl}" WHERE key = ? AND ts > ? LIMIT 1', (key, (now or time.time()) - ttl)):
return Cached(loads(row[0]), row[1])
if 'default' in kw:
return kw['default']
elif self._store._default_factory is not None:
return self.set(key, self._store._default_factory(), now=(now or time.time()))
raise KeyError(key)
def set(self, key: str, value: Any, now: float=0, commit: bool=False) -> Any:
self._store.execute(f'REPLACE INTO "{self._tbl}" (key, value, ts) VALUES (?, ?, ?)', (key, dumps(value), now or time.time()))
if commit:
self._store.commit()
return value
def remove(self, key: str, commit: bool=False) -> None:
self._store.execute(f'DELETE FROM "{self._tbl}" WHERE key = ?', (key,))
if commit:
self._store.commit()
def keys(self, ttl: float=float('inf'), now: float=0) -> Iterable[str]:
return (row[0] for row in self._store.execute(f'SELECT key FROM "{self._tbl}" WHERE ts > ? ORDER BY rowid', ((now or time.time()) - ttl,)))
def values(self, ttl: float=float('inf'), now: float=0) -> Iterable:
return (loads(row[0]) for row in self._store.execute(f'SELECT value FROM "{self._tbl}" WHERE ts > ? ORDER BY rowid', ((now or time.time()) - ttl,)))
def items(self, ttl: float=float('inf'), now: float=0) -> Iterable[Tuple[str, Any]]:
return ((row[0], loads(row[1])) for row in self._store.execute(f'SELECT key, value FROM "{self._tbl}" WHERE ts > ? ORDER BY rowid', ((now or time.time()) - ttl,)))
def clear(self, ttl: float=0, now: float=0, commit: bool=False) -> None:
self._store.execute(f'DELETE FROM "{self._tbl}" WHERE ts <= ?', ((now or time.time()) - ttl,)) if ttl > 0 else self._store.execute(f'DELETE FROM "{self._tbl}"')
if commit:
self._store.commit()
__len__ = count; __contains__ = has; __getitem__ = get; __setitem__ = set; __delitem__ = remove; __iter__ = keys
class CacheProxy(object):
def __init__(self, obj):
self._obj = obj
self._mapping = {}
def _cache(self, key, cache, prefix='', ttl=0):
if key is None or isinstance(key, str):
self._mapping[key] = (cache, prefix, ttl)
if key:
try:
delattr(self, key)
except AttributeError:
pass
else:
for k in key:
self._cache(k, cache, prefix, ttl)
return self
def __getattr__(self, key):
if key in self._mapping:
cache, prefix, ttl = self._mapping[key]
elif None in self._mapping:
cache, prefix, ttl = self._mapping[None]
else:
return getattr(self._obj, key)
if cache is None:
return getattr(self._obj, key)
if asyncio.iscoroutinefunction(value := getattr(self._obj, key)):
lock = asyncio.Lock()
async def fetch(*args, _cache_key, **kw):
async with lock:
with cache:
res = cache[_cache_key] = await value(*args, **kw)
return res
async def thunk(*args, _cache_ttl: float=ttl, _cache_stale: bool=True, **kw):
_cache_key = prefix + key + repr(args) + repr(kw)
try:
return cache[_cache_key:_cache_ttl]
except KeyError:
kw['_cache_key'] = _cache_key
if _cache_stale and cache.has(_cache_key):
asyncio.ensure_future(fetch(*args, **kw))
return cache[_cache_key]
return await fetch(*args, **kw)
elif callable(value):
lock = threading.Lock()
def fetch(*args, _cache_key, **kw):
with lock, cache:
res = cache[_cache_key] = value(*args, **kw)
return res
def thunk(*args, _cache_ttl: float=ttl, _cache_stale: bool=True, **kw):
_cache_key = prefix + key + repr(args) + repr(kw)
try:
return cache[_cache_key:_cache_ttl]
except KeyError:
kw['_cache_key'] = _cache_key
if _cache_stale and cache.has(_cache_key):
threading.Thread(target=fetch, args=args, kwargs=kw).start()
return cache[_cache_key]
return fetch(*args, **kw)
else:
return value
thunk.cached = True
setattr(self, key, thunk)
return thunk
class SyncProxy(object):
def __init__(self, obj, loop=None):
self._obj = obj
self._loop = loop or asyncio.get_event_loop()
def __getattr__(self, key):
if asyncio.iscoroutinefunction(value := getattr(self._obj, key)):
setattr(self, key, (thunk := lambda *args, **kw: asyncio.run_coroutine_threadsafe(value(*args, **kw), loop=self._loop).result()))
return thunk
elif callable(value):
setattr(self, key, value)
return value
else:
return value
re_dt_fileman = r'(?P<dt_fileman>(\d{3})(\d{2})(\d{2})(?:\.(\d{2})?(\d{2})?(\d{2})?)?)' # George Timson's format
re_dt_today = r'(?P<dt_today>TODAY|T)' # today
re_dt_now = r'(?P<dt_now>NOW|N)' # now
re_dt_mdy = r'(?P<dt_mdy>(\d{1,2})[^\w@?]+(\d{1,2})[^\w@?]+(\d{4}|\d{2})\s*)' # m/d/yy, m/d/yyyy
re_dt_ymd = r'(?P<dt_ymd>(\d{4})[^\w@?]+(\d{1,2})[^\w@?]+(\d{1,2})\s*)' # yyyy/m/d
re_dt_yyyymmdd = r'(?P<dt_yyyymmdd>(\d{4})(\d{2})(\d{2}))' # yyyymmdd
re_dt_Mdy = r'(?P<dt_Mdy>([A-Z]{3,})[^\w@?]+(\d{1,2})[^\w@?]+(\d{4}|\d{2})\s*)' # M/d/yy, M/d/yyyy
re_dt_dMy = r'(?P<dt_dMy>(\d{1,2})[^\w@?]+([A-Z]{3,})[^\w@?]+(\d{4}|\d{2})\s*)' # d/M/yy, d/M/yyyy
re_dt_md = r'(?P<dt_md>(\d{1,2})[^\w@?]+(\d{1,2})\s*)' # m/d
re_dt_offset = r'(?P<offset>([-+]\d+)(H|W|M)?)' # +#U
re_dt_time = r'(?:@?(?P<time>(\d{1,2}):?(\d{1,2})))' # time
re_dt_ext = r'(?P<ext>[<>])' # (nonstandard extension)
rx_dt = re.compile(f'^{re_dt_fileman}|(?:(?:{re_dt_today}|{re_dt_now}|{re_dt_mdy}|{re_dt_ymd}|{re_dt_yyyymmdd}|{re_dt_Mdy}|{re_dt_dMy}|{re_dt_md})?{re_dt_offset}?{re_dt_time}?{re_dt_ext}?)$', re.IGNORECASE)
def vista_strptime(s: str) -> datetime.datetime:
"""Parse VistA-style datetime strings into Python datetime.datetime objects"""
if m := rx_dt.match(s.strip().lower()):
m = m.groupdict()
if m['dt_fileman']:
m1 = re.match(re_dt_fileman, m['dt_fileman'])
return datetime.datetime(year=1700 + int(m1.group(2)), month=int(m1.group(3)), day=int(m1.group(4)), hour=int(m1.group(5) or 0), minute=int(m1.group(6) or 0), second=int(m1.group(7) or 0))
date = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
if m['dt_today']: pass
elif m['dt_now']: date = datetime.datetime.now()
elif m['dt_mdy']: date = date.replace(month=int((m1 := re.match(re_dt_mdy, m['dt_mdy'], flags=re.I)).group(2)), day=int(m1.group(3)), year=vista_strptime_year(int(m1.group(4)), date))
elif m['dt_ymd']: date = date.replace(year=int((m1 := re.match(re_dt_ymd, m['dt_ymd'], flags=re.I)).group(2)), month=int(m1.group(3)), day=int(m1.group(4)))
elif m['dt_yyyymmdd']: date = date.replace(year=int((m1 := re.match(re_dt_yyyymmdd, m['dt_yyyymmdd'], flags=re.I)).group(2)), month=int(m1.group(3)), day=int(m1.group(4)))
elif m['dt_Mdy']: date = date.replace(month=vista_strptime_month((m1 := re.match(re_dt_Mdy, m['dt_Mdy'], flags=re.I)).group(2)), day=int(m1.group(3)), year=vista_strptime_year(int(m1.group(4)), date))
elif m['dt_dMy']: date = date.replace(day=int((m1 := re.match(re_dt_dMy, m['dt_dMy'], flags=re.I)).group(2)), month=vista_strptime_month(m1.group(3)), year=vista_strptime_year(int(m1.group(4)), date))
elif m['dt_md']: date = date.replace(month=int((m1 := re.match(re_dt_md, m['dt_md'], flags=re.I)).group(2)), day=int(m1.group(3)))
time = datetime.time()
if m['time']:
if m['dt_now']:
raise ValueError('cannot specify NOW with time')
m1 = re.match(re_dt_time, m['time'], flags=re.I)
date = date.replace(hour=int(m1.group(2)), minute=int(m1.group(3)))
if m['offset']:
m1 = re.match(re_dt_offset, m['offset'], flags=re.I)
if (offset_unit := m1.group(3)) == 'h' and (m['time'] or m['dt_today']):
raise ValueError('cannot specify time or TODAY with H offset')
date = vista_strptime_offset(date, int(m1.group(2)), offset_unit or 'd')
if m['ext']:
if m['ext'] == '<':
date = date.replace(hour=0, minute=0, second=0, microsecond=0)
elif m['ext'] == '>':
date = date.replace(hour=23, minute=59, second=59, microsecond=999999)
if date.year < 1800:
raise ValueError('cannot specify year before 1800')
return date
else:
raise ValueError(f'invalid date/time {s}')
def vista_strptime_year(y: int, today: datetime.datetime) -> int:
"""Promote years to 4 digits"""
return y if y >= 1000 else y2000 if (y2000 := y + 2000) < today.year + 20 else y + 1900
def vista_strptime_month(m: str, mapping: dict={'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12}) -> int:
"""Convert en-US month names to integers"""
return mapping[m[:3]]
def vista_strptime_offset(base: datetime.datetime, offset: int, suffix: str, mapping: dict={'h': 'hours', 'd': 'days', 'w': 'weeks', 'm': 'months'}) -> datetime.datetime:
"""Apply datetime offset"""
return (base + datetime.timedelta(**{mapping[suffix]: offset})) if suffix != 'm' else base.replace(month=month) if (month := base.month + offset) <= 12 else base.replace(month=month%12, year=base.year + month//12)
def vista_strftime(date: Union[datetime.datetime, datetime.date]) -> str:
"""Convert Python datetime.datetime objects into conventional FileMan/Timson format"""
return f'{date.year - 1700:03}{date.month:02}{date.day:02}' if isinstance(date, datetime.date) else f'{date.year - 1700:03}{date.month:02}{date.day:02}.{date.hour:02}{date.minute:02}{date.second:02}'
def vista_datefloat(date: Union[datetime.datetime, datetime.date]) -> float:
"""Convert Python datetime.datetime objects into floating point FileMan/Timson format"""
res = 10000*(date.year - 1700) + 100*date.month + date.day
return (res + (time := date.time()).hour/100 + time.minute/10000 + time.second/1000000 + time.microsecond/1000000000000) if isinstance(date, datetime.datetime) else res
def validate(s):
for c in s:
if (not 32 <= (x := ord(c)) <= 126) or x == 94:
raise ValueError(f'{repr(c)} not allowed in string')