基本通道操作建立 put 關閉和緩衝區

core.async 是關於製作過程是需要從值,把值到渠道

(require [clojure.core.async :as a])

chan 建立頻道

你可以使用 chan 函式建立頻道:

(def chan-0 (a/chan)) ;; unbuffered channel: acts as a rendez-vous point.
(def chan-1 (a/chan 3)) ;; channel with a buffer of size 3. 
(def chan-2 (a/chan (a/dropping-buffer 3)) ;; channel with a *dropping* buffer of size 3
(def chan-3 (a/chan (a/sliding-buffer 3)) ;; channel with a *sliding* buffer of size 3

使用 >!!>! 將值放入通道

你可以使用 >!! 將值放入通道:

(a/>!! my-channel :an-item)

除了 nil 之外,你可以將任何值(字串,數字,地圖,集合,物件,甚至其他頻道等)放入頻道中:

;; WON'T WORK
(a/>!! my-channel nil)
=> IllegalArgumentException Can't put nil on channel

根據通道的緩衝區,>!! 可能會阻塞當前執行緒。

(let [ch (a/chan)] ;; unbuffered channel
  (a/>!! ch :item) 
  ;; the above call blocks, until another process 
  ;; takes the item from the channel.
  )
(let [ch (a/chan 3)] ;; channel with 3-size buffer
  (a/>!! ch :item-1) ;; => true
  (a/>!! ch :item-2) ;; => true
  (a/>!! ch :item-3) ;; => true
  (a/>!! ch :item-4) 
  ;; now the buffer is full; blocks until :item-1 is taken from ch.
  )

(go ...) 區塊內部,你可以 - 並且應該 - 使用 a/>! 而不是 a/>!!

 (a/go (a/>! ch :item))

邏輯行為與 a/>!! 相同,但只有 goroutine 的邏輯程序才會阻塞而不是實際的 OS 執行緒。

(go ...) 塊中使用 a/>!! 是一種反模式:

;; NEVER DO THIS
(a/go 
  (a/>!! ch :item))

使用 <!! 從頻道中獲取值

你使用 <!! 從頻道中獲取值:

;; creating a channel
(def ch (a/chan 3))
;; putting some items in it
(do 
  (a/>!! ch :item-1)
  (a/>!! ch :item-2)
  (a/>!! ch :item-3))
;; taking a value
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2

如果通道中沒有可用的專案,a/<!! 將阻止當前執行緒,直到在通道中放入一個值(或者通道關閉,請參閱後面的內容):

(def ch (a/chan))
(a/<!! ch) ;; blocks until another process puts something into ch or closes it

(go ...) 區塊內部,你可以 - 並且應該 - 使用 a/<! 而不是 a/<!!

 (a/go (let [x (a/<! ch)] ...))

邏輯行為與 a/<!! 相同,但只有 goroutine 的邏輯程序才會阻塞而不是實際的 OS 執行緒。

(go ...) 塊中使用 a/<!! 是一種反模式:

;; NEVER DO THIS
(a/go 
  (a/<!! ch))

關閉渠道

你用 a/close! 關閉一個頻道:

(a/close! ch)

一旦頻道關閉,並且頻道中的所有資料都已用完,拍攝將始終返回 nil

(def ch (a/chan 5))

;; putting 2 values in the channel, then closing it
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/close! ch)

;; taking from ch will return the items that were put in it, then nil
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil

;; once the channel is closed, >!! will have no effect on the channel:
(a/>!! ch :item-3)
=> false ;; false means the put did not succeed
(a/<!! ch) ;; => nil

非同步放入 put!

作為 a/>!!(可能阻止)的替代方法,你可以呼叫 a/put! 將值放入另一個執行緒的通道中,並帶有可選的回撥。

(a/put! ch :item)
(a/put! ch :item (fn once-put [closed?] ...)) ;; callback function, will receive 

在 ClojureScript 中,由於無法阻止當前執行緒,因此不支援 a/>!!,並且 put! 是將資料從 (go) 塊外部放入通道的唯一方法。

非同步採用 take!

作為 a/<!!(可能會阻塞當前執行緒)的替代方法,你可以使用 a/take! 非同步從通道中獲取值,並將其傳遞給回撥。

(a/take! ch (fn [x] (do something with x)))

使用下拉和滑動緩衝區

使用 drop 和 slip 緩衝區,puts 永遠不會阻塞,但是,當緩衝區已滿時,會丟失資料。丟棄緩衝區會丟失最後新增的資料,而滑動緩衝區會丟失新增的第一個資料。

刪除緩衝區示例:

(def ch (a/chan (a/dropping-buffer 2)))

;; putting more items than buffer size
(a/>!! ch :item-1)
=> true ;; put succeeded
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> false ;; put failed

;; no we take from the channel
(a/<!! ch)
=> :item-1
(a/<!! ch)
=> :item-2
(a/<!! ch)
;; blocks! :item-3 is lost

滑動緩衝區示例:

(def ch (a/chan (a/sliding-buffer 2)))

;; putting more items than buffer size
(a/>!! ch :item-1)
=> true
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> true

;; no when we take from the channel:
(a/<!! ch)
=> :item-2
(a/<!! ch)
=> :item-3
;; :item-1 was lost