Недавно мы писали о том, как перед нами впервые встала задача крупномасштабной миграции данных пользователей между дата-центрами и о том как мы ее решили.
В этот раз мы подробнее остановимся на том, каким образом осуществлялась миграция фотографий пользователей и какие структуры данных использовались для ограничения создаваемой нагрузки на сервера с фотографиями.
Ежедневно пользователи Badoo загружают примерно 3 миллиона фотографий. Для их хранения мы выделили специальный кластер серверов, занимающихся также изменением размеров, наложением «водяных знаков», импортом фотографий из других социальных сетей и прочими манипуляциями с файлами.
Все машины этого кластера можно условно разделить на три группы. Первая ― это серверы, отвечающие за быструю отдачу фотографий пользователям (можно сказать, собственная реализация CDN). В контексте миграции эти серверы нам не будут интересны. Вторая группа ― это хранилища с дисками, на которых, собственно, и находятся все фотографии. И третья группа ― это серверы, предоставляющие интерфейс ко второй группе, условно назовём их фотосерверами. На них по оптоволокну смонтированы дисковые массивы хранилищ, на эти же машины происходит загрузка фотографий и здесь же работают все скрипты, выполняющие какие-либо операции с файлами.
Таким образом, для PHP-кода совершенно неважно, на каком именно диске какого хранилища находится фотография. Все, что нужно сделать, это перенести фотографии пользователя с одного фотосервера на другой и обновить эту информацию в базе данных и некоторых демонах. Здесь важно отметить, что все фотографии пользователя всегда находятся на одном фотосервере.
Постановка задачи
Суммарный объем всех фотографий, когда-либо загруженных нашими пользователями, составляет примерно 600 Тб. В это число входят как оригиналы фотографий, так и набор фотографий с измененными размерами, необходимый для отображения в том или ином случае.
Грубая оценка показывает, что если 190 миллионов пользователей загрузили 600 ТБ данных, то 1,5 миллиона пользователей из Тайланда (самой крупной из перенесённых между дата-центрами стран) загрузили 4,7 ТБ. Пропускная способность канала между нашими дата-центрами на на момент миграции составляла 200 Мбит/с. Путем нехитрых вычислений мы получаем 55 часов на перенесение всех фотографий пользователей из Тайланда. Естественно, этот канал уже частично занят другими данными, постоянно циркулирующими между дата-центрами, и в действительности понадобится больше, чем 55 часов. А наша цель ― успеть за 8 часов.
Можно было бы перенести только оригиналы фотографий и сделать для них «ресайзы» на новом фотосервере, но это привело бы к нежелательному росту нагрузки на процессоры. Поэтому мы решили перенести фотографии заранее, рассчитывая, что пользователи не успеют загрузить много новых фотографий за время, необходимое на перенесение фотографий всей страны.
То есть сначала мы просто копируем уже существующие фотографии, а во время миграции всех данных пользователя (когда сайт для него недоступен и невозможно что-то изменить в своих фотографиях) проверяем, были ли какие-то изменения после переноса фотографий. Если изменения были, то копируем фотографии еще раз (вернее, сделаем rsync), и только после этого обновим данные в базах и демонах, чтобы фотографии пользователя стали показываться и загружаться на новом фотосервере.
Как показала практика, наши ожидания оправдались, и делать rsync второй раз пришлось для очень незначительного процента пользователей.
Еще одним ограничением для нас являлась производительность дисков хранилищ. Мы выяснили, что даже имея канал в сотни террабит, мы не сможем использовать его на полную мощность, поскольку на фотосерверы постоянно загружаются фотографии, с ними производятся различные операции. Кроме того, наш CDN «читает» эти фотографии с дисков, и дополнительная нагрузка на чтение может существенно замедлить повседневные операции. То есть интенсивность миграции фотографий должна быть искусственно ограничена.
Реализация
Первое, что пришло нам на ум ― это очередь в базе данных (мы используем MySQL), в которой будут находиться все пользователи, чьи фотографии нужно перенести. Очередь будет обрабатываться в несколько процессов. Ограничивая число процессов на один фотосервер, мы тем самым решим проблему ограничения нагрузки на диски. Ограничение на общее число процессов позволит нам регулировать загруженность канала между дата-центрами.
Допустим, у нас есть таблица MigrationPhoto следующей структуры:
CREATE TABLE MigrationPhoto (
user_id INT PRIMARY KEY,
updated TIMESTAMP,
photoserver VARCHAR(255), # фотосервер c фотографиями пользователя
script_name VARCHAR(255), # имя процесса, обрабатывающего пользователя
done TINYINT(1), # 1 если фотографии были перенесены, иначе 0
KEY photoserver (photoserver),
KEY script_name (script_name)
)
Сначала добавляем всех пользователей, фотографии которых мы хотим перенести, в нашу таблицу:
INSERT INTO MigrationPhoto (user_id,photoserver) VALUES (00000000, 'photoserver1')
Ограничение на суммарное число процессов легко достигается с помощью расширения pcntl и не представляет большого интереса, поэтому далее будем рассматривать один процесс, занимающийся переносом.
Нам нужно обеспечить конкретное число процессов на один фотосервер. Сначала разберемся, пользователи с каких фотосерверов вообще есть в очереди. Чтобы не делать каждый раз SELECT photoserver, COUNT(*) FROM MigrationPhoto, заведем отдельную таблицу:
CREATE TABLE MigrationPhotoCounters (
photoserver VARCHAR(255) PRIMARY KEY,
users INT
)
Мы будем заполнять ее при вставке каждого пользователя в таблицу MigrationPhoto:
INSERT INTO MigrationPhotoCounters (photoserver, users) VALUES ('photoserver1', 1) ON DUPLICATE KEY UPDATE users = users + VALUES(users)
Либо после заполнения MigrationPhoto сделаем так:
INSERT INTO MigrationPhotoCounters (bphotos_server, users) VALUES (SELECT photoserver, COUNT(*) AS users FROM MigrationPhoto)
Имея такую таблицу, при запуске каждого процесса будем делать
SELECT photoserver FROM MigrationPhotoCounters WHERE users>0 ORDER BY RAND()
Получив список всех фотосерверов, определим, для какого из них можно запустить процесс так, чтобы не превысить ограничения. Для этого поставим в базе лок, имя которого состоит из имени фотосервера и порядкового номера процесса в рамках данного сервера:
$processNumber = null;
foreach ($photoservers as $serverName) {
for ($i = 1; $i <= PROCESSES_PER_SERVER; $i++) {
$lock = executeQuery("SELECT GET_LOCK('migration" . $serverName . '_' . $processNumber . "', 0)");
if ($lock === '1') {
$processNumber = $i;
break;
}
}
if ($processNumber) {
$serverName = $serverName;
$scriptName = 'migration' . $serverName . '_' . $processNumber;
break;
}
}
Таким образом, мы перебираем в двух вложенных циклах все фотосерверы и номера процессов для них. Если выполнены все итерации внутреннего цикла и переменная $processNumber не определена, значит, для данного фотосервера достигнут лимит количества процессов. Если выполнены все итерации внешнего цикла, значит, такой лимит достигнут для всех фотосерверов, на которых еще имеются пользователи, подлежащие переносу.
Допустим, мы выбрали фотосервер photoserver1, и это второй процесс для него, то есть идентификатор процесса будет $scriptName = 'migration_photoserver1_2'.
Перед тем как двигаться дальше, вернем в общую очередь тех пользователей, которые по тем или иным причинам остались помеченными выбранным нами идентификатором процесса ($scriptName) при предыдущих запусках:
UPDATE MIgrationPhoto SET script_name = NULL WHERE done = 0 AND script_name = 'migration_photoserver1_2'
Пометим порцию пользователей как обрабатываемую данным процессом:
UPDATE SET MigrationPhoto script_name='migration_photoserver1_2' WHERE photoserver='photoserver1' AND done = 0 AND script_name IS NULL LIMIT 100;
Теперь возьмем из очереди несколько пользователей, которые соответствуют выбранному нами фотосерверу и еще не обрабатываются другим процессом:
SELECT * FROM MigrationPhoto WHERE script_name='migration_photoserver1_2' AND done=0;
После этого мы можем быть уверены, что выбранные нами записи не обрабатывает ни один другой процесс.
Запоминаем, когда мы перенесли фотографии пользователя:
UPDATE MigrationPhoto SET updated=NOW() WHERE user_id = 00000000
Выполняем все необходимые нам операции, упрощенно их можно представить как rsync. После успешного перенесения фотографий нам нужно отметить это в базе (для каждого из выбранных пользователей):
BEGIN;
UPDATE MigrationPhotoCounters SET users = users - 1 WHERE photoserver = 'photoserver1';
UPDATE MigrationPhoto SET done = 1 WHERE user_id = 00000000;
COMMIT;
Может случиться так, что из 100 взятых на обработку пользователей для некоторых не удастся осуществить перенос по самым разнообразным причинам. Таких пользователей нужно вернуть в очередь, чтобы перенести их позже:
UPDATE MigrationPhoto SET script_name = NULL WHERE user_id IN (<failed_ids>)
Завершаем процесс:
SELECT RELEASE_LOCK('migration_photoserver1_2')
Казалось бы, на этом можно закончить. Но у нашей схемы есть принципиальный недостаток.
Предположим, что запустился процесс, у которого $scriptName='migration_photoserver1_10', при этом PROCESSES_PER_SERVER=10. И этот процесс упал, не вернув взятых им пользователей в очередь. Для того чтобы эти пользователи снова были выбраны, либо снова должен запуститься процесс с таким же $scriptName, либо кто-то должен выставить этим пользователям в базе script_name=NULL. Запуска процесса с таким же $scriptName может больше и не случиться.
Например, у нас 100 фотосерверов в MigrationPhotoCounters, ограничение на суммарное число процессов ― 50, ограничение на число процессов на один ― 10, тогда очевидно, что если в какой-то момент на один фотосервер пришлось 10 процессов, то в дальнейшем этот фотосервер может получать только по одному процессу. Поэтому напишем еще один скрипт, который, допустим, один раз в минуту будет устанавливать script_name=NULL для тех пользователей, процессы которых сейчас не запущены:
foreach ($photoservers as $serverName) {
for ($i = 1; $i <= PROCESSES_PER_SERVER; $i++) {
$lock = executeQuery("SELECT GET_LOCK('migration" . $serverName . '_' . $processNumber . "', 0)");
if ($lock === '1') {
executeQuery("UPDATE MigrationPhoto SET script_name = NULL WHERE done = 0 AND script_name = 'migration" . $serverName . '_' . $processNumber . "'");
}
}
}
Теперь, даже в случае падения процесса, его пользователи станут доступны для обработки другим процессам. Кроме того, это позволит менять ограничение числа процессов на один фотосервер «на лету».
Когда процесс будет завершен и начнется миграция всех остальных данных пользователя, достаточно лишь проверить, что фотографии пользователя не менялись со времени, указанного в поле updated таблицы MigrationPhoto. А если менялись ― то просто повторить rsync. Это не займет много времени, так как практически никто из пользователей не меняет все свои фотографии за 2-3 суток.
В итоге у нас было 63 фотосервера, с которых мы читали фотографии, и 30 серверов, на которые писали. Все это происходило силами 80 процессов, с ограничением не более 3 процессов на один фотосервер. При таких ограничениях трафик составлял 150 Мбит/с. Перенесение фотографий для пользователей из Тайланда заняло чуть менее трех суток. Учитывая объем данных, мы получили отличный результат.
Заключение
Конечно, наша схема допускает расширения и улучшения. Например, можно ограничивать количество процессов обработки для каждого фотосервера индивидуально. Можно регулировать это количество в зависимости от того, сколько пользователей конкретного фотосервера находится в очереди. Можно добавить приоритеты, прогрессивный тайм-аут на следующую обработку записи (в случае неудачной текущей), максимальное количество неудачных обработок, логи, графики и еще много чего.
Но нам хотелось донести саму идею параллельной обработки очереди с заданными ограничениями, которая, конечно, может быть использована не только для переноса файлов между серверами.
Сегодня мы не стали описывать процесс обработки разнообразных ошибок и исключительных ситуаций, равно как и механизм создания и поддержания заданного количества процессов, и реализацию переноса файлов. Но если это вам интересно ― спрашивайте, и мы обязательно ответим в комментариях.
Антон Степаненко, Team Lead, PHP-разработчик
Автор: AntonStepanenko