一个来源 - 工作流水线 - 一个水槽

我们希望并行处理数据并将其推送到其他工作人员处理的行。

由于 Workers 既消费又生产数据,我们必须创建两个队列:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

第一波工人从 first_input_source 读取一个项目,处理该项目,并在 first_output_sink 中写入结果:

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

第二波工人使用 first_output_sink 作为其输入源并读取,然后处理写入另一个输出接收器:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

现在 second_output_sink 是接收器,让我们将它转​​换为数组:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }