File size: 4,839 Bytes
e8e1967 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
import sqlite3
import time
import atexit
class FileDict:
def __init__(self, file_path, buffer_size=100000, buffer_idle_time=5, table='filedict'):
self.file_path = file_path
self.table = table
self.conn = sqlite3.connect(file_path,check_same_thread=False)
self.conn.execute('CREATE TABLE IF NOT EXISTS {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table))
self.buffer = []
self.buffer_size = buffer_size
self.last_commit_time = time.time()
self.buffer_idle_time = buffer_idle_time
atexit.register(self.close)
def get(self, key):
try:return self.__getitem__(key)
except KeyError: return None
def __getitem__(self, key):
self._check_buffer()
cursor = self.conn.execute('SELECT value FROM {} WHERE key = ?'.format(self.table), (key,))
result = cursor.fetchone()
if result is None:
raise KeyError(key)
return result[0]
def Tables(self):
cursor = self.conn.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
table_names = [t[0] for t in tables]
return table_names
def __setitem__(self, key, value):
try:
self.check_key(key)
self.buffer.append(('set', key, value))
self._check_buffer()
except sqlite3.IntegrityError:
self.buffer.append(('update', key, value))
self._check_buffer()
def __delitem__(self, key):
self.buffer.append(('del', key))
self._check_buffer()
def __iter__(self):
self._check_buffer()
cursor = self.conn.execute('SELECT key FROM {}'.format(self.table))
while True:
result=cursor.fetchone()
if not result or result is None: break
yield result[0]
cursor.close()
#raise StopIteration
def items(self):
self._check_buffer()
cursor = self.conn.execute('SELECT key, value FROM {}'.format(self.table))
while True:
result=cursor.fetchone()
if not result or result is None: break
yield result
cursor.close()
#raise StopIteration
#return cursor.fetchall()
def from_dict(self, dict):
self.check_dict(dict)
self.conn.execute('DROP TABLE IF EXISTS {}'.format(self.table))
self.conn.execute('CREATE TABLE {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table))
self.conn.executemany('INSERT INTO {} (key, value) VALUES (?, ?)'.format(self.table), dict.items())
self.conn.commit()
def add_items(self, items):
for key, value in items.items():
try:
self.check_key(key)
self.buffer.append(('set', key, value))
self._check_buffer()
except sqlite3.IntegrityError:
self.buffer.append(('update', key, value))
self._check_buffer()
self._check_buffer()
def _check_buffer(self):
if not self.buffer:
return
idle_time = time.time() - self.last_commit_time
if len(self.buffer) >= self.buffer_size or idle_time >= self.buffer_idle_time:
self._commit()
def _commit(self):
if not self.buffer:
return
cursor = self.conn.cursor()
for op in self.buffer:
if op[0] == 'set':
cursor.execute('INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)'.format(self.table), (op[1], op[2]))
elif op[0] == 'update':
cursor.execute('UPDATE {} SET value = ? WHERE key = ?'.format(self.table), (op[2], op[1]))
elif op[0] == 'del':
cursor.execute('DELETE FROM {} WHERE key = ?'.format(self.table), (op[1],))
self.buffer = []
self.last_commit_time = time.time()
self.conn.commit()
def check_dict(self, dictionary):
for key in dictionary:
self.check_key(key)
def check_key(self, key):
if not isinstance(key, str):
raise TypeError('Keys must be strings.')
if not key:
raise ValueError('Keys cannot be empty strings.')
def search_keys(self, pattern, like=True, values=False):
self._check_buffer()
operator = 'LIKE' if like else '='
cursor = self.conn.cursor()
cursor.execute(f"SELECT key FROM {self.table} WHERE key {operator} ?", (pattern,))
while True:
result=cursor.fetchone()
if not result or result is None: break
yield result[0]
cursor.close()
def close(self):
self._commit()
try:
self.conn.commit()
except:
pass
self.conn.close()
|