为什么需要 Observable

RxJS 说是弥补关于数据处理场景的一块空白 → 推送多个数据

这里把我们对数据的处理方式做了两个维度的分类:

TypeSINGLEMULTIPLE
PullFunctionIterator
PushPromiseObservable

Pull 和 Push

Pull 就是我要的时候,就能要到。生产者无感。

  • 每个 JS 函数都是在 Pull
  • ES2015 的生成器(function*)弥补了"拉取多个值"的空白

Push 就是我决定啥时候给,消费者无感。

  • 典型例子就是 Promise,等 then 的时候自然会 push 给你

从生产者和消费者的角度理解:

生产者消费者
Pull被动:等着要数据的时候再产生数据主动:会决定啥时候去要数据
Push主动:按照自己的节奏产生数据,然后推出去被动:就等着数据(等回调)

代码示例

1. 拉取单个数据

function getNum() {
  return Math.floor(Math.random() * 10)
}

function singlePull() {
  const num = getNum()
  console.log(num, 'singlePull')
}

2. 推送单个数据

function getNumPromise() {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(getNum())
    }, 1000)
  })
}

function singlePush() {
  const promise = getNumPromise()
  promise.then(num => console.log(num, 'singlePush'))
}

3. 拉取多个数据

function* getNumIterator() {
  let i = 0;
  while (i < 3) {
    i++;
    yield getNum();
  }
  return i;
}

function multiplePull() {
  const it = getNumIterator()
  while (true) {
    const { value, done } = it.next()
    if (done) break
    console.log(value, 'multiplePull')
  }
}

4. 推送多个数据

function getNumObservable() {
  return new Observable(subscriber => {
    subscriber.next(getNum())
    subscriber.next(getNum())
    subscriber.next(getNum())
    setTimeout(() => {
      subscriber.next(getNum())
      subscriber.complete()
    }, 200)
  })
}

function multiplePush() {
  const ob = getNumObservable()
  ob.subscribe(val => {
    console.log(val, 'multiplePush')
  })
}

核心概念

英文中文描述
Observable可观察对象可调用的未来值或事件的集合
Observer观察者知道如何去监听由 Observable 提供的值
Subscription订阅Observable 的执行,主要用于取消 Observable 的执行
Operators操作符纯函数,如 map, filter, concat, flatmap 等处理集合
Subject主体相当于 EventEmitter,是将多值或事件多路推送给多个 Observer 的唯一方式
Scheduler调度器用来控制并发,中央集权的调度员

个人理解

  • Observable: 核心集合对象。会(在未来)产生值和事件。
  • Observer: 对 Observable 产生的值和事件的处理函数。
  • Subscription: 代表 Observable 的执行实例,可用于结束 Observable。
  • Operators: 操作符,一系列无副作用函数。
  • Subject: 唯一的多播(multicasting)方式,可多播到多个 Observer。
  • Scheduler: 控制并发,类似于一个共用的锁。

Observable 的使用

创建 Observable

方式一:new 一个

const ob = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.complete()
})

方式二:使用操作符

of(1, 2, 3)
from([1, 2, 3])
interval(200)

订阅 Observable

of(1,2,3).subscribe(val => {
  console.log(val, 'of')
})

取消订阅

const subscription = interval(200).subscribe(val => {
  console.log(val, 'interval');
})
setTimeout(() => subscription.unsubscribe(), 1000)

Observable 的执行时机

重要:Observable 是在被 subscribe 了才执行的。

这是理解"多播"与"单播"的关键点:

let n = 0;
function getNum() {
  return n++;
}

const ob = new Observable(subscriber => {
  subscriber.next(getNum());
})

console.log(n) // 0

ob.subscribe(val => console.log(val, 'unicast')); // 1

参考:RxJS Official - Executing Observables

Observer

Observer 是一个消费者,消费的是 Observable 中流转的值。是一个最多有 3 个回调函数的对象。

const observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
}

注:回调可以省略。

使用示例:

const ob = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)	
  subscriber.complete()
})
ob.subscribe(observer)

Subject 多播 vs 单播

RxJS Subject 是一个特殊类型的 Observable,核心点就是它是多播(multicast)的。普通的 Observable 是单播的(unicast)。

const { Observable, Subject } = require("rxjs");

let n = 0;
function getNum() {
  return n++;
}

// 单播:每次订阅都会重新执行
const unicastObservable = new Observable(observer => {
  observer.next(getNum());
})

unicastObservable.subscribe(val => console.log(val, 'unicast')); // 0
unicastObservable.subscribe(val => console.log(val, 'unicast')); // 1

// 多播:多次订阅共享一次计算结果
const multicastObservable = new Subject();
setTimeout(() => {
  multicastObservable.next(getNum());
}, 0);

multicastObservable.subscribe(val => console.log(val, 'multicast')); // 2
multicastObservable.subscribe(val => console.log(val, 'multicast')); // 2
  • 单播(unicast): 每订阅(subscribe)一次,就执行了一次
  • 多播(multicast): 多次订阅,共享的一次计算结果

Operators 操作符

RxJS 有两类操作符:Pipeable OperatorsCreation Operators

Creation Operators

可以理解为一种生成新的 Observable 的独立函数。比如 of(1,2,3) 生成一个 Observable,会一个个发出 1, 2, 3。

Pipeable Operators

本质上就是一个纯函数,输入 Observable,输出新的 Observable。

const { Observable, pipe, filter } = require('rxjs')

const observable = new Observable(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
  observer.next(4)
})

// 通过 pipe 构建自己的 operator
function onlyEven() {
  return pipe(
    filter((value) => value % 2 === 0)
  )
}

observable.pipe(
  onlyEven()
).subscribe(v => console.log(v)) // 2, 4

参考:RxJS Official - Operators

Subscription

Subscription 是一个对象,表示一次性资源,常作为一个 Observable 的执行器。有一个重要的方法是 unsubscribe 用于取消订阅。

本质上就是一个 Observable 的执行对象,保存着一个取消订阅的方法。

const { interval } = require('rxjs');

const obab = interval(200)
const subscription = obab.subscribe(val => console.log(val, 'interval'))

setTimeout(() => {
  subscription.unsubscribe()
}, 2000);

Scheduler

Scheduler 负责控制 subscription 的开始和数据的传输。

涉及知识点:JS 执行机制

参考:

API 速查

interval(period: number, scheduler)

隔 period 毫秒产生一个递增数字。

interval(200) // 每 200ms 发出一个递增数字

总结:RxJS 是一个强大的响应式编程库,核心是 Observable。理解 Pull/Push、单播/多播、以及各类操作符的用法,是掌握 RxJS 的关键。