P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети

в 9:00, , рубрики: Holepunch, IP, mikrotik, nat, network, overlay, p2p-сети, Peer-to-Peer, python, ruvds_статьи, socket

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 1


Многие, кто работают с интернет-сокетами в любой сфере IT, задаются вопросом о пробросе портов. Связано это с тем, что практически во всех домашних/общественных/корпоративных роутерах реализован механизм NAT, который перекрывает прямой доступ к устройствам в этих подсетях извне, общаясь с внешним интернетом от их имени.

У NAT есть киллер-фича — он представляет собой идеальный фаервол: атаки извне не могут использовать порты локальных устройств напрямую, следовательно, это решает проблему атак на сетевую уязвимость ОС.

Но, это доставляет и неудобства, например, если ты захочешь подключиться или хотя бы увидеть устройство за NAT в благих целях, то ты чисто теоретически не сможешь это сделать — у него относительно тебя нет IP-адреса.

Разнообразные сервисы работают на серверах, т. е. имеют некую ноду, которая имеет белый адрес в интернете (находится не за NAT). Все пользователи же подключаются к этому единому серверу. В таком случае проблема «невидимости» пользователей отпадает. Однако чисто серверное взаимодействие ограничивает скорость участников, так ещё и не отказоустойчиво. Если сервер упадёт, то все клиенты отправятся за ним (считаем, что это одноклеточный сервис не на всяких там kubernetes).

Как вы уже могли были догадаться, даже в реалиях, когда практически все устройства находятся за NATами, P2P реален. Когда вы являетесь участником bittorrent-раздачи, трансфер больших данных осуществляется напрямую. Как это работает? Поиск ответа на этот вопрос завёл меня в глубокие дебри, разгребая которые я написал оверлейную p2p-сеть, где трекерами являются сами её участники. Интересно? Тогда добро пожаловать под кат.

Есть концепт, использующийся во многих готовых решениях, называющийся NAT hole punching. Он позволяет общаться двум устройствам, находящимся за NATами, зная лишь внешние адреса NAT друг друга. Но ведь за одним преобразователем может быть огромное количество устройств — как же тогда это возможно?

Дисклеймер

В дальнейшем я буду намеренно оперировать немного искажёнными понятиями для простоты восприятия их человеком совершенно незнакомым с сетевой моделью OSI, транспортными протоколами и их особенностями.

К моему удивлению, простым поиском я не нашел практически никаких исчерпывающих статей или готовых скриптов, которые бы объясняли hole punching, тем более в ru сегменте. Если же вы любите зрить в корень вопроса или считаете себя уже достаточно опытным в сетевых технологиях, могу посоветовать этот материал (en) — на него опирался и я сам, когда изучал данный вопрос. Если вас действительно заинтересует этот стек после прочтения моей статьи и данного материала, то могу предложить ещё и эту весьма грамотную статью: How NAT traversal works.

P.S. Если же вы не уверены в своем техническом английском, то представляю вам этот перевод, который я предложил написать своему более умелому в технической адаптации языка другу.

Эта статья, в свою очередь, призвана доказать, что с практической точки зрения с NAT hole punching всё не так запутано и сложно, как может показаться при поиске информации на эту тему.
Будет реализован алгоритм работы P2P сети для неограниченного количества участников, которая позволит им общаться в формате форума.

Статья получилась немного перегруженной вложенностями (спойлерами). В противном случае это был бы отвратительный моно-лонгрид, кои я всей душой осуждаю. Теперь вы знаете, на что идёте. Удачи!

Оглавление

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 2

Матчасть NAT

Если вы не нуждаетесь в подобном и подробном объяснении, листайте сразу до следующего заголовка.

NAT (от англ. Network Address Translation — «преобразование сетевых адресов») — это механизм в сетях TCP/IP, позволяющий преобразовывать IP-адреса транзитных пакетов.

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 3

Простыми словами — NAT это такая программа в твоем роутере, которая позволяет тебе, будучи в его подсети, общаться с внешним миром от имени роутера. Так как у роутера есть внешний адрес, уникальный для интернета, а у тебя его нет.

Так выглядит твой обычный запрос до DNS Google:

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 4

Однако пообщаться так с компьютером друга, находящимся за своим NAT, не выйдет. Его роутер просто не может знать, кому ты адресовал пакет в его подсети, следовательно, и передавать он его никому не станет. Само собой, это можно исправить. Друг может «пробросить порт» на своем роутере. Он зайдёт на веб-майку роутера по адресу, например 192.168.0.1, и потыкает там кнопки по гайду для какой-нибудь игры. Это создаст для его NAT новое правило о соединении конкретного порта («логической дырки») снаружи роутера с портом конкретного компьютера за ним.

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 5

Увы, это не всегда возможно. Наш условный друг может быть в кафе/общаге/на работе (не иметь административного доступа к настройкам роутера). Для этого был придуман UPnP.

UPnP-пакет — специальный запрос к роутеру, который просит его пробросить порт самостоятельно. Однако роутер должен ожидать такой запрос и быть правильно настроен. Но технология эта не прижилась в массах. Постоянно включенный UPnP на роутере даёт пользователям возможность сделать сеть уязвимой, так что он отключен по умолчанию практически на всех маршрутизаторах с завода -> он не является универсальным решением.

Скорее всего, UPnP отключен на вашем роутере, что не мешает работать торрент-клиентам.

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 6Для решения этой задачи нужно присмотреться к самому NAT.
Вот, он замаскировал твой пакет под пакет твоего роутера и отправил его наружу, верно?

Но когда ты делаешь запрос к Google (пару иллюстраций назад), твой компьютер ведь получает ответ от него — значит NAT может работать и в обратную сторону?

Дело в том, что NAT оставляет лазейку для получения ответа.

Лазейкой или дыркой я далее буду называть сессию NAT для примитивности объяснения.

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 7По сути это временный проброс порта до твоего компьютера, однако с ограничением: проброс сработает только для пакетов, отправленных с того же адреса, на который был отправлен твой пакет-инициатор проброса.

Другими словами, в нашем примере пробросом может пользоваться только Google, для которого ты сам по сути открыл эту дыру, отправив ему исходящий пакет.

Это и есть сессия NAT. На самом деле, много описанное мной здесь будет актуально для большинства существующих на данный момент спецификаций трансляции, но исключения всё же имеются. Вдаваться в подробности я не стану, скажу лишь, что описанный далее способ будет точно актуален для большинства провайдеров (и даже операторов мобильной связи!).

Кстати, NAT может быть в подсети другого NAT и подобная вложенность может содержать сколь угодно уровней (см. CG-NAT).

Однако это тоже не должно стать проблемой для описанного далее метода.

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 8

Пробиваем дыры в NAT

В нашем примере Google имел статичный белый адрес с заранее проброшенным портом. Но как заставить таким образом соединиться два устройства за NATами?

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 9

Попробуем что-то сделать, основываясь на имеющихся у нас данных…

  1. Ты отправляешь пакет на белый адрес роутера друга, он транслируется NAT и исходит из порта на твоем роутере №X (ты его не знаешь!).
  2. Пакет разбивается о железный файрвол роутера друга снаружи.
  3. Туннель временно поднят на твоём внешнем порту X, но его номер никто не знает.

Ситуация кажется тупиковой, но есть ещё одна интересная способность сессии, созданной в NAT — если сейчас сессия настроена, например, на приём от google.com во внешний порт роутера X, то ты можешь послать изнутри пакет, используя эту же сессию на другой адрес (допустим, 123.123.123.123). Сессия изменит настройки и станет принимать пакеты от него, при этом приходить они будут всё ещё в старый порт на локальном компьютере.

Представим себе некий адрес 1.2.3.4, видимый из интернета, и готовый принимать подключения.

  1. Ты отправляешь пакет на 1.2.3.4, создавая тем самым сессию, открытую на внешнем порту X.
  2. 1.2.3.4 видит твой внешний порт X, когда принимает пакет (друг не видел, ведь пакет не дошёл).
  3. Владелец 1.2.3.4 неким образом сообщает тебе твой внешний порт данной сессии X.
  4. Ты отправляешь пакет на адрес друга, используя тот же локальный порт на ПК, что и для пакета к 1.2.3.4. Это заставляет пакет идти через уже имеющуюся сессию NAT.
  5. Твоя сессия NAT теперь будет принимать пакеты не от 1.2.3.4, а от твоего друга.
  6. Пакет разбивается о железный файрвол роутера друга снаружи.
  7. Кажется, что всё как в прошлой попытке, однако сейчас мы знаем X за счёт помощи от 1.2.3.4,
  8. Друг делает эти же шаги, но уже обращаясь к нашему адресу в порт X, который будет открыт вместе с сессией NAT ещё несколько секунд и настроен на приём пакетов от друга (в шаге 5).
  9. Пакет друга доходит до нашего локального компьютера!
  10. Мы видим внешний порт друга Y (аналогично X) в заголовке полученного пакета и теперь можем общаться в обе стороны сквозь трансляторы. Победа!

Осталось найти готового к благим делам 1.2.3.4, и, удивительно, он существует. И зовут этих добряков STUN-серверами.

STUN-сервер - это сервер, который отвечает на все входящие пакеты адресом и портом X, из которого они были отправлены. Это позволяет определить свой внешний IP-адрес и внешний порт X своей сессии NAT.

Их есть великое множество в открытом доступе, и составить запрос к любому из них не составляет проблемы — это мы ещё реализуем. Тут можно найти актуальный список открытых url, на которых подняты STUN: https://gist.github.com/mondain/b0ec1cf5f60ae726202e.

Мы будем использовать stun.ekiga.net, но чуть позже.

Мой скромный гайд по данной технологии на этом подходит к концу — далее идёт проектирование и написание конкретного кейса. Повторюсь, что если вы осознали вышесказанное без особых проблем или просто хотите знать больше нюансов, то вам сюда.

Топология сети

Наша оверлейная сеть будет представлять из себя неограниченное количество нод, соединённых друг с другом посредством udp-туннелей, созданных между их NAT.

Сеть НЕ будет анонимной, так как сторонний наблюдатель может отследить первоисточник сообщения, если, конечно, наблюдал за сетью достаточно долго.

Однако идея схожа с задачей на базе очередей

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 10

Если вкратце, мы моментально обрабатываем всё, что приходит нам, но отправляем сообщения только раз в некоторый период T. Если отправлять по одному сообщению сразу всем (пусть даже с мусором), и так будут делать все участники сети, то факт и момент передачи чего-либо не будет известен стороннему наблюдателю.

Мусорный (ложный) трафик является неотъемлемой частью теоретически доказуемых анонимных сетей.

Оригинал: https://habr.com/ru/articles/753902/.

Таким образом, наша сеть могла бы быть анонимной, если бы каждая нода, помимо полезного трафика слала ещё и мусор всем, но я, пожалуй, откажусь от данной условности в угоду разгрузки канала, но и не стану называть сеть анонимной.

Маршрутизация

Для полной автономности и децентрализованности сети я не стану использовать выделенные серверы, трекеры и т. д. Изначальная идея такова, что все участники равноправны, и никакое третье лицо с белым адресом им не нужно (кроме STUN).

Предположим, что есть две ноды (I) и (II), они обмениваются информацией, в том числе списками своих подключений. В это же время активна некая нода (III), которая оказалась подключённой к (I)
. Нода (II) узнала от (I), что есть некая (III). Стабилизация сети будет происходить при помощи взаимной маршрутизации. Дальнейшие действия можно описать простой схемой.

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 11

  • 0. Пассивный обмен информацией и списками подключений — будет иллюстрирован позже.
  • 1. (II) отправляет запрос на установление соединения к (III), но не может это сделать напрямую, и просит об этом (I).
  • 2. (I) перенаправляет этот запрос.
  • 3. (III) получает запрос и инициализирует HOLE PUNCH к (II), однако пробрасывается новый случайный порт и его пока не знает (II), зато сам (III) узнал его от STUN.
  • 4. (III) отправляет ответ на запрос (II) с актуальным PUNCHED-портом, перенаправляя его через (I).
  • 5. (I) перенаправляет этот ответ.
  • 6. (II) получает ответ и инициализирует HOLE PUNCH к (III) на известный порт, полученный в ответе, таким образом, порты взаимно проброшены.
  • 7. Теперь (III) имеет полноправное соединение с (II), и начинается пассивный обмен информацией, как в шаге 0.

Есть правда один нюанс — к такой сети невозможно легко присоединиться извне. Вся она основана на взаимных подключениях. Следовательно, если ты хочешь войти в сеть, ты должен найти кого-то, кто уже в ней состоит, и в ручном режиме соединиться с ним (по взаимному согласию). Далее всё сработает автоматически по схеме, приведённой выше.

Что мы имеем?

Мы получили теоретически рабочую оверлейную сеть, где устройства могут общаться друг с другом без каких-либо препятствий, как если бы они находились в одной LAN (за исключением свободы выбора порта).

Принцип обмена сообщениями

  • Каждое сообщение является обычной строкой.
  • Каждое сообщение имеет известный и однозначный хеш.
  • Reply на чужое сообщение осуществляется по его хешу.

  • Весь форум — это один большой python set (aka множество):
    • Двух одинаковых сообщений быть не может.
    • Каждый участник синхронизирует свое множество с чужими.

  • Структуру форума можно визуализировать как дерево, где узлы это сообщения.
    • Отвечая на чужое сообщение, ты создаешь новую ветку от него.
    • Продолжая цепочку ответов, ты удлиняешь одну из веток.

Поскольку каждый участник хранит у себя свою версию содержимого форума, им нужно как-то договариваться об истинной версии. Но, тут всё просто: истинная версия набора сообщений — это та, которая больше. Другими словами, если у кого-то из твоих соседей имеются сообщения, которых нет у тебя, то ты должен их скачать и дописать себе.

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 12

Примерная схема синхронизации данных

  • Каждый участник хранит сет сообщений и информацию о существующих сессиях.

Подбираясь к реализации, скажу, что можно просто суммировать свой сет сообщений с чужим.

Не забываем, что сессия NAT умрёт, если через неё несколько минут не будут передаваться данные, так что нужна будет пассивная отправка Keep Alive-пакетов.

На этом можно закончить планирование и перейти к написанию Proof of Concept-инструментов на Python для подтверждения теории практикой.

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 13

Реализация на Python

Заранее прошу прощения у всех, кто читает с телефона. Весь код проекта вышел достаточно массивным, так что я не смогу комментировать каждую строку отдельно. Будут приводиться логически разделённые куски, которые будет не очень удобно анализировать целиком через маленькое окошко фрейма с кодом по центру вашего небольшого экрана.

Импорт библиотек, логгер и небольшие формальности

from pathlib import Path
import time
import os
import sys


def cls():
    os.system("cls" if os.name == "nt" else "clear")


try:
    scriptname = Path(__file__).name
except:
    scriptname = "CnC.py"
try:
    verfrom = time.ctime(os.path.getmtime(__file__))
except:
    verfrom = "!No time!"


import threading
import socket
import random
import hashlib
import json
import logging
import pdb


class Debug(pdb.Pdb):
    def __init__(self, *args, **kwargs):
        super(Debug, self).__init__(*args, **kwargs)
        self.prompt = "CnC Debug Shell >>> "

    def shell(self):
        self.set_trace()


debug = Debug()
# logging.warn(f"Event logging is enabled. You can see it in {scriptname}.log")
logging.basicConfig(
    format="%(asctime)s %(message)s", level=logging.DEBUG, filename=f"{scriptname}.log"
)
logging.info("Hello world!")
logging.getLogger().setLevel(logging.DEBUG)


Теперь нужно подготовить простые функции для упрощения дальнейшей жизни:

def dat_to_bytes(diction: dict) -> bytes:
    return json.dumps(diction).encode("cp866")


def bytes_to_dat(byte: bytes) -> dict:
    return json.loads(byte.decode("cp866"))


def checksum(b):
    return hashlib.blake2s(b, digest_size=4).hexdigest()


_alreadyused = set()


def randomport():
    global _alreadyused
    p = random.randint(16000, 65535)
    while p in _alreadyused:
        p = random.randint(16000, 65535)
    _alreadyused.update({p})
    return p

Первые две превращают словарь в байты, и обратно.

Далее идёт взятие хэша и функция, дающая каждый раз уникальный порт.

Функция STUN() принимает локальный порт как обязательный аргумент и отправляет из него STUN request на сервер stun.ekiga.net.

Запрос представляет из себя фиксированную последовательность байт, так что я не вдаваясь в подробности дампнул его из Wireshark.

Возвращает функция тот самый порт X из начала статьи.

def STUN(port, host="stun.ekiga.net"):
    logging.debug(f"STUN request via {host}")
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(("0.0.0.0", port))
    sock.setblocking(0)
    server = socket.gethostbyname(host)
    work = True
    while work:
        sock.sendto(
            b"x00x01x00x00!x12xa4Bxd6x85yxb8x11x030x06xixdfB",
            (server, 3478),
        )
        for i in range(20):
            try:
                ans, addr = sock.recvfrom(2048)
                work = False
                break
            except:
                time.sleep(0.01)

    sock.close()
    return socket.inet_ntoa(ans[28:32]), int.from_bytes(ans[26:28], byteorder="big")

Пара вспомогательных функций для кодировки ip:port в одно число

def addr2int(ip, port: int):
    binport = bin(port)[2:].rjust(16, "0")
    binip = "".join([bin(int(i))[2:].rjust(8, "0") for i in ip.split(".")])
    return int(binip + binport, 2)


def int2addr(num):
    num = bin(num)[2:].rjust(48, "0")
    print(num)
    num = [
        str(int(i, 2))
        for i in [num[0:8], num[8:16], num[16:24], num[24:32], num[32:48]]
    ]
    return ".".join(num[0:4]), int(num[4])

Dump и Load будут сохранять/загружать текущий чекпоинт сети:

def data_dump():
    with open(f"{scriptname}.chat-savefile.json", "w") as f:
        json.dump(data, f)


def data_load():
    global data
    try:
        with open(f"{scriptname}.chat-savefile.json", "r") as f:
            data = json.load(f)
    except:
        logging.error(f"No save file exists! New one created")
        with open(f"{scriptname}.chat-savefile.json", "w") as f:
            json.dump({}, f)

▍ Основной код ноды

Класс, описывающий поведение сессии с другой нодой:

class Session:
    def __init__(self):
        # self.prefix="IDL"
        self.immortal = False
        self.local_port = randomport()
        for i in range(10):
            self.public_ip, self.public_port = STUN(self.local_port)
        self.socket = None
        self.client = None
        self.thread = None
        logging.info(f'"{self.public_ip}",{self.public_port}')

    def make_connection(self, ip, port, timeout=10):
        logging.debug(f"Start waiting for handshake with {ip}:{port}")
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.bind(("0.0.0.0", self.local_port))
        sock.setblocking(0)
        while True:
            sock.sendto(b"Con. Request!", (ip, port))
            time.sleep(2)
            try:
                ans, addr = sock.recvfrom(9999)
                sock.sendto(b"Con. Request!", (ip, port))
                sock.close()
                sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                sock.bind(("0.0.0.0", self.local_port))
                sock.setblocking(0)
                self.client = (ip, port)
                self.socket = sock
                logging.debug(f"Hole with {self.client} punched!")
                break
            except Exception as e:
                assert timeout > 0
                timeout -= 1
                logging.debug(f"No handshake with {ip}:{port} yet...")

Из важного — тут происходит стабилизация туннеля в NAT. Вам может резать глаза строка for i in range(10): self.public_ip, self.public_port = STUN(self.local_port), которая повторяет запрос к STUN 10 РАЗ. Это является вынужденным костылём для увеличения срока жизни NAT-сессии, ведь зачастую роутеры создают её с небольшим временем жизни (порядка 10 секунд) и вы с «другом» можете просто не успеть синхронизировать подключения. При этом таймаут до смерти сессии растёт при прохождении через неё пакетов, для этого, собственно, и нужен спам STUN-запросами.

▍ Протоколы

Дальше случится очень много кода — это реализация описанных в первой части статьи механизмов взаимодействия между клиентами.

Введу такое понятие, как «протокол». Определяться он будет первыми 3 байтами UDP-пакета и определять, с какой целью оно было нам отправлено.

Реализация протоколов

Внимание: Впереди может быть очень длинный if-elif-else, который можно было бы заменить на match или замутить нечто высокоуровневое, масштабируемое и абстрактное. Как вы уже могли догадаться, автор не стал этого делать...

Описываем цикл жизни ноды:

def backlife_cycle(self, freq=1):
    global sessions
    if self.immortal:
        logging.warning(f"{self.client} session beacame immortal")
        self.life_cycle = aegis(self.life_cycle)
    th = threading.Thread(target=self.life_cycle, args=(freq,))
    th.start()
    self.thread = th
    logging.warning(f"Session with {self.client} stabilized!")
    # sessions.append(self)

def life_cycle(self, freq=1):
    global data
    global sessions
    global pool
    c = 0
    while 1:
        if len(pool):
            pref = pool.pop(0)
        else:
            pref = b"KPL"

        self.socket.sendto(pref, self.client)  # Keep-alive
        time.sleep(max(random.gauss(1 / freq, 3), 0))

        while True:
            try:
                ans, reply_addr = self.socket.recvfrom(9999)
                logging.debug(
                    f"{self.client[0]}: Recieved {ans[:3].decode('cp866')} from {reply_addr}: {ans}"
                )
            except:
                break

Каждая итерация её жизненного цикла отправляет KPL (Keep Alive) пакет. Теперь опишем поведение при получении KPL-пакета.

На каждый из них нам реагировать, само собой, не стоит.

  • Раз в 10 полученных KPL мы будем запрашивать синхронизацию датасетов.
  • Раз в 33 полученных KPL мы будем запрашивать синхронизацию подключений.

if ans[:3] == b"KPL":
    c += 1
    if c % 10 == 0:
        logging.debug(
            f"{self.client[0]}: Requesting Datagram     {c//10}"
        )
        self.socket.sendto(b"RQD", self.client)
        c += 1
    elif c % 33 == 0:
        logging.debug(
            f"{self.client[0]}: Requesting Session List {c//33}"
        )
        self.socket.sendto(b"RQS", self.client)
        c += 1

Поротоколы, необходимые для частичной синхронизации при помощи сравнения хешей.
По умолчанию я их не использовал, так как мои тесты не заходили дальше 4 устройств и десятка сообщений. Не могу быть уверен, что они отказоустойчивы, так что пока они отключены.


# ----------------------------------------Hashes (keys) sync (disabled now)----------------------------------------
elif ans[:3] == b"RQH":
    logging.debug(f"{self.client[0]}: Sending hashes")
    self.socket.sendto(
        b"HAS" + dat_to_bytes(list(data.keys())), self.client
    )

elif ans[:3] == b"HAS":
    missed_messages.update(
        set(bytes_to_dat(ans[3:])) - set(data.keys())
    )

Непосредственно протоколы синхронизации сообщений:

# ----------------------------------------Data sync----------------------------------------
elif ans[:3] == b"RQD":
    if ans[3:] != b"":
        logging.debug(f"{self.client[0]}: Sending specific datagram")
        n = {}
        r = set(bytes_to_dat(ans[3:]))
        for i in r:
            if i in data:
                n.update({i: data[i]})
        logging.debug(f"{self.client[0]}: Sending {n}")
        self.socket.sendto(b"DAT" + dat_to_bytes(n), self.client)
    else:
        logging.debug(f"{self.client[0]}: Sending datagram")
        self.socket.sendto(b"DAT" + dat_to_bytes(data), self.client)

elif ans[:3] == b"DAT":
    data.update(bytes_to_dat(ans[3:]))

Обмен списками подключений и подключение к ещё одному случайному участнику, с которым у нас пока нет сессии:

# ----------------------------------------IP list sync-------------------------------------
elif ans[:3] == b"RQS":
    logging.debug(f"{self.client[0]}: Sending Session List")
    sess = [i.client[0] if i.client else None for i in sessions]
    sess = set(sess)
    sess = sess - {None}
    sess = list(sess)
    sess.remove(self.client[0])
    self.socket.sendto(b"SES" + dat_to_bytes(sess), self.client)

elif ans[:3] == b"SES":
    sess = [i.client[0] if i.client else None for i in sessions]
    sess = set(sess)
    sess = sess - {None}
    logging.debug(
        f"{self.client[0]}: My sessions: {sess} Recieved: {set(bytes_to_dat(ans[3:]))} New: {list(set(bytes_to_dat(ans[3:]))-sess)}"
    )
    uncon = list(set(bytes_to_dat(ans[3:])) - sess)
    if not uncon:
        continue
    adr = random.choice(uncon)
    s = Session()
    sessions.append(s)
    self.socket.sendto(
        b"HOP"
        + socket.inet_aton(adr)
        + b"CON"
        + s.public_ip.encode("cp866")
        + b":"
        + str(s.public_port).encode("cp866"),
        self.client,
    )

Для маршрутизации внутри сети я использую протокол, который назвал HOP. Он предпологает, что каждый готов послужить ретранслятором для другого.

Допустим, есть пакет X, в котором есть заголовок (id протокола) и данные.
Мы хотим его отправить на 2.2.2.2, но не имеем с ним сессии. Для этого мы узнаём, кто же с ним эту самую сессию имеет (допустим 3.3.3.3). Мы отправляем к 3.3.3.3 пакет с таким содержанием: HOP+2.2.2.2+X.

Описываем принцип работы HOP:

# ----------------------------------------HOP Tracking-------------------------------------
elif ans[:3] == b"HOP":
    sess = [i.client[0] if i.client else None for i in sessions]

    ip = socket.inet_ntoa(ans[3:7])
    if ip in sess:
        s = sessions[sess.index(ip)]
        s.socket.sendto(ans[7:], s.client)

elif ans[:3] == b"CON":
    s = Session()
    adr, prt = ans[3:].decode("cp866").split(":")
    self.socket.sendto(
        b"HOP"
        + socket.inet_aton(adr)
        + b"RDY"
        + prt.encode("cp866")
        + b":"
        + s.public_ip.encode("cp866")
        + b":"
        + str(s.public_port).encode("cp866"),
        self.client,
    )
    try:
        s.make_connection(adr, int(prt))
        s.backlife_cycle(1)
        sessions.append(s)
    except:
        logging.error(f"{self.client[0]}: Connect initiation timeout!")

elif ans[:3] == b"RDY":
    myprt, adr, prt = ans[3:].decode("cp866").split(":")
    sess = [i.public_port for i in sessions]

    if int(myprt) in sess:
        s = sessions[sess.index(int(myprt))]
        try:
            s.make_connection(adr, int(prt))
            s.backlife_cycle(1)
        except:
            logging.error(
                f"{self.client[0]}: Connect stabilization timeout!"
            )
            sessions.remove(s)

Последний блок кода — отключенный механизм трекинга, который я писал до того, как придумал HOP. Извиняюсь за то, что вы его тут видите)

# ----------------------------------------Disabled Trash-----------------------------------
elif ans[:3] == b"TRK":
    adr, prt = ans[3:].decode("cp866").split(",")
    sess = [i.client[0] for i in sessions]
    sess.remove(self.client[0])
    s = sessions[sess.index(adr)]

    s.socket.sendto(
        b"CN0"
        + f"{self.client[0].encode('cp866')}:{prt.encode('cp866')}",
        s.client,
    )

elif ans[:3] == b"CN0":
    s = Session()
    self.socket.sendto(
        b"CN1"
        + f"{s.public_ip.encode('cp866')}:{str(s.public_port).encode('cp866')}",
        self.client,
    )
    adr, prt = ans[3:].decode("cp866").split(":")
    s.make_connection(adr, int(prt))
    sessions.append(s)
elif ans[:3] == b"TRK":
    pass

Закрываем врата в ад из бесконечного elif, обрабатывая любой мусор, который мы можем случайно получить при помощи else.

else:
    logging.warning(f"{self.client[0]}: Malformed! !!!{ans}!!!")

В сухом остатке: в этом сниппете обрабатываются протоколы взаимодействия, такие как:

  1. KPL — Keep Alive
  2. RQH — Request hashes
  3. HAS  — Пакет с хэшами
  4. RQD  — Request dataset
  5. DAT  — пакет с сообщениями
  6. RQS  — Request Sessions
  7. SES  — пакет с сессиями
  8. HOP  — Заголовок, который означает, что пакет нужно перенаправить (после него идёт адрес и любой другой пакет)
  9. CON  — Запрос подключения
  10. RDY  — Ответ о готовности к взаимному подключению

▍ Интерфейс

Теперь осталось завернуть этот функционал в красивый консольный интерфейс и по максимуму убрать ручной ввод кода пользователем.

Код милого интерфейса и визуализации сообщений в виде дерева

Для начала выведем пользователю базовую информацию. Делаем что-то вроде экрана загрузки.

def silent():
    logging.disable()


logging.warning(
    f"Logger for DEBUG is running! to diable it run {scriptname.split('.')[0]}.silent()"
)
print(
    f" Welcome to {scriptname} version from {verfrom} - event logging here: {scriptname}.log"
)
print(
    f"   Your Python version is {'.'.join([str(i) for i in sys.version_info[0:3]])}, branch {sys.version_info[3].upper()}"
)
try:
    c = STUN(12346)
    print(f"     Internet connection is stable on {socket.gethostname()}!")
    if 1:
        import requests

        r2 = requests.get(f"http://ipinfo.io/{c[0]}").json()["org"]
        print(
            f"       Gray  (local)  IP is {socket.gethostbyname(socket.gethostname())}"
        )
        print(f"       While (public) IP is {c[0]} - {r2}")
    else:
        print(
            f"       Gray  (local)  IP is {socket.gethostbyname(socket.gethostname())}"
        )
        print(f"       While (public) IP is {c[0]}")
except Exception as e:
    logging.critical(e)
    print(f" No internet connection found! I cant work offline!")

Функция для рекурсивного отображения нашей структуры (дерева):


import re

def get_tree(data):
    data = data.copy()
    data.update({"00000000": "Root"})
    # data.update({"11111111":"Root"})
    data.update({"ffffffff": "Lost"})
    dct = {}

    for key, message in data.items():
        if key + ": " + message not in dct:
            dct.update({key + ": " + message: []})

    for key, message in data.items():
        rep = [i[1:] for i in re.findall("@+[a-z0-9]{8}", message)]

        if len(rep) > 0:
            rep = rep[0]
        else:
            rep = "00000000"

        if rep not in data:
            rep = "ffffffff"
        if rep + ": " + data[rep] in dct:
            dct[rep + ": " + data[rep]].append(key + ": " + message)
        else:
            pass
            # dct.update({rep+": "+data[rep]:[key+": "+message]})
    # print(dct)
    dct["00000000: Root"].remove("00000000: Root")
    for k, v in list(dct.items()).copy():
        if v == []:
            dct.pop(k)
    return dct

Пишем MAIN с самим интерфейсом:


if __name__ == "__main__":
    data_load()
    print()
    inp = input(
        "You must to connect with one node (friend), which is already in net. (Press Enter when you and your friend are ready)"
    )
    s = Session()
    print()
    print(f"Send this to your friend {addr2int(s.public_ip,int(s.public_port))}")
    print()
    i, p = int2addr(int(input("Input the number from your friend: ")))
    print("Waiting for your friend")
    s.make_connection(i, p)
    sessions.append(s)
    s.backlife_cycle(1)
    while 1:
        print()
        print("╔════════════> Connect & Chan Panel <═════════════")
        print("╟> Type and enter your post")
        print("╟> (Empty input + Enter) to see Root")
        print("╟> ($00000000) to see specific thread")
        print("╟> (!) for new connection")
        print("╟> (#) to save checkpoint")
        print("╟> (`) dev console")
        print("╚══════════════════════════>")
        inp = input("Input: ")
        print("<════════════> Connect & Chan Info <════════════>")
        print(
            f" Connected Nodes: {len(sessions)} | Size of Chan tree: {len(dat_to_bytes(data))} bytesn"
        )

        if inp == "":
            ptree("00000000: Root", get_tree(data))

        elif inp == "!":
            s = Session()
            print(
                f"Send this to your friend {addr2int(s.public_ip,int(s.public_port))}"
            )
            print()
            inp = input("Input the number from your friend (Empty to cancel): ")
            if inp:
                i, p = int2addr(int(inp))
                print("Waiting for your friend")
                try:
                    s.make_connection(i, p)
                    sessions.append(s)
                    s.backlife_cycle(1)
                except:
                    print("Timeout(")
        elif inp == "`":
            try:
                debug.shell()
            except:
                pass
        elif inp == "#":
            data_dump()
        elif inp[0] == "$":
            if inp[1:] in data:
                ptree(f"{inp[1:]}: {data[inp[1:]]}", get_tree(data))
            else:
                print(f"There is no message with id [{inp[1:]}]")
        elif inp:
            data_add(inp)
        else:
            ptree("00000000: Root", get_tree(data))
        input("nnEnter to reload page")
        cls()


Кажется, всё готово. Теперь создаём сессию в google colab, которая будет играть роль нашего друга (тут должна быть шутка про отсутствие реальных).

Демонстрация

Вот что видит каждый пользователь после запуска

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 14

Обмениваемся номерами с устройством, которое уже состоит в сети

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 15

Так выглядит интерфейс управления

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 16

Попробуем написать что-нибудь

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 17

Видим, что друг получил данные

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 18

Ответим на уже существующее сообщение

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 19

И ещё отправим сообщение в ответ на фейковый 12312312 хэш.

Видим, что наш друг видит оба (а потерянное сообщение прикрепилось к ветке Lost):

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 20

Если мы рано или поздно скачаем сообщение с хэшем 12312312, то оно станет родительским для нашего.

Можно посмотреть конкретный трэд вместо Root:

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 21

Я пробовал подключать три и более пользователей. Через некоторое время каждый был подключен к каждому и они обменивались сообщениями — мне кажется, что это успех.

Логи ноды (я же зачем-то перенёс всё на логгер, верно?)

2023-07-27 21:05:30,651 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:34,622 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:34,622 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:37,502 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:37,502 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:37,502 34.74.93.102: Requesting Datagram     60
2023-07-27 21:05:37,502 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:37,502 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:39,647 34.74.93.102: Recieved DAT from ('34.74.93.102', 26698): b'DAT{"54b2b91c": "hello", "f41fe2a9": "Reply to @54b2b91c!", "894df995": "Malformed, lol @12345678", "1508abcd": "text", "0f0961d5": "#"}'
2023-07-27 21:05:39,647 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:44,532 34.74.93.102: Recieved RQD from ('34.74.93.102', 26698): b'RQD'
2023-07-27 21:05:44,532 34.74.93.102: Sending datagram
2023-07-27 21:05:44,532 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:48,152 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:05:53,749 34.74.93.102: Recieved RQD from ('34.74.93.102', 26698): b'RQD'
2023-07-27 21:05:53,749 34.74.93.102: Sending datagram
2023-07-27 21:05:53,749 34.74.93.102: Recieved RQS from ('34.74.93.102', 26698): b'RQS'
2023-07-27 21:05:53,749 34.74.93.102: Sending Session List
2023-07-27 21:05:53,750 34.74.93.102: Recieved RQD from ('34.74.93.102', 26698): b'RQD'
2023-07-27 21:05:53,750 34.74.93.102: Sending datagram
2023-07-27 21:05:53,750 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:01,225 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:03,857 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:03,857 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:03,857 34.74.93.102: Requesting Datagram     61
2023-07-27 21:06:07,235 34.74.93.102: Recieved DAT from ('34.74.93.102', 26698): b'DAT{"54b2b91c": "hello", "f41fe2a9": "Reply to @54b2b91c!", "894df995": "Malformed, lol @12345678", "1508abcd": "text", "0f0961d5": "#"}'
2023-07-27 21:06:07,235 34.74.93.102: Recieved RQD from ('34.74.93.102', 26698): b'RQD'
2023-07-27 21:06:07,235 34.74.93.102: Sending datagram
2023-07-27 21:06:07,235 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:11,860 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:11,860 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:14,504 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:24,636 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:24,636 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:24,636 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:24,636 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:28,006 34.74.93.102: Recieved RQD from ('34.74.93.102', 26698): b'RQD'
2023-07-27 21:06:28,006 34.74.93.102: Sending datagram
2023-07-27 21:06:28,006 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:28,006 34.74.93.102: Requesting Datagram     62
2023-07-27 21:06:28,007 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:28,007 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:28,007 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:37,933 34.74.93.102: Recieved DAT from ('34.74.93.102', 26698): b'DAT{"54b2b91c": "hello", "f41fe2a9": "Reply to @54b2b91c!", "894df995": "Malformed, lol @12345678", "1508abcd": "text", "0f0961d5": "#"}'
2023-07-27 21:06:37,933 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:37,933 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:37,933 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:37,933 34.74.93.102: Requesting Session List 19
2023-07-27 21:06:37,933 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:37,933 34.74.93.102: Recieved KPL from ('34.74.93.102', 26698): b'KPL'
2023-07-27 21:06:37,933 34.74.93.102: Requesting Datagram     63
2023-07-27 21:06:47,334 34.74.93.102: Recieved SES from ('34.74.93.102', 26698): b'SES["34.147.33.206"]'
2023-07-27 21:06:47,334 34.74.93.102: My sessions: {'34.74.93.102'} Recieved: {'34.147.33.206'} New: ['34.147.33.206']
2023-07-27 21:06:47,334 STUN request via stun.ekiga.net

Видим практическое подтверждение всего, что было сказано раньше. Ноды обмениваются KPL, периодически сверяя датасеты и списки сессий.

As conclusion

image

Под конец можно сказать, что результат получился даже лучше, чем я изначально рассчитывал. Простая пробрасывалка портов превратилась в полноценное юзабельное приложение. На самом деле, ещё есть что доработать и прикрутить — совершенству нет предела. Как минимум можно распространять вместе со своим ip ещё и публичный ключ от асимметричной пары и передавать данные не в сыром виде, а зашифрованные этой парой так, чтобы прочитать их мог только желаемый получатель (только ваш друг). Можно прикрутить интерфейс поверх какого-нибудь electronа и получить готовое десктопное приложение…

Весь исходный код CnC.py

from pathlib import Path
import time
import os
import sys


def cls():
    os.system("cls" if os.name == "nt" else "clear")


try:
    scriptname = Path(__file__).name
except:
    scriptname = "CnC.py"
try:
    verfrom = time.ctime(os.path.getmtime(__file__))
except:
    verfrom = "!No time!"


import threading
import socket
import random
import hashlib
import json
import logging
import pdb


class Debug(pdb.Pdb):
    def __init__(self, *args, **kwargs):
        super(Debug, self).__init__(*args, **kwargs)
        self.prompt = "CnC Debug Shell >>> "

    def shell(self):
        self.set_trace()


debug = Debug()
# logging.warn(f"Event logging is enabled. You can see it in {scriptname}.log")
logging.basicConfig(
    format="%(asctime)s %(message)s", level=logging.DEBUG, filename=f"{scriptname}.log"
)
logging.info("Hello world!")
logging.getLogger().setLevel(logging.DEBUG)


def get_files():
    matches = []
    for root, dirs, files in os.walk(os.getcwd()):
        for file in files:
            if file.endswith(".html"):
                matches.append(os.path.splitext(file)[0])
    return matches


def dat_to_bytes(diction: dict) -> bytes:
    return json.dumps(diction).encode("cp866")


def bytes_to_dat(byte: bytes) -> dict:
    return json.loads(byte.decode("cp866"))


def checksum(b):
    return hashlib.blake2s(b, digest_size=4).hexdigest()


_alreadyused = set()


def randomport():
    global _alreadyused
    p = random.randint(16000, 65535)
    while p in _alreadyused:
        p = random.randint(16000, 65535)
    _alreadyused.update({p})
    return p


def aegis(f):
    def wr():
        while 1:
            try:
                f()
                break
            except Exception as e:
                logging.critical(e)

    return wr


def STUN(port, host="stun.ekiga.net"):
    logging.debug(f"STUN request via {host}")
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(("0.0.0.0", port))
    sock.setblocking(0)
    server = socket.gethostbyname(host)
    work = True
    while work:
        sock.sendto(
            b"x00x01x00x00!x12xa4Bxd6x85yxb8x11x030x06xixdfB",
            (server, 3478),
        )
        for i in range(20):
            try:
                ans, addr = sock.recvfrom(2048)
                work = False
                break
            except:
                time.sleep(0.01)

    sock.close()
    return socket.inet_ntoa(ans[28:32]), int.from_bytes(ans[26:28], byteorder="big")


def addr2int(ip, port: int):
    binport = bin(port)[2:].rjust(16, "0")
    binip = "".join([bin(int(i))[2:].rjust(8, "0") for i in ip.split(".")])
    return int(binip + binport, 2)


def int2addr(num):
    num = bin(num)[2:].rjust(48, "0")
    print(num)
    num = [
        str(int(i, 2))
        for i in [num[0:8], num[8:16], num[16:24], num[24:32], num[32:48]]
    ]
    return ".".join(num[0:4]), int(num[4])


pool = []  # Pool for commands to any session (like RQD to initiate data update)

missed_messages = (
    set()
)  # Storage for hashes of required but missed messages (to request them later)

data = {}
sessions = []


def mass_command(c):
    global pool
    pool.append(c)


def data_add(item):
    data.update({checksum(item.encode("cp866")): item})


def data_dump():
    with open(f"{scriptname}.chat-savefile.json", "w") as f:
        json.dump(icm2.data, f)


def data_load():
    global data
    try:
        with open(f"{scriptname}.chat-savefile.json", "r") as f:
            data = json.load(f)
    except:
        logging.error(f"No save file exists! New one created")
        with open(f"{scriptname}.chat-savefile.json", "w") as f:
            json.dump({}, f)


class Session:
    def __init__(self):
        # self.prefix="IDL"
        self.immortal = False
        self.local_port = randomport()
        for i in range(10):
            self.public_ip, self.public_port = STUN(self.local_port)
        self.socket = None
        self.client = None
        self.thread = None
        logging.info(f'"{self.public_ip}",{self.public_port}')

    def make_connection(self, ip, port, timeout=10):
        logging.debug(f"Start waiting for handshake with {ip}:{port}")
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.bind(("0.0.0.0", self.local_port))
        sock.setblocking(0)
        while True:
            sock.sendto(b"Con. Request!", (ip, port))
            time.sleep(2)
            try:
                ans, addr = sock.recvfrom(9999)
                sock.sendto(b"Con. Request!", (ip, port))
                sock.close()
                sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                sock.bind(("0.0.0.0", self.local_port))
                sock.setblocking(0)
                self.client = (ip, port)
                self.socket = sock
                logging.debug(f"Hole with {self.client} punched!")
                break
            except Exception as e:
                assert timeout > 0
                timeout -= 1
                logging.debug(f"No handshake with {ip}:{port} yet...")

    def backlife_cycle(self, freq=1):
        global sessions
        if self.immortal:
            logging.warning(f"{self.client} session beacame immortal")
            self.life_cycle = aegis(self.life_cycle)
        th = threading.Thread(target=self.life_cycle, args=(freq,))
        th.start()
        self.thread = th
        logging.warning(f"Session with {self.client} stabilized!")
        # sessions.append(self)

    def life_cycle(self, freq=1):
        global data
        global sessions
        global pool
        c = 0
        while 1:
            if len(pool):
                pref = pool.pop(0)
            else:
                pref = b"KPL"

            self.socket.sendto(pref, self.client)  # Keep-alive
            time.sleep(max(random.gauss(1 / freq, 3), 0))

            while True:
                try:
                    ans, reply_addr = self.socket.recvfrom(9999)
                    logging.debug(
                        f"{self.client[0]}: Recieved {ans[:3].decode('cp866')} from {reply_addr}: {ans}"
                    )
                except:
                    break

                if ans[:3] == b"KPL":
                    c += 1
                    if c % 10 == 0:
                        logging.debug(
                            f"{self.client[0]}: Requesting Datagram     {c//10}"
                        )
                        self.socket.sendto(b"RQD", self.client)
                        c += 1
                    elif c % 33 == 0:
                        logging.debug(
                            f"{self.client[0]}: Requesting Session List {c//33}"
                        )
                        self.socket.sendto(b"RQS", self.client)
                        c += 1

                # ----------------------------------------Hashes (keys) sync (disabled now)----------------------------------------
                elif ans[:3] == b"RQH":
                    logging.debug(f"{self.client[0]}: Sending hashes")
                    self.socket.sendto(
                        b"HAS" + dat_to_bytes(list(data.keys())), self.client
                    )

                elif ans[:3] == b"HAS":
                    missed_messages.update(
                        set(bytes_to_dat(ans[3:])) - set(data.keys())
                    )

                # ----------------------------------------Data sync----------------------------------------
                elif ans[:3] == b"RQD":
                    if ans[3:] != b"":
                        logging.debug(f"{self.client[0]}: Sending specific datagram")
                        n = {}
                        r = set(bytes_to_dat(ans[3:]))
                        for i in r:
                            if i in data:
                                n.update({i: data[i]})
                        logging.debug(f"{self.client[0]}: Sending {n}")
                        self.socket.sendto(b"DAT" + dat_to_bytes(n), self.client)
                    else:
                        logging.debug(f"{self.client[0]}: Sending datagram")
                        self.socket.sendto(b"DAT" + dat_to_bytes(data), self.client)

                elif ans[:3] == b"DAT":
                    data.update(bytes_to_dat(ans[3:]))

                # ----------------------------------------IP list sync-------------------------------------
                elif ans[:3] == b"RQS":
                    logging.debug(f"{self.client[0]}: Sending Session List")
                    sess = [i.client[0] if i.client else None for i in sessions]
                    sess = set(sess)
                    sess = sess - {None}
                    sess = list(sess)
                    sess.remove(self.client[0])
                    self.socket.sendto(b"SES" + dat_to_bytes(sess), self.client)

                elif ans[:3] == b"SES":
                    sess = [i.client[0] if i.client else None for i in sessions]
                    sess = set(sess)
                    sess = sess - {None}
                    logging.debug(
                        f"{self.client[0]}: My sessions: {sess} Recieved: {set(bytes_to_dat(ans[3:]))} New: {list(set(bytes_to_dat(ans[3:]))-sess)}"
                    )
                    uncon = list(set(bytes_to_dat(ans[3:])) - sess)
                    if not uncon:
                        continue
                    adr = random.choice(uncon)
                    s = Session()
                    sessions.append(s)
                    self.socket.sendto(
                        b"HOP"
                        + socket.inet_aton(adr)
                        + b"CON"
                        + s.public_ip.encode("cp866")
                        + b":"
                        + str(s.public_port).encode("cp866"),
                        self.client,
                    )
                # ----------------------------------------HOP Tracking-------------------------------------
                elif ans[:3] == b"HOP":
                    sess = [i.client[0] if i.client else None for i in sessions]

                    ip = socket.inet_ntoa(ans[3:7])
                    if ip in sess:
                        s = sessions[sess.index(ip)]
                        s.socket.sendto(ans[7:], s.client)

                elif ans[:3] == b"CON":
                    s = Session()
                    adr, prt = ans[3:].decode("cp866").split(":")
                    self.socket.sendto(
                        b"HOP"
                        + socket.inet_aton(adr)
                        + b"RDY"
                        + prt.encode("cp866")
                        + b":"
                        + s.public_ip.encode("cp866")
                        + b":"
                        + str(s.public_port).encode("cp866"),
                        self.client,
                    )
                    try:
                        s.make_connection(adr, int(prt))
                        s.backlife_cycle(1)
                        sessions.append(s)
                    except:
                        logging.error(f"{self.client[0]}: Connect initiation timeout!")

                elif ans[:3] == b"RDY":
                    myprt, adr, prt = ans[3:].decode("cp866").split(":")
                    sess = [i.public_port for i in sessions]

                    if int(myprt) in sess:
                        s = sessions[sess.index(int(myprt))]
                        try:
                            s.make_connection(adr, int(prt))
                            s.backlife_cycle(1)
                        except:
                            logging.error(
                                f"{self.client[0]}: Connect stabilization timeout!"
                            )
                            sessions.remove(s)

                # ----------------------------------------Disabled Trash-----------------------------------
                elif ans[:3] == b"TRK":
                    adr, prt = ans[3:].decode("cp866").split(",")
                    sess = [i.client[0] for i in sessions]
                    sess.remove(self.client[0])
                    s = sessions[sess.index(adr)]

                    s.socket.sendto(
                        b"CN0"
                        + f"{self.client[0].encode('cp866')}:{prt.encode('cp866')}",
                        s.client,
                    )

                elif ans[:3] == b"CN0":
                    s = Session()
                    self.socket.sendto(
                        b"CN1"
                        + f"{s.public_ip.encode('cp866')}:{str(s.public_port).encode('cp866')}",
                        self.client,
                    )
                    adr, prt = ans[3:].decode("cp866").split(":")
                    s.make_connection(adr, int(prt))
                    sessions.append(s)
                elif ans[:3] == b"TRK":
                    pass
                else:
                    logging.warning(f"{self.client[0]}: Malformed! !!!{ans}!!!")


# It is like a legacy code, bruh: (https://stackoverflow.com/questions/51903172/how-to-display-a-tree-in-python-similar-to-msdos-tree-command)
def ptree(start, tree, indent_width=4):
    def _ptree(start, parent, tree, grandpa=None, indent=""):
        if parent != start:
            if grandpa is None:  # Ask grandpa kids!
                print(parent, end="")
            else:
                print(parent)
        if parent not in tree:
            return 
        for child in tree[parent][:-1]:
            print(indent + "├" + "─" * indent_width, end="")
            _ptree(start, child, tree, parent, indent + "│" + " " * 4)
        child = tree[parent][-1]
        print(indent + "└" + "─" * indent_width, end="")
        _ptree(start, child, tree, parent, indent + " " * 5)  # 4 -> 5

    parent = start
    print(start)
    _ptree(start, parent, tree)


def silent():
    logging.disable()


logging.warning(
    f"Logger for DEBUG is running! to diable it run {scriptname.split('.')[0]}.silent()"
)
print(
    f" Welcome to {scriptname} version from {verfrom} - event logging here: {scriptname}.log"
)
print(
    f"   Your Python version is {'.'.join([str(i) for i in sys.version_info[0:3]])}, branch {sys.version_info[3].upper()}"
)
try:
    c = STUN(12346)
    print(f"     Internet connection is stable on {socket.gethostname()}!")
    if 1:
        import requests

        r2 = requests.get(f"http://ipinfo.io/{c[0]}").json()["org"]
        print(
            f"       Gray  (local)  IP is {socket.gethostbyname(socket.gethostname())}"
        )
        print(f"       While (public) IP is {c[0]} - {r2}")
    else:
        print(
            f"       Gray  (local)  IP is {socket.gethostbyname(socket.gethostname())}"
        )
        print(f"       While (public) IP is {c[0]}")
except Exception as e:
    logging.critical(e)
    print(f" No internet connection found! I cant work offline!")


import re


def get_tree(data):
    data = data.copy()
    data.update({"00000000": "Root"})
    # data.update({"11111111":"Root"})
    data.update({"ffffffff": "Lost"})
    dct = {}

    for key, message in data.items():
        if key + ": " + message not in dct:
            dct.update({key + ": " + message: []})

    for key, message in data.items():
        rep = [i[1:] for i in re.findall("@+[a-z0-9]{8}", message)]

        if len(rep) > 0:
            rep = rep[0]
        else:
            rep = "00000000"

        if rep not in data:
            rep = "ffffffff"
        if rep + ": " + data[rep] in dct:
            dct[rep + ": " + data[rep]].append(key + ": " + message)
        else:
            pass
            # dct.update({rep+": "+data[rep]:[key+": "+message]})
    # print(dct)
    dct["00000000: Root"].remove("00000000: Root")
    for k, v in list(dct.items()).copy():
        if v == []:
            dct.pop(k)
    return dct


if __name__ == "__main__":
    data_load()
    print()
    inp = input(
        "You must to connect with one node (friend), which is already in net. (Press Enter when you and your friend are ready)"
    )
    s = Session()
    print()
    print(f"Send this to your friend {addr2int(s.public_ip,int(s.public_port))}")
    print()
    i, p = int2addr(int(input("Input the number from your friend: ")))
    print("Waiting for your friend")
    s.make_connection(i, p)
    sessions.append(s)
    s.backlife_cycle(1)
    while 1:
        print()
        print("╔════════════> Connect & Chan Panel <═════════════")
        print("╟> Type and enter your post")
        print("╟> (Empty input + Enter) to see Root")
        print("╟> ($00000000) to see specific thread")
        print("╟> (!) for new connection")
        print("╟> (#) to save checkpoint")
        print("╟> (`) dev console")
        print("╚══════════════════════════>")
        inp = input("Input: ")
        print("<════════════> Connect & Chan Info <════════════>")
        print(
            f" Connected Nodes: {len(sessions)} | Size of Chan tree: {len(dat_to_bytes(data))} bytesn"
        )

        if inp == "":
            ptree("00000000: Root", get_tree(data))

        elif inp == "!":
            s = Session()
            print(
                f"Send this to your friend {addr2int(s.public_ip,int(s.public_port))}"
            )
            print()
            inp = input("Input the number from your friend (Empty to cancel): ")
            if inp:
                i, p = int2addr(int(inp))
                print("Waiting for your friend")
                try:
                    s.make_connection(i, p)
                    sessions.append(s)
                    s.backlife_cycle(1)
                except:
                    print("Timeout(")
        elif inp == "`":
            try:
                debug.shell()
            except:
                pass
        elif inp == "#":
            data_dump()
        elif inp[0] == "$":
            if inp[1:] in data:
                ptree(f"{inp[1:]}: {data[inp[1:]]}", get_tree(data))
            else:
                print(f"There is no message with id [{inp[1:]}]")
        elif inp:
            data_add(inp)
        else:
            ptree("00000000: Root", get_tree(data))
        input("nnEnter to reload page")
        cls()

Хотя останавливаться тоже нужно уметь.

На этом я, пожалуй, поблагодарю вас за выделенное время на эту статью. Не уверен, что досюда все дочитали, особенно если вникали во весь код и объяснения. Если же ты, мой дорогой читатель, действительно ещё здесь…

Держи милые обои на рабочий стол

P2P-форум с нуля | от NAT hole punching до автономной и полностью децентрализованной сети - 23

Или тизер следующей моей статьи по ML, тут как знать...

Само собой, feedback и здравая критика в комментариях приветствуются. Спасибо!

Автор: Andрeй ✅

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js