Многопоточная закачка файла в S3

в 22:02, , рубрики: amazon s3, ruby, threading, upload, метки: , , ,

Эта статья иллюстрирует реальное применение и получение выигрыша в производительности на примере закачки файлов в хранилище Amazon S3 с использованием многопоточности на языке Ruby с использованием gem aws-sdk.

Начнем с простого

Реализовать закачку файла при помощи официального (т.е. разрабатываемого Amazon) gem aws-sdk достаточно просто. Если опустить подготовительную часть формирования параметров авторизации Amazon Web Services (AWS), то код занимает три строки:

def upload_to_s3(config, src, dst)
 # создаем объект доступа к S3, используя параметры авторизации из хэша config
 s3 = AWS::S3.new(
   :access_key_id => config['access_key_id'],
   :secret_access_key => сonfig['secret_access_key'])
 # создаем объект, связанный с файлом ‘dst’ на S3 (существующим или не существующим)
 s3_file = s3.buckets[config['bucket_name']].objects[dst]
 # записываем содержимое файла src в удаленный файл (заодно открываем общий доступ)
 s3_file.write(:file => src, :acl => :public_read)
end

Метод работает и показывает среднюю скорость закачки до 5Мб/c на файлах, больших 5Мб (на меньших файлах средняя скорость падает).
Однако, из опытов несложно заметить, что суммарная скорость одновременной закачки нескольких файлов выше, чем скорость закачки одного файла за раз. Получается, что есть некое ограничение пропускной способности на один поток? Попробуем распараллелить закачку, используя многопоточность и метод multipart_upload, и посмотреть, что получится.

Закачка файла частями

Для закачки файла частями в aws-sdk существуем метод multipart_upload. Посмотрим на его типовое применение:

def multipart_upload_to_s3(src, dst, config)
 s3 = AWS::S3.new(
   :access_key_id => config['access_key_id'],
   :secret_access_key => сonfig['secret_access_key'])
 s3_file = s3.buckets[config['bucket_name']].objects[dst]
 # открываем локальный src файл для чтения
 src_io = File.open(src, 'rb')
 # несколько полезных счетчиков
 read_size = 0
 uploaded_size = 0
 parts = 0
 # размер файла пригодится
 src_size = File.size(src)
 # начинаем закачку частями
 s3_file = s3_file.multipart_upload({:acl => :public_read}) do |upload|
   while read_size < src_size
     # считаем последовательность байт заданного размера из файла в буфер
     buff = src_io.readpartial(config['part_size'])
     # увеличим счетчики
     read_size += buff.size
     part_number = parts += 1
     # собственно, закачаем считанные данные на S3
     upload.add_part :data => buff, :part_number => part_number      
   end
 end
 # закроем исходный файл
 src_io.close

 s3_file
end

На вид получается посложнее, чем простая закачка. Однако, у нового метода есть существенные преимущества:

  • части могут закачиваться в произвольном порядке
  • нет необходимости ждать завершения закачки одной части, чтобы начать закачку другой

Эти два свойства позволяют нам использовать многопоточность при выполнении закачки.

Многопоточная закачка файла по частям

Есть несколько подходов для решения этой задачи:

  • фиксированное число потоков: файл делится на N частей (по количеству потоков N), каждая часть закачивается параллельно;
  • фиксированный размер части: файл делится на части размера, не превышающего значение S, каждая часть закачивается параллельно;
  • смешанный: файл делится на части размера, не превышающего значение S, полученные части закачиваются паралельно, но не более чем N потоками одновременно.

С практической точки зрения наиболее удобен именно смешанный подход, т.к.:

  • в API S3 есть ограничение на минимальный размер части (5Мб);
  • использование большого числа потоков снижает эффективность.

Приведу код многопоточной закачки файла:

def threaded_upload_to_s3(src, dst, config)
    s3 = AWS::S3.new(
      :access_key_id => config['access_key_id'],
      :secret_access_key => сonfig['secret_access_key'])
    s3_file = s3.buckets[config['bucket_name']].objects[dst]
    src_io = File.open(src, 'rb')
    read_size = 0
    uploaded_size = 0
    parts = 0
    src_size = File.size(src)
    s3_file = s3_file.multipart_upload({:acl => :public_read}) do |upload|
     # заведем массив для сохранения информации о потоках
     upload_threads = []
     # создадим мьютекс (или семафор), чтобы избежать “гонок”
     mutex = Mutex.new
     max_threads = [config['threads_count'], (src_size.to_f / config['part_size']).ceil].min
     # в цикле создадим требуемое количество потоков
     max_threads.times do |thread_number|
       upload_threads << (Thread.new do
         # мы в свежесозданном потоке
         while true
           # входим в участок кода, в котором одновременно может находится только один поток
           # выставляем блокировку
           mutex.lock
           # прерываем цикл, если все данные уже прочитаны из файла
           break unless read_size < src_size
           # считаем последовательность байт заданного размера из файла в буфер
           buff = src_io.readpartial(config['part_size'])
           # увеличим счетчики
           read_size += buff.size
           part_number = parts += 1
           # снимаем блокировку
           mutex.unlock

           # собственно, закачаем считанные данные на S3
           upload.add_part :data => buff, :part_number => part_number
         end
       end)
     end
     # дожидаемся завершения работы всех потоков
     upload_threads.each{|thread| thread.join}
    end
    src_io.close

    s3_file
 end

Для создания потоков мы используем стандартный класс Thread (основы работы с многопоточностью в Ruby описаны в статье habrahabr.ru/post/94574). Для реализации взаимных исключений мы используем простейшие двоичные семафоры (мьютексы), реализованные стандартным классом Mutex.

Для чего требуются семафоры? С их помощью мы отмечаем участок кода (называемый критическим), который может выполнять только один поток в один момент времени. Остальные потоки должны будут ждать, пока включивший семафор поток не покинет критический участок. Обычно семафоры используются для обеспечения правильного доступа к общим ресурсам. В данном случае, общими ресурсами являются: объект ввода src_io и переменные read_size и parts. Переменные buff и part_number объявлены как локальные внутри потока (т.е. блока Thread.new do … end), и поэтому не являются общими.

Более подробно про семафоры и многопоточность в Ruby можно прочитать в статье www.tutorialspoint.com/ruby/ruby_multithreading.htm (англ.)

Результаты сравнения

Замеряем скорость закачки разными методами на нескольких тестовых файлах (размером 1Мб, 10Мб, 50Мб, 150Мб) и сводим в таблицу:

Файл

Части Потоки

upload, Mb/s

multipart_upload, Mb/s

threaded_upload, Mb/s

1

64k

1

1

0.78

0.29 (37%)

0.29 (37%)

2

512k

1

1

2.88

1.84 (64%)

1.65 (57%)

3

1Mb

1

1

3.39

2.38 (70%)

2.58 (76%)

4

10Mb

2

2

5.06

4.50 (89%)

7.69 (152%)

5

50Mb

10

5

4.48

4.41 (98%)

9.02 (201%)

6

50Mb

10

10

4.33

4.44 (103%)

8.49 (196%)

7

150Mb

30

5

4.34

4.43 (102%)

9.22 (212%)

8

150Mb

30

10

4.48

4.52(101%)

8.90 (199%)

Тестирование проводилось закачкой файлов с машины в регионе EU West (Ireland) на хранилище S3 в этом же регионе. Проводилась серия из 10 последовательных испытаний для каждого файла.
Если оценить скорость закачки простым методом для испытаний 4-8, то погрешность измерения составляет около 8%, что вполне приемлемо.
Закачка по частям (multipart_upload) на небольших файлах показала худший результат по сравнению с простой и такой же — на больших файлах.
Многопоточная закачка (threaded_upload) показала ту же эффективность на файлах из одной части, что и закачка по частям (что достаточно очевидно). А вот на больших по объему файлах мы имеем значительное преимущество — до двух раз (по сравнению с обычной закачкой).
Задачи выяснить оптимальные размер части и количества потоков не ставилось, но увеличение потоков с 5 до 10 на больших файлах не дало значимого эффекта.

Заключение

Многопоточная закачка файла показала себя эффективнее обычной на файлах, которые состоят более чем из одной части, прирост скорости — до двух раз.
Кстати, удобно будет создать метод, который сам выбирает наиболее подходящий способ закачки в зависимости от размера файла.
Исходный код, приведенный в примерах, выложен на Github: github.com/whisk/s3up

Автор: whisk

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js