Многие Руби-разработчики игнорируют потоки (threads), хотя это очень полезный инструмент. В данной статье мы рассмотрим создание IO потоков в Руби и покажем как Руби справляется с потоками в которых происходит много вычислительных операций. Попробуем применить альтернативные имплементации Руби, а так же узнаем, каких результатов можно добиться при помощи модуля DRb. В конце статьи посмотрим, как эти принципы используются в различных серверах для аппликаций на Ruby on Rails.
IO потоки в Руби
Рассмотрим небольшой пример:
def call_remote(host)
sleep 3 # симулируем долгий запрос к серверу
end
Если нам надо обратитьcя к двум серверам, например, чтобы очистить кэш, и мы дважды последовательно вызовем эту функцию:
call_remote 'host1/clear_caches'
call_remote 'host2/clear_caches'
то наша программа будет работать 6 секунд.
Мы можем ускорить исполнение программы, если будем использовать потоки, например, так:
threads = []
['host1', 'host2'].each do |host|
threads << Thread.new do
call_remote "#{host}/clear_caches"
end
end
threads.each(&:join)
Мы создали два потока, в каждом потоке обратились к своему серверу и командами #join сказали, что главной программе (главному потоку) надо подождать их завершения. Теперь наша программа успешно выполняется в два раза быстрее, за 3 секунды.
Больше потоков хороших и разных
Рассмотрим более сложный пример, в котором попробуем заполучить все закрытые баги и проблемы с GitHub о проекте Jekyll через предоставляемый API.
Поскольку мы не хотим произвести DoS атаку на GitHub, нужно ограничить количество одновременных потоков, заняться их планированием, запуском и по мере поступления собирать результаты.
Стандартная библиотека Ruby не предоставляет готовых инструментов для решения подобных задач, поэтому я реализовал свою собственную библиотеку FutureProof для создания групп потоков в Ruby, об использовании которой и хочу рассказать подробнее.
Ее принцип прост — необходимо создать новую группу, указав максимально допустимое количество одновременных потоков:
thread_pool = FutureProof::ThreadPool.new(5)
добавить в него задач:
thread_pool.submit i, 2, 5 do |a, b|
a + b
end
и попросить их значения:
thread_pool.values
Таким образом, чтобы получить нужную нам информацию о проекте Jekyll, будет достаточно следующего кода:
require 'future_proof'
require 'net/http'
thread_pool = FutureProof::ThreadPool.new(5)
10.times do |i|
thread_pool.submit i do |page|
uri = URI.parse(
"https://api.github.com/repos/mojombo/jekyll/issues?state=close&page=#{page + 1}&per_page=100.json"
)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true
http.request(Net::HTTP::Get.new(uri.request_uri)).body
end
end
thread_pool.perform
puts thread_pool.values[3] # [{"url":"https://api.github.com/repo ...
В основе реализации библиотеки FutureProof находится класс Queue, который позволяет создавать безопасные для работы с множеством потоков очереди, гарантирующие, что несколько потоков не запишут в очередь значения одновременно поверх друг друга, и не считают одновременно одно и тоже значение.
В библиотеке разработана также обработка исключений — если таковое произошло во время исполнения потока, то thread_pool по прежнему сможет вернуть массив из полученных значений и вызовет исключение только в случае, если программист попробует напрямую обратиться к конкретному элементу массива.
Имплементация групп потоков является попыткой приблизить возможности Руби в работе с потоками к возможностям Java и java.util.concurrent, откуда отчасти и черпалось вдохновение.
Используя библиотеку FutureProof можно выполнять задачи, включающие работу с IO потоками, намного удобнее и эффективнее. Библиотека поддерживает версии Руби 1.9.3, 2.0, а так же Rubinius.
Потоки и вычислительные операции
Учитывая успешный опыт улучшения быстродействия программы при помощи потоков, проведем два теста, в одном из которых мы 10000 раз по два раза вычислим факториал 1000 последовательно, а в другом параллельно.
require 'benchmark'
factorial = Proc.new { |n|
1.upto(n).inject(1) { |i, n| i * n }
}
Benchmark.bm do |x|
x.report('sequential') do
10_000.times do
2.times do
factorial.call 1000
end
end
end
x.report('thready') do
10_000.times do
threads = []
2.times do
threads << Thread.new do
factorial.call 1000
end
end
threads.each &:join
end
end
end
В итоге мы получили достаточно неожиданный результат (используя Ruby 2.0) — параллельное исполнение произвелось на секунду дольше:
user system total real
sequential 24.130000 1.510000 25.640000 (25.696196)
thready 24.600000 2.420000 27.020000 (26.877708)
Одна из причин — мы усложнили код планированием потоков, а вторая — Руби использовал только одно ядро для исполнения этой программы. К сожалению, возможности заставить Руби использовать несколько ядер для одного ruby процесса на данный момент не предоставляется.
Позвольте показать вам результаты исполнения этого же скрипта на jRuby 1.7.4:
user system total real
sequential 33.180000 0.690000 33.870000 (33.090000)
thready 37.820000 3.830000 41.650000 (24.333000)
Как можно заметить, результат стал лучше. Т.к. замер происходил на компьютере с двумя ядрами, и одно из ядер было использовано только на 75%, улучшение не составило 200%. Но, следовательно, на компьютере с большим количеством ядер мы могли бы делать еще больше параллельных потоков и еще больше улучшить наш результат.
jRuby является альтернативной имплементацией Руби на JVM, превносящая в сам язык очень большие возможности.
Выбирая количество одновременных потоков необходимо помнить — мы можем без потери производительности располагать множество потоков, занимающихся IO операциями на одном ядре. Но мы будем немного терять в производительности, если количество потоков будет превосходить количество ядер в случае с вычислительными операциями.
В случае с оригинальной имплементацией Ruby (MRI) рекомендуется использовать только один поток для вычислительных операций. Настоящего параллелизма при помощи потоков мы можем добиться только используя jRuby и Rubinius.
Параллелизм на уровне процессов
Как мы теперь знаем, Ruby MRI для одного процесса ruby (в Unix системах) может использовать только одно ядро. Одним из вариантов, как мы можем обойти этот недостаток, является использование форков процессов, например так:
read, write = IO.pipe
result = 5
pid = fork do
result = result + 5
Marshal.dump(result, write)
exit 0
end
write.close
result = read.read
Process.wait(pid)
puts Marshal.load(result)
Форк процесса, на момент создания, копирует значение переменной result, равной 5, но главный процесс не увидит дальнейшее изменение переменной внутри форка, поэтому нам было необходимо наладить сообщение между форком и главным процессом при помощи IO.pipe.
Такой метод является действенным, но достаточно громоздким и неудобным. При помощи модуля DRb для дистрибутивного программирования, можно добиться более интересных результатов.
Применяем модуль DRb для синхронизации процессов
Модуль DRb является частью стандартной Ruby библиотеки и отвечает за возможности дистрибутивного программирования. В основе его идеи лежит возможность дать доступ к одному Руби объекту любому компьютеру в сети. Результаты всех манипуляций с этим объектом, его внутреннее состояние, видно всем подключенным компьютерам, и постоянно синхронизировано. В целом, возможности модуля очень широки, и достойны отдельной статьи.
Мне пришла в голову идея использовать кортежи Rinda::TupleSpace вместе с этой возможностью DRb, чтобы создать модуль Pthread, отвечающий за исполнение кода в отдельных процессах как на компьютере главной программы, так и на других подключенных машинах. Rinda::TupleSpace предлагает доступ к кортежам по имени и, как и объекты класса Queue, позволяют записывать и считывать кортежи только одному потоку или процессу в одно время.
Таким образом, появилось решение, которое позволяет в Ruby MRI исполнять код на нескольких ядрах:
Pthread::Pthread.new queue: 'fact', code: %{
1.upto(n).inject(1) { |i, n| i * n }
}, context: { n: 1000 }
Как вы могли заметить, код, который необходимо исполнить, подается как строка, т.к. в случае с процедурами DRb передает другому процессу только ссылку на нее, и для ее исполнения пользуется ресурсам процесса, который эту процедуру создал. Чтобы избавиться от контекста главного процесса, я подаю код другим процессам как строку, а значения переменных строки в дополнительном словаре. Пример, как к исполнению кода подключать дополнительные машины, можно найти на домашней странице проекта.
Библиотека Pthread поддерживает версии MRI 1.9.3 и 2.0.
Параллелизм в Ruby on Rails
Сервера для Ruby on Rails и библиотеки для исполнения фоновых задач можно разбить на две группы. Первая для обработки запросов пользователей или исполнения фоновых задач использует форки — дополнительные процессы. Таким образом, используя MRI и эти сервера и библиотеки, мы можем обрабатывать параллельно сразу несколько запросов, и выполнять несколько задач одновременно.
Однако, у этого способа есть недостаток. Форки процессов копируют память создавшего их процесса, и таким образом сервер Unicorn с тремя «работниками» может забирать 1ГБ памяти, едва начав работу. Тоже самое касается библиотек для исполнения фоновых задач, например Resque.
Создатели сервера Puma для Ruby on Rails учли особенности jRuby и Rubinius, и выпустили сервер, ориентированный в первую очередь на эти две имплементации. В отличии от того же Unicorn, Puma для одновременной обработки запросов использует потоки, которые требуют гораздо меньше памяти. Таким образом, Puma будет являться отличной альтернативой при использовании в связке вместе с jRuby или Rubinius. Поэтому же принципу утроена библиотека Sidekiq.
Заключение
Потоки являются очень сильным инструментом, позволяющим делать несколько вещей одновременно, особенно, когда это касается долгих IO операций или вычислений. В данной статье мы рассмотрели некоторые возможности и ограничения Руби и разных его имплементаций, а также применили две сторонние библиотеки для упрощения работы с потоками.
Таким образом, автор статьи рекомендует поиграться с Ruby, потоками, и при начинании будущих Rails проектов, посмотреть в сторону альтернативных имплементаций — jRuby и Rubinius.
Автор: RNik