Skip to content
Menu
CFC Studio
  • 实验室主页
  • CFC 招新简章
  • 友情链接
  • RSS订阅
CFC Studio

通俗易懂的方式理解 RxJS

Posted on 2017年2月27日2017年3月23日 by Mr.code

序言

今早看民工叔的文章的时候, 发现对Rxjs所知甚少, 于是去官方看了下教程, 整理出一些东西, 写成此文。
Rxjs据说会在2017年流行起来, 因为其处理异步逻辑,数据流, 事件非常擅长。 但是其学习曲线相比Promise, EventEmitter陡峭了不少。 而且民工叔也说:”由于RxJS的抽象程度很高,所以,可以用很简短代码表达很复杂的含义,这对开发人员的要求也会比较高,需要有比较强的归纳能力。” 本文就Rx.js的几个核心概念做出阐述。 尽可能以通俗易懂的方式解释这些概念。要是本文有误或不完善的地方,欢迎指出。

Observable到底是什么

先上代码:

let foo = Rx.Observable.create(observer => {  
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(x => console.log(x));  
foo.subscribe(y => console.log(y));

输出

"Hello"
42  
"Hello"
42

这里可以把foo想象成一个函数,这意味着你每次调用foo都会导致传入Rx.Observable.create里的回调函数重新执行一次, 调用的方式为foo.subscribe(callback), 相当于foo()。 接收函数返回值的方式也从var a = foo()改为通过传入回调函数的方式获取。第三行的observer.next表示返回一个值, 你可以调用多次,每次调用observer.next后, 会先将next里的值返回给foo.subcribe里的回调函数, 执行完后再返回。observer.complete, observer.error来控制流程。 具体看代码:

var observable = Rx.Observable.create(observer => {  
  try {
    observer.next(1);
    console.log('hello');
    observer.next(2);
    observer.next(3);
    observer.complete();
    observer.next(4);
  } catch (err) {
    observer.error(err); 
  }
});

let = subcription = observable.subscribe(value => {  
  console.log(value)
})

运行结果:

1  
hello  
2  
3

如上的第一个回调函数里的结构是推荐的结构。 当observable的执行出现异常的时候,通过observer.error将错误返回, 然而observable.subscribe的回调函数无法接收到.因为observer.complete已经调用, 因此observer.next(4)的返回是无效的. Observable不是可以返回多个值的Promise 虽然获得Promise的值的方式也是通过then函数这种类似的方式, 但是new Promise(callback)里的callback回调永远只会执行一次!因为Promise的状态是不可逆的。

可以使用其他方式创建Observable, 看代码:

var clicks = Rx.Observable.fromEvent(document, 'click');  
clicks.subscribe(x => console.log(x));

当用户对document产生一个click行为的时候, 就会打印事件对象到控制台上。

Observer是什么

先看代码:

let foo = Rx.Observable.create(observer => {  
  console.log('Hello');
  observer.next(42);
});

let observer = x => console.log(x);  
foo.subscribe(observer);

代码中的第二个变量就是observer. 没错, observer就是当Observable”返回”值的时候接受那个值的函数!第一行中的observer其实就是通过foo.subscribe传入的callback. 只不过稍加封装了。 怎么封装的? 看代码:

let foo = Rx.Observable.create(observer => {  
  try {
    console.log('Hello');
    observer.next(42);
    observer.complete();
    observer.next(10);
  } catch(e) { observer.error(e) }

});

let observer = {  
  next(value) { console.log(value) },
  complete() { console.log('completed'),
  error(err) { console.error(err) }
}
foo.subscribe(observer);

你看到observer被定义成了一个对象, 其实这才是完整的observer. 传入一个callback到observable.subcribe相当于传入了{ next: callback }。

Subcription里的陷阱

Subscription是什么, 先上代码:

var observable = Rx.Observable.interval(1000);  
var subscription = observable.subscribe(x => console.log(x));

setTimeout(() => {  
  subscription.unsubscribe();
}, 3100)

运行结果:

0  
1  
2

Rx.Observable.interval可以返回一个能够发射(返回)0, 1, 2, 3…, n数字的Observable, 返回的时间间隔这里是1000ms。 第二行中的变量就是subscription。 subscription有一个unsubscribe方法, 这个方法可以让subscription订阅的observable发射的数据被observer忽略掉.通俗点说就是取消订阅。

unsubscribe存在一个陷阱。 先看代码:

var foo = Rx.Observable.create((observer) => {  
  var i = 0
  setInterval(() => {
    observer.next(i++)
    console.log('hello')
  }, 1000)
})

const subcription = foo.subscribe((i) => console.log(i))  
subcription.unsubscribe()

运行结果:

hello  
hello  
hello  
......
hello

unsubscribe只会让observer忽略掉observable发射的数据,但是setInterval依然会继续执行。 这看起来似乎是一个愚蠢的设计。 所以不建议这样写。

Subject

Subject是一种能够发射数据给多个observer的Observable, 这让Subject看起来就好像是EventEmitter。 先上代码:

var subject = new Rx.Subject();

subject.subscribe({  
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({  
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);  
subject.next(2);

运行结果:

observerA: 1  
observerB: 1  
observerA: 2  
observerB: 2

与Observable不同的是, Subject发射数据给多个observer。 其次, 定义subject的时候并没有传入callback, 这是因为subject自带next, complete, error等方法。从而可以发射数据给observer。 这和EventEmitter很类似。observer并不知道他subscribe的是Obervable还是Subject。 对observer来说是透明的。 而且Subject还有各种派生, 比如说:

BehaviorSubject 能够保留最近的数据,使得当有subscribe的时候,立马发射出去。看代码:

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({  
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);  
subject.next(2);

subject.subscribe({  
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

运行结果:

observerA: 0  
observerA: 1  
observerA: 2  
observerB: 2  
observerA: 3  
observerB: 3

ReplaySubject 能够保留最近的一些数据, 使得当有subscribe的时候,将这些数据发射出去。看代码:

var subject = new Rx.ReplaySubject(3); 

subject.subscribe({  
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);  
subject.next(2);  
subject.next(3);  
subject.next(4);

subject.subscribe({  
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出结果:

observerA: 1  
observerA: 2  
observerA: 3  
observerA: 4  
observerB: 2  
observerB: 3  
observerB: 4  
observerA: 5  
observerB: 5

第一行的声明表示ReplaySubject最大能够记录的数据的数量是3。

AsyncSubject 只会发射结束前的一个数据。 看代码:

var subject = new Rx.AsyncSubject();

subject.subscribe({  
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);  
subject.next(2);  
subject.next(3);  
subject.next(4);

subject.subscribe({  
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);  
subject.complete();

输出结果:

observerA: 5  
observerB: 5

既然subject有next, error, complete三种方法, 那subject就可以作为observer! 看代码:

var subject = new Rx.Subject();

subject.subscribe({  
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({  
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject);

输出结果:

observerA: 1  
observerB: 1  
observerA: 2  
observerB: 2  
observerA: 3  
observerB: 3

也就是说, observable.subscribe可以传入一个subject来订阅其消息。 这就好像是Rxjs中的一颗语法糖, Rxjs有专门的实现。

Multicasted Observables 是一种借助Subject来将数据发射给多个observer的Observable。 看代码:

var source = Rx.Observable.from([1, 2, 3]);  
var subject = new Rx.Subject();  
var multicasted = source.multicast(subject);

multicasted.subscribe({  
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({  
  next: (v) => console.log('observerB: ' + v)
});

multicasted.connect();

Rx.Observable.from能够逐一发射数组中的元素, 在multicasted.connect()调用之前的任何subscribe都不会导致source发射数据。multicasted.connect()相当于之前的observable.subscribe(subject)。因此不能将multicasted.connect()写在subscribe的前面。因为这会导致在执行multicasted.connect()的时候source发射数据, 但是subject又没保存数据, 导致两个subscribe无法接收到任何数据。

最好是第一个subscribe的时候能够得到当前已有的数据, 最后一个unsubscribe的时候就停止Observable的执行, 相当于Observable发射的数据都被忽略。

refCount就是能够返回这样的Observable的方法

var source = Rx.Observable.interval(500);  
var subject = new Rx.Subject();  
var refCounted = source.multicast(subject).refCount();  
var subscription1, subscription2, subscriptionConnect;

console.log('observerA subscribed');  
subscription1 = refCounted.subscribe({  
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {  
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {  
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {  
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

输出结果:

observerA subscribed  
observerA: 0  
observerB subscribed  
observerA: 1  
observerB: 1  
observerA unsubscribed  
observerB: 2  
observerB unsubscribed

What’s Operators?

Observable上有很多方法, 比如说map, filter, merge等等。 他们基于调用它们的observable,返回一个全新的observable。 而且他们都是纯方法。 operators分为两种, instance operators 和 static operators。 instance operators是存在于observable实例上的方法, 也就是实例方法; static operators是存在于Observable这个类型上的方法, 也就是静态方法。Rxjs拥有很多强大的operators。

自己实现一个operators:

function multiplyByTen(input) {  
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);  
var output = multiplyByTen(input);  
output.subscribe(x => console.log(x));

输出结果:

10  
20  
30  
40

Rx.js实践import React from ‘react’;

import ReactDOM from 'react-dom';  
import Rx from 'rx';

class Main extends React.Component {  
  constructor (props) {
    super(props);
    this.state = {count: 0};
  }

  // Click events are now observables! No more proactive approach.
  componentDidMount () {
    const plusBtn = document.getElementById('plus');
    const minusBtn = document.getElementById('minus');

    const plus$ = Rx.Observable.fromEvent(plusBtn, 'click').map(e => 1);
    const minus$ = Rx.Observable.fromEvent(minusBtn, 'click').map(e => -1);

    Rx.Observable.merge(plus$, minus$).scan((acc, n) => acc + n)
      .subscribe(value => this.setState({count: value}));
  }

  render () {
    return (
        <div>
          <button id="plus">+</button>
          <button id="minus">-</button>
          <div>count: {this.state.count}</div>
        </div>
    );
  }
}

ReactDOM.render(<Main/>, document.getElementById('app'));

merge用于合并两个observable产生一个新的observable。 scan类似于Array中的reduce。 这个例子实现了点击plus的时候+1, 点击minus的时候-1。

Rx.js适用的场景

  • 多个复杂的异步或事件组合在一起。
  • 处理多个数据序列

假如没有被复杂的异步,事件, 数据序列困扰, 如果promise已经足够的话, 就没必要适用Rx.js。

Summary

  • Observable, Observer, Subscription, Subscrib, Subject概念。
  • RxJS适用于解决复杂的异步,事件问题。

文章参考

  • 让我们一起来学习 RxJS —by 饿了么前端
  • 21-use-rxjs-for-orchestrating-asynchronous-and-event-based-computations
  • RxJS文档
  • RxJS 入门指引和初步应用 —by 民工叔

发表评论 取消回复

您的电子邮箱地址不会被公开。 必填项已用*标注

分类

  • CFC 周刊 (4)
  • CFC 技术 (44)
  • CFC 日常 (3)
  • 未分类 (15)
  • 活动通知 (3)

标签

ACM Android anime animeloop animeloop-cli APP Apple aria2 Array Blog CFC数据结构与算法训练指南 CoreData CQUT Don't Starve Hexo iBooks JavaScript macOS Matlab moeoverflow OpenCV Programming README RxJS SQLite SQLite3 Steam Swift Theme Web Xcode 主题模板 动漫 博客 反编译 妹子 循环 教程 数据库 游戏 算法 装逼 视频 重庆理工大学 饥荒

登录
©2025 CFC Studio | Powered by WordPress & Superb Themes