ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

rxJS

2021-02-17 00:01:02  阅读:208  来源: 互联网

标签:Observable observerA console next subscribe rxJS subject


概述

Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。

Observable(可观察对象)

  • 定义

    Observables 是多个值的惰性推送集合

  • 创建 Observables

    Rx.Observable.create 是 Observable 构造函数的别名,它接收一个参数:subscribe 函数。

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});
  • 订阅 Observables

    订阅 Observable 像是调用函数, 并提供接收数据的回调函数。

observable.subscribe(x => console.log(x));

注:
   observable.subscribe 
   Observable.create(function subscribe(observer) {...}) 
   中的 subscribe 有着同样的名字,这并不是一个巧合。
   这表明 subscribe 调用在同一 Observable 的多个观察者之间是不共享的。
  • 执行 Observables
Observable.create(function subscribe(observer) {...}) 中...的代码表示 “Observable 执行”
它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。
Observable 执行可以传递三种类型的值:

"Next"     通知: 发送一个值,比如数字、字符串、对象,等等。 -- 最重要
"Error"    通知: 发送一个 JavaScript 错误 或 异常。
"Complete" 通知: 不再发送任何值。
  • 清理 Observable 执行

    当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。

1. 
var observable = Rx.Observable.from([10, 20, 30]);
var subscription1 = observable.subscribe(x => console.log(x));

// 清理
subscription1.unsubscribe();

2.
var observable = Rx.Observable.create(function subscribe(observer) {
  // 追踪 interval 资源
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  // 提供取消和清理 interval 资源的方法
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});
var subscription2 = observable.subscribe(x => console.log(x));

// 清理
subscription2.unsubscribe();

Observer (观察者)

  • 定义

    观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete

1.
var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);

2. 
observable.subscribe(x => console.log('Observer got a next value: ' + x));
在 observable.subscribe 内部,它会创建一个观察者对象并使用第一个回调函数参数作为 next 的处理方法。

三种类型的回调函数都可以直接作为参数来提供:
observable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);

Subscription (订阅)

  • 定义

    Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe

Subject (主体)

  • 定义

    1. Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者

    2. Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。

  1. 每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法。
var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

执行结果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
  1. 每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)、error(e) 和 complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。
var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅

执行结果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
  • 多播的 Observables

    多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。

import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
 
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
 
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
// This is, under the hood, `source.subscribe(subject)`:
// 决定了何时启动共享的 Observable 执行
multicasted.connect(); 
  • BehaviorSubject

    “当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
 
subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
 
subject.next(1);
subject.next(2);
 
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
subject.next(3);
 
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
  • ReplaySubject

    ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。

import { ReplaySubject } from 'rxjs';

// 为新的订阅者缓冲3个值
// (还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录:new Rx.ReplaySubject(100, 500 /* windowTime */);)
const subject = new ReplaySubject(3);
 
subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
 
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
 
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
subject.next(5);
 
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
  • AsyncSubject

    AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
 
subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
 
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
 
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
subject.next(5);
subject.complete();
 
// Logs:
// observerA: 5
// observerB: 5

Operators

帮助学习的网站:
https://rxmarbles.com/
https://reactive.how/rxjs/explorer
  • 定义

    操作符是函数,它基于当前的 Observable 创建一个新的 Observable(接收一个 Observable 作为输入、并生成一个新的 Observable 作为输出)。这是一个无副作用的操作:前面的 Observable 保持不变

    实例操作符 vs. 静态操作符

    1. 实例操作符:Observable 实例上的方法
    2. 静态操作符:静态操作符是附加到 Observalbe 类上的纯函数,通常用来从头开始创建 Observalbe 。它们只接收非 Observable 参数,比如数字,然后创建一个新的 Observable ,而不是将一个输入 Observable 转换为输出 Observable 。
  • 工作方式
    弹珠图

  • 操作符分类
    TODO

调度器

  • 定义

    调度器控制着何时启动 subscription 和何时发送通知、用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调。(调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者)

    1. 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
    2. 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
    3. 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。
  • 调度器类型

调度器目的
null不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。
Rx.Scheduler.queue当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。
Rx.Scheduler.asap微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。
Rx.Scheduler.async使用 setInterval 的调度。用于基于时间的操作符。

标签:Observable,observerA,console,next,subscribe,rxJS,subject
来源: https://blog.csdn.net/weixin_39807883/article/details/113829871

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有