RxSwift之操作介绍

ps: Observable 被观察对象; Observers 观察者;

前言

学习资源:RxSwift

步骤

  1. 从Github拉下RxSwift

  2. 1
    2
    3
    4
    Open Rx.xcworkspace.
    Build the RxSwift-macOS scheme (Product → Build).
    Open Rx playground in the Project navigator.
    Show the Debug Area (View → Debug Area → Show Debug Area).

创建序列

创建普通Observable序列

1
2
3
4
5
6
7
let myJust = { (element: String) -> Observable<String> in
return Observable.create { observer in
observer.on(.next(element))
observer.on(.completed)
return Disposables.create()
}
}

Range序列

1
2
3
4
// 发送指定范围Integers
Observable.range(start: 1, count: 10)
.subscribe { print($0) }
.disposed(by: disposeBag)

RepeatElement序列

1
2
3
4
Observable.repeatElement("R")
.take(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

Generate序列

1
2
3
4
5
6
7
8
// 根据条件是否为真,生成元素组成序列
Observable.generate(
initialState: 0,
condition: { $0 < 3 },
iterate: { $0 + 1 }
)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

deferred

1
2
3
4
5
6
7
8
9
10
11
12
13
// 通过工厂方法创建Observable序列,为每一个订阅者生成一个新的序列
let deferredSequence = Observable<String>.deferred {
print("Creating \(count)")
count += 1
return Observable.create { observer in
print("Emitting...")
observer.onNext("🐶")
observer.onNext("🐱")
observer.onNext("🐵")
return Disposables.create()
}
}

error序列

1
2
3
4
// 不发送任何元素,立刻返回一个错误
Observable<Int>.error(TestError.test)
.subscribe { print($0) }
.disposed(by: disposeBag)

Never序列

1
2
// 序列永远不会被执行也不会传递任何事件
let neverSequence = Observable<String>.never()

Empty序列

1
2
3
4
5
6
// 序列只传递complete事件
Observable<Int>.empty()
.subscribe({ ( event ) in
print(event)
})
.disposed(by: disposeBag)

Just序列

1
2
3
4
5
// 只包含一个元素的序列
Observable.just("Rose")
.subscribe({ ( event ) in
print(event)
})

Of序列

1
2
3
4
5
// 包含一系列元素
Observable.of("🐶", "🐱", "🐭", "🐹")
.subscribe(onNext: { element in
print(element)
})

from序列

1
2
3
4
// 通过序列对象(包括数组,字典,Set)创建Observable序列
Observable.from(["R", "O", "S", "E"])
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

Subject

Subject可以看作是代理或者桥梁。既是订阅者又是订阅源,也就是既可以订阅其他Observable对象,又可以向订阅者发送事件。

其中PublishSubjectReplaySubjectBehaviorSubject即将被清除时也不会自动发送完成(Completed)事件。

PublishSubject

publishSubject遵循了ObserverType协议。广播事件给所有的订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
example("PublishSubject") {
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()
subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("dog")
subject.onNext("cat")
subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("A") // 广播给两个订阅者
subject.onNext("B")
}
--- PublishSubject example ---
Subscription: 1 Event: next(dog)
Subscription: 1 Event: next(cat)
Subscription: 1 Event: next(A)
Subscription: 2 Event: next(A)
Subscription: 1 Event: next(B)
Subscription: 2 Event: next(B)

ReplaySubject

广播事件给所有订阅者,并会补发发送事件给新的订阅者。通过bufferSize可以指定给新的订阅者缓存几个已发送事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
example("ReplaySubject") {
let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1) // 指定缓存大小
subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("dog")
subject.onNext("cat")
subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("A")
subject.onNext("B")
}
--- ReplaySubject example ---
Subscription: 1 Event: next(dog)
Subscription: 1 Event: next(cat)
Subscription: 2 Event: next(cat) // 先补发
Subscription: 1 Event: next(A)
Subscription: 2 Event: next(A)
Subscription: 1 Event: next(B)
Subscription: 2 Event: next(B)

BehaviorSubject

广播所有事件给订阅者,发送最近的一次发送事件给新的订阅者,如果没有最近的发送事件则发送默认值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
example("BehaviorSubject") {
let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "Bow") // 指定默认值
subject.addObserver("1").disposed(by: disposeBag)// 此时添加新订阅者时无最近事件,则会发送默认值
subject.onNext("Dog")
subject.onNext("Cat")
subject.addObserver("2").disposed(by: disposeBag)// 最近事件发送Cat
subject.onNext("A")
subject.addObserver("3").disposed(by: disposeBag)
subject.onNext("Apple")
}
--- BehaviorSubject example ---
Subscription: 1 Event: next(Bow)
Subscription: 1 Event: next(Dog)
Subscription: 1 Event: next(Cat)
Subscription: 2 Event: next(Cat)
Subscription: 1 Event: next(A)
Subscription: 2 Event: next(A)
Subscription: 3 Event: next(A)
Subscription: 1 Event: next(Apple)
Subscription: 2 Event: next(Apple)
Subscription: 3 Event: next(Apple)

Variable

基于BehaviorSubject的封装,也是发送最近的发送事件或默认值给新的订阅者。但Variable会保留当前值状态,并且永远都不会发送错误(Error)事件,然而,它会在析构(deinit)的时候自动发送Completed事件。

调用Variable实例的asObservable方法获取BehaviorSubject序列。Variable并没有实现on操作符,所以通过暴露value属性设置当前值于获取当前值。通过设置一个新的值就可以将这个值添加到BehaviorSubject序列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
example("Variable") {
let disposeBag = DisposeBag()
let variable = Variable("Bow")
variable.asObservable().addObserver("1").disposed(by: disposeBag)
variable.value = "Dog"
variable.value = "Cat"
variable.asObservable().addObserver("2").disposed(by: disposeBag)
variable.value = "A"
variable.value = "B"
}
--- Variable example ---
Subscription: 1 Event: next(Bow)
Subscription: 1 Event: next(Dog)
Subscription: 1 Event: next(Cat)
Subscription: 2 Event: next(Cat)
Subscription: 1 Event: next(A)
Subscription: 2 Event: next(A)
Subscription: 1 Event: next(B)
Subscription: 2 Event: next(B)
Subscription: 1 Event: completed
Subscription: 2 Event: completed

Combination(合并操作)

通过操作符可以将多个观察源(序列)转化为一个观察源(序列)

startWith

在开始发送事件元素前发送指定序列的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
example("startWith") {
let disposeBag = DisposeBag()
Observable.of("🐶", "🐱", "🐭", "🐹")
.startWith("1️⃣")
.startWith("2️⃣")
.startWith("3️⃣", "🅰️", "🅱️")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- startWith example ---
3️⃣
🅰️
🅱️
2️⃣
1️⃣
🐶
🐱
🐭
🐹

merge

将多个源序列中的元素组合成一个新的Observable序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
example("merge") {
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
let subject3 = PublishSubject<String>()
Observable.of(subject1, subject2, subject3)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("🅰️")
subject2.onNext("🆎")
subject3.onNext("Rose")
subject1.onNext("🅱️")
}
--- merge example ---
🅰️
🆎
Rose
🅱️

zip

将多个序列中的元素根据一定的原则组合在一起,序列之间的元素一一对应后再合并。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
example("zip") {
let disposeBag = DisposeBag()
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
let secondStringSubject = PublishSubject<String>()
Observable.zip(stringSubject, intSubject, secondStringSubject) { stringElement, intElement, anotherElement in
"\(stringElement) \(intElement) \(anotherElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSubject.onNext("🅰️")
stringSubject.onNext("🅱️")
intSubject.onNext(1)
intSubject.onNext(2)
secondStringSubject.onNext("Rose")
}
--- zip example ---
🅰️ 1 Rose

combineLatest

将多个序列的元素按一定的原则组合在一起形成一个新的Observable序列,当每一个源序列至少发送一个元素时,combine序列就开始发送第一个元素,并且接下来任意一个序列发送一个元素,combine序列都会使用每一个源序列中最新的元素组合发送新元素。ps:combineLates还支持传入数组以及任意集合型的Observables序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
example("combineLatest") {
let disposeBag = DisposeBag()
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSubject.onNext("A")
stringSubject.onNext(️"B")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("AB")
}
example("Array.combineLatest") {
let disposeBag = DisposeBag()
let stringObservable = Observable.just("Heart")
let fruitObservable = Observable.from(["Apple", "Orange"])
let animalObservable = Observable.of("Dog", "Cat", "Mouse")
Observable.combineLatest([stringObservable, fruitObservable, animalObservable]) {
"\($0[0]) \($0[1]) \($0[2])"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- Array.combineLatest example ---
Heart Apple Dog
Heart Orange Dog
Heart Orange Cat
Heart Orange Mouse

switchLatest

将事件序列的序列切换到最新的一条序列当中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
example("switchLatest") {
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "⚽️")
let subject2 = BehaviorSubject(value: "🍎")
let variable = Variable(subject1)
variable.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("🏈")
subject1.onNext("🏀")
variable.value = subject2
subject1.onNext("⚾️")
subject2.onNext("🍐")
}
--- switchLatest example ---
⚽️
🏈
🏀
🍎
🍐

Transform(转换操作)

map

对序列中的每个元素进行转换

1
2
3
4
5
6
7
8
9
10
11
example("map") {
let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- map example ---
1
4
9

flatMap && flatMapLatest

map做转换时容易出现“升维”的情况,即转变之后从一个序列变成了一个序列的序列。Swift中过滤nil的最佳方案就是flatMap

flatMapLatest则是只获取最新的一个元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
example("flatMap and flatMapLatest") {
let disposeBag = DisposeBag()
struct Player {
var score: Variable<Int>
}
let 👦🏻 = Player(score: Variable(80))
let 👧🏼 = Player(score: Variable(90))
let player = Variable(👦🏻)
player.asObservable()
.flatMap { $0.score.asObservable() } // Change flatMap to flatMapLatest and observe change in printed output
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
👦🏻.score.value = 85
player.value = 👧🏼
👦🏻.score.value = 95 // Will be printed when using flatMap, but will not be printed when using flatMapLatest
👧🏼.score.value = 100
}
--- flatMap example ---
80
85
90
95
100
--- flatMapLatest example ---
80
85
90
100

scan

累计每次的运算结果,作为下次运算的输入值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
example("scan") {
let disposeBag = DisposeBag()
Observable.of(10, 100, 1000)
.scan(1) { aggregateValue, newValue in
aggregateValue + newValue
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- scan example ---
11
111
1111

Filtering and Conditional(过滤与条件操作)

filter

只发送符合条件的元素

1
2
3
4
5
6
7
8
9
10
11
example("filter") {
let disposeBag = DisposeBag()
Observable.of("A", "B", "C", "A", "a")
.filter { $0 == "A" }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- filter example ---
A
A

distinctUntilChanged

发送符合条件的元素,且会过滤连续重复的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
example("distinctUntilChanged") {
let disposeBag = DisposeBag()
Observable.of("A", "B", "A", "A", "B")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- distinctUntilChanged example ---
A
B
A
B

elementAt

获取制定index位置的元素,越界则抛出error错误。

1
2
3
4
5
6
7
8
9
10
example("elementAt") {
let disposeBag = DisposeBag()
Observable.of("R", "O", "A", "A", "B")
.elementAt(1)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- elementAt example ---
O

single

获取符合条件的第一个元素,如果无条件则获取序列第一个元素,有多个元素或者无符合条件的元素时会抛出error错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
example("single") {
let disposeBag = DisposeBag()
Observable.of("R", "O", "S", "S", "E")
.single() // 多个元素会抛出错误
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- single example ---
R
Received unhandled error: Filtering_and_Conditional_Operators.xcplaygroundpage:64:__lldb_expr_42 -> Sequence contains more than one element.
example("single with conditions") {
let disposeBag = DisposeBag()
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.single { $0 == "🔵" }
.subscribe { print($0) }
.disposed(by: disposeBag)
}
--- single with conditions example ---
error(Sequence doesn't contain any elements.)

take

获取序列中的指定数量的前n个元素

1
2
3
4
5
6
7
8
9
10
11
12
example("take") {
let disposeBag = DisposeBag()
Observable.of("R", "O", "S", "S", "E")
.take(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- take example ---
R
O
S

takeLast

从序列的尾部获取指定数量的元素。确定范围后元素顺序取

1
2
3
4
5
6
7
8
9
10
11
12
example("takeLast") {
let disposeBag = DisposeBag()
Observable.of("R", "O", "O", "S", "E")
.takeLast(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- takeLast example ---
O
S
E

takeWhile

从序列的头部顺序获取符合条件的元素,一旦不满足条件就停止匹配元素。

1
2
3
4
5
6
7
8
9
10
11
12
example("takeWhile") {
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4, 5, 6)
.takeWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- takeWhile example ---
1
2
3

takeUntil

发送源(source)Observable序列的元素,一旦关联(reference)Observable序列发送元素发送Completed消息,停止发送源Observable序列的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
example("takeUntil") {
let disposeBag = DisposeBag()
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.takeUntil(referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("R")
sourceSequence.onNext("O")
referenceSequence.onNext("A")
sourceSequence.onNext("S")
sourceSequence.onNext("E")
}
--- takeUntil example ---
next(R)
next(O)
completed

skip

直接过滤掉头部指定数量的n个元素

1
2
3
4
5
6
7
8
9
10
11
12
example("skip") {
let disposeBag = DisposeBag()
Observable.of("R", "O", "O", "S", "E")
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- skip example ---
O
S
E

skipWhile

跳过满足条件的值直到条件为false时开始获取序列的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
example("skipWhile") {
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4, 5, 6, 1)
.skipWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- skipWhile example ---
4
5
6
1

skipWhileWithIndex

基于skipWhile的基础,不过条件中提供了index参数,可将index作为条件判断的一个方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
example("skipWhileWithIndex") {
let disposeBag = DisposeBag()
Observable.of("R", "O", "O", "S", "E")
.skipWhileWithIndex { element, index in
index < 3
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
print("------")
Observable.of("R", "O", "O", "S", "E")
.skipWhileWithIndex { element, index in
element == "R"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- skipWhileWithIndex example ---
S
E
------
O
O
S
E

skipUntil

跳过源序列的元素,直到关联序列开始发送元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
example("skipUntil") {
let disposeBag = DisposeBag()
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.skipUntil(referenceSequence)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
sourceSequence.onNext("R")
sourceSequence.onNext("O")
referenceSequence.onNext("🔴")
sourceSequence.onNext("S")
sourceSequence.onNext("E")
}
--- skipUntil example ---
S
E

Mathematical and Aggregate Operators(聚合操作)

toArray

将序列转换成一个数组,将这个数组当成一个新元素发送然后completed

1
2
3
4
5
6
7
8
9
10
11
example("toArray") {
let disposeBag = DisposeBag()
Observable.range(start: 1, count: 10)
.toArray()
.subscribe { print($0) }
.disposed(by: disposeBag)
}
--- toArray example ---
next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
completed

reduce

设置初始值,与序列中的每一个值进行指定运算,返回最后结果当作一个值发送。

1
2
3
4
5
6
7
8
9
10
example("reduce") {
let disposeBag = DisposeBag()
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- reduce example ---
1111

concat

串联多个序列,新的序列只有在前一个序列completed后才会发送新序列中的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
example("concat") {
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "J")
let variable = Variable(subject1)
variable.asObservable()
.concat()
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("B")
subject1.onNext("C")
variable.value = subject2
subject2.onNext("I would be ignored")
subject2.onNext("O")
subject1.onCompleted()// 上一个序列completed,发送新序列的元素"O"
subject2.onNext("L")
}
--- concat example ---
next(A)
next(B)
next(C)
next(O)
next(L)

Connectable(连接操作)

Connectable序列其实很像一般的序列,但不会像一般序列一旦被订阅就会发送值,它只会在调用connect后才会发送值。因此,你可以通过该序列在所有订阅者订阅之后再发送值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 没有连接操作
func sampleWithoutConnectableOperators() {
printExampleHeader(#function)
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)// 在主线程每隔一秒发射一个信号
_ = interval
.subscribe(onNext: { print("Subscription: 1, Event: \($0)") })
delay(5) {
_ = interval
.subscribe(onNext: { print("Subscription: 2, Event: \($0)") })
}
}
--- sampleWithoutConnectableOperators() example ---
Subscription: 1, Event: 0 // 第一个订阅者开始发送值,值从0开始
Subscription: 1, Event: 1
Subscription: 1, Event: 2
Subscription: 1, Event: 3
Subscription: 1, Event: 4
Subscription: 1, Event: 5
Subscription: 2, Event: 0 // 5s后第二个订阅者开始发送值,值从0开始
Subscription: 1, Event: 6
Subscription: 2, Event: 1
.....

publish

将一个普通序列转化为可连接(connectable)的序列,只有在调用connect()之后才开始发送值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func sampleWithPublish() {
printExampleHeader(#function)
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.publish() // 转换成可连接序列
_ = intSequence
.subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
delay(2) { _ = intSequence.connect() } // 2s后开始发送值
delay(4) {
_ = intSequence
.subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
delay(6) {
_ = intSequence
.subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}
}
--- sampleWithPublish() example ---
Subscription 1:, Event: 0 // 2s后第一个订阅者开始发送值
Subscription 1:, Event: 1
Subscription 2:, Event: 1 // 4s后第二个订阅者开始发送值,值从1开始
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3 // 6s后第三个订阅者开始发送值,值从3开始
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
....

replay

将普通序列转化为可连接序列,并会在调用connect()之后才开始发送值,且新的订阅者出现时会补发指定个数的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func sampleWithReplayBuffer() {
printExampleHeader(#function)
let timeInterval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)// 在主线程每隔一秒发射一个信号
_ = timeInterval
.subscribe(onNext: { print("第\($0 + 1)s") })
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.replay(2)
_ = intSequence
.subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
delay(2) {
print("订阅者1订阅")
_ = intSequence.connect()
}
delay(5) {
print("订阅者2订阅")
_ = intSequence
.subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
delay(9) {
print("订阅者3订阅")
_ = intSequence
.subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}
}
--- sampleWithReplayBuffer() example ---
第1s
第2s
订阅者1订阅
第3s
Subscription 1:, Event: 0 // 2s后订阅者1开始订阅
第4s
Subscription 1:, Event: 1
第5s
订阅者2订阅
Subscription 2:, Event: 0 // 5s后订阅者2先补发值
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2 // 订阅者2发送值
第6s
Subscription 1:, Event: 3
Subscription 2:, Event: 3
第7s
Subscription 1:, Event: 4
Subscription 2:, Event: 4
第8s
Subscription 1:, Event: 5
Subscription 2:, Event: 5
第9s
订阅者3订阅
Subscription 3:, Event: 4 // 9s后订阅者3先补发值
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6 // 订阅者3发送值
第10s
Subscription 1:, Event: 7
Subscription 2:, Event: 7
Subscription 3:, Event: 7

multicast

将普通序列转化为可连接序列,并会在调用connect()之后才开始发送值。序列发送值时指定的subject 也会发射值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func sampleWithMulticast() {
printExampleHeader(#function)
let subject = PublishSubject<Int>()
_ = subject
.subscribe(onNext: { print("Subject: \($0)") })
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.multicast(subject)
_ = intSequence
.subscribe(onNext: { print("\tSubscription 1:, Event: \($0)") })
delay(2) { _ = intSequence.connect() }
delay(4) {
_ = intSequence
.subscribe(onNext: { print("\tSubscription 2:, Event: \($0)") })
}
delay(6) {
_ = intSequence
.subscribe(onNext: { print("\tSubscription 3:, Event: \($0)") })
}
}
--- sampleWithMulticast() example ---
Subject: 0
Subscription 1:, Event: 0
Subject: 1
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subject: 2
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subject: 3
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subject: 4
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4

Error(错误操作)

catchErrorJustReturn

捕获到错误时,返回指定的值然后终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
example("catchErrorJustReturn") {
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("Rose")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("R")
sequenceThatFails.onNext("O")
sequenceThatFails.onError(TestError.test)
}
--- catchErrorJustReturn example ---
next(R)
next(O)
next(Rose)
completed

catchError

捕获错误,切换到其他序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
example("catchError") {
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence
}
.subscribe { print($0) }
.disposed(by: disposeBag)
recoverySequence.onNext("Rose")
sequenceThatFails.onNext("R")
sequenceThatFails.onNext("O")
sequenceThatFails.onError(TestError.test)
recoverySequence.onNext("Jolie")
}
--- catchError example ---
next(R)
next(O)
Error: test
next(Jolie) // 如何没有发送Jolie信息则sequenceThatFails不会发送信息

retry

捕获到错误后可对序列进行重新订阅。retry中可以传参设置最大重新订阅次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
example("retry") {
let disposeBag = DisposeBag()
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("Apple")
observer.onNext("Orange")
if count == 1 {
observer.onError(TestError.test)
print("Error encountered")
count += 1
}
observer.onNext("Dog")
observer.onNext("Cat")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- retry example ---
Apple
Orange
Error encountered
Apple
Orange
Dog
Cat

debug(调试操作)

debug

打印所有的订阅,值发送与清理事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
example("debug") {
let disposeBag = DisposeBag()
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("Apple")
if count < 5 {
observer.onError(TestError.test)
print("Error encountered")
count += 1
}
observer.onNext("Dog")
observer.onNext("Cat")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.debug()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
--- debug example ---
2017-06-06 16:33:21.310: Debugging_Operators.xcplaygroundpage:39 (__lldb_expr_96) -> subscribed // 订阅
2017-06-06 16:33:21.312: Debugging_Operators.xcplaygroundpage:39 (__lldb_expr_96) -> Event next(Apple) // 发送值
Apple
Error encountered
2017-06-06 16:33:21.315: Debugging_Operators.xcplaygroundpage:39 (__lldb_expr_96) -> Event next(Apple)
Apple
Error encountered
2017-06-06 16:33:21.316: Debugging_Operators.xcplaygroundpage:39 (__lldb_expr_96) -> Event next(Apple)
Apple
Error encountered
2017-06-06 16:33:21.318: Debugging_Operators.xcplaygroundpage:39 (__lldb_expr_96) -> Event error(test)
Received unhandled error: Debugging_Operators.xcplaygroundpage:40:__lldb_expr_96 -> test
2017-06-06 16:33:21.318: Debugging_Operators.xcplaygroundpage:39 (__lldb_expr_96) -> isDisposed // 清理

RxSwift.Resources.total

提供所有Rx资源分配了内存的数量,检测内存泄漏的时候很有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
example("RxSwift.Resources.total") {
print(RxSwift.Resources.total) // ---0
let disposeBag = DisposeBag()
print(RxSwift.Resources.total) // --- 2
let variable = Variable("Apple")
let subscription1 = variable.asObservable().subscribe(onNext: { print($0) })// --- Apple
print(RxSwift.Resources.total)// --- 11
let subscription2 = variable.asObservable().subscribe(onNext: { print($0) })// --- Apple
print(RxSwift.Resources.total) // --- 14
subscription1.dispose()
print(RxSwift.Resources.total)// --- 12
subscription2.dispose()
print(RxSwift.Resources.total)// --- 10
}
print(RxSwift.Resources.total)// --- 0

其它操作

doOn副作用

1
2
3
4
5
// 每一次发送事件时执行并不对事件做任何处理返回原始事件
Observable.of("R", "O", "S", "E")
.do(onNext: { print("do Next:", $0) }, onError: { print("do Error:", $0) }, onCompleted: { print("do Completed") })
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

发送事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable<String>.create { observerOfString -> Disposable in
print("Observable created")
observerOfString.on(.next("😉")) // 发送事件数据信号
observerOfString.on(.completed) // 发送事件完成信号
return Disposables.create()
}
.subscribe { event in
print(event)
}
Observable<String>.create({ (observerOfString ) -> Disposable in
observerOfString.onNext("Hello")
observerOfString.onCompleted()
return Disposables.create()
})

订阅

被观察对象只有存在订阅者时才会执行订阅闭包中的事件。

1
2
3
4
5
6
someObservable.subscribe(
onNext: { print("Element:", $0) },
onError: { print("Error:", $0) },
onCompleted: { print("Completed") },
onDisposed: { print("Disposed") }
)

易混淆

map VS flatMap

map是对数组中的所有元素一一映射。flatMap在多维数组时,则会对序列做降维操作,也就是原本是二位的序列,处理后会变成一维。

1
2
3
4
5
let numbers = [[1,2,3,4],[4,5,6]];
var res = numbers.map { $0.map{ $0 + 2 } }
// [[3, 4, 5, 6], [6, 7, 8]]
var flatRes = numbers.flatMap { $0.map{ $0 + 2 } }
// [3, 4, 5, 6, 6, 7, 8]

Swift中过滤nil的最佳方案就是flatMap。原理: flatMap的源码实现中遍历元素时对元素进行了if let语句解包处理,只有解包成功的才会添加到结果集中,因而具备自动去nil值的效果。

1
2
3
4
// 过滤nil
let values: [Int?] = [1, 2, 3, nil, 4]
value.flatMap { $0 }
---- 结果是 [1, 2, 3, 4]

takeWhile VS filter

takeWhile按顺序获取符合条件的元素,一旦条件不符合,后面的元素也不进行匹配。

filter则会对序列中的所有元素进行条件判断获取符合条件的所有元素。

reduce VS scan

reduce: 指定初始值与序列所有元素进行累积运算,只发送最后一次的运算结果。

scan:累计每次的运算结果,作为下次运算的输入值,每次的运算结果都会发送。

reducescan的基础上只获取最后一个元素。

ReplaySubject VS Replay

ReplaySubject: 既是订阅者又是订阅源,也就是既可以订阅其他Observable对象,又可以向订阅者发送事件。通过bufferSize可以指定给新的订阅者缓存几个已发送事件。

replay:将普通序列转化为可连接序列,并会在调用connect()之后才开始发送值,且新的订阅者出现时会补发指定个数(通过bufferSize)的值。

参考资料

谈谈 Swift 中的 map 和 flatMap