ORM, или объектно-реляционное отображение — это программная технология, которая позволяет взаимодействовать с базами данных с использованием объектно-ориентированной парадигмы. Вместо того чтобы писать SQL-запросы напрямую для работы с данными в базе данных, можно использовать ORM, чтобы взаимодействовать с данными, как если бы они были объектами в вашем коде.
Не бывало ли вам интересно, как работает изнутри такая идейно простая концепция? Благодаря чему достигается удобство работы? Сегодня мы напишем ORM самостоятельно и узнаем, какие инструменты python нам для этого понадобятся.
Ремарка: sqlite3 выбран из-за простоты, нетрудно заменить обращения к нему на обращения к любой удобной для вас базе данных. По синтаксису я ориентировался на джанго.
▍ Базовые типы данных
В sqlite3 существуют: INTEGER — вещественное число с указанной точностью, TEXT — текст, BLOB — двоичные данные, REAL — число с плавающей запятой(float24), NUMERIC — то же, что и INTEGER.
CREATE TABLE "example" (
"Field1" INTEGER NOT NULL,
"Field2" TEXT UNIQUE,
"Field3" BLOB,
"Field4" REAL DEFAULT 123,
"Field5" NUMERIC
);
У каждого из них есть параметры NULL, UNIQUE, DEFAULT, так что первым делом пишем класс, который будут наследовать все остальные:
class BaseType:
field_type: str #название типа данных поля, например, "INTEGER"
def __init__(self, unique: bool = False, null: bool = True, default: int = None):
self.unique = unique
self.null = null
self.default = default
На основе него прописываем остальные базовые классы:
class IntegerField(BaseType):
field_type = 'INTEGER'
class TextField(BaseType):
field_type = 'TEXT'
class BlobField(BaseType):
field_type = 'BLOB'
class RealField(BaseType):
field_type = 'REAL'
class NumericField(BaseType):
field_type = 'NUMERIC'
▍ Пользовательские модели
Я хочу, чтобы пользовательские модели выглядел максимально просто, например, так:
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()
Для этого реализуем родительский класс Model. Он должен задавать объекты с заданными переменными. Для этого пишем инициализатор:
class Model:
def __init__(self, *args, **kwargs):
fields = [el for el in vars(self.__class__) if not el.startswith("__")] #поля, которые мы создали в модели (в данном случае name, width, height)
for i, value in enumerate(args):
setattr(self, fields[i], value)#задаем переменные переданные с помощью args
for field, value in kwargs.items():#задаем переменные переданные с помощью kwargs
setattr(self, field, value)
Все методы, которые, мы напишем для класса Model будут работать с любым объектом, который мы зададим.
— *args позволяет передавать произвольное количество позиционных аргументов в функцию.
— Аргументы, переданные как *args, собираются в кортеж (tuple) внутри функции.
— Вы можете использовать любое имя вместо «args», но общепринято использовать именно «args».
Пример:
def print_args(*args):
for arg in args:
print(arg)
print_args(1, 2, 3) # Выводит: 1, 2, 3
2. **kwargs:
— **kwargs позволяет передавать произвольное количество именованных аргументов (ключ-значение) в функцию.
— Аргументы, переданные как **kwargs, собираются в словарь (dictionary) внутри функции.
— Вы можете использовать любое имя вместо «kwargs», но общепринято использовать именно «kwargs».
Пример:
def print_kwargs(**kwargs):
for key, value in kwargs.items():
print(f"{key}: {value}")
print_kwargs(name="John", age=30, city="New York") # Выводит: name: John, age: 30, city: New York
Используя *args и **kwargs, вы можете создавать более гибкие функции, которые могут обрабатывать разные наборы аргументов.
Давайте также создадим сразу метод json(), чтобы возвращать объект в виде словаря, он понадобится нам, например, для api или удобного вывода в консоль.
def json(self):
attributes = {}
for key, value in vars(self).items():
if not key.startswith("__") and not callable(value):#проверка на системные методы и поля
attributes[key] = value
return attributes
В данный момент мы уже можем пользоваться моделями, но пока что без базы данных:
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()
print(Box('Box 1', 1, 1).json())#выведет {'name': 'Box 1', 'width': 1, 'height': 1}
▍ Добавляем sqlite3
Для использования базы данных я хочу, чтобы каждый объект имел поле objects — менеджер объектов, через который мы и будем обращаться к sqlite.
Например, так:
Box.objects.add(Box('BOX 1', 1, 1))
Box.objects.get(name='BOX 1')
Box.objects.filter(width=1, height=1)
Box.objects.delete(name="BOX 1")
Мы хотим, чтобы каждый объект имел это поле, но при этом поведение различалось в зависимости от модели объекта. Для решения этого создадим прокси-класс, который определяет модель и возвращает нужный менеджер.
class ProxyObjects:
def __get__(self, instance, owner):
return Object(owner)
__get__ определяется внутри класса, который также может иметь методы __set__ и __delete__, если необходимо управлять операциями присваивания и удаления атрибутов.
Пример использования __get__:
class MyDescriptor:
def __get__(self, instance, owner):
if instance is None:
# Если доступ к дескриптору осуществляется через класс, а не через экземпляр,
# то instance будет равен None, и мы можем вернуть сам дескриптор или другое значение.
return self
else:
# В этом случае instance - это экземпляр объекта, owner - это класс, к которому относится атрибут.
# Мы можем вернуть значение атрибута или выполнить другие действия при доступе к нему.
return instance._value
class MyClass:
def __init__(self, value):
self._value = value
# Используем дескриптор MyDescriptor для атрибута 'my_attribute'
my_attribute = MyDescriptor()
# Создаём экземпляр класса
obj = MyClass(42)
# Доступ к атрибуту 'my_attribute' будет вызывать метод __get__ дескриптора MyDescriptor
print(obj.my_attribute) # Выведет: 42
В этом примере MyDescriptor является дескриптором, который определяет поведение при доступе к атрибуту my_attribute класса MyClass. Метод __get__ определяет, что происходит при чтении значения этого атрибута через экземпляр obj.
Первое, что нужно сделать для работы с базой данных — создать таблицу. Для этого пишем метод:
import sqlite3 # Необходимо импортировать библиотеку для работы с SQLite
class Object:
def __init__(self, object_type: type):
# Конструктор класса принимает тип объекта (класс) и сохраняет его в атрибуте object_type.
self.object_type = object_type
def __createTable__(self):
# Метод для создания таблицы в базе данных, основанной на атрибутах класса object_type.
# Устанавливаем соединение с базой данных
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# Создаём список custom_fields для хранения определений полей таблицы.
custom_fields = []
# Проходимся по атрибутам класса object_type и извлекаем информацию о полях.
for key, value in vars(self.object_type).items():
if not key.startswith("__") and not callable(value):
field_name = key
field_type = value.field_type
is_unique = value.unique
is_null = value.null
default_value = value.default
# Создаём строку с определением поля и добавляем её в список custom_fields.
field_declaration = [f'"{field_name}" {field_type}']
if is_unique:
field_declaration.append('UNIQUE')
if not is_null:
field_declaration.append('NOT NULL')
if default_value is not None:
field_declaration.append(f'DEFAULT {default_value}')
custom_fields.append(' '.join(field_declaration))
# Создаём SQL-запрос для создания таблицы с определёнными полями.
create_table_sql = f'''
CREATE TABLE IF NOT EXISTS {self.object_type.__name__} (
{", ".join(custom_fields)}
);
'''
# Выполняем SQL-запрос.
cursor.execute(create_table_sql)
# Фиксируем изменения и закрываем соединение с базой данных.
conn.commit()
conn.close()
Создавать таблицу нужно для каждой пользовательской модели — для этого удобно использовать простой декоратор для класса:
def simple_orm(class_: type):
EXTERN_TYPES[class_.__name__] = class_ # Сохраняем в словарь моделей
class_.objects.__createTable__() # Создаём таблицу в бд
return class_
Декораторы используются с помощью символа "@" перед определением функции, которую они декорируют. Вот пример использования декоратора:
def my_decorator(func):
def wrapper():
print("Что-то происходит перед вызовом функции")
func()
print("Что-то происходит после вызова функции")
return wrapper
@my_decorator
def say_hello():
print("Привет, мир!")
say_hello()
В этом примере my_decorator — это декоратор, который добавляет вывод текста до и после вызова функции say_hello. Затем декоратор @my_decorator применяется к функции say_hello, и при вызове say_hello() будет выполнено дополнительное действие, предусмотренное декоратором.
Декораторы часто используются для следующих задач:
- Логирования: Запись логов для функций или методов.
- Аутентификации: Проверка прав доступа перед вызовом функции.
- Кэширования: Сохранение результатов функции для ускорения будущих вызовов с теми же аргументами.
- Измерения времени выполнения: Оценка производительности функции.
- Модификации поведения: Изменение или расширение функциональности функции.
Python предоставляет множество встроенных декораторов, таких как @staticmethod, @classmethod, @property и другие, а также вы можете создавать свои собственные декораторы в соответствии с вашими потребностями.
Теперь при инициализации модели в бд создаётся таблица с соответствующими полями:
▍ JsonField
В sqlite3 нет JsonField по умолчанию, так что мы реализуем его на основе текста. Для начала добавляем его в базовые типы:
class JsonField(BaseType):
field_type = 'JSON'
Json, по сути, ведёт себя, как текст, за исключением того, что при создании и изменении надо использовать json.dumps(), а при получении — json.loads().
▍ ForeignKey
В реляционных базах данных, «foreign key» (внешний ключ) — это структурный элемент, который используется для установления связей между двумя таблицами. Внешний ключ представляет собой один или несколько столбцов в одной таблице, которые связаны с первичным ключом (обычно) в другой таблице. Эта связь позволяет базе данных поддерживать целостность данных и обеспечивать связи между данными в разных таблицах.
ForeignKey должен возвращать объект заданного типа по значению заданного поля.
class ForeignKey(BaseType):
field_type = 'FOREIGN_KEY'
def __init__(self, object_class: type, foreign_field: str, unique: bool = False,
null: bool = True, default=None):
self.object_class = object_class,
self.foreign_field = foreign_field,
self.unique = unique
self.null = null
self.default = default
Я реализую его просто, как json объект с параметрами type, key, value, например:
{"type": "Box", "key": "name", "value": "BOX 1"}.
Теперь мы с помощью него можем делать так:
@simple_orm
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()
@simple_orm
class Circle(models.Model):
box = ForeignKey(object_class=Box, foreign_field='name')
name = TextField()
radius = IntegerField()
data = JsonField()
box = Box.objects.add(Box('BOX 1', 1, 1))
circle = Circle.objects.add(Circle(box, "CIRCLE 1", 5, {'data': 5}))
print(circle.json()) #{'box': <__main__.Box object at 0x7f7637f6d850>, 'name': 'CIRCLE 1', 'radius': 5, 'data': {'data': 5}}
▍ Добавляем CRUD
class Object:
def __init__(self, object_type):
self.object_type = object_type
def add(self, obj):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
d = copy.copy(obj.__dict__)
object_type_name = self.object_type.__name__
for key, value in vars(self.object_type).items():
if not key.startswith("__") and not callable(value):
if type(value) in BASIC_TYPES:
continue
if type(value) == JsonField:
d[key] = json.dumps(d[key])
if type(value) == ForeignKey:
d[key] = json.dumps({'type': value.object_class[0].__name__, 'key': value.foreign_field[0], 'value': getattr(d[key], value.foreign_field[0])})
insert_sql = f'INSERT INTO {object_type_name} ({", ".join(obj.__dict__.keys())}) VALUES ({", ".join(["?"] * len(obj.__dict__))});'
values = tuple(d.values())
cursor.execute(insert_sql, values)
conn.commit()
conn.close()
return obj
def save(self, obj):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
d = copy.copy(obj.__dict__)
object_type_name = self.object_type.__name__
for key, value in vars(self.object_type).items():
if not key.startswith("__") and not callable(value):
if type(value) in BASIC_TYPES:
continue
if type(value) == JsonField:
d[key] = json.dumps(d[key])
if type(value) == ForeignKey:
d[key] = json.dumps({'type': value.object_class[0].__name__, 'key': value.foreign_field[0], 'value': getattr(d[key], value.foreign_field[0])})
upsert_sql = f'INSERT OR REPLACE INTO {object_type_name} ({", ".join(obj.__dict__.keys())}) VALUES ({", ".join(["?"] * len(obj.__dict__))});'
values = tuple(d.values())
cursor.execute(upsert_sql, values)
conn.commit()
conn.close()
return obj
def get(self, **kwargs):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
object_type_name = self.object_type.__name__
attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]
where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
where_clause = ' AND '.join(where_clauses)
select_by_attrs_sql = f'SELECT * FROM {object_type_name} WHERE {where_clause};'
values = tuple(value for _, value in attr_value_pairs)
cursor.execute(select_by_attrs_sql, values)
row = cursor.fetchone()
conn.close()
if row:
obj = self.object_type()
for i, value in enumerate(row):
if type(getattr(obj, cursor.description[i][0])) == JsonField:
setattr(obj, cursor.description[i][0], json.loads(value))
elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
value = json.loads(value)
setattr(obj, cursor.description[i][0], EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
else:
setattr(obj, cursor.description[i][0], value)
return obj
else:
return None
def delete(self, **kwargs):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
object_type_name = self.object_type.__name__
attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]
where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
where_clause = ' AND '.join(where_clauses)
delete_by_attrs_sql = f'DELETE FROM {object_type_name} WHERE {where_clause};'
values = tuple(value for _, value in attr_value_pairs)
cursor.execute(delete_by_attrs_sql, values)
conn.commit()
conn.close()
def filter(self, **kwargs):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
object_type_name = self.object_type.__name__
attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]
where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
where_clause = ' AND '.join(where_clauses)
select_by_attrs_sql = f'SELECT * FROM {object_type_name} WHERE {where_clause};'
values = tuple(value for _, value in attr_value_pairs)
cursor.execute(select_by_attrs_sql, values)
rows = cursor.fetchall()
conn.close()
objects = []
for row in rows:
obj = self.object_type()
for i, value in enumerate(row):
if type(getattr(obj, cursor.description[i][0])) == JsonField:
setattr(obj, cursor.description[i][0], json.loads(value))
elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
value = json.loads(value)
setattr(obj, cursor.description[i][0],
EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
else:
setattr(obj, cursor.description[i][0], value)
objects.append(obj)
return objects
def all(self):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
object_type_name = self.object_type.__name__
select_all_sql = f'SELECT * FROM {object_type_name};'
cursor.execute(select_all_sql)
rows = cursor.fetchall()
conn.close()
objects = []
for row in rows:
obj = self.object_type()
for i, value in enumerate(row):
if type(getattr(obj, cursor.description[i][0])) == JsonField:
setattr(obj, cursor.description[i][0], json.loads(value))
elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
value = json.loads(value)
setattr(obj, cursor.description[i][0],
EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
else:
setattr(obj, cursor.description[i][0], value)
objects.append(obj)
return objects
def __createTable__(self):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
custom_fields = []
for key, value in vars(self.object_type).items():
if not key.startswith("__") and not callable(value):
field_name = key
field_type = value.field_type
is_unique = value.unique
is_null = value.null
default_value = value.default
if value.field_type == 'FOREIGN_KEY':
field_type = "TEXT"
if value.field_type == 'JSON':
field_type = 'TEXT'
field_declaration = [f'"{field_name}" {field_type}']
if is_unique:
field_declaration.append('UNIQUE')
if not is_null:
field_declaration.append('NOT NULL')
if default_value is not None:
field_declaration.append(f'DEFAULT {default_value}')
custom_fields.append(' '.join(field_declaration))
create_table_sql = f'''
CREATE TABLE IF NOT EXISTS {self.object_type.__name__} (
{", ".join(custom_fields)}
);
'''
cursor.execute(create_table_sql)
conn.commit()
conn.close()
▍ Список объектов
Сейчас функции all() и filter() возвращают list состоящий из объектов. Это неудобно, ведь нельзя, например, удалить все объекты. Исправим это, добавив класс ListOfObjects:
class ListOfObjects:
def __init__(self, objects):
self.objects = objects
def filter(self, **kwargs):
filtered_objects = []
for obj in self.objects:
if all(getattr(obj, attr, None) == value for attr, value in kwargs.items()):
filtered_objects.append(obj)
return ListOfObjects(filtered_objects)
def delete(self):
for obj in self.objects:
obj.delete()
def json(self):
object_dicts = [obj.json() for obj in self.objects]
return object_dicts
▍ Примеры
models.py
import simple_orm.models as models
from simple_orm.models import IntegerField, TextField, ForeignKey, JsonField
from simple_orm.models import simple_orm
from pathlib import Path
import sys
PATH = Path(__file__).absolute().parent.parent
sys.path.append(str(PATH))
@simple_orm
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()
@simple_orm
class Circle(models.Model):
box = ForeignKey(object_class=Box, foreign_field='name')
name = TextField()
radius = IntegerField()
data = JsonField()
main.py
from models import Box, Circle
box = Box.objects.add(Box('BOX 1', 1, 1))
circle = Circle.objects.add(Circle(box, "CIRCLE 1", 5, {'data': 5}))
print(circle.json())
print(Box.objects.filter(width=1, height=1).json())
print(Circle.objects.get(name="CIRCLE 1").json())
Box.objects.delete(name="BOX 1")
print(Box.objects.all().json())
▍ Заключение
В процессе создания своего ORM мы использовали много сложных инструментов языка python, которые помогли написать короткий и красивый код, решающий довольно сложную задачу.
Да, он не идеальный, вы можете предложить его улучшения в комментариях.
Полный код есть на https://github.com/leo-need-more-coffee/simple-orm
Библиотека для python: https://pypi.org/project/sqlite3-simple-orm
Скачать с помощью pip: pip install sqlite3-simple-orm
Автор: Лень К.