vistassh-py/kvstore.py

184 lines
10 KiB
Python
Raw Normal View History

2024-03-02 00:34:29 -05:00
#!/usr/bin/env python3
import re
import uuid
import time
import json
import sqlite3
from typing import Optional, Union, Sequence
class KVStore(object):
"""Provide a generic key-value store backed by SQLite"""
def __init__(self, database: Union[str, sqlite3.Connection]=':memory:'):
self.db = database if isinstance(database, sqlite3.Connection) else sqlite3.connect(database)
def put(self, obj):
raise NotImplementedError
def get(self, uid):
raise KeyError
class KVStoreView(object):
"""Hold a reference to a view"""
def __init__(self, store: KVStore, view_name: str, schema_name: str, attributes: Sequence[str]):
self.store = store
self.view_name = view_name
self.schema_name = schema_name
def put(self, obj, **kw):
"""Insert an object"""
return self.store.put(obj, schema=self.schema_name, **kw)
def get(self, uid, **kw):
"""Retrieve an object, with optional default"""
res = self.store.get(uid, **kw)
if 'schema' in res and res['schema'] != self.schema_name:
raise KeyError(uid)
def delete(self, where: str, params: Sequence):
"""Remove underlying data"""
return self.store.view_delete(self.view_name, where, params)
class KVStoreEAV(KVStore):
"""Provide a key-value store backed by SQLite with EAV pattern"""
def __init__(self, database: Union[str, sqlite3.Connection]=':memory:', table_name: str='eavstore'):
KVStore.__init__(self, database)
self.table_name = table_name
self.table_name_q = quote_identifier(table_name)
self.entity_column_name = 'uid'
self.attribute_column_name = '__attribute__'
self.value_column_name = '__value__'
self.data_attribute_name = '__data__'
self.db.execute(f'CREATE TABLE IF NOT EXISTS {self.table_name_q} ({self.entity_column_name} TEXT, {self.attribute_column_name} TEXT, {self.value_column_name} INTEGER)')
self.db.execute(f'CREATE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_entity")} on {self.table_name_q} ({self.entity_column_name})')
self.db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_attribute")} on {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name})')
self.db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_value")} on {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name}, {self.value_column_name})')
def put(self, obj, **kw):
"""Insert an object"""
if 'uid' in obj:
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.entity_column_name}=?', (obj['uid'],))
else:
obj['uid'] = uuid.uuid4().hex
if 'ts' not in obj:
obj['ts'] = time.time()
if 'schema' in kw:
obj['schema'] = kw['schema']
uid = obj['uid']
self.db.executemany(f'INSERT INTO {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name}, {self.value_column_name}) VALUES (?, ?, ?)', ((uid, k, v) for k, v in obj.items() if k != 'uid'))
if self.data_attribute_name not in obj:
self.db.execute(f'INSERT INTO {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name}, {self.value_column_name}) VALUES (?, ?, ?)', (uid, self.data_attribute_name, json.dumps(obj, default=str, separators=(',', ':'))))
return obj
def get(self, uid, **kw):
"""Retrieve an object, with optional default"""
obj = {k: v for k, v in self.db.execute(f'SELECT {self.attribute_column_name}, {self.value_column_name} FROM {self.table_name_q} WHERE {self.entity_column_name}=?', (uid,))}
if len(obj) > 0:
obj['uid'] = uid
return obj
elif 'default' in kw:
return kw['default']
else:
raise KeyError(uid)
def ensure_view(self, view_name: str, schema_name: str, attributes: Sequence[str], clear: bool=False) -> KVStoreView:
"""Create a view over the key-value schema, optionally removing existing underlying data"""
self.db.execute(f'DROP VIEW IF EXISTS {quote_identifier(view_name)}')
self.db.execute(eav_query_ensure_view(view_name, self.table_name, self.entity_column_name, self.attribute_column_name, self.value_column_name, schema_name, attributes))
if clear:
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.entity_column_name} IN (SELECT {self.entity_column_name} FROM {quote_identifier(view_name)} WHERE schema=?)', (schema_name,))
return KVStoreView(self, view_name, schema_name, attributes)
def view_delete(self, view_name: str, where: str, params: Sequence):
"""Remove underlying data"""
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.entity_column_name} IN (SELECT {self.entity_column_name} FROM {quote_identifier(view_name)} WHERE ' + where + ')', params)
class KVStoreJSON(KVStore):
"""Provide a key-value store backed by SQLite with JSON rows"""
def __init__(self, database: Union[str, sqlite3.Connection]=':memory:', table_name: str='jsonstore'):
KVStore.__init__(self, database)
self.table_name = table_name
self.table_name_q = quote_identifier(table_name)
self.uid_column_name = 'uid'
self.data_column_name = '__data__'
self.db.execute(f'CREATE TABLE IF NOT EXISTS {self.table_name_q} ({self.uid_column_name} TEXT, {self.data_column_name} JSON)')
self.db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_uid")} ON {self.table_name_q} ({self.uid_column_name})')
self.db.execute(f'CREATE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_ts")} ON {self.table_name_q} (JSON_EXTRACT({self.data_column_name}, "$.ts"))')
self.db.execute(f'CREATE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_schema")} ON {self.table_name_q} (JSON_EXTRACT({self.data_column_name}, "$.schema"))')
def put(self, obj, **kw):
"""Insert an object"""
obj = obj.copy()
if 'uid' not in obj:
obj['uid'] = uuid.uuid4().hex
if 'ts' not in obj:
obj['ts'] = time.time()
if 'schema' in kw:
obj['schema'] = kw['schema']
self.db.execute(f'INSERT INTO {self.table_name_q} (uid, {self.data_column_name}) VALUES (?, JSON(?))', (obj['uid'], json.dumps(obj, default=str, separators=(',', ':'))))
return obj
def get(self, uid, **kw):
"""Retrieve an object, with optional default"""
for row in self.db.execute(f'SELECT {self.data_column_name} FROM {self.table_name_q} WHERE uid=? LIMIT 1', (uid,)):
return json.loads(row[0])
if 'default' in kw:
return kw['default']
else:
raise KeyError(uid)
def ensure_view(self, view_name: str, schema_name: str, attributes: Sequence[str], clear: bool=False) -> KVStoreView:
"""Create a view over the key-value schema, optionally removing existing underlying data"""
self.db.execute(f'DROP VIEW IF EXISTS {quote_identifier(view_name)}')
self.db.execute(json_query_ensure_view(view_name, self.table_name, self.uid_column_name, self.data_column_name, schema_name, attributes))
if clear:
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE JSON_EXTRACT({self.data_column_name}, "$.schema")=?', (schema_name,))
return KVStoreView(self, view_name, schema_name, attributes)
def view_delete(self, view_name: str, where: str, params: Sequence):
"""Remove underlying data"""
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.uid_column_name} IN (SELECT {self.uid_column_name} FROM {quote_identifier(view_name)} WHERE ' + where + ')', params)
def quote_string(s: str) -> str:
"""Make string safe for SQLite"""
return '"' + s.replace('"', '""') + '"'
def quote_identifier(s: str) -> str:
"""Make string safe as SQLite identifier"""
if re.match(r'^[A-Za-z][0-9A-Za-z_]*$', s):
return s
else:
return '"' + s.replace('"', '""') + '"'
def eav_query_pivot(table_name: str, entity_column_name: str, attribute_column_name: str, value_column_name: str, attributes: Sequence[str]) -> str:
"""Group EAV rows into traditional rows"""
if entity_column_name in attributes:
attributes = list(attributes)
attributes.remove(entity_column_name)
q = f'SELECT {quote_identifier(entity_column_name)}\n'
for attr in attributes:
q += f', MAX(CASE WHEN {quote_identifier(attribute_column_name)}={quote_string(attr)} THEN {quote_identifier(value_column_name)} END) {quote_identifier(attr)}\n'
q += f'FROM {quote_identifier(table_name)} GROUP BY {quote_identifier(entity_column_name)}'
return q
def eav_query_ensure_view(view_name: str, table_name: str, entity_column_name: str, attribute_column_name: str, value_column_name: str, schema_name: str, attributes: Sequence[str]) -> str:
"""Generate SQL to create a view over the grouped EAV data"""
if 'schema' not in attributes:
attributes = tuple(attributes) + ('schema',)
if entity_column_name in attributes:
attributes = list(attributes)
attributes.remove(entity_column_name)
return f'CREATE TEMPORARY VIEW IF NOT EXISTS {quote_identifier(view_name)} ({quote_identifier(entity_column_name)}, ' +\
', '.join(quote_identifier(attr) for attr in attributes) + ') AS\n' +\
eav_query_pivot(table_name, entity_column_name, attribute_column_name, value_column_name, attributes) +\
f'\nHAVING schema={quote_string(schema_name)}'
def json_query_pivot(table_name: str, uid_column_name: str, data_column_name: str, attributes: Sequence[str]) -> str:
"""Expand JSON rows into traditional rows"""
if uid_column_name in attributes:
attributes = list(attributes)
attributes.remove(uid_column_name)
return f'SELECT {quote_identifier(uid_column_name)}, ' +\
', '.join(f'JSON_EXTRACT({quote_identifier(data_column_name)}, {quote_identifier("$." + attr)}) AS {quote_identifier(attr)}' for attr in attributes) +\
f' FROM {quote_identifier(table_name)}'
def json_query_ensure_view(view_name: str, table_name: str, uid_column_name: str, data_column_name: str, schema_name: str, attributes: Sequence[str]) -> str:
"""Generate SQL to create a view over the expanded JSON data"""
if 'schema' not in attributes:
attributes = tuple(attributes) + ('schema',)
if uid_column_name in attributes:
attributes = list(attributes)
attributes.remove(uid_column_name)
return f'CREATE TEMPORARY VIEW IF NOT EXISTS {quote_identifier(view_name)} ({quote_identifier(uid_column_name)}, ' +\
', '.join(quote_identifier(attr) for attr in attributes) + ') AS\n' +\
json_query_pivot(table_name, uid_column_name, data_column_name, attributes) +\
f'\nWHERE JSON_EXTRACT({quote_identifier(data_column_name)}, {quote_identifier("$.schema")})={quote_string(schema_name)}'