基本通道操作创建 put 关闭和缓冲区

core.async 是关于制作过程是需要从值,把值到渠道

(require [clojure.core.async :as a])

chan 创建频道

你可以使用 chan 函数创建频道:

(def chan-0 (a/chan)) ;; unbuffered channel: acts as a rendez-vous point.
(def chan-1 (a/chan 3)) ;; channel with a buffer of size 3. 
(def chan-2 (a/chan (a/dropping-buffer 3)) ;; channel with a *dropping* buffer of size 3
(def chan-3 (a/chan (a/sliding-buffer 3)) ;; channel with a *sliding* buffer of size 3

使用 >!!>! 将值放入通道

你可以使用 >!! 将值放入通道:

(a/>!! my-channel :an-item)

除了 nil 之外,你可以将任何值(字符串,数字,地图,集合,对象,甚至其他频道等)放入频道中:

;; WON'T WORK
(a/>!! my-channel nil)
=> IllegalArgumentException Can't put nil on channel

根据通道的缓冲区,>!! 可能会阻塞当前线程。

(let [ch (a/chan)] ;; unbuffered channel
  (a/>!! ch :item) 
  ;; the above call blocks, until another process 
  ;; takes the item from the channel.
  )
(let [ch (a/chan 3)] ;; channel with 3-size buffer
  (a/>!! ch :item-1) ;; => true
  (a/>!! ch :item-2) ;; => true
  (a/>!! ch :item-3) ;; => true
  (a/>!! ch :item-4) 
  ;; now the buffer is full; blocks until :item-1 is taken from ch.
  )

(go ...) 区块内部,你可以 - 并且应该 - 使用 a/>! 而不是 a/>!!

 (a/go (a/>! ch :item))

逻辑行为与 a/>!! 相同,但只有 goroutine 的逻辑进程才会阻塞而不是实际的 OS 线程。

(go ...) 块中使用 a/>!! 是一种反模式:

;; NEVER DO THIS
(a/go 
  (a/>!! ch :item))

使用 <!! 从频道中获取值

你使用 <!! 从频道中获取值:

;; creating a channel
(def ch (a/chan 3))
;; putting some items in it
(do 
  (a/>!! ch :item-1)
  (a/>!! ch :item-2)
  (a/>!! ch :item-3))
;; taking a value
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2

如果通道中没有可用的项目,a/<!! 将阻止当前线程,直到在通道中放入一个值(或者通道关闭,请参阅后面的内容):

(def ch (a/chan))
(a/<!! ch) ;; blocks until another process puts something into ch or closes it

(go ...) 区块内部,你可以 - 并且应该 - 使用 a/<! 而不是 a/<!!

 (a/go (let [x (a/<! ch)] ...))

逻辑行为与 a/<!! 相同,但只有 goroutine 的逻辑进程才会阻塞而不是实际的 OS 线程。

(go ...) 块中使用 a/<!! 是一种反模式:

;; NEVER DO THIS
(a/go 
  (a/<!! ch))

关闭渠道

你用 a/close! 关闭一个频道:

(a/close! ch)

一旦频道关闭,并且频道中的所有数据都已用完,拍摄将始终返回 nil

(def ch (a/chan 5))

;; putting 2 values in the channel, then closing it
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/close! ch)

;; taking from ch will return the items that were put in it, then nil
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil

;; once the channel is closed, >!! will have no effect on the channel:
(a/>!! ch :item-3)
=> false ;; false means the put did not succeed
(a/<!! ch) ;; => nil

异步放入 put!

作为 a/>!!(可能阻止)的替代方法,你可以调用 a/put! 将值放入另一个线程的通道中,并带有可选的回调。

(a/put! ch :item)
(a/put! ch :item (fn once-put [closed?] ...)) ;; callback function, will receive 

在 ClojureScript 中,由于无法阻止当前线程,因此不支持 a/>!!,并且 put! 是将数据从 (go) 块外部放入通道的唯一方法。

异步采用 take!

作为 a/<!!(可能会阻塞当前线程)的替代方法,你可以使用 a/take! 异步从通道中获取值,并将其传递给回调。

(a/take! ch (fn [x] (do something with x)))

使用下拉和滑动缓冲区

使用 drop 和 slip 缓冲区,puts 永远不会阻塞,但是,当缓冲区已满时,会丢失数据。丢弃缓冲区会丢失最后添加的数据,而滑动缓冲区会丢失添加的第一个数据。

删除缓冲区示例:

(def ch (a/chan (a/dropping-buffer 2)))

;; putting more items than buffer size
(a/>!! ch :item-1)
=> true ;; put succeeded
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> false ;; put failed

;; no we take from the channel
(a/<!! ch)
=> :item-1
(a/<!! ch)
=> :item-2
(a/<!! ch)
;; blocks! :item-3 is lost

滑动缓冲区示例:

(def ch (a/chan (a/sliding-buffer 2)))

;; putting more items than buffer size
(a/>!! ch :item-1)
=> true
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> true

;; no when we take from the channel:
(a/<!! ch)
=> :item-2
(a/<!! ch)
=> :item-3
;; :item-1 was lost