#!/usr/bin/env python3 import re import uuid import time import json import sqlite3 from typing import Optional, Union, Sequence class KVStore(object): """Provide a generic key-value store backed by SQLite""" def __init__(self, database: Union[str, sqlite3.Connection]=':memory:'): self.db = database if isinstance(database, sqlite3.Connection) else sqlite3.connect(database) def put(self, obj): raise NotImplementedError def get(self, uid): raise KeyError class KVStoreView(object): """Hold a reference to a view""" def __init__(self, store: KVStore, view_name: str, schema_name: str, attributes: Sequence[str]): self.store = store self.view_name = view_name self.schema_name = schema_name def put(self, obj, **kw): """Insert an object""" return self.store.put(obj, schema=self.schema_name, **kw) def get(self, uid, **kw): """Retrieve an object, with optional default""" res = self.store.get(uid, **kw) if 'schema' in res and res['schema'] != self.schema_name: raise KeyError(uid) def delete(self, where: str, params: Sequence): """Remove underlying data""" return self.store.view_delete(self.view_name, where, params) class KVStoreEAV(KVStore): """Provide a key-value store backed by SQLite with EAV pattern""" def __init__(self, database: Union[str, sqlite3.Connection]=':memory:', table_name: str='eavstore'): KVStore.__init__(self, database) self.table_name = table_name self.table_name_q = quote_identifier(table_name) self.entity_column_name = 'uid' self.attribute_column_name = '__attribute__' self.value_column_name = '__value__' self.data_attribute_name = '__data__' self.db.execute(f'CREATE TABLE IF NOT EXISTS {self.table_name_q} ({self.entity_column_name} TEXT, {self.attribute_column_name} TEXT, {self.value_column_name} INTEGER)') self.db.execute(f'CREATE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_entity")} on {self.table_name_q} ({self.entity_column_name})') self.db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_attribute")} on {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name})') self.db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_value")} on {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name}, {self.value_column_name})') def put(self, obj, **kw): """Insert an object""" if 'uid' in obj: self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.entity_column_name}=?', (obj['uid'],)) else: obj['uid'] = uuid.uuid4().hex if 'ts' not in obj: obj['ts'] = time.time() if 'schema' in kw: obj['schema'] = kw['schema'] uid = obj['uid'] self.db.executemany(f'INSERT INTO {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name}, {self.value_column_name}) VALUES (?, ?, ?)', ((uid, k, v) for k, v in obj.items() if k != 'uid')) if self.data_attribute_name not in obj: self.db.execute(f'INSERT INTO {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name}, {self.value_column_name}) VALUES (?, ?, ?)', (uid, self.data_attribute_name, json.dumps(obj, default=str, separators=(',', ':')))) return obj def get(self, uid, **kw): """Retrieve an object, with optional default""" obj = {k: v for k, v in self.db.execute(f'SELECT {self.attribute_column_name}, {self.value_column_name} FROM {self.table_name_q} WHERE {self.entity_column_name}=?', (uid,))} if len(obj) > 0: obj['uid'] = uid return obj elif 'default' in kw: return kw['default'] else: raise KeyError(uid) def ensure_view(self, view_name: str, schema_name: str, attributes: Sequence[str], clear: bool=False) -> KVStoreView: """Create a view over the key-value schema, optionally removing existing underlying data""" self.db.execute(f'DROP VIEW IF EXISTS {quote_identifier(view_name)}') self.db.execute(eav_query_ensure_view(view_name, self.table_name, self.entity_column_name, self.attribute_column_name, self.value_column_name, schema_name, attributes)) if clear: self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.entity_column_name} IN (SELECT {self.entity_column_name} FROM {quote_identifier(view_name)} WHERE schema=?)', (schema_name,)) return KVStoreView(self, view_name, schema_name, attributes) def view_delete(self, view_name: str, where: str, params: Sequence): """Remove underlying data""" self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.entity_column_name} IN (SELECT {self.entity_column_name} FROM {quote_identifier(view_name)} WHERE ' + where + ')', params) class KVStoreJSON(KVStore): """Provide a key-value store backed by SQLite with JSON rows""" def __init__(self, database: Union[str, sqlite3.Connection]=':memory:', table_name: str='jsonstore'): KVStore.__init__(self, database) self.table_name = table_name self.table_name_q = quote_identifier(table_name) self.uid_column_name = 'uid' self.data_column_name = '__data__' self.db.execute(f'CREATE TABLE IF NOT EXISTS {self.table_name_q} ({self.uid_column_name} TEXT, {self.data_column_name} JSON)') self.db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_uid")} ON {self.table_name_q} ({self.uid_column_name})') self.db.execute(f'CREATE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_ts")} ON {self.table_name_q} (JSON_EXTRACT({self.data_column_name}, "$.ts"))') self.db.execute(f'CREATE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_schema")} ON {self.table_name_q} (JSON_EXTRACT({self.data_column_name}, "$.schema"))') def put(self, obj, **kw): """Insert an object""" obj = obj.copy() if 'uid' not in obj: obj['uid'] = uuid.uuid4().hex if 'ts' not in obj: obj['ts'] = time.time() if 'schema' in kw: obj['schema'] = kw['schema'] self.db.execute(f'INSERT INTO {self.table_name_q} (uid, {self.data_column_name}) VALUES (?, JSON(?))', (obj['uid'], json.dumps(obj, default=str, separators=(',', ':')))) return obj def get(self, uid, **kw): """Retrieve an object, with optional default""" for row in self.db.execute(f'SELECT {self.data_column_name} FROM {self.table_name_q} WHERE uid=? LIMIT 1', (uid,)): return json.loads(row[0]) if 'default' in kw: return kw['default'] else: raise KeyError(uid) def ensure_view(self, view_name: str, schema_name: str, attributes: Sequence[str], clear: bool=False) -> KVStoreView: """Create a view over the key-value schema, optionally removing existing underlying data""" self.db.execute(f'DROP VIEW IF EXISTS {quote_identifier(view_name)}') self.db.execute(json_query_ensure_view(view_name, self.table_name, self.uid_column_name, self.data_column_name, schema_name, attributes)) if clear: self.db.execute(f'DELETE FROM {self.table_name_q} WHERE JSON_EXTRACT({self.data_column_name}, "$.schema")=?', (schema_name,)) return KVStoreView(self, view_name, schema_name, attributes) def view_delete(self, view_name: str, where: str, params: Sequence): """Remove underlying data""" self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.uid_column_name} IN (SELECT {self.uid_column_name} FROM {quote_identifier(view_name)} WHERE ' + where + ')', params) def quote_string(s: str) -> str: """Make string safe for SQLite""" return '"' + s.replace('"', '""') + '"' def quote_identifier(s: str) -> str: """Make string safe as SQLite identifier""" if re.match(r'^[A-Za-z][0-9A-Za-z_]*$', s): return s else: return '"' + s.replace('"', '""') + '"' def eav_query_pivot(table_name: str, entity_column_name: str, attribute_column_name: str, value_column_name: str, attributes: Sequence[str]) -> str: """Group EAV rows into traditional rows""" if entity_column_name in attributes: attributes = list(attributes) attributes.remove(entity_column_name) q = f'SELECT {quote_identifier(entity_column_name)}\n' for attr in attributes: q += f', MAX(CASE WHEN {quote_identifier(attribute_column_name)}={quote_string(attr)} THEN {quote_identifier(value_column_name)} END) {quote_identifier(attr)}\n' q += f'FROM {quote_identifier(table_name)} GROUP BY {quote_identifier(entity_column_name)}' return q def eav_query_ensure_view(view_name: str, table_name: str, entity_column_name: str, attribute_column_name: str, value_column_name: str, schema_name: str, attributes: Sequence[str]) -> str: """Generate SQL to create a view over the grouped EAV data""" if 'schema' not in attributes: attributes = tuple(attributes) + ('schema',) if entity_column_name in attributes: attributes = list(attributes) attributes.remove(entity_column_name) return f'CREATE TEMPORARY VIEW IF NOT EXISTS {quote_identifier(view_name)} ({quote_identifier(entity_column_name)}, ' +\ ', '.join(quote_identifier(attr) for attr in attributes) + ') AS\n' +\ eav_query_pivot(table_name, entity_column_name, attribute_column_name, value_column_name, attributes) +\ f'\nHAVING schema={quote_string(schema_name)}' def json_query_pivot(table_name: str, uid_column_name: str, data_column_name: str, attributes: Sequence[str]) -> str: """Expand JSON rows into traditional rows""" if uid_column_name in attributes: attributes = list(attributes) attributes.remove(uid_column_name) return f'SELECT {quote_identifier(uid_column_name)}, ' +\ ', '.join(f'JSON_EXTRACT({quote_identifier(data_column_name)}, {quote_identifier("$." + attr)}) AS {quote_identifier(attr)}' for attr in attributes) +\ f' FROM {quote_identifier(table_name)}' def json_query_ensure_view(view_name: str, table_name: str, uid_column_name: str, data_column_name: str, schema_name: str, attributes: Sequence[str]) -> str: """Generate SQL to create a view over the expanded JSON data""" if 'schema' not in attributes: attributes = tuple(attributes) + ('schema',) if uid_column_name in attributes: attributes = list(attributes) attributes.remove(uid_column_name) return f'CREATE TEMPORARY VIEW IF NOT EXISTS {quote_identifier(view_name)} ({quote_identifier(uid_column_name)}, ' +\ ', '.join(quote_identifier(attr) for attr in attributes) + ') AS\n' +\ json_query_pivot(table_name, uid_column_name, data_column_name, attributes) +\ f'\nWHERE JSON_EXTRACT({quote_identifier(data_column_name)}, {quote_identifier("$.schema")})={quote_string(schema_name)}'