create
const { Observable } = require('rxjs');// 创建 Observablesvar observable = Observable.create(observer => { var id = setInterval(() => { subscriber.next(123); subscriber.error('err: ...'); subscriber.complete('done.'); }, 1000); // 提供取消和清理 interval 资源的方法 return function unsubscribe() { clearInterval(id); };});// 订阅 Observablesvar subscription = observable.subscribe( v => console.log(v), e => console.log(e), done => console.log(done),);// .add() 合并多个订阅,以便同时取消订阅// subscription.add(childSubscription);// 撤销 add的合并// subscription.remove(childSubscription);// 3s后取消订阅setTimeout(() => { subscription.unsubscribe();}, 3000)
of(...items) 和 from([...items])
按顺序发出每个元素, from 可以代替 fromPromise
let { of, from } = require("rxjs");of(1, 2, 3).subscribe(v => l(v)); // 1,2,3from([1, 2, 3]).subscribe(v => l(v));// 1,2,3
bindCallback
把普通的回调函数转化为 Observable
let { bindCallback } = require("rxjs");function test(cb) { cb(1); }let o$ = bindCallback(test)();o$.subscribe(v => l(v)); // 1
bindNodeCallback
把标准的node回调函数转化为 Observable
let { bindNodeCallback } = require("rxjs");let fs = require("fs");// fs.readFile("./test.jss", "utf8", (e, d) => (!!e ? l(e) : l(d)));let o$ = bindNodeCallback(fs.readFile)("./test.jss", "utf8");o$.subscribe(v => l(v), e => l(e));
combineLatest
组合多个Observable以创建一个Observable, 其值是根据每个输入Observable的最新值计算
let { combineLatest, of, timer } = require("rxjs");let o1$ = of(233);let o2$ = of(12);let o3$ = timer(0, 1000);let o$ = combineLatest(o1$, o2$, o3$);// let o$ = combineLatest([o1$, o2$, o3$]); // 和上面一样o$.subscribe(v => l(v));// [ 233, 12, 0 ]// [ 233, 12, 1 ]// [ 233, 12, 2 ]// ...
empty
仅仅发出 complete 通知,其他什么也不做
let { empty } = require("rxjs");const l = console.log;empty().subscribe(l, l, () => l("done")); // done
forkJoin
连接传递的Observables发出的最后一个值。
let { forkJoin, of } = require("rxjs");forkJoin(of(1), of(2, 3)).subscribe(l); // [ 1, 3 ]
merge
把多个流的合并到一个 流, 按顺序执行,异步流会被延后
let { merge, of, from } = require("rxjs");merge(of(1), of(4), from(["a", "b"]), 1).subscribe(l); // 1 4 a b
concat
它顺序地从给定的Observable中发出所有值,必须等上一个完成才执行下一个
let { concat, interval, of } = require("rxjs");let { take } = require("rxjs/operators");const l = console.log;let o1$ = interval(1000).pipe(take(4)); // take 接收源 最初的N个值let o2$ = of(233);let o$ = concat(o1$, o2$);o$.subscribe(v => l(v)); // 0,1,2,3,233
never
创建一个不向观察者发出任何项的 Observable
let { never } = require("rxjs");never().subscribe(l, l, () => l("done")); //
range
range(start: number = 0, count: number = 0)
let { range } = require("rxjs");// 从2开始 发出后续 11位数字range(2, 11).subscribe(l); // 2..12
throwError
仅仅发出 error 通知
let { throwError } = require("rxjs");throwError("err message").subscribe(l, l, () => l("done")); // err message
timer
let { timer } = require("rxjs");// 延迟3秒发送第一个值,然后马上结束timer(3000).subscribe( v=> console.log(v)) //每隔3s发送一个值,第一个值延迟5stimer(5000, 3000).subscribe( v=> console.log(v))
interval
在指定的时间间隔内发出序列号 0+
let { interval } = require("rxjs");let { take } = require("rxjs/operators");interval(200) .pipe(take(3)) .subscribe(l); // 0,1,2
defer
懒惰地创建Observable
let { defer, of } = require("rxjs");let o$ = defer(() => Math.random() > 0.5 ? of(1) : of(2) );o$.subscribe(v => l(v));
zip
其值根据其每个输入Observable的值按顺序计算
let { zip, of } = require("rxjs");let { map } = require("rxjs/operators");let age$ = of(27, 25, 29);let name$ = of("Foo", "Bar", "Beer");let isDev$ = of(true, true, false);zip(age$, name$, isDev$) .pipe(map(([age, name, isdev]) => ({ age, name, isdev }))) .subscribe(l);// res{ age: 27, name: 'Foo', isdev: true }{ age: 25, name: 'Bar', isdev: true }{ age: 29, name: 'Beer', isdev: false }
iif
iif(test, then, else)
let { iif, of } = require("rxjs");iif(() => Math.random() > 0.5, of("then"), of("else")).subscribe(l); // then | else
fromEvent
let { fromEvent } = require("rxjs");const clicks$ = fromEvent(document, 'click');clicks$.subscribe(x => console.log(x));
generate
仔细看有点像 for循环
let { generate } = require("rxjs");generate(0, x => x < 3, x => x + 1).subscribe( v => l(v), err => {}, () => l("done"),);// 0, 1, 2, done
identity
没有做处理,返回值本身
let { identity } = require("rxjs");let x = identity(233);l(x); // 233
isObservable
let { isObservable } = require("rxjs");l(isObservable(2)); // false
onErrorResumeNext
将无错误地移动到下一个源
let { onErrorResumeNext, of } = require("rxjs");let { map } = require("rxjs/operators");onErrorResumeNext( of(1, 2, 0, 1).pipe( map(el => { if (el === 0) throw Error("000"); return el + el; }), ), of("2"),).subscribe(l, err => console.error(err), () => l("done")); // 2, 4, 2, done
pairs
将对象转换为Observable
let { pairs, of } = require("rxjs");pairs({ name: "ajanuw", age: 14,}).subscribe(l); // [ 'name', 'ajanuw' ] [ 'age', 14 ]
race
返回,第一个Observable的反映的输出,然后结束
let { race, of, interval } = require("rxjs");race(interval(1000), of(2, 22), of(3)).subscribe(l, l, () => l("done")); // 2, 22, done