Оригинал этой статьи опубликован в журнале «Системный администратор» №3 (136) за март 2014.

В статье рассматриваются основные средства работы с потоками (threads) и процессами в языке и стандартной библиотеке Ruby
Немного о терминологии: англоязычный термин «thread» на русский переводится в двух вариантах — как «поток» и как «нить». Второй вариант точнее и не вызывает неоднозначности с потоками данных (streams), однако первый уже прижился в качестве основного. Кроме того, есть еще производные термины и варианты вроде «многонитевость» (или «многонитность»), но они мне не встречались и, честно говоря, режут глаз. Поэтому я буду использовать «поток».
Прежде, чем перейти к описаниям имеющегося инструментария, хотелось бы заметить, что Ruby создавался не как специальный язык параллельного программирования, при этом во время его создания и становления многозадачность уже стала привычной и необходимой. Из этих двух посылок, в общем-то, можно вывести текущую картину: никаких специфических концепций мы в нем не увидим, только поддержку привычной для всех языков общего назначения модели с некоторыми нюансами реализации.
Многопоточность
Потоки позволяют программисту распараллелить выполнение задачи в рамках одного процесса. Это дает заметный выигрыш в двух основных случаях: во-первых, когда есть аппаратные ресурсы для параллельных вычислений, т.е. многоядерная или многопроцессорная архитектура (что для современных компьютеров норма), и во-вторых, когда какие-то подзадачи вынуждены тратить время на ожидание внешних ресурсов, будь то дисковая подсистема, сеть, или действия пользователя.
Плохая новость в том, что в действительности потоки Ruby не параллельны, и выигрыша от многоядерности нам получить не удастся. Несмотря на то, что актуальные версии интерпретатора используют потоки операционной системы, управление ими построено так, что в определенный момент времени выполняется только один поток. В старых версиях (по 1.8.7 включительно) использовались так называемые «зеленые» потоки, исполняющиеся в рамках одного системного, в новых — действует механизм GIL (global interpreter lock). Хорошей же новостью можно считать то, что этот механизм защищает от некоторых (но не всех) потенциальных конфликтов между потоками. Ниже о синхронизации еще поговорим, а пока отметим, что второй выигрыш — в случае ожидания внешних ресурсов — нам остается вполне доступен, таким образом, польза от использования потоков в Ruby безусловно есть.
Работа с потоками в Ruby в основном сосредоточена в классе Thread
— в самом простом случае мы создаем объект этого класса из блока
(или множество объектов) и ждем завершения1.
puts 'begin'
th = Thread.new do
(1..3).each { |i| puts i }
end
# sleep 0
puts '---'
th.join
puts 'end'
Результат будет такой:
$ ruby demo01.rb
begin
---
1
2
3
end
Если же мы раскомментируем строчку «sleep 0
», положение строки с дефисами относительно цифр станет непредсказуемым,
а если вместо ноля напишем «1
», или даже «0.1
», дефисы будут стабильно выводиться после тройки, поскольку поток
полностью отработает раньше.
Что еще можно сделать с потоком, кроме как запустить его, и смиренно дождаться завершения (именно это делает метод join
)?
Можно его приостановить (метод класса Thread.stop
, вызываемый внутри потока), «разбудить», т.е. продолжить выполнение
с момента остановки (wakeup
), прекратить, не дожидаясь окончания работы (terminate
)… Такой пример:
th = Thread.new do
puts 'started'
Thread.stop
puts 'continued'
sleep 100
puts 'finished'
end
sleep 0.1
puts 'wakeup'
th.wakeup
sleep 0
puts 'terminate'
th.terminate
th.join
Даст следующий вывод:
$ ruby demo02.rb
started
wakeup
continued
terminate
Причем, если мы закомментируем строку «sleep 0
», то до «continued
» дело может и не дойти.
Обмен данными между потоками
Во-первых, мы можем передавать блоку аргументы при создании потока:
Thread.new 1, 2, 3 { |a, b, c| ... }
Во-вторых, в Ruby любой код возвращает какое-то значение, и результат выполнения блока мы вполне можем получить —
для этого нужно использовать метод value
вместо join
.
В-третьих, блок, из которого создается поток, как и любой другой, образует замыкание, т.е. в нем можно обращаться к любым данным, доступным в месте его объявления2. Однако здесь нужно помнить о том, что разные потоки могут обращаться к одним и тем же данным в произвольном порядке.
От одновременного обращения двух потоков к одной переменной нас защищает GIL, о котором было сказано выше,
он же делает многие (но не все) стандартные методы стандартных классов атомарными, но этого недостаточно.
Сколько раз выведет «true
» следующий код?
flag = true
5.times do
Thread.new do
puts 'true' if flag
flag = false
end
end
Thread.list.each do |th|
if th != Thread.current
th.join
end
end
Правильный ответ: непредсказуемо, если запустить скрипт достаточно много раз, можно увидеть все варианты от одного до пяти. Хотя проверка флага и присвоение ему значения сами по себе атомарны и не вызывают конфликтов, между ними легко может произойти переключение между потоками.
Самый простой способ обеспечить синхронизацию — использовать метод Thread.exclusive
, в предыдущем примере это бы выглядело так:
5.times do
Thread.new do
Thread.exclusive do
puts 'true' if flag
flag = false
end
end
end
В более сложных случаях, когда у нас, например, две переменные, обращение к каждой из которых нужно синхронизировать
независимо друг от друга, следует использовать объекты класса Mutex
и их метод synchronize
.
alpha = true
alpha_m = Mutex.new
beta = true
beta_m = Mutex.new
5.times do
Thread.new do
alpha_m.synchronize do
puts 'alpha' if alpha
alpha = false
end
beta_m.synchronize do
puts 'beta' if beta
beta = false
end
end
end
Собственно, Thread.exclusive
делает то же самое, но при этом использует один и тот же объект класса Mutex
на все случаи.
Кроме такой безусловной синхронизации объекты Mutex
позволяют и более гибко работать с блокировками — в каких-то случаях
не дожидаться освобождения заблокированного объекта, а выполнить другие действия (например, вывести сообщение об ошибке).
А еще ручное блокирование/разблокирование дает простор для глупых ошибок по невнимательности, поэтому я бы не рекомендовал
им пользоваться без особой на то необходимости.
В-четвертых, мы можем получать и устанавливать так называемые переменные потока посредством методов thread_variable_get
/set
.
th = Thread.new do
sleep 0.1
p Thread.current.thread_variable_get 'alpha'
end
th.thread_variable_set 'alpha', :alpha
th.join
Сюда же отнесем обращение к переменным, принадлежащим текущему «волокну» (fiber) потока — пример выше можно переписать так:
th = Thread.new do
sleep 0.1
p Thread.current['alpha']
end
th['alpha'] = :alpha
th.join
Это короче и наглядней, но надо помнить, что в общем случае «волокна» могут меняться.
И, в-пятых, для потоков применимы описываемые ниже способы взаимодействия между процессами.
Отступление о «волокнах»
«Волокна» (fibers) имеют косвенное отношение к теме статьи, но не упомянуть их нельзя, хотя бы из-за вышеописанного обращения к fiber-local переменным. По сути это сопрограммы, переключение между которыми происходит не средствами системы (или виртуальной машины), а вручную. Еще их можно охарактеризовать как подпрограммы, выполнение которых при каждом вызове начинается с того момента, на котором было остановлено в прошлый раз. Поясню примером:
f = Fiber.new do
current = Time.new
loop do
last = current
current = Time.new
Fiber.yield [last, current]
end
end
5.times do
p f.resume
sleep 1
end
В результате получим последовательность пар значений времени предыдущего вызова и текущего.
В целом, это довольно экзотический инструмент, которому в явном виде не так-то просто найти практическое применение.
Дополнительно о потоках
Для группировки потоков существует класс ThreadGroup
, который не предоставляет никакой особой функциональности,
кроме контроля за тем, что каждый поток принадлежит одной и только одной группе. Не добавленный ни в какую группу
явно, поток принадлежит ThreadGroup::Default
.
Все классы, упомянутые выше, принадлежат ядру языка и загружаются автоматически, однако есть еще кое-какие возможности,
предоставляемые уже модулями стандартной библиотеки3. Так, «require 'thread'
» предоставит нам классы Queue
и SizedQueue
, с функциональностью очереди, как ясно из названия. Во втором случае объем очереди ограничен,
и при достижении ограничения помещение нового элемента будет дожидаться, пока другой поток освободит место.
Еще одна полезная библиотека («require 'thwait'
» и класс ThreadsWait
) позволяет ожидать завершения некоего набора
потоков, как всех вместе, так и по очереди.
Процессы в Ruby
Собственно запуск программы на Ruby, как и на любом другом языке — есть запуск процесса. Который, в свою очередь, может порождать дочерние и общаться как с ними, так и с совершенно независимыми от него. Ключевое отличие дочернего процесса от потока — независимое адресное пространство — разные процессы не могут никаким образом обращаться к переменным друг друга.
С точки зрения программиста дочерние процессы делятся на два принципиально разных вида: подпроцессы, порождаемые
из того же кода посредством fork
, и внешние программы.
Как работает fork
? В привычных языках, типа C — это функция, в родительском процессе возвращающая идентификатор дочернего,
а в дочернем — ноль. В Ruby можно ее использовать точно так же, однако более элегантно воспользоваться вариантом с блоком,
который и станет выполняться в дочернем процессе.
pid = fork do
3.times do |i|
sleep 0.01
puts "Child [#{Process.pid}]: #{i}"
end
end
3.times do |i|
sleep 0.01
puts "Parent [#{Process.pid}]: #{i}"
end
Process.waitpid pid
Должен получиться примерно такой вывод:
$ ruby demo09.rb
Parent [9032]: 0
Child [9034]: 0
Parent [9032]: 1
Child [9034]: 1
Parent [9032]: 2
Child [9034]: 2
Что здесь важно помнить, так это то, что хотя блок при fork
и является замыканием, он получает доступ не к тому же окружению,
в котором определен, а к его копии на момент запуска. Таким образом обмен данными посредством внешних переменных невозможен,
а вопрос о синхронизации не имеет смысла.
Что касается внешних программ, то для их вызова служит несколько методов:
-
spawn
— асинхронный вызов, который нас и будет интересовать, возвращает идентификатор процесса; -
system
— синхронный вызов (т.е. метод дожидается завершения), возвращает индикатор успешности вызова; -
exec
— синхронный вызов, в случае неудачи вызывает исключение; -
`command`
или%x{command}
— самая простая форма — синхронный вызов, возвращает строку, соответствующую выводу программы.
Собственно, теме данной статьи соответствует только spawn
как асинхронный. Замечу лишь, что system
и exec
используют те же
аргументы. Аргументы описываются следующим образом:
spawn(‹env,› command ‹, args›*, ‹options›) → pid
Т.е. в начале идет необязательный параметр, устанавливающий дополнительные переменные окружения, затем команда, затем произвольное количество необязательных же аргументов, и наконец, если последний параметр — хэш, из него берутся опции, позволяющие управлять правами доступа, текущим каталогом и, самое главное, перенаправлениями ввода-вывода. В самом же простом случае достаточно указать только команду.
Сигналы
Процессы могут посылать друг другу сигналы и как-то на них реагировать. Вообще говоря, сигналы — это скорее механизм для общения операционной системы с процессами, и большинство из них зарезервировано под специальные нужды, однако кое-что можно задействовать и в прикладных целях. Выглядит это, например, так:
child = fork do
count = 0
Signal.trap :USR1 do
count += 1
puts "Signal USR1: #{count}"
end
Signal.trap :TERM do
puts 'Signal TERM'
exit
end
sleep 1000
puts 'Ooops!'
end
Signal.trap :CHLD do
puts 'Child died.'
end
Process.kill :USR1, child
sleep 0.01
Process.kill :USR1, child
Process.kill :TERM, child
Process.wait
В результате должно получиться:
$ ruby demo10.rb
Signal USR1: 1
Signal USR1: 2
Signal TERM
Child died.
Метод Process.kill
посылает сигнал, а Signal.trap
устанавливает обработчик сигнала. При этом нетрудно видеть,
что сигнал CHLD
мы не посылали — его отправила система, уведомляя родительский процесс о завершении дочернего.
Отдельно стоит обратить внимание на строку «sleep 0.01
» между двумя отправками. Если ее закомментировать,
то сигнал USR1
будет получен дочерним процессом только один раз, поскольку на момент второй отправки первый
еще не будет обработан — сигналы поступают в очередь и уже имеющиеся там не добавляются.
К сожалению, посредством сигналов мы можем сообщить процессу только о наступлении некоторого события, без подробной информации. А всю информацию между процессами нужно передавать средствами ввода-вывода.
Каналы ввода-вывода
Общим способом для любых дочерних процессов будет перенаправление ввода-вывода посредством каналов (pipes). Для внутренних подпроцессов это выглядит так:
rd, wr = IO.pipe
child = fork do
rd.close
wr.write 'From Child'
wr.close
end
wr.close
msg = rd.read
rd.close
p msg
Process.wait
Здесь существенно, что оба процесса первым делом закрывают ненужные «концы» канала. Если этого не сделать, то возможны проблемы с некорректным определением конца файла.
В случае внешних команд все похоже, а «пишущий конец» канала передаем в специальном хэш-значении в последнем параметре spawn:
rd, wr = IO.pipe
child = spawn "echo 'External Child'",
[ STDERR, STDOUT ] => wr
wr.close
msg = rd.read
rd.close
p msg
Process.wait
В приведенных примерах использовалась передача данных только в одну сторону, что, конечно, необязательно. Можно создавать произвольное количество каналов и назначать их как выводу, так и вводу.
Сокеты
Сокеты — это совсем универсальный механизм взаимодействия произвольных программ между собою, в том числе и по сети.
Здесь уже не важно, как и где запускается процесс-собеседник, нужно только знать адрес и протокол обмена (формат данных).
Сам по себе стандарт сокетов довольно низкоуровневый, так что подробное описание и примеры заняли бы слишком много места.
Отмечу лишь, что инструменты работы с сокетами находятся в модуле socket
стандартной библиотеки.
Кроме того, на сокетах основано взаимодействие уже максимального уровня — готовыми ruby-объектами в библиотеке dRuby, о которой я писал в одной из предыдущих статей4. Здесь же стоит сказать, что dRuby позволяет обращаться к объекту в другом процессе (и, возможно, на другой машине) как к локальному объекту Ruby со всеми его методами, свойствами и т.д.
Применение
Подведем некоторые итоги.
При использовании потоков мы остаемся в рамках одного процесса, что определяет как плюсы, так и минусы: с одной стороны, возможность использования общих переменных, с другой — взаимная зависимость. Дополнительный минус именно ruby-реализации — глобальный блокировщик, из-за которого реально в любой момент времени выполняется только один поток. Следует однако понимать, что GIL — это именно особенность реализации, а не языка, независимые реализации, такие как, например JRuby и Rubinius его не имеют; не исключено, что и будущие версии «эталонного» Ruby изменят свое поведение.
Оптимальный сценарий использования — распараллеливание ожидания: работа с сетью, фоновые действия когда один из потоков ждет и обрабатывает общение с пользователем, файловые операции. Основным средством взаимодействия между потоками является использование общих переменных (не забывая про синхронизацию).
При запуске нескольких процессов получаем полный паралеллизм, а вот средства коммуникации приходится прописывать отдельно.
Соответственно, лучше всего такой сценарий поведет себя на максимально независимых задачах, малосвязных, обмен данными между которыми можно свести к нескольким точкам. Взаимодействие строится на каналах ввода-вывода для дочерних процессов и сокетах/dRuby для независимых.
-
Полные тексты примеров находятся по адресу https://gist.github.com/shikhalev/9198544. ↩
-
О блоках и замыканиях см. статью «Блоки и контекст в Ruby» в номере 1-2 этого года. ↩
-
Подробную документацию на стандартную библиотеку можно найти по адресу http://rubydoc.info/stdlib/ (англ.) ↩
-
«Распределенный Ruby» в декабрьском номере 2013 года. ↩