184 lines
10 KiB
Python
184 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import re
|
|
import uuid
|
|
import time
|
|
import json
|
|
import sqlite3
|
|
|
|
from typing import Optional, Union, Sequence
|
|
|
|
class KVStore(object):
|
|
"""Provide a generic key-value store backed by SQLite"""
|
|
def __init__(self, database: Union[str, sqlite3.Connection]=':memory:'):
|
|
self.db = database if isinstance(database, sqlite3.Connection) else sqlite3.connect(database)
|
|
def put(self, obj):
|
|
raise NotImplementedError
|
|
def get(self, uid):
|
|
raise KeyError
|
|
|
|
class KVStoreView(object):
|
|
"""Hold a reference to a view"""
|
|
def __init__(self, store: KVStore, view_name: str, schema_name: str, attributes: Sequence[str]):
|
|
self.store = store
|
|
self.view_name = view_name
|
|
self.schema_name = schema_name
|
|
def put(self, obj, **kw):
|
|
"""Insert an object"""
|
|
return self.store.put(obj, schema=self.schema_name, **kw)
|
|
def get(self, uid, **kw):
|
|
"""Retrieve an object, with optional default"""
|
|
res = self.store.get(uid, **kw)
|
|
if 'schema' in res and res['schema'] != self.schema_name:
|
|
raise KeyError(uid)
|
|
def delete(self, where: str, params: Sequence):
|
|
"""Remove underlying data"""
|
|
return self.store.view_delete(self.view_name, where, params)
|
|
|
|
class KVStoreEAV(KVStore):
|
|
"""Provide a key-value store backed by SQLite with EAV pattern"""
|
|
def __init__(self, database: Union[str, sqlite3.Connection]=':memory:', table_name: str='eavstore'):
|
|
KVStore.__init__(self, database)
|
|
self.table_name = table_name
|
|
self.table_name_q = quote_identifier(table_name)
|
|
self.entity_column_name = 'uid'
|
|
self.attribute_column_name = '__attribute__'
|
|
self.value_column_name = '__value__'
|
|
self.data_attribute_name = '__data__'
|
|
self.db.execute(f'CREATE TABLE IF NOT EXISTS {self.table_name_q} ({self.entity_column_name} TEXT, {self.attribute_column_name} TEXT, {self.value_column_name} INTEGER)')
|
|
self.db.execute(f'CREATE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_entity")} on {self.table_name_q} ({self.entity_column_name})')
|
|
self.db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_attribute")} on {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name})')
|
|
self.db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_value")} on {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name}, {self.value_column_name})')
|
|
def put(self, obj, **kw):
|
|
"""Insert an object"""
|
|
if 'uid' in obj:
|
|
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.entity_column_name}=?', (obj['uid'],))
|
|
else:
|
|
obj['uid'] = uuid.uuid4().hex
|
|
if 'ts' not in obj:
|
|
obj['ts'] = time.time()
|
|
if 'schema' in kw:
|
|
obj['schema'] = kw['schema']
|
|
uid = obj['uid']
|
|
self.db.executemany(f'INSERT INTO {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name}, {self.value_column_name}) VALUES (?, ?, ?)', ((uid, k, v) for k, v in obj.items() if k != 'uid'))
|
|
if self.data_attribute_name not in obj:
|
|
self.db.execute(f'INSERT INTO {self.table_name_q} ({self.entity_column_name}, {self.attribute_column_name}, {self.value_column_name}) VALUES (?, ?, ?)', (uid, self.data_attribute_name, json.dumps(obj, default=str, separators=(',', ':'))))
|
|
return obj
|
|
def get(self, uid, **kw):
|
|
"""Retrieve an object, with optional default"""
|
|
obj = {k: v for k, v in self.db.execute(f'SELECT {self.attribute_column_name}, {self.value_column_name} FROM {self.table_name_q} WHERE {self.entity_column_name}=?', (uid,))}
|
|
if len(obj) > 0:
|
|
obj['uid'] = uid
|
|
return obj
|
|
elif 'default' in kw:
|
|
return kw['default']
|
|
else:
|
|
raise KeyError(uid)
|
|
def ensure_view(self, view_name: str, schema_name: str, attributes: Sequence[str], clear: bool=False) -> KVStoreView:
|
|
"""Create a view over the key-value schema, optionally removing existing underlying data"""
|
|
self.db.execute(f'DROP VIEW IF EXISTS {quote_identifier(view_name)}')
|
|
self.db.execute(eav_query_ensure_view(view_name, self.table_name, self.entity_column_name, self.attribute_column_name, self.value_column_name, schema_name, attributes))
|
|
if clear:
|
|
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.entity_column_name} IN (SELECT {self.entity_column_name} FROM {quote_identifier(view_name)} WHERE schema=?)', (schema_name,))
|
|
return KVStoreView(self, view_name, schema_name, attributes)
|
|
def view_delete(self, view_name: str, where: str, params: Sequence):
|
|
"""Remove underlying data"""
|
|
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.entity_column_name} IN (SELECT {self.entity_column_name} FROM {quote_identifier(view_name)} WHERE ' + where + ')', params)
|
|
|
|
class KVStoreJSON(KVStore):
|
|
"""Provide a key-value store backed by SQLite with JSON rows"""
|
|
def __init__(self, database: Union[str, sqlite3.Connection]=':memory:', table_name: str='jsonstore'):
|
|
KVStore.__init__(self, database)
|
|
self.table_name = table_name
|
|
self.table_name_q = quote_identifier(table_name)
|
|
self.uid_column_name = 'uid'
|
|
self.data_column_name = '__data__'
|
|
self.db.execute(f'CREATE TABLE IF NOT EXISTS {self.table_name_q} ({self.uid_column_name} TEXT, {self.data_column_name} JSON)')
|
|
self.db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_uid")} ON {self.table_name_q} ({self.uid_column_name})')
|
|
self.db.execute(f'CREATE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_ts")} ON {self.table_name_q} (JSON_EXTRACT({self.data_column_name}, "$.ts"))')
|
|
self.db.execute(f'CREATE INDEX IF NOT EXISTS {quote_identifier(f"idx_{table_name}_schema")} ON {self.table_name_q} (JSON_EXTRACT({self.data_column_name}, "$.schema"))')
|
|
def put(self, obj, **kw):
|
|
"""Insert an object"""
|
|
obj = obj.copy()
|
|
if 'uid' not in obj:
|
|
obj['uid'] = uuid.uuid4().hex
|
|
if 'ts' not in obj:
|
|
obj['ts'] = time.time()
|
|
if 'schema' in kw:
|
|
obj['schema'] = kw['schema']
|
|
self.db.execute(f'INSERT INTO {self.table_name_q} (uid, {self.data_column_name}) VALUES (?, JSON(?))', (obj['uid'], json.dumps(obj, default=str, separators=(',', ':'))))
|
|
return obj
|
|
def get(self, uid, **kw):
|
|
"""Retrieve an object, with optional default"""
|
|
for row in self.db.execute(f'SELECT {self.data_column_name} FROM {self.table_name_q} WHERE uid=? LIMIT 1', (uid,)):
|
|
return json.loads(row[0])
|
|
if 'default' in kw:
|
|
return kw['default']
|
|
else:
|
|
raise KeyError(uid)
|
|
def ensure_view(self, view_name: str, schema_name: str, attributes: Sequence[str], clear: bool=False) -> KVStoreView:
|
|
"""Create a view over the key-value schema, optionally removing existing underlying data"""
|
|
self.db.execute(f'DROP VIEW IF EXISTS {quote_identifier(view_name)}')
|
|
self.db.execute(json_query_ensure_view(view_name, self.table_name, self.uid_column_name, self.data_column_name, schema_name, attributes))
|
|
if clear:
|
|
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE JSON_EXTRACT({self.data_column_name}, "$.schema")=?', (schema_name,))
|
|
return KVStoreView(self, view_name, schema_name, attributes)
|
|
def view_delete(self, view_name: str, where: str, params: Sequence):
|
|
"""Remove underlying data"""
|
|
self.db.execute(f'DELETE FROM {self.table_name_q} WHERE {self.uid_column_name} IN (SELECT {self.uid_column_name} FROM {quote_identifier(view_name)} WHERE ' + where + ')', params)
|
|
|
|
def quote_string(s: str) -> str:
|
|
"""Make string safe for SQLite"""
|
|
return '"' + s.replace('"', '""') + '"'
|
|
|
|
def quote_identifier(s: str) -> str:
|
|
"""Make string safe as SQLite identifier"""
|
|
if re.match(r'^[A-Za-z][0-9A-Za-z_]*$', s):
|
|
return s
|
|
else:
|
|
return '"' + s.replace('"', '""') + '"'
|
|
|
|
def eav_query_pivot(table_name: str, entity_column_name: str, attribute_column_name: str, value_column_name: str, attributes: Sequence[str]) -> str:
|
|
"""Group EAV rows into traditional rows"""
|
|
if entity_column_name in attributes:
|
|
attributes = list(attributes)
|
|
attributes.remove(entity_column_name)
|
|
q = f'SELECT {quote_identifier(entity_column_name)}\n'
|
|
for attr in attributes:
|
|
q += f', MAX(CASE WHEN {quote_identifier(attribute_column_name)}={quote_string(attr)} THEN {quote_identifier(value_column_name)} END) {quote_identifier(attr)}\n'
|
|
q += f'FROM {quote_identifier(table_name)} GROUP BY {quote_identifier(entity_column_name)}'
|
|
return q
|
|
|
|
def eav_query_ensure_view(view_name: str, table_name: str, entity_column_name: str, attribute_column_name: str, value_column_name: str, schema_name: str, attributes: Sequence[str]) -> str:
|
|
"""Generate SQL to create a view over the grouped EAV data"""
|
|
if 'schema' not in attributes:
|
|
attributes = tuple(attributes) + ('schema',)
|
|
if entity_column_name in attributes:
|
|
attributes = list(attributes)
|
|
attributes.remove(entity_column_name)
|
|
return f'CREATE TEMPORARY VIEW IF NOT EXISTS {quote_identifier(view_name)} ({quote_identifier(entity_column_name)}, ' +\
|
|
', '.join(quote_identifier(attr) for attr in attributes) + ') AS\n' +\
|
|
eav_query_pivot(table_name, entity_column_name, attribute_column_name, value_column_name, attributes) +\
|
|
f'\nHAVING schema={quote_string(schema_name)}'
|
|
|
|
def json_query_pivot(table_name: str, uid_column_name: str, data_column_name: str, attributes: Sequence[str]) -> str:
|
|
"""Expand JSON rows into traditional rows"""
|
|
if uid_column_name in attributes:
|
|
attributes = list(attributes)
|
|
attributes.remove(uid_column_name)
|
|
return f'SELECT {quote_identifier(uid_column_name)}, ' +\
|
|
', '.join(f'JSON_EXTRACT({quote_identifier(data_column_name)}, {quote_identifier("$." + attr)}) AS {quote_identifier(attr)}' for attr in attributes) +\
|
|
f' FROM {quote_identifier(table_name)}'
|
|
|
|
def json_query_ensure_view(view_name: str, table_name: str, uid_column_name: str, data_column_name: str, schema_name: str, attributes: Sequence[str]) -> str:
|
|
"""Generate SQL to create a view over the expanded JSON data"""
|
|
if 'schema' not in attributes:
|
|
attributes = tuple(attributes) + ('schema',)
|
|
if uid_column_name in attributes:
|
|
attributes = list(attributes)
|
|
attributes.remove(uid_column_name)
|
|
return f'CREATE TEMPORARY VIEW IF NOT EXISTS {quote_identifier(view_name)} ({quote_identifier(uid_column_name)}, ' +\
|
|
', '.join(quote_identifier(attr) for attr in attributes) + ') AS\n' +\
|
|
json_query_pivot(table_name, uid_column_name, data_column_name, attributes) +\
|
|
f'\nWHERE JSON_EXTRACT({quote_identifier(data_column_name)}, {quote_identifier("$.schema")})={quote_string(schema_name)}'
|