- PVSM.RU - https://www.pvsm.ru -
Эта статья иллюстрирует реальное применение и получение выигрыша в производительности на примере закачки файлов в хранилище Amazon S3 [1] с использованием многопоточности на языке Ruby с использованием gem aws-sdk [2].
Реализовать закачку файла при помощи официального (т.е. разрабатываемого Amazon) gem aws-sdk достаточно просто. Если опустить подготовительную часть формирования параметров авторизации Amazon Web Services (AWS), то код занимает три строки:
def upload_to_s3(config, src, dst)
# создаем объект доступа к S3, используя параметры авторизации из хэша config
s3 = AWS::S3.new(
:access_key_id => config['access_key_id'],
:secret_access_key => сonfig['secret_access_key'])
# создаем объект, связанный с файлом ‘dst’ на S3 (существующим или не существующим)
s3_file = s3.buckets[config['bucket_name']].objects[dst]
# записываем содержимое файла src в удаленный файл (заодно открываем общий доступ)
s3_file.write(:file => src, :acl => :public_read)
end
Метод работает и показывает среднюю скорость закачки до 5Мб/c на файлах, больших 5Мб (на меньших файлах средняя скорость падает).
Однако, из опытов несложно заметить, что суммарная скорость одновременной закачки нескольких файлов выше, чем скорость закачки одного файла за раз. Получается, что есть некое ограничение пропускной способности на один поток? Попробуем распараллелить закачку, используя многопоточность и метод multipart_upload [3], и посмотреть, что получится.
Для закачки файла частями в aws-sdk существуем метод multipart_upload. Посмотрим на его типовое применение:
def multipart_upload_to_s3(src, dst, config)
s3 = AWS::S3.new(
:access_key_id => config['access_key_id'],
:secret_access_key => сonfig['secret_access_key'])
s3_file = s3.buckets[config['bucket_name']].objects[dst]
# открываем локальный src файл для чтения
src_io = File.open(src, 'rb')
# несколько полезных счетчиков
read_size = 0
uploaded_size = 0
parts = 0
# размер файла пригодится
src_size = File.size(src)
# начинаем закачку частями
s3_file = s3_file.multipart_upload({:acl => :public_read}) do |upload|
while read_size < src_size
# считаем последовательность байт заданного размера из файла в буфер
buff = src_io.readpartial(config['part_size'])
# увеличим счетчики
read_size += buff.size
part_number = parts += 1
# собственно, закачаем считанные данные на S3
upload.add_part :data => buff, :part_number => part_number
end
end
# закроем исходный файл
src_io.close
s3_file
end
На вид получается посложнее, чем простая закачка. Однако, у нового метода есть существенные преимущества:
Эти два свойства позволяют нам использовать многопоточность при выполнении закачки.
Есть несколько подходов для решения этой задачи:
С практической точки зрения наиболее удобен именно смешанный подход, т.к.:
Приведу код многопоточной закачки файла:
def threaded_upload_to_s3(src, dst, config)
s3 = AWS::S3.new(
:access_key_id => config['access_key_id'],
:secret_access_key => сonfig['secret_access_key'])
s3_file = s3.buckets[config['bucket_name']].objects[dst]
src_io = File.open(src, 'rb')
read_size = 0
uploaded_size = 0
parts = 0
src_size = File.size(src)
s3_file = s3_file.multipart_upload({:acl => :public_read}) do |upload|
# заведем массив для сохранения информации о потоках
upload_threads = []
# создадим мьютекс (или семафор), чтобы избежать “гонок”
mutex = Mutex.new
max_threads = [config['threads_count'], (src_size.to_f / config['part_size']).ceil].min
# в цикле создадим требуемое количество потоков
max_threads.times do |thread_number|
upload_threads << (Thread.new do
# мы в свежесозданном потоке
while true
# входим в участок кода, в котором одновременно может находится только один поток
# выставляем блокировку
mutex.lock
# прерываем цикл, если все данные уже прочитаны из файла
break unless read_size < src_size
# считаем последовательность байт заданного размера из файла в буфер
buff = src_io.readpartial(config['part_size'])
# увеличим счетчики
read_size += buff.size
part_number = parts += 1
# снимаем блокировку
mutex.unlock
# собственно, закачаем считанные данные на S3
upload.add_part :data => buff, :part_number => part_number
end
end)
end
# дожидаемся завершения работы всех потоков
upload_threads.each{|thread| thread.join}
end
src_io.close
s3_file
end
Для создания потоков мы используем стандартный класс Thread (основы работы с многопоточностью в Ruby описаны в статье habrahabr.ru/post/94574 [4]). Для реализации взаимных исключений мы используем простейшие двоичные семафоры (мьютексы), реализованные стандартным классом Mutex.
Для чего требуются семафоры? С их помощью мы отмечаем участок кода (называемый критическим), который может выполнять только один поток в один момент времени. Остальные потоки должны будут ждать, пока включивший семафор поток не покинет критический участок. Обычно семафоры используются для обеспечения правильного доступа к общим ресурсам. В данном случае, общими ресурсами являются: объект ввода src_io и переменные read_size и parts. Переменные buff и part_number объявлены как локальные внутри потока (т.е. блока Thread.new do … end), и поэтому не являются общими.
Более подробно про семафоры и многопоточность в Ruby можно прочитать в статье www.tutorialspoint.com/ruby/ruby_multithreading.htm [5] (англ.)
Замеряем скорость закачки разными методами на нескольких тестовых файлах (размером 1Мб, 10Мб, 50Мб, 150Мб) и сводим в таблицу:
Файл | Части | Потоки | upload, Mb/s | multipart_upload, Mb/s | threaded_upload, Mb/s | |
1 | 64k | 1 | 1 | 0.78 | 0.29 (37%) | 0.29 (37%) |
2 | 512k | 1 | 1 | 2.88 | 1.84 (64%) | 1.65 (57%) |
3 | 1Mb | 1 | 1 | 3.39 | 2.38 (70%) | 2.58 (76%) |
4 | 10Mb | 2 | 2 | 5.06 | 4.50 (89%) | 7.69 (152%) |
5 | 50Mb | 10 | 5 | 4.48 | 4.41 (98%) | 9.02 (201%) |
6 | 50Mb | 10 | 10 | 4.33 | 4.44 (103%) | 8.49 (196%) |
7 | 150Mb | 30 | 5 | 4.34 | 4.43 (102%) | 9.22 (212%) |
8 | 150Mb | 30 | 10 | 4.48 | 4.52(101%) | 8.90 (199%) |
Тестирование проводилось закачкой файлов с машины в регионе EU West (Ireland) на хранилище S3 в этом же регионе. Проводилась серия из 10 последовательных испытаний для каждого файла.
Если оценить скорость закачки простым методом для испытаний 4-8, то погрешность измерения составляет около 8%, что вполне приемлемо.
Закачка по частям (multipart_upload) на небольших файлах показала худший результат по сравнению с простой и такой же — на больших файлах.
Многопоточная закачка (threaded_upload) показала ту же эффективность на файлах из одной части, что и закачка по частям (что достаточно очевидно). А вот на больших по объему файлах мы имеем значительное преимущество — до двух раз (по сравнению с обычной закачкой).
Задачи выяснить оптимальные размер части и количества потоков не ставилось, но увеличение потоков с 5 до 10 на больших файлах не дало значимого эффекта.
Многопоточная закачка файла показала себя эффективнее обычной на файлах, которые состоят более чем из одной части, прирост скорости — до двух раз.
Кстати, удобно будет создать метод, который сам выбирает наиболее подходящий способ закачки в зависимости от размера файла.
Исходный код, приведенный в примерах, выложен на Github: github.com/whisk/s3up [6]
Автор: whisk
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/ruby/4501
Ссылки в тексте:
[1] Amazon S3: http://aws.amazon.com/s3/
[2] aws-sdk: http://aws.amazon.com/sdkforruby/
[3] multipart_upload: http://docs.amazonwebservices.com/AWSRubySDK/latest/AWS/S3/MultipartUpload.html
[4] habrahabr.ru/post/94574: http://habrahabr.ru/post/94574
[5] www.tutorialspoint.com/ruby/ruby_multithreading.htm: http://www.tutorialspoint.com/ruby/ruby_multithreading.htm
[6] github.com/whisk/s3up: https://github.com/whisk/s3up
Нажмите здесь для печати.