Простой ORM для sqlite3

в 13:00, , рубрики: python, ruvds_статьи, ооп, Программирование

Простой ORM для sqlite3 - 1


ORM, или объектно-реляционное отображение — это программная технология, которая позволяет взаимодействовать с базами данных с использованием объектно-ориентированной парадигмы. Вместо того чтобы писать SQL-запросы напрямую для работы с данными в базе данных, можно использовать ORM, чтобы взаимодействовать с данными, как если бы они были объектами в вашем коде.

Не бывало ли вам интересно, как работает изнутри такая идейно простая концепция? Благодаря чему достигается удобство работы? Сегодня мы напишем ORM самостоятельно и узнаем, какие инструменты python нам для этого понадобятся.

Ремарка: sqlite3 выбран из-за простоты, нетрудно заменить обращения к нему на обращения к любой удобной для вас базе данных. По синтаксису я ориентировался на джанго.

▍ Базовые типы данных

В sqlite3 существуют: INTEGER — вещественное число с указанной точностью, TEXT — текст, BLOB — двоичные данные, REAL — число с плавающей запятой(float24), NUMERIC — то же, что и INTEGER.

CREATE TABLE "example" (
    "Field1"    INTEGER NOT NULL,
    "Field2"    TEXT UNIQUE,
    "Field3"    BLOB,
    "Field4"    REAL DEFAULT 123,
    "Field5"    NUMERIC
);

У каждого из них есть параметры NULL, UNIQUE, DEFAULT, так что первым делом пишем класс, который будут наследовать все остальные:

class BaseType:
  field_type: str #название типа данных поля, например, "INTEGER"

  def __init__(self, unique: bool = False, null: bool = True, default: int = None):
      self.unique = unique
      self.null = null
      self.default = default

На основе него прописываем остальные базовые классы:

class IntegerField(BaseType):
  field_type = 'INTEGER'


class TextField(BaseType):
  field_type = 'TEXT'


class BlobField(BaseType):
  field_type = 'BLOB'


class RealField(BaseType):
  field_type = 'REAL'


class NumericField(BaseType):
  field_type = 'NUMERIC'

▍ Пользовательские модели

Я хочу, чтобы пользовательские модели выглядел максимально просто, например, так:

class Box(models.Model):
  name = TextField()
  width = IntegerField()
  height = IntegerField()

Для этого реализуем родительский класс Model. Он должен задавать объекты с заданными переменными. Для этого пишем инициализатор:

class Model:
  def __init__(self, *args, **kwargs):
      fields = [el for el in vars(self.__class__) if not el.startswith("__")] #поля, которые мы создали в модели (в данном случае name, width, height)
      for i, value in enumerate(args):
          setattr(self, fields[i], value)#задаем переменные переданные с помощью args

      for field, value in kwargs.items():#задаем переменные переданные с помощью kwargs
          setattr(self, field, value)

Все методы, которые, мы напишем для класса Model будут работать с любым объектом, который мы зададим.

Про args и kwargs

1. *args:
— *args позволяет передавать произвольное количество позиционных аргументов в функцию.
— Аргументы, переданные как *args, собираются в кортеж (tuple) внутри функции.
— Вы можете использовать любое имя вместо «args», но общепринято использовать именно «args».
Пример:

def print_args(*args):
for arg in args:
    print(arg)

print_args(1, 2, 3)  # Выводит: 1, 2, 3

2. **kwargs:
— **kwargs позволяет передавать произвольное количество именованных аргументов (ключ-значение) в функцию.
— Аргументы, переданные как **kwargs, собираются в словарь (dictionary) внутри функции.
— Вы можете использовать любое имя вместо «kwargs», но общепринято использовать именно «kwargs».
Пример:

def print_kwargs(**kwargs):
for key, value in kwargs.items():
    print(f"{key}: {value}")

print_kwargs(name="John", age=30, city="New York")  # Выводит: name: John, age: 30, city: New York

Используя *args и **kwargs, вы можете создавать более гибкие функции, которые могут обрабатывать разные наборы аргументов.

Давайте также создадим сразу метод json(), чтобы возвращать объект в виде словаря, он понадобится нам, например, для api или удобного вывода в консоль.

def json(self):
  attributes = {}
  for key, value in vars(self).items():
      if not key.startswith("__") and not callable(value):#проверка на системные методы и поля
          attributes[key] = value

  return attributes

В данный момент мы уже можем пользоваться моделями, но пока что без базы данных:

class Box(models.Model):
  name = TextField()
  width = IntegerField()
  height = IntegerField()


print(Box('Box 1', 1, 1).json())#выведет {'name': 'Box 1', 'width': 1, 'height': 1}

▍ Добавляем sqlite3

Для использования базы данных я хочу, чтобы каждый объект имел поле objects — менеджер объектов, через который мы и будем обращаться к sqlite.
Например, так:

Box.objects.add(Box('BOX 1', 1, 1))
Box.objects.get(name='BOX 1')
Box.objects.filter(width=1, height=1)
Box.objects.delete(name="BOX 1")

Мы хотим, чтобы каждый объект имел это поле, но при этом поведение различалось в зависимости от модели объекта. Для решения этого создадим прокси-класс, который определяет модель и возвращает нужный менеджер.

class ProxyObjects:
  def __get__(self, instance, owner):
      return Object(owner)

Про __get__

__get__ — это метод в Python, который используется для определения поведения при доступе к атрибуту объекта. Он является частью протокола дескрипторов в Python и позволяет объектам контролировать доступ к своим атрибутам.

__get__ определяется внутри класса, который также может иметь методы __set__ и __delete__, если необходимо управлять операциями присваивания и удаления атрибутов.

Пример использования __get__:

class MyDescriptor:
def __get__(self, instance, owner):
    if instance is None:
        # Если доступ к дескриптору осуществляется через класс, а не через экземпляр,
        # то instance будет равен None, и мы можем вернуть сам дескриптор или другое значение.
        return self
    else:
        # В этом случае instance - это экземпляр объекта, owner - это класс, к которому относится атрибут.
        # Мы можем вернуть значение атрибута или выполнить другие действия при доступе к нему.
        return instance._value

class MyClass:
def __init__(self, value):
    self._value = value

# Используем дескриптор MyDescriptor для атрибута 'my_attribute'
my_attribute = MyDescriptor()

# Создаём экземпляр класса
obj = MyClass(42)

# Доступ к атрибуту 'my_attribute' будет вызывать метод __get__ дескриптора MyDescriptor
print(obj.my_attribute)  # Выведет: 42

В этом примере MyDescriptor является дескриптором, который определяет поведение при доступе к атрибуту my_attribute класса MyClass. Метод __get__ определяет, что происходит при чтении значения этого атрибута через экземпляр obj.

Первое, что нужно сделать для работы с базой данных — создать таблицу. Для этого пишем метод:

import sqlite3  # Необходимо импортировать библиотеку для работы с SQLite

class Object:
def __init__(self, object_type: type):
    # Конструктор класса принимает тип объекта (класс) и сохраняет его в атрибуте object_type.
    self.object_type = object_type

def __createTable__(self):
    # Метод для создания таблицы в базе данных, основанной на атрибутах класса object_type.

    # Устанавливаем соединение с базой данных
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    # Создаём список custom_fields для хранения определений полей таблицы.
    custom_fields = []

    # Проходимся по атрибутам класса object_type и извлекаем информацию о полях.
    for key, value in vars(self.object_type).items():
        if not key.startswith("__") and not callable(value):
            field_name = key
            field_type = value.field_type
            is_unique = value.unique
            is_null = value.null
            default_value = value.default

            # Создаём строку с определением поля и добавляем её в список custom_fields.
            field_declaration = [f'"{field_name}" {field_type}']

            if is_unique:
                field_declaration.append('UNIQUE')
            if not is_null:
                field_declaration.append('NOT NULL')
            if default_value is not None:
                field_declaration.append(f'DEFAULT {default_value}')

            custom_fields.append(' '.join(field_declaration))

    # Создаём SQL-запрос для создания таблицы с определёнными полями.
    create_table_sql = f'''
    CREATE TABLE IF NOT EXISTS {self.object_type.__name__} (
        {", ".join(custom_fields)}
    );
    '''

    # Выполняем SQL-запрос.
    cursor.execute(create_table_sql)

    # Фиксируем изменения и закрываем соединение с базой данных.
    conn.commit()
    conn.close()

Создавать таблицу нужно для каждой пользовательской модели — для этого удобно использовать простой декоратор для класса:

def simple_orm(class_: type):
  EXTERN_TYPES[class_.__name__] = class_ # Сохраняем в словарь моделей
  class_.objects.__createTable__() # Создаём таблицу в бд

  return class_

Про декораторы

Декоратор в Python — это функция, которая принимает другую функцию и добавляет к её поведению какое-то дополнительное функциональное или метаинформационное украшение, не изменяя саму функцию. Декораторы позволяют изменять или расширять поведение функций или методов, не модифицируя их код. Они являются мощным инструментом для реализации множества различных задач в Python.

Декораторы используются с помощью символа "@" перед определением функции, которую они декорируют. Вот пример использования декоратора:

def my_decorator(func):
def wrapper():
    print("Что-то происходит перед вызовом функции")
    func()
    print("Что-то происходит после вызова функции")
return wrapper

@my_decorator
def say_hello():
print("Привет, мир!")

say_hello()

В этом примере my_decorator — это декоратор, который добавляет вывод текста до и после вызова функции say_hello. Затем декоратор @my_decorator применяется к функции say_hello, и при вызове say_hello() будет выполнено дополнительное действие, предусмотренное декоратором.

Декораторы часто используются для следующих задач:

  1. Логирования: Запись логов для функций или методов.
  2. Аутентификации: Проверка прав доступа перед вызовом функции.
  3. Кэширования: Сохранение результатов функции для ускорения будущих вызовов с теми же аргументами.
  4. Измерения времени выполнения: Оценка производительности функции.
  5. Модификации поведения: Изменение или расширение функциональности функции.

Python предоставляет множество встроенных декораторов, таких как @staticmethod, @classmethod, @property и другие, а также вы можете создавать свои собственные декораторы в соответствии с вашими потребностями.

Теперь при инициализации модели в бд создаётся таблица с соответствующими полями:

Простой ORM для sqlite3 - 2

▍ JsonField

В sqlite3 нет JsonField по умолчанию, так что мы реализуем его на основе текста. Для начала добавляем его в базовые типы:

class JsonField(BaseType):
  field_type = 'JSON'

Json, по сути, ведёт себя, как текст, за исключением того, что при создании и изменении надо использовать json.dumps(), а при получении — json.loads().

▍ ForeignKey

В реляционных базах данных, «foreign key» (внешний ключ) — это структурный элемент, который используется для установления связей между двумя таблицами. Внешний ключ представляет собой один или несколько столбцов в одной таблице, которые связаны с первичным ключом (обычно) в другой таблице. Эта связь позволяет базе данных поддерживать целостность данных и обеспечивать связи между данными в разных таблицах.

ForeignKey должен возвращать объект заданного типа по значению заданного поля.

class ForeignKey(BaseType):
  field_type = 'FOREIGN_KEY'

  def __init__(self, object_class: type, foreign_field: str, unique: bool = False,
                null: bool = True, default=None):
      self.object_class = object_class,
      self.foreign_field = foreign_field,
      self.unique = unique
      self.null = null
      self.default = default

Я реализую его просто, как json объект с параметрами type, key, value, например:

{"type": "Box", "key": "name", "value": "BOX 1"}.

Теперь мы с помощью него можем делать так:

@simple_orm
class Box(models.Model):
  name = TextField()
  width = IntegerField()
  height = IntegerField()


@simple_orm
class Circle(models.Model):
  box = ForeignKey(object_class=Box, foreign_field='name')
  name = TextField()
  radius = IntegerField()
  data = JsonField()


box = Box.objects.add(Box('BOX 1', 1, 1))
circle = Circle.objects.add(Circle(box, "CIRCLE 1", 5, {'data': 5}))
print(circle.json()) #{'box': <__main__.Box object at 0x7f7637f6d850>, 'name': 'CIRCLE 1', 'radius': 5, 'data': {'data': 5}}

▍ Добавляем CRUD

Полный код Object

class Object:
  def __init__(self, object_type):
      self.object_type = object_type

  def add(self, obj):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      d = copy.copy(obj.__dict__)

      object_type_name = self.object_type.__name__
      for key, value in vars(self.object_type).items():
          if not key.startswith("__") and not callable(value):
              if type(value) in BASIC_TYPES:
                  continue
              if type(value) == JsonField:
                  d[key] = json.dumps(d[key])
              if type(value) == ForeignKey:
                  d[key] = json.dumps({'type': value.object_class[0].__name__, 'key': value.foreign_field[0], 'value': getattr(d[key], value.foreign_field[0])})

      insert_sql = f'INSERT INTO {object_type_name} ({", ".join(obj.__dict__.keys())}) VALUES ({", ".join(["?"] * len(obj.__dict__))});'

      values = tuple(d.values())
      cursor.execute(insert_sql, values)
      conn.commit()
      conn.close()
      return obj

  def save(self, obj):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      d = copy.copy(obj.__dict__)

      object_type_name = self.object_type.__name__
      for key, value in vars(self.object_type).items():
          if not key.startswith("__") and not callable(value):
              if type(value) in BASIC_TYPES:
                  continue
              if type(value) == JsonField:
                  d[key] = json.dumps(d[key])
              if type(value) == ForeignKey:
                  d[key] = json.dumps({'type': value.object_class[0].__name__, 'key': value.foreign_field[0], 'value': getattr(d[key], value.foreign_field[0])})

      upsert_sql = f'INSERT OR REPLACE INTO {object_type_name} ({", ".join(obj.__dict__.keys())}) VALUES ({", ".join(["?"] * len(obj.__dict__))});'

      values = tuple(d.values())
      cursor.execute(upsert_sql, values)
      conn.commit()
      conn.close()
      return obj

  def get(self, **kwargs):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      object_type_name = self.object_type.__name__

      attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]

      where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
      where_clause = ' AND '.join(where_clauses)
      select_by_attrs_sql = f'SELECT * FROM {object_type_name} WHERE {where_clause};'

      values = tuple(value for _, value in attr_value_pairs)

      cursor.execute(select_by_attrs_sql, values)
      row = cursor.fetchone()
      conn.close()

      if row:
          obj = self.object_type()
          for i, value in enumerate(row):
              if type(getattr(obj, cursor.description[i][0])) == JsonField:
                  setattr(obj, cursor.description[i][0], json.loads(value))
              elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
                  value = json.loads(value)
                  setattr(obj, cursor.description[i][0], EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
              else:
                  setattr(obj, cursor.description[i][0], value)

          return obj
      else:
          return None

  def delete(self, **kwargs):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      object_type_name = self.object_type.__name__

      attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]

      where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
      where_clause = ' AND '.join(where_clauses)
      delete_by_attrs_sql = f'DELETE FROM {object_type_name} WHERE {where_clause};'
      values = tuple(value for _, value in attr_value_pairs)

      cursor.execute(delete_by_attrs_sql, values)
      conn.commit()
      conn.close()

  def filter(self, **kwargs):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      object_type_name = self.object_type.__name__

      attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]

      where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
      where_clause = ' AND '.join(where_clauses)
      select_by_attrs_sql = f'SELECT * FROM {object_type_name} WHERE {where_clause};'

      values = tuple(value for _, value in attr_value_pairs)

      cursor.execute(select_by_attrs_sql, values)
      rows = cursor.fetchall()
      conn.close()

      objects = []
      for row in rows:
          obj = self.object_type()
          for i, value in enumerate(row):
              if type(getattr(obj, cursor.description[i][0])) == JsonField:
                  setattr(obj, cursor.description[i][0], json.loads(value))
              elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
                  value = json.loads(value)
                  setattr(obj, cursor.description[i][0],
                          EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
              else:
                  setattr(obj, cursor.description[i][0], value)

          objects.append(obj)

      return objects

  def all(self):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      object_type_name = self.object_type.__name__
      select_all_sql = f'SELECT * FROM {object_type_name};'

      cursor.execute(select_all_sql)
      rows = cursor.fetchall()
      conn.close()

      objects = []
      for row in rows:
          obj = self.object_type()
          for i, value in enumerate(row):
              if type(getattr(obj, cursor.description[i][0])) == JsonField:
                  setattr(obj, cursor.description[i][0], json.loads(value))
              elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
                  value = json.loads(value)
                  setattr(obj, cursor.description[i][0],
                          EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
              else:
                  setattr(obj, cursor.description[i][0], value)
          objects.append(obj)

      return objects

  def __createTable__(self):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()
      custom_fields = []
      for key, value in vars(self.object_type).items():
          if not key.startswith("__") and not callable(value):
              field_name = key
              field_type = value.field_type
              is_unique = value.unique
              is_null = value.null
              default_value = value.default

              if value.field_type == 'FOREIGN_KEY':
                  field_type = "TEXT"
              if value.field_type == 'JSON':
                  field_type = 'TEXT'

              field_declaration = [f'"{field_name}" {field_type}']

              if is_unique:
                  field_declaration.append('UNIQUE')
              if not is_null:
                  field_declaration.append('NOT NULL')
              if default_value is not None:
                  field_declaration.append(f'DEFAULT {default_value}')

              custom_fields.append(' '.join(field_declaration))

      create_table_sql = f'''
      CREATE TABLE IF NOT EXISTS {self.object_type.__name__} (
          {", ".join(custom_fields)}
      );
      '''
      cursor.execute(create_table_sql)

      conn.commit()
      conn.close()

▍ Список объектов

Сейчас функции all() и filter() возвращают list состоящий из объектов. Это неудобно, ведь нельзя, например, удалить все объекты. Исправим это, добавив класс ListOfObjects:

class ListOfObjects:
  def __init__(self, objects):
      self.objects = objects

  def filter(self, **kwargs):
      filtered_objects = []
      for obj in self.objects:
          if all(getattr(obj, attr, None) == value for attr, value in kwargs.items()):
              filtered_objects.append(obj)
      return ListOfObjects(filtered_objects)

  def delete(self):
      for obj in self.objects:
          obj.delete()

  def json(self):
      object_dicts = [obj.json() for obj in self.objects]
      return object_dicts

▍ Примеры

models.py

import simple_orm.models as models
from simple_orm.models import IntegerField, TextField, ForeignKey, JsonField
from simple_orm.models import simple_orm
from pathlib import Path
import sys

PATH = Path(__file__).absolute().parent.parent
sys.path.append(str(PATH))


@simple_orm
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()


@simple_orm
class Circle(models.Model):
box = ForeignKey(object_class=Box, foreign_field='name')
name = TextField()
radius = IntegerField()
data = JsonField()

main.py

from models import Box, Circle

box = Box.objects.add(Box('BOX 1', 1, 1))
circle = Circle.objects.add(Circle(box, "CIRCLE 1", 5, {'data': 5}))
print(circle.json())

print(Box.objects.filter(width=1, height=1).json())
print(Circle.objects.get(name="CIRCLE 1").json())
Box.objects.delete(name="BOX 1")
print(Box.objects.all().json())

▍ Заключение

В процессе создания своего ORM мы использовали много сложных инструментов языка python, которые помогли написать короткий и красивый код, решающий довольно сложную задачу.
Да, он не идеальный, вы можете предложить его улучшения в комментариях.

Полный код есть на https://github.com/leo-need-more-coffee/simple-orm
Библиотека для python: https://pypi.org/project/sqlite3-simple-orm
Скачать с помощью pip: pip install sqlite3-simple-orm

Автор: Лень К.

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js