-
-
Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
| import Foundation | |
| import Combine | |
| // see: not working https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f | |
| // Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing. | |
| // $ swift --version | |
| // Apple Swift version 5.3.1 (swiftlang-1200.0.41 clang-1200.0.32.8) | |
| // Target: x86_64-apple-darwin20.1.0 | |
| // $ xcodebuild -version | |
| // Xcode 12.2 | |
| // Build version 12B5044c | |
| let workers: [AnyPublisher<String, Never>] = (0..<20).map { | |
| Just<Int>($0) | |
| .flatMap { value in | |
| Future<String, Never>() { promise in | |
| DispatchQueue.global().async { | |
| print("Working: isMainThread [\(Thread.isMainThread)] [\(value)]") | |
| promise(.success(String(value))) | |
| } | |
| } | |
| .eraseToAnyPublisher() | |
| } | |
| .eraseToAnyPublisher() | |
| } | |
| var cancellables: Set<AnyCancellable> = .init() | |
| print("Workers initial count: \(workers.count)") | |
| Publishers | |
| .MergeMany(workers) | |
| .collect() | |
| .receive(on: DispatchQueue.main) | |
| .sink(receiveValue: { | |
| print("Workers result count: \($0.count)") | |
| dump($0) | |
| }) | |
| .store(in: &cancellables) | |
| // - OUTPUT -------------------------------------------------------- | |
| // Workers initial count: 20 | |
| // Working: isMainThread [false] [0] | |
| // Working: isMainThread [false] [1] | |
| // Working: isMainThread [false] [2] | |
| // Working: isMainThread [false] [3] | |
| // Working: isMainThread [false] [4] | |
| // Working: isMainThread [false] [5] | |
| // Working: isMainThread [false] [6] | |
| // Working: isMainThread [false] [7] | |
| // Working: isMainThread [false] [8] | |
| // Working: isMainThread [false] [9] | |
| // Working: isMainThread [false] [11] | |
| // Working: isMainThread [false] [12] | |
| // Working: isMainThread [false] [10] | |
| // Working: isMainThread [false] [13] | |
| // Working: isMainThread [false] [14] | |
| // Working: isMainThread [false] [15] | |
| // Working: isMainThread [false] [16] | |
| // Working: isMainThread [false] [17] | |
| // Working: isMainThread [false] [18] | |
| // Working: isMainThread [false] [19] | |
| // Workers result count: 20 | |
| // ▿ 20 elements | |
| // - "0" | |
| // - "1" | |
| // - "2" | |
| // - "3" | |
| // - "4" | |
| // - "5" | |
| // - "7" | |
| // - "6" | |
| // - "8" | |
| // - "9" | |
| // - "11" | |
| // - "12" | |
| // - "10" | |
| // - "13" | |
| // - "14" | |
| // - "15" | |
| // - "16" | |
| // - "17" | |
| // - "18" | |
| // - "19" | |
| // - OUTPUT -------------------------------------------------------- |
FYI
CombineのMergeMany, receive(on, collectで複数のバックグラウンド処理を待ち合わせると、一部のバックグラウンド処理が実行されないコードのgist
→ https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f
なるほど!スレッドチェックしてませんでした。
これはJustがバックグラウンドスレッドで実施されて、続くmapもバックグラウンドスレッドで実施されるという解釈なのかな。
はい。その通りだと思います。
さらに言うと、Justとmapの間にあることでJust<Map< ... >>にならないようにしているとも思います。
解決案2: Justとmapを連続しない
それに関連する話で、実は最初に出した .subscribe(on: DispatchQueue.global())にこだわった解決案2があり、次のようにJustとmapを連続しないことでmapはバックグラウンドスレッドで動作するようになりますね。
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.eraseToAnyPublisher() // ここでJust<Map< ...> >になるのを防ぎAnyPublisherにしておく
.map {
print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
return String($0)
}
.subscribe(on: DispatchQueue.global())
.eraseToAnyPublisher()
}Justとmapを連続すると型がJust<Map< ...>>になり何が起こっているのか
この理由はあくまで予測なのですが、Just<Map< ...>>型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしているんだと思います。Hot化されてしまうからsubscribe(on:)でバックグラウンドキューによる指定ができなくなるという状況なんだと予想しています(CombineではHotという概念が表面的には示されていないのであくまで「Hot的な動作」という意味です)。
なぜそう思うかというと、複数のSubscriberがsubscribeした際に結果が共有されるようになるためです。検証のためにコピペでsinkを複数でやってみます。次のような感じです。
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.eraseToAnyPublisher() // この部分があるかないかでmapの動作回数も変わります
.map {
print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
return String($0)
}
.subscribe(on: DispatchQueue.global())
.eraseToAnyPublisher()
}
var cancellables: Set<AnyCancellable> = .init()
Publishers
.MergeMany(workers)
.collect()
.receive(on: DispatchQueue.main)
.sink(receiveValue: {
print("Workers result count: \($0.count)")
dump($0)
})
.store(in: &cancellables)
Publishers
.MergeMany(workers)
.collect()
.receive(on: DispatchQueue.main)
.sink(receiveValue: {
print("Workers result count: \($0.count)")
dump($0)
})
.store(in: &cancellables).eraseToAnyPublisher()があると- map処理が40回になる
- subscribeされる数を2倍にしたので
subscribe(on:)で指定したバックグラウンドキューで実行される
- map処理が40回になる
.eraseToAnyPublisher()がないと- map処理は20回以下になる
- subscribeされた数を気にしなくなる
subscribe(on:)で指定したバックグラウンドキューで実行されない
- map処理は20回以下になる
これ不思議なんですが、実は.mapだけじゃなく.filterでも同様だったと思います。このことから憶測ですがAppleのCombineは意図的にオペレータの組み合わせでHot化させて無駄を省こうとしてくれていて、それによってsubscribe(on:)でキューを指定してもそれに応じないんじゃないかと思っています。FutureもいわゆるHotなので、キューを指定してもそれに従わないのと同じなんだと思います。
整理してみます
- Cold動作で指定したバックグラウンドキューで動作させるなら
- Justとmapの間に
.receive(on:)を置く - Justとmapの間に
.eraseToAnyPublisher()を置いて、.subscribe(on:)を置く
- Justとmapの間に
- Hot動作 が望ましいなら
- Futureにする(結局そうなるとDispatchQueue.global().async)
という感じでしょうか。私の予測が入り混じっているので全然自信はないです。
そして実際はアプリの開発にJustとmapをやりたいわけじゃなく、本当はMergeManyとcollectの検証のために今回のようにJustとmap書いただけではないかと思いますので、結局FutureにしてDeferred使うことに落ち着くかなという気もします。
printしたらmapのreceive finishedが20回あるがreceive value:が19回の件
あくまで憶測ですがcollect()の動作が2種類ある気がします。
動作的には .subscribe(on)されてないColdなPublisherたちを下流にあるcollect()が扱う場合は、何かしらのタイミングで終了したという判断をしないといけないために上流がイベントを流さずにその上流ストリームも終了させられてしまう。一方、HotなPublisherもしくは.subscribe(on:)されたColdなPublisherたちのcollect()は、非同期実行を待つ前提であるため上流のストリームの終了を待つという感じなのかな、と思いました。思い込みかもしれませんが...。
以上、長々と憶測が多くなり失礼しましたー🙇
なるほど、@yimajo さん解説は、仮説ですが論理的に整合性が通っており、とても腑に落ちました。
Hot化されてしまうからsubscribe(on:)でバックグラウンドキューによる指定ができなくなるという状況
確かに、UpstreamがHotだと、Downstreamでsubscribe(on:)によるUpstreamのスケジュラーを変更はできなそうですね。
Just<Map< ...>>型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしている
&
eraseToAnyPublisherでColdのまま動作
このあたりの挙動はドキュメントに明記もないですし、宣言的な記述にもかかわらず、書き手&読み手が挙動を直感的に類推しにくいですね。(悩ましい)
そして実際はアプリの開発にJustとmapをやりたいわけじゃなく、本当はMergeManyとcollectの検証のために今回のようにJustとmap書いただけではないかと思いますので、結局FutureにしてDeferred使うことに落ち着くかなという気もします。
手元のアプリはあまり他のアプリではないですが、200個ぐらいMergeManyで並列処理しています。
各並列処理をFutureで記述すると、200個のFutureを逐次インスタンス化しいるところからHotのため随時実行開始されます。
そうすると、読み手がFutureはHotであることを知っていても、コードを読んで上記の挙動を直感的にイメージしにくいなぁと思っています。
なので、MergeManyで並列処理を開始して、collectで全件待ち合わせる挙動の方が、記述から意図(挙動)を読み取りやすいかなと感じています。
Futureでお茶を濁そうかと思っていたので、@yimajo さんの subscribe(on:) の活用はとても助かりました。( JustはHotになりますが、mapがColdのままのでありがたい )
Deferredも意図する挙動に活用できそうですね。
原因は難しいですね。
@yimajo がおっしゃるとおり、collectが最後の完了まで待たないのかなと思いきや、ログを出して解析してみると
以下のログは20回出力されていて、20回の全件がfinishedしたから正しくcollectしているようです。
ただ、map処理の以下ログは19回しかなくて、mapが処理されてないけど、正しい数finishedされてるようですね。謎です。