Source code for sofos.database

"""Module Database"""
import os
import datetime
import hashlib
from . import dbf as dbi
IGNORE = 'models'


[docs]class Database(): """Database Class""" def __init__(self, models, dbf=None): """Initialize :param models: user models module :param dbf: database file """ self.models = models self.inte_dic = {} self.dbf = dbf if self.set_database(dbf) else None @property def is_connected(self): return True if self.dbf else False
[docs] def integrity_dict(self): """Integrity dictionary :return: Dictionary {parent1: {child1: fld1, child2: fld2, ...}, ...} """ if self.inte_dic != {}: return self.inte_dic for table_name, table_object in self.table_objects().items(): for fld_name, fld_obj in table_object.field_objects().items(): if fld_obj.fkey: parent = fld_obj.ftable.table_name() child = table_name field = fld_name self.inte_dic[parent] = self.inte_dic.get(parent, {}) self.inte_dic[parent][child] = field return self.inte_dic
[docs] def integrity(self, parent_table, id_value): idict = self.integrity_dict() if parent_table not in idict: return False else: for child, field in idict[parent_table].items(): if dbi.ref_exists(self.dbf, child, field, id_value): return True return False
[docs] def set_database(self, dbf): """Set database if compatible :param dbf: Database file name :return: True or False. """ if not dbf: return False if self.is_database_compatible(dbf): self.dbf = dbf return True return False
[docs] def create_database(self, dbf, init_db_file=None): """Create tables from model definitions :param dbf: Database filename :param models: The models module to use (Normally models.py in your application's root) :param init_db_file: The init_db_file.sql file to use if present """ msg = '' success, info = dbi.script( dbf, self.sql_database_create(), create=True) if not success: return success, info else: self.dbf = dbf msg = 'Database %s created successfuly' % (dbf) success2 = True if init_db_file and os.path.isfile(init_db_file): with open(init_db_file) as file: success2, _ = dbi.script(dbf, file.read()) if success2: msg += '\ninitial data inserted successfuly' else: msg += '\nproblem inserting initial data' return success, msg
[docs] def sql_database_create(self): """Create sql for table creation according to your model settings :param models: normally models.py from your project folder :return: create sql """ classes = [cls for cls in dir(self.models) if (cls[0] != '_' and cls != IGNORE)] tsql = 'BEGIN TRANSACTION;\n\n' for cls in classes: aaa = getattr(self.models, cls) tsql += aaa.sql_create() tsql += self.create_z_table() return tsql + 'COMMIT;'
[docs] def backup_database(self): """Backup database with timestamp""" timestamp = datetime.datetime.now().isoformat() tsr = timestamp.replace('-', '').replace('T', '').replace(':', '')[:12] filename = '%s.%s.sql' % (self.dbf, tsr) if self.dbf: return dbi.backup( self.dbf, filename, overwrite=True, inserts_only=True) else: return False, 'Error during backup procedure'
[docs] def restore_database(self): """Not implemented yet""" return False, 'Not Implemented yet'
[docs] def table_objects(self): """models: models.py from our project folder :return: Dictionary return dictionary format:: {table_name1: table_object1, ...} """ tables = [tbl for tbl in dir(self.models) if (tbl[0] != '_' and tbl != IGNORE)] table_dict = {} for tbl in tables: model = getattr(self.models, tbl) table_object = getattr(self.models, tbl) # injecting database file name to table model here table_object.__dbf__ = self.dbf table_object.__database__ = self table_dict[model.table_name()] = table_object return table_dict
[docs] def table_object(self, table_name): """Returns table object by table name :param table_name: table name """ tableobject = self.table_objects().get(table_name, None) return tableobject
[docs] def table_names(self): """Returns table names""" tnames = [] for table_object in self.table_objects(): tnames.append(table_object) return tnames
[docs] def table_labels(self, as_dict=False): """Returns table labels""" tlabels = {} if as_dict else [] for tname, tobject in self.table_objects().items(): if as_dict: tlabels[tname] = tobject.table_label() else: tlabels.append(tobject.table_label()) return tlabels
[docs] def is_database_compatible(self, dbf): """This function checks the databases creation md5 against current models md5 in order to evaluate equality of the two schemas :param dbf: Database file :param models: normally models.py from your project folder :return: True if database schema is the same with model schema """ md5_from_models = self.calc_md5() sql = "SELECT val FROM z WHERE key='md5'" try: success, md5_dict = dbi.read(dbf, sql, 'one') except TypeError: return False if success: return md5_from_models == md5_dict['val'] else: return False
[docs] def calc_md5(self): """Calculates the md5 of the models schema :param models: normally models.py from your project folder :return: md5 value """ tables = [cls for cls in dir(self.models) if (cls[0] != '_' and cls != IGNORE)] tdic = {} for cls in tables: aaa = getattr(self.models, cls) tdic[aaa.table_name()] = aaa arr = [] for key in tdic: arr.append(key) arr += tdic[key].field_names() arr.sort() md5 = hashlib.md5() md5.update(str(arr).encode()) return md5.hexdigest()
[docs] def create_z_table(self): """Create a metadata keys/values table and insert at least the md5 of the models schema. :param models: normally models.py from your project folder """ md5 = self.calc_md5() sql = ("CREATE TABLE IF NOT EXISTS z (" "key TEXT NOT NULL PRIMARY KEY, " "val TEXT);\n" "INSERT INTO z VALUES ('md5', '%s');\n" % md5) return sql