| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960 |
- //
- // Flatten.swift
- // ReactiveSwift
- //
- // Created by Neil Pankey on 11/30/15.
- // Copyright © 2015 GitHub. All rights reserved.
- //
- import enum Result.NoError
- /// Describes how multiple producers should be joined together.
- public enum FlattenStrategy: Equatable {
- /// The producers should be merged, so that any value received on any of the
- /// input producers will be forwarded immediately to the output producer.
- ///
- /// The resulting producer will complete only when all inputs have
- /// completed.
- case merge
- /// The producers should be concatenated, so that their values are sent in
- /// the order of the producers themselves.
- ///
- /// The resulting producer will complete only when all inputs have
- /// completed.
- case concat
- /// Only the events from the latest input producer should be considered for
- /// the output. Any producers received before that point will be disposed
- /// of.
- ///
- /// The resulting producer will complete only when the producer-of-producers
- /// and the latest producer has completed.
- case latest
- }
- extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` or an active inner producer fails, the returned
- /// signal will forward that failure immediately.
- ///
- /// - note: `interrupted` events on inner producers will be treated like
- /// `Completed events on inner producers.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
- switch strategy {
- case .merge:
- return self.merge()
- case .concat:
- return self.concat()
- case .latest:
- return self.switchToLatest()
- }
- }
- }
- extension SignalProtocol where Value: SignalProducerProtocol, Error == NoError {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If an active inner producer fails, the returned signal will
- /// forward that failure immediately.
- ///
- /// - warning: `interrupted` events on inner producers will be treated like
- /// `completed` events on inner producers.
- ///
- /// - parameters:
- /// - strategy: Strategy used when flattening signals.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
- return self
- .promoteErrors(Value.Error.self)
- .flatten(strategy)
- }
- }
- extension SignalProtocol where Value: SignalProducerProtocol, Error == NoError, Value.Error == NoError {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - warning: `interrupted` events on inner producers will be treated like
- /// `completed` events on inner producers.
- ///
- /// - parameters:
- /// - strategy: Strategy used when flattening signals.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
- switch strategy {
- case .merge:
- return self.merge()
- case .concat:
- return self.concat()
- case .latest:
- return self.switchToLatest()
- }
- }
- }
- extension SignalProtocol where Value: SignalProducerProtocol, Value.Error == NoError {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` fails, the returned signal will forward that failure
- /// immediately.
- ///
- /// - warning: `interrupted` events on inner producers will be treated like
- /// `completed` events on inner producers.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
- return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
- }
- }
- extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
- /// Flattens the inner producers sent upon `producer` (into a single
- /// producer of values), according to the semantics of the given strategy.
- ///
- /// - note: If `producer` or an active inner producer fails, the returned
- /// producer will forward that failure immediately.
- ///
- /// - warning: `interrupted` events on inner producers will be treated like
- /// `completed` events on inner producers.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
- switch strategy {
- case .merge:
- return self.merge()
- case .concat:
- return self.concat()
- case .latest:
- return self.switchToLatest()
- }
- }
- }
- extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == NoError {
- /// Flattens the inner producers sent upon `producer` (into a single
- /// producer of values), according to the semantics of the given strategy.
- ///
- /// - note: If an active inner producer fails, the returned producer will
- /// forward that failure immediately.
- ///
- /// - warning: `interrupted` events on inner producers will be treated like
- /// `completed` events on inner producers.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
- return self
- .promoteErrors(Value.Error.self)
- .flatten(strategy)
- }
- }
- extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == NoError, Value.Error == NoError {
- /// Flattens the inner producers sent upon `producer` (into a single
- /// producer of values), according to the semantics of the given strategy.
- ///
- /// - warning: `interrupted` events on inner producers will be treated like
- /// `completed` events on inner producers.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
- switch strategy {
- case .merge:
- return self.merge()
- case .concat:
- return self.concat()
- case .latest:
- return self.switchToLatest()
- }
- }
- }
- extension SignalProducerProtocol where Value: SignalProducerProtocol, Value.Error == NoError {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` fails, the returned signal will forward that failure
- /// immediately.
- ///
- /// - warning: `interrupted` events on inner producers will be treated like
- /// `completed` events on inner producers.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
- return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
- }
- }
- extension SignalProtocol where Value: SignalProtocol, Error == Value.Error {
- /// Flattens the inner signals sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` or an active inner signal emits an error, the
- /// returned signal will forward that error immediately.
- ///
- /// - warning: `interrupted` events on inner signals will be treated like
- /// `completed` events on inner signals.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
- return self
- .map(SignalProducer.init)
- .flatten(strategy)
- }
- }
- extension SignalProtocol where Value: SignalProtocol, Error == NoError {
- /// Flattens the inner signals sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If an active inner signal emits an error, the returned signal
- /// will forward that error immediately.
- ///
- /// - warning: `interrupted` events on inner signals will be treated like
- /// `completed` events on inner signals.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
- return self
- .promoteErrors(Value.Error.self)
- .flatten(strategy)
- }
- }
- extension SignalProtocol where Value: SignalProtocol, Error == NoError, Value.Error == NoError {
- /// Flattens the inner signals sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - warning: `interrupted` events on inner signals will be treated like
- /// `completed` events on inner signals.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
- return self
- .map(SignalProducer.init)
- .flatten(strategy)
- }
- }
- extension SignalProtocol where Value: SignalProtocol, Value.Error == NoError {
- /// Flattens the inner signals sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` emits an error, the returned signal will forward
- /// that error immediately.
- ///
- /// - warning: `interrupted` events on inner signals will be treated like
- /// `completed` events on inner signals.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
- return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
- }
- }
- extension SignalProtocol where Value: Sequence, Error == NoError {
- /// Flattens the `sequence` value sent by `signal` according to
- /// the semantics of the given strategy.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Iterator.Element, Error> {
- return self.flatMap(strategy) { .init($0) }
- }
- }
- extension SignalProducerProtocol where Value: SignalProtocol, Error == Value.Error {
- /// Flattens the inner signals sent upon `producer` (into a single producer
- /// of values), according to the semantics of the given strategy.
- ///
- /// - note: If `producer` or an active inner signal emits an error, the
- /// returned producer will forward that error immediately.
- ///
- /// - warning: `interrupted` events on inner signals will be treated like
- /// `completed` events on inner signals.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
- return self
- .map(SignalProducer.init)
- .flatten(strategy)
- }
- }
- extension SignalProducerProtocol where Value: SignalProtocol, Error == NoError {
- /// Flattens the inner signals sent upon `producer` (into a single producer
- /// of values), according to the semantics of the given strategy.
- ///
- /// - note: If an active inner signal emits an error, the returned producer
- /// will forward that error immediately.
- ///
- /// - warning: `interrupted` events on inner signals will be treated like
- /// `completed` events on inner signals.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
- return self
- .promoteErrors(Value.Error.self)
- .flatten(strategy)
- }
- }
- extension SignalProducerProtocol where Value: SignalProtocol, Error == NoError, Value.Error == NoError {
- /// Flattens the inner signals sent upon `producer` (into a single producer
- /// of values), according to the semantics of the given strategy.
- ///
- /// - warning: `interrupted` events on inner signals will be treated like
- /// `completed` events on inner signals.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
- return self
- .map(SignalProducer.init)
- .flatten(strategy)
- }
- }
- extension SignalProducerProtocol where Value: SignalProtocol, Value.Error == NoError {
- /// Flattens the inner signals sent upon `producer` (into a single producer
- /// of values), according to the semantics of the given strategy.
- ///
- /// - note: If `producer` emits an error, the returned producer will forward
- /// that error immediately.
- ///
- /// - warning: `interrupted` events on inner signals will be treated like
- /// `completed` events on inner signals.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
- return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
- }
- }
- extension SignalProducerProtocol where Value: Sequence, Error == NoError {
- /// Flattens the `sequence` value sent by `producer` according to
- /// the semantics of the given strategy.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Iterator.Element, Error> {
- return self.flatMap(strategy) { .init($0) }
- }
- }
- extension SignalProtocol where Value: PropertyProtocol {
- /// Flattens the inner properties sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` fails, the returned signal will forward that failure
- /// immediately.
- public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
- return self.flatMap(strategy) { $0.producer }
- }
- }
- extension SignalProducerProtocol where Value: PropertyProtocol {
- /// Flattens the inner properties sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` fails, the returned signal will forward that failure
- /// immediately.
- public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
- return self.flatMap(strategy) { $0.producer }
- }
- }
- extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
- /// Returns a signal which sends all the values from producer signal emitted
- /// from `signal`, waiting until each inner producer completes before
- /// beginning to send the values from the next inner producer.
- ///
- /// - note: If any of the inner producers fail, the returned signal will
- /// forward that failure immediately
- ///
- /// - note: The returned signal completes only when `signal` and all
- /// producers emitted from `signal` complete.
- fileprivate func concat() -> Signal<Value.Value, Error> {
- return Signal<Value.Value, Error> { relayObserver in
- let disposable = CompositeDisposable()
- let relayDisposable = CompositeDisposable()
- disposable += relayDisposable
- disposable += self.observeConcat(relayObserver, relayDisposable)
- return disposable
- }
- }
-
- fileprivate func observeConcat(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable? = nil) -> Disposable? {
- let state = Atomic(ConcatState<Value.Value, Error>())
-
- func startNextIfNeeded() {
- while let producer = state.modify({ $0.dequeue() }) {
- producer.startWithSignal { signal, inner in
- let handle = disposable?.add(inner)
- signal.observe { event in
- switch event {
- case .completed, .interrupted:
- handle?.remove()
-
- let shouldStart: Bool = state.modify {
- $0.active = nil
- return !$0.isStarting
- }
- if shouldStart {
- startNextIfNeeded()
- }
- case .value, .failed:
- observer.action(event)
- }
- }
- }
- state.modify { $0.isStarting = false }
- }
- }
-
- return observe { event in
- switch event {
- case let .value(value):
- state.modify { $0.queue.append(value.producer) }
- startNextIfNeeded()
- case let .failed(error):
- observer.send(error: error)
- case .completed:
- state.modify { state in
- state.queue.append(SignalProducer.empty.on(completed: observer.sendCompleted))
- }
- startNextIfNeeded()
-
- case .interrupted:
- observer.sendInterrupted()
- }
- }
- }
- }
- extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
- /// Returns a producer which sends all the values from each producer emitted
- /// from `producer`, waiting until each inner producer completes before
- /// beginning to send the values from the next inner producer.
- ///
- /// - note: If any of the inner producers emit an error, the returned
- /// producer will emit that error.
- ///
- /// - note: The returned producer completes only when `producer` and all
- /// producers emitted from `producer` complete.
- fileprivate func concat() -> SignalProducer<Value.Value, Error> {
- return SignalProducer<Value.Value, Error> { observer, disposable in
- self.startWithSignal { signal, signalDisposable in
- disposable += signalDisposable
- _ = signal.observeConcat(observer, disposable)
- }
- }
- }
- }
- extension SignalProducerProtocol {
- /// `concat`s `next` onto `self`.
- public func concat(_ next: SignalProducer<Value, Error>) -> SignalProducer<Value, Error> {
- return SignalProducer<SignalProducer<Value, Error>, Error>([ self.producer, next ]).flatten(.concat)
- }
-
- /// `concat`s `value` onto `self`.
- public func concat(value: Value) -> SignalProducer<Value, Error> {
- return self.concat(SignalProducer(value: value))
- }
-
- /// `concat`s `self` onto initial `previous`.
- public func prefix<P: SignalProducerProtocol>(_ previous: P) -> SignalProducer<Value, Error>
- where P.Value == Value, P.Error == Error
- {
- return previous.concat(self.producer)
- }
-
- /// `concat`s `self` onto initial `value`.
- public func prefix(value: Value) -> SignalProducer<Value, Error> {
- return self.prefix(SignalProducer(value: value))
- }
- }
- private final class ConcatState<Value, Error: Swift.Error> {
- typealias SignalProducer = ReactiveSwift.SignalProducer<Value, Error>
-
- /// The active producer, if any.
- var active: SignalProducer? = nil
-
- /// The producers waiting to be started.
- var queue: [SignalProducer] = []
-
- /// Whether the active producer is currently starting.
- /// Used to prevent deep recursion.
- var isStarting: Bool = false
-
- /// Dequeue the next producer if one should be started.
- ///
- /// - note: The caller *must* set `isStarting` to false after the returned
- /// producer has been started.
- ///
- /// - returns: The `SignalProducer` to start or `nil` if no producer should
- /// be started.
- func dequeue() -> SignalProducer? {
- if active != nil {
- return nil
- }
-
- active = queue.first
- if active != nil {
- queue.removeFirst()
- isStarting = true
- }
- return active
- }
- }
- extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
- /// Merges a `signal` of SignalProducers down into a single signal, biased
- /// toward the producer added earlier. Returns a Signal that will forward
- /// events from the inner producers as they arrive.
- fileprivate func merge() -> Signal<Value.Value, Error> {
- return Signal<Value.Value, Error> { relayObserver in
- let disposable = CompositeDisposable()
- let relayDisposable = CompositeDisposable()
- disposable += relayDisposable
- disposable += self.observeMerge(relayObserver, relayDisposable)
- return disposable
- }
- }
- fileprivate func observeMerge(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable) -> Disposable? {
- let inFlight = Atomic(1)
- let decrementInFlight = {
- let shouldComplete: Bool = inFlight.modify {
- $0 -= 1
- return $0 == 0
- }
- if shouldComplete {
- observer.sendCompleted()
- }
- }
- return self.observe { event in
- switch event {
- case let .value(producer):
- producer.startWithSignal { innerSignal, innerDisposable in
- inFlight.modify { $0 += 1 }
- let handle = disposable.add(innerDisposable)
- innerSignal.observe { event in
- switch event {
- case .completed, .interrupted:
- handle.remove()
- decrementInFlight()
- case .value, .failed:
- observer.action(event)
- }
- }
- }
- case let .failed(error):
- observer.send(error: error)
- case .completed:
- decrementInFlight()
- case .interrupted:
- observer.sendInterrupted()
- }
- }
- }
- }
- extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
- /// Merges a `signal` of SignalProducers down into a single signal, biased
- /// toward the producer added earlier. Returns a Signal that will forward
- /// events from the inner producers as they arrive.
- fileprivate func merge() -> SignalProducer<Value.Value, Error> {
- return SignalProducer<Value.Value, Error> { relayObserver, disposable in
- self.startWithSignal { signal, signalDisposable in
- disposable += signalDisposable
- _ = signal.observeMerge(relayObserver, disposable)
- }
- }
- }
- }
- extension SignalProtocol {
- /// Merges the given signals into a single `Signal` that will emit all
- /// values from each of them, and complete when all of them have completed.
- public static func merge<Seq: Sequence, S: SignalProtocol>(_ signals: Seq) -> Signal<Value, Error>
- where S.Value == Value, S.Error == Error, Seq.Iterator.Element == S
- {
- return SignalProducer<S, Error>(signals)
- .flatten(.merge)
- .startAndRetrieveSignal()
- }
-
- /// Merges the given signals into a single `Signal` that will emit all
- /// values from each of them, and complete when all of them have completed.
- public static func merge<S: SignalProtocol>(_ signals: S...) -> Signal<Value, Error>
- where S.Value == Value, S.Error == Error
- {
- return Signal.merge(signals)
- }
- }
- extension SignalProducerProtocol {
- /// Merges the given producers into a single `SignalProducer` that will emit
- /// all values from each of them, and complete when all of them have
- /// completed.
- public static func merge<Seq: Sequence, S: SignalProducerProtocol>(_ producers: Seq) -> SignalProducer<Value, Error>
- where S.Value == Value, S.Error == Error, Seq.Iterator.Element == S
- {
- return SignalProducer(producers).flatten(.merge)
- }
-
- /// Merges the given producers into a single `SignalProducer` that will emit
- /// all values from each of them, and complete when all of them have
- /// completed.
- public static func merge<S: SignalProducerProtocol>(_ producers: S...) -> SignalProducer<Value, Error>
- where S.Value == Value, S.Error == Error
- {
- return SignalProducer.merge(producers)
- }
- }
- extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
- /// Returns a signal that forwards values from the latest signal sent on
- /// `signal`, ignoring values sent on previous inner signal.
- ///
- /// An error sent on `signal` or the latest inner signal will be sent on the
- /// returned signal.
- ///
- /// The returned signal completes when `signal` and the latest inner
- /// signal have both completed.
- fileprivate func switchToLatest() -> Signal<Value.Value, Error> {
- return Signal<Value.Value, Error> { observer in
- let composite = CompositeDisposable()
- let serial = SerialDisposable()
- composite += serial
- composite += self.observeSwitchToLatest(observer, serial)
- return composite
- }
- }
- fileprivate func observeSwitchToLatest(_ observer: Observer<Value.Value, Error>, _ latestInnerDisposable: SerialDisposable) -> Disposable? {
- let state = Atomic(LatestState<Value, Error>())
- return self.observe { event in
- switch event {
- case let .value(innerProducer):
- innerProducer.startWithSignal { innerSignal, innerDisposable in
- state.modify {
- // When we replace the disposable below, this prevents
- // the generated Interrupted event from doing any work.
- $0.replacingInnerSignal = true
- }
- latestInnerDisposable.inner = innerDisposable
- state.modify {
- $0.replacingInnerSignal = false
- $0.innerSignalComplete = false
- }
- innerSignal.observe { event in
- switch event {
- case .interrupted:
- // If interruption occurred as a result of a new
- // producer arriving, we don't want to notify our
- // observer.
- let shouldComplete: Bool = state.modify { state in
- if !state.replacingInnerSignal {
- state.innerSignalComplete = true
- }
- return !state.replacingInnerSignal && state.outerSignalComplete
- }
- if shouldComplete {
- observer.sendCompleted()
- }
- case .completed:
- let shouldComplete: Bool = state.modify {
- $0.innerSignalComplete = true
- return $0.outerSignalComplete
- }
- if shouldComplete {
- observer.sendCompleted()
- }
- case .value, .failed:
- observer.action(event)
- }
- }
- }
- case let .failed(error):
- observer.send(error: error)
- case .completed:
- let shouldComplete: Bool = state.modify {
- $0.outerSignalComplete = true
- return $0.innerSignalComplete
- }
- if shouldComplete {
- observer.sendCompleted()
- }
- case .interrupted:
- observer.sendInterrupted()
- }
- }
- }
- }
- extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
- /// Returns a signal that forwards values from the latest signal sent on
- /// `signal`, ignoring values sent on previous inner signal.
- ///
- /// An error sent on `signal` or the latest inner signal will be sent on the
- /// returned signal.
- ///
- /// The returned signal completes when `signal` and the latest inner
- /// signal have both completed.
- fileprivate func switchToLatest() -> SignalProducer<Value.Value, Error> {
- return SignalProducer<Value.Value, Error> { observer, disposable in
- let latestInnerDisposable = SerialDisposable()
- disposable += latestInnerDisposable
- self.startWithSignal { signal, signalDisposable in
- disposable += signalDisposable
- disposable += signal.observeSwitchToLatest(observer, latestInnerDisposable)
- }
- }
- }
- }
- private struct LatestState<Value, Error: Swift.Error> {
- var outerSignalComplete: Bool = false
- var innerSignalComplete: Bool = true
-
- var replacingInnerSignal: Bool = false
- }
- extension SignalProtocol {
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting producers (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `signal` or any of the created producers fail, the returned signal
- /// will forward that failure immediately.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, Error>) -> Signal<U, Error> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting producers (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `signal` fails, the returned signal will forward that failure
- /// immediately.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, NoError>) -> Signal<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `signal` or any of the created signals emit an error, the returned
- /// signal will forward that error immediately.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, Error>) -> Signal<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `signal` emits an error, the returned signal will forward that
- /// error immediately.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, NoError>) -> Signal<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `signal` to a new property, then flattens the
- /// resulting properties (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `signal` emits an error, the returned signal will forward that
- /// error immediately.
- public func flatMap<P: PropertyProtocol>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> P) -> Signal<P.Value, Error> {
- return map(transform).flatten(strategy)
- }
- }
- extension SignalProtocol where Error == NoError {
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If any of the created signals emit an error, the returned signal
- /// will forward that error immediately.
- public func flatMap<U, E>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, E>) -> Signal<U, E> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, NoError>) -> Signal<U, NoError> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If any of the created signals emit an error, the returned signal
- /// will forward that error immediately.
- public func flatMap<U, E>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, E>) -> Signal<U, E> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, NoError>) -> Signal<U, NoError> {
- return map(transform).flatten(strategy)
- }
- }
- extension SignalProducerProtocol {
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting producers (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `self` or any of the created producers fail, the returned producer
- /// will forward that failure immediately.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, Error>) -> SignalProducer<U, Error> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting producers (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `self` fails, the returned producer will forward that failure
- /// immediately.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, NoError>) -> SignalProducer<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting signals (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `self` or any of the created signals emit an error, the returned
- /// producer will forward that error immediately.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, Error>) -> SignalProducer<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting signals (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `self` emits an error, the returned producer will forward that
- /// error immediately.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, NoError>) -> SignalProducer<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `self` to a new property, then flattens the
- /// resulting properties (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `self` emits an error, the returned producer will forward that
- /// error immediately.
- public func flatMap<P: PropertyProtocol>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> P) -> SignalProducer<P.Value, Error> {
- return map(transform).flatten(strategy)
- }
- }
- extension SignalProducerProtocol where Error == NoError {
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting producers (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If any of the created producers fail, the returned producer will
- /// forward that failure immediately.
- public func flatMap<U, E>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, E>) -> SignalProducer<U, E> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting producers (into a producer of values), according to the
- /// semantics of the given strategy.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, NoError>) -> SignalProducer<U, NoError> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting signals (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If any of the created signals emit an error, the returned
- /// producer will forward that error immediately.
- public func flatMap<U, E>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, E>) -> SignalProducer<U, E> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting signals (into a producer of values), according to the
- /// semantics of the given strategy.
- public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, NoError>) -> SignalProducer<U, NoError> {
- return map(transform).flatten(strategy)
- }
- }
- extension SignalProtocol {
- /// Catches any failure that may occur on the input signal, mapping to a new
- /// producer that starts in its place.
- public func flatMapError<F>(_ handler: @escaping (Error) -> SignalProducer<Value, F>) -> Signal<Value, F> {
- return Signal { observer in
- self.observeFlatMapError(handler, observer, SerialDisposable())
- }
- }
- fileprivate func observeFlatMapError<F>(_ handler: @escaping (Error) -> SignalProducer<Value, F>, _ observer: Observer<Value, F>, _ serialDisposable: SerialDisposable) -> Disposable? {
- return self.observe { event in
- switch event {
- case let .value(value):
- observer.send(value: value)
- case let .failed(error):
- handler(error).startWithSignal { signal, disposable in
- serialDisposable.inner = disposable
- signal.observe(observer)
- }
- case .completed:
- observer.sendCompleted()
- case .interrupted:
- observer.sendInterrupted()
- }
- }
- }
- }
- extension SignalProducerProtocol {
- /// Catches any failure that may occur on the input producer, mapping to a
- /// new producer that starts in its place.
- public func flatMapError<F>(_ handler: @escaping (Error) -> SignalProducer<Value, F>) -> SignalProducer<Value, F> {
- return SignalProducer { observer, disposable in
- let serialDisposable = SerialDisposable()
- disposable += serialDisposable
- self.startWithSignal { signal, signalDisposable in
- serialDisposable.inner = signalDisposable
- _ = signal.observeFlatMapError(handler, observer, serialDisposable)
- }
- }
- }
- }
|