基本通道操作创建 put 关闭和缓冲区
core.async
是关于制作过程是需要从值,把值到渠道。
(require [clojure.core.async :as a])
用 chan
创建频道
你可以使用 chan
函数创建频道:
(def chan-0 (a/chan)) ;; unbuffered channel: acts as a rendez-vous point.
(def chan-1 (a/chan 3)) ;; channel with a buffer of size 3.
(def chan-2 (a/chan (a/dropping-buffer 3)) ;; channel with a *dropping* buffer of size 3
(def chan-3 (a/chan (a/sliding-buffer 3)) ;; channel with a *sliding* buffer of size 3
使用 >!!
和 >!
将值放入通道
你可以使用 >!!
将值放入通道:
(a/>!! my-channel :an-item)
除了 nil
之外,你可以将任何值(字符串,数字,地图,集合,对象,甚至其他频道等)放入频道中:
;; WON'T WORK
(a/>!! my-channel nil)
=> IllegalArgumentException Can't put nil on channel
根据通道的缓冲区,>!!
可能会阻塞当前线程。
(let [ch (a/chan)] ;; unbuffered channel
(a/>!! ch :item)
;; the above call blocks, until another process
;; takes the item from the channel.
)
(let [ch (a/chan 3)] ;; channel with 3-size buffer
(a/>!! ch :item-1) ;; => true
(a/>!! ch :item-2) ;; => true
(a/>!! ch :item-3) ;; => true
(a/>!! ch :item-4)
;; now the buffer is full; blocks until :item-1 is taken from ch.
)
从 (go ...)
区块内部,你可以 - 并且应该 - 使用 a/>!
而不是 a/>!!
:
(a/go (a/>! ch :item))
逻辑行为与 a/>!!
相同,但只有 goroutine 的逻辑进程才会阻塞而不是实际的 OS 线程。
在 (go ...)
块中使用 a/>!!
是一种反模式:
;; NEVER DO THIS
(a/go
(a/>!! ch :item))
使用 <!!
从频道中获取值
你使用 <!!
从频道中获取值:
;; creating a channel
(def ch (a/chan 3))
;; putting some items in it
(do
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/>!! ch :item-3))
;; taking a value
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
如果通道中没有可用的项目,a/<!!
将阻止当前线程,直到在通道中放入一个值(或者通道关闭,请参阅后面的内容):
(def ch (a/chan))
(a/<!! ch) ;; blocks until another process puts something into ch or closes it
从 (go ...)
区块内部,你可以 - 并且应该 - 使用 a/<!
而不是 a/<!!
:
(a/go (let [x (a/<! ch)] ...))
逻辑行为与 a/<!!
相同,但只有 goroutine 的逻辑进程才会阻塞而不是实际的 OS 线程。
在 (go ...)
块中使用 a/<!!
是一种反模式:
;; NEVER DO THIS
(a/go
(a/<!! ch))
关闭渠道
你用 a/close!
关闭一个频道:
(a/close! ch)
一旦频道关闭,并且频道中的所有数据都已用完,拍摄将始终返回 nil
:
(def ch (a/chan 5))
;; putting 2 values in the channel, then closing it
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/close! ch)
;; taking from ch will return the items that were put in it, then nil
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
;; once the channel is closed, >!! will have no effect on the channel:
(a/>!! ch :item-3)
=> false ;; false means the put did not succeed
(a/<!! ch) ;; => nil
异步放入 put!
作为 a/>!!
(可能阻止)的替代方法,你可以调用 a/put!
将值放入另一个线程的通道中,并带有可选的回调。
(a/put! ch :item)
(a/put! ch :item (fn once-put [closed?] ...)) ;; callback function, will receive
在 ClojureScript 中,由于无法阻止当前线程,因此不支持 a/>!!
,并且 put!
是将数据从 (go)
块外部放入通道的唯一方法。
异步采用 take!
作为 a/<!!
(可能会阻塞当前线程)的替代方法,你可以使用 a/take!
异步从通道中获取值,并将其传递给回调。
(a/take! ch (fn [x] (do something with x)))
使用下拉和滑动缓冲区
使用 drop 和 slip 缓冲区,puts 永远不会阻塞,但是,当缓冲区已满时,会丢失数据。丢弃缓冲区会丢失最后添加的数据,而滑动缓冲区会丢失添加的第一个数据。
删除缓冲区示例:
(def ch (a/chan (a/dropping-buffer 2)))
;; putting more items than buffer size
(a/>!! ch :item-1)
=> true ;; put succeeded
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> false ;; put failed
;; no we take from the channel
(a/<!! ch)
=> :item-1
(a/<!! ch)
=> :item-2
(a/<!! ch)
;; blocks! :item-3 is lost
滑动缓冲区示例:
(def ch (a/chan (a/sliding-buffer 2)))
;; putting more items than buffer size
(a/>!! ch :item-1)
=> true
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> true
;; no when we take from the channel:
(a/<!! ch)
=> :item-2
(a/<!! ch)
=> :item-3
;; :item-1 was lost