发送多个并行 HTTP 请求

Web 应用程序中一个非常常见的用例是执行多个异步(例如 HTTP)请求,并在它们到达时收集它们的结果或者同时收集它们的所有结果(例如,在 Angular2 中使用 HTTP 服务 )。

1.当他们到达时逐个收集异步响应

这通常使用 mergeMap() 运算符来完成,运算符采用必须返回 Observable 的投影函数。即使先前的 Observable 尚未完成,运算符 mergeMap() 也会立即在内部订阅每个 Observable。

function mockHTTPRequest(url) {
    return Observable.of(`Response from ${url}`)
      .delay(Math.random() * 1000);
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];

Observable.from(urls)
  .mergeMap(url => mockHTTPRequest(url))
  .subscribe(val => console.log(val));

由于随机延迟,这会以不同的顺序打印对控制台的响应:

Response from url-3
Response from url-4
Response from url-2
Response from url-1

查看现场演示: https//jsbin.com/xaqudan/2/edit?js,console

每个响应(通过 next 调用发出的项目)立即由 mergeMap() 重新发送。

为了我们发送多个 HTTP 请求的目的,有必要提一下 mergeMap() 总共可以带三个参数:

  1. 需要返回 Observable 的投影函数。
  2. 结果选择器功能允许我们在进一步发出结果之前修改结果。
  3. 并发订阅的 Observable 的数量。

控制并行请求的数量

使用第三个参数,我们可以控制我们将处理多少个并行请求(假设执行 HTTP 请求的每个 Observable 都是)。

在以下示例中,我们将同时仅运行 2 个请求。

function mockHTTPRequest(url) {
    return Observable.of(`Response from ${url}`)
      .delay(1000);
}

let urls = ['url-1', 'url-2', 'url-3', 'url-4'];
let start = (new Date()).getTime();

Observable.from(urls)
  .mergeMap(url => mockHTTPRequest(url), undefined, 2)
  .timestamp()
  .map(stamp => [stamp.timestamp - start, stamp.value])
  .subscribe(val => console.log(val));

观看现场演示: https//jsbin.com/sojejal/4/edit?js,console

请注意,前两个请求在 1 秒后完成,而其他两个请求在 2 秒后完成。

[1004, "Response from url-1"]
[1010, "Response from url-2"]
[2007, "Response from url-3"]
[2012, "Response from url-4"]

处理错误

如果任何源 Observable 失败(发送 error 通知),mergeMap() 会再次发送错误作为 error。如果我们希望每个 Observable 优雅地失败,我们需要使用例如 catch() 操作符。

function mockHTTPRequest(url) {
  return Observable.of(`Response from ${url}`)
    .delay(Math.random() * 1000)
    .map(value => {
      if (url === 'url-3') {
        throw new Error(`Error response from ${url}`)
      }
      return value;
    });
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];

Observable.from(urls)
  .mergeMap(url => mockHTTPRequest(url).catch(() => Observable.empty()))
  .subscribe(val => console.log(val));

url-3 的响应会抛出一个错误,该错误将作为 error 通知发送。这是后来被 catch() 运算符捕获并替换为 Observable.empty(),这只是一个 complete 通知。因此,忽略此响应。

此示例的输出如下:

Response from url-4
Response from url-1
Response from url-2

观看现场演示: https//jsbin.com/kuqumud/4/edit?js,console

2.立即收集所有异步响应

按照前面的例子,我们可以用 toArray() 运算符收集所有响应。

function mockHTTPRequest(url) {
    return Observable.of(`Response from ${url}`)
      .delay(Math.random() * 1000);
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];

Observable.from(urls)
  .mergeMap(url => mockHTTPRequest(url))
  .toArray()
  .subscribe(val => console.log(val));

但是,使用 toArray() 算子会产生重要影响。订阅者是否收到结果不仅通过完成所有 HTTP 请求来控制,还通过完成源 Observable(在我们的例子中为 Observable.from)来控制。这意味着我们不能使用永远不会完成的源 Observable(例如,Observable.fromEvent)。

另一种实现相同结果的方法是使用 Observable.forkJoin() ,它将我们想要订阅的 Observable 数组作为参数,并等待所有它们发出至少一个值并完成

function mockHTTPRequest(url) {
    return Observable.of(`Response from ${url}`)
      .delay(Math.random() * 1000);
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
var observables = urls.map(url => mockHTTPRequest(url));

Observable.forkJoin(observables)
  .subscribe(val => console.log(val));

这会将所有响应打印为单个数组:

["Response from url-1", "Response from url-2", "Response from url-3", "Response from url-4"]

观看现场演示: https//jsbin.com/fomoye/2/edit?js,console

Observable.forkJoin() 还将一个结果选择器函数作为可选参数,它允许我们在进一步发出之前修改最终结果:

Observable.forkJoin(observables, (...results) => {
    return results.length;
  })
  .subscribe(val => console.log(val));

这打印到控制台:

4

查看现场演示: https//jsbin.com/muwiqic/1/edit?js,console

请注意,结果选择器函数的参数已解压缩。

处理错误

对于错误处理,我们可以使用与前面示例中相同的方法与 catch() 运算符。

但是,有一件重要的事情需要注意。forkJoin() 要求每个源 Observable 至少发出一个值。如果我们使用 catch(() => Observable.empty()) 就像我们之前所做的那样,forkJoin() 永远不会发出任何东西,因为 Observable.empty() 只是一个通知。
这就是为什么我们需要使用例如 Observable.of(null),这是 null 值,然后是 complete 通知。

function mockHTTPRequest(url) {
  return Observable.of(`Response from ${url}`)
    .delay(Math.random() * 1000)
    .map(value => {
      if (url === 'url-3') {
        throw new Error(`Error response from ${url}`)
      }
      return value;
    });
}

var urls = ['url-1', 'url-2', 'url-3', 'url-4'];

var observables = urls.map(url => mockHTTPRequest(url).catch(() => Observable.of(null)));

Observable.forkJoin(observables)
  .subscribe(val => console.log(val));

观看现场演示: https//jsbin.com/yidiked/2/edit?js,console

这打印到控制台:

["Response from url-1", "Response from url-2", null, "Response from url-4"]

请注意,错误由 null 替换。如果我们只使用 Observable.empty(),则 forkJoin() 将永远不会发出任何东西。