一個來源 - 工作流水線 - 一個水槽

我們希望並行處理資料並將其推送到其他工作人員處理的行。

由於 Workers 既消費又生產資料,我們必須建立兩個佇列:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

第一波工人從 first_input_source 讀取一個專案,處理該專案,並在 first_output_sink 中寫入結果:

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

第二波工人使用 first_output_sink 作為其輸入源並讀取,然後處理寫入另一個輸出接收器:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

現在 second_output_sink 是接收器,讓我們將它轉​​換為陣列:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }