#!/usr/bin/env python3 import re import time import datetime import itertools import sqlite3 import asyncio import threading from weakref import WeakValueDictionary try: from cPickle import dumps, loads except ImportError: from pickle import dumps, loads from typing import Any, Union, AsyncGenerator, Iterable, Tuple, Callable class Cached(object): def __init__(self, base, ts=None): self._base = base self._ts = ts def __getattr__(self, key): return getattr(self._base, key) def __getitem__(self, key): return getitem(self._base, key) class Store(object): def __init__(self, database: Union[sqlite3.Connection, str]=':memory:', synchronous: bool=None, journal_mode: bool=None, default_factory: Union[Callable, None]=None): self._db = database if isinstance(database, sqlite3.Connection) else sqlite3.connect(database, check_same_thread=False) if synchronous: self._db.execute(f'PRAGMA synchronous = {synchronous}') if journal_mode: self._db.execute(f'PRAGMA journal_mode = {journal_mode}') self._mappings = WeakValueDictionary() self._default_factory = default_factory self.execute = self._db.execute self.commit = self._db.commit self.__enter__ = self._db.__enter__ self.__exit__ = self._db.__exit__ def __getitem__(self, key: str) -> 'Mapping': if key not in self._mappings: self._mappings[key] = res = Mapping(database=self, table=key) return self._mappings[key] def __delitem__(self, key: str): with self._db: self._db.execute(f'DROP TABLE "{key}"') del self._mappings[key] __getattr__ = __getitem__; __delattr__ = __delitem__ class Mapping(object): def __init__(self, database: Union[Store, sqlite3.Connection, str]=':memory:', table: str='store'): self._store = database if isinstance(database, Store) else Store(database) self._tbl = table self.commit = self._store.commit with self._store._db: self._store.execute(f'CREATE TABLE IF NOT EXISTS "{self._tbl}" (key TEXT PRIMARY KEY, value BLOB, ts FLOAT)') self._store.execute(f'CREATE INDEX IF NOT EXISTS "{self._tbl}_ts" ON "{self._tbl}" (ts)') def __enter__(self): return self._store.__enter__() def __exit__(self, exc_type, exc_val, exc_tb): return self._store.__exit__(exc_type, exc_val, exc_tb) def count(self, ttl: float=float('inf'), now: float=0) -> int: return self._store.execute(f'SELECT COUNT(*) FROM "{self._tbl}" WHERE ts > ?', ((now or time.time()) - ttl,)).fetchone()[0] def has(self, key: str, ttl: float=float('inf'), now: float=0) -> bool: for row in self._store.execute(f'SELECT 1 FROM "{self._tbl}" WHERE key = ? AND ts > ? LIMIT 1', (key, (now or time.time()) - ttl)): return True return False def get(self, key: Union[str, slice], ttl: float=float('inf'), now: float=0, **kw) -> Any: if isinstance(key, slice): key, ttl, now = key.start, key.stop, key.step for row in self._store.execute(f'SELECT value, ts FROM "{self._tbl}" WHERE key = ? AND ts > ? LIMIT 1', (key, (now or time.time()) - ttl)): return Cached(loads(row[0]), row[1]) if 'default' in kw: return kw['default'] elif self._store._default_factory is not None: return self.set(key, self._store._default_factory(), now=(now or time.time())) raise KeyError(key) def set(self, key: str, value: Any, now: float=0, commit: bool=False) -> Any: self._store.execute(f'REPLACE INTO "{self._tbl}" (key, value, ts) VALUES (?, ?, ?)', (key, dumps(value), now or time.time())) if commit: self._store.commit() return value def remove(self, key: str, commit: bool=False) -> None: self._store.execute(f'DELETE FROM "{self._tbl}" WHERE key = ?', (key,)) if commit: self._store.commit() def keys(self, ttl: float=float('inf'), now: float=0) -> Iterable[str]: return (row[0] for row in self._store.execute(f'SELECT key FROM "{self._tbl}" WHERE ts > ? ORDER BY rowid', ((now or time.time()) - ttl,))) def values(self, ttl: float=float('inf'), now: float=0) -> Iterable: return (loads(row[0]) for row in self._store.execute(f'SELECT value FROM "{self._tbl}" WHERE ts > ? ORDER BY rowid', ((now or time.time()) - ttl,))) def items(self, ttl: float=float('inf'), now: float=0) -> Iterable[Tuple[str, Any]]: return ((row[0], loads(row[1])) for row in self._store.execute(f'SELECT key, value FROM "{self._tbl}" WHERE ts > ? ORDER BY rowid', ((now or time.time()) - ttl,))) def clear(self, ttl: float=0, now: float=0, commit: bool=False) -> None: self._store.execute(f'DELETE FROM "{self._tbl}" WHERE ts <= ?', ((now or time.time()) - ttl,)) if ttl > 0 else self._store.execute(f'DELETE FROM "{self._tbl}"') if commit: self._store.commit() __len__ = count; __contains__ = has; __getitem__ = get; __setitem__ = set; __delitem__ = remove; __iter__ = keys class CacheProxy(object): def __init__(self, obj): self._obj = obj self._mapping = {} def _cache(self, key, cache, prefix='', ttl=0): if key is None or isinstance(key, str): self._mapping[key] = (cache, prefix, ttl) if key: try: delattr(self, key) except AttributeError: pass else: for k in key: self._cache(k, cache, prefix, ttl) return self def __getattr__(self, key): if key in self._mapping: cache, prefix, ttl = self._mapping[key] elif None in self._mapping: cache, prefix, ttl = self._mapping[None] else: return getattr(self._obj, key) if cache is None: return getattr(self._obj, key) if asyncio.iscoroutinefunction(value := getattr(self._obj, key)): lock = asyncio.Lock() async def fetch(*args, _cache_key, **kw): async with lock: with cache: res = cache[_cache_key] = await value(*args, **kw) return res async def thunk(*args, _cache_ttl: float=ttl, _cache_stale: bool=False, **kw): _cache_key = prefix + key + repr(args) + repr(kw) try: return cache[_cache_key:_cache_ttl] except KeyError: kw['_cache_key'] = _cache_key if _cache_stale and cache.has(_cache_key): asyncio.ensure_future(fetch(*args, **kw)) return cache[_cache_key] return await fetch(*args, **kw) elif callable(value): lock = threading.Lock() def fetch(*args, _cache_key, **kw): with lock, cache: res = cache[_cache_key] = value(*args, **kw) return res def thunk(*args, _cache_ttl: float=ttl, _cache_stale: bool=False, **kw): _cache_key = prefix + key + repr(args) + repr(kw) try: return cache[_cache_key:_cache_ttl] except KeyError: kw['_cache_key'] = _cache_key if _cache_stale and cache.has(_cache_key): threading.Thread(target=fetch, args=args, kwargs=kw).start() return cache[_cache_key] return fetch(*args, **kw) else: return value thunk.cached = True setattr(self, key, thunk) return thunk class SyncProxy(object): def __init__(self, obj, loop=None): self._obj = obj self._loop = loop or asyncio.get_event_loop() def __getattr__(self, key): if asyncio.iscoroutinefunction(value := getattr(self._obj, key)): setattr(self, key, (thunk := lambda *args, **kw: asyncio.run_coroutine_threadsafe(value(*args, **kw), loop=self._loop).result())) return thunk elif callable(value): setattr(self, key, value) return value else: return value re_dt_fileman = r'(?P(\d{3})(\d{2})(\d{2})(?:\.(\d{2})?(\d{2})?(\d{2})?)?)' # George Timson's format re_dt_today = r'(?PTODAY|T)' # today re_dt_now = r'(?PNOW|N)' # now re_dt_mdy = r'(?P(\d{1,2})[^\w@?]+(\d{1,2})[^\w@?]+(\d{4}|\d{2})\s*)' # m/d/yy, m/d/yyyy re_dt_ymd = r'(?P(\d{4})[^\w@?]+(\d{1,2})[^\w@?]+(\d{1,2})\s*)' # yyyy/m/d re_dt_yyyymmdd = r'(?P(\d{4})(\d{2})(\d{2}))' # yyyymmdd re_dt_Mdy = r'(?P([A-Z]{3,})[^\w@?]+(\d{1,2})[^\w@?]+(\d{4}|\d{2})\s*)' # M/d/yy, M/d/yyyy re_dt_dMy = r'(?P(\d{1,2})[^\w@?]+([A-Z]{3,})[^\w@?]+(\d{4}|\d{2})\s*)' # d/M/yy, d/M/yyyy re_dt_md = r'(?P(\d{1,2})[^\w@?]+(\d{1,2})\s*)' # m/d re_dt_offset = r'(?P([-+]\d+)(H|W|M)?)' # +#U re_dt_time = r'(?:@?(?P