| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974 |
- //
- // Flatten.swift
- // ReactiveCocoa
- //
- // Created by Neil Pankey on 11/30/15.
- // Copyright © 2015 GitHub. All rights reserved.
- //
- import enum Result.NoError
- /// Describes how multiple producers should be joined together.
- public enum FlattenStrategy: Equatable {
- /// The producers should be merged, so that any value received on any of the
- /// input producers will be forwarded immediately to the output producer.
- ///
- /// The resulting producer will complete only when all inputs have
- /// completed.
- case Merge
- /// The producers should be concatenated, so that their values are sent in
- /// the order of the producers themselves.
- ///
- /// The resulting producer will complete only when all inputs have
- /// completed.
- case Concat
- /// Only the events from the latest input producer should be considered for
- /// the output. Any producers received before that point will be disposed
- /// of.
- ///
- /// The resulting producer will complete only when the producer-of-producers
- /// and the latest producer has completed.
- case Latest
- }
- extension SignalType where Value: SignalProducerType, Error == Value.Error {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` or an active inner producer fails, the returned
- /// signal will forward that failure immediately.
- ///
- /// - note: `Interrupted` events on inner producers will be treated like
- /// `Completed events on inner producers.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
- switch strategy {
- case .Merge:
- return self.merge()
- case .Concat:
- return self.concat()
- case .Latest:
- return self.switchToLatest()
- }
- }
- }
- extension SignalType where Value: SignalProducerType, Error == NoError {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If an active inner producer fails, the returned signal will
- /// forward that failure immediately.
- ///
- /// - warning: `Interrupted` events on inner producers will be treated like
- /// `Completed` events on inner producers.
- ///
- /// - parameters:
- /// - strategy: Strategy used when flattening signals.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
- return self
- .promoteErrors(Value.Error.self)
- .flatten(strategy)
- }
- }
- extension SignalType where Value: SignalProducerType, Error == NoError, Value.Error == NoError {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - warning: `Interrupted` events on inner producers will be treated like
- /// `Completed` events on inner producers.
- ///
- /// - parameters:
- /// - strategy: Strategy used when flattening signals.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
- switch strategy {
- case .Merge:
- return self.merge()
- case .Concat:
- return self.concat()
- case .Latest:
- return self.switchToLatest()
- }
- }
- }
- extension SignalType where Value: SignalProducerType, Value.Error == NoError {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` fails, the returned signal will forward that failure
- /// immediately.
- ///
- /// - warning: `Interrupted` events on inner producers will be treated like
- /// `Completed` events on inner producers.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
- return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
- }
- }
- extension SignalProducerType where Value: SignalProducerType, Error == Value.Error {
- /// Flattens the inner producers sent upon `producer` (into a single
- /// producer of values), according to the semantics of the given strategy.
- ///
- /// - note: If `producer` or an active inner producer fails, the returned
- /// producer will forward that failure immediately.
- ///
- /// - warning: `Interrupted` events on inner producers will be treated like
- /// `Completed` events on inner producers.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatten(strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
- switch strategy {
- case .Merge:
- return self.merge()
- case .Concat:
- return self.concat()
- case .Latest:
- return self.switchToLatest()
- }
- }
- }
- extension SignalProducerType where Value: SignalProducerType, Error == NoError {
- /// Flattens the inner producers sent upon `producer` (into a single
- /// producer of values), according to the semantics of the given strategy.
- ///
- /// - note: If an active inner producer fails, the returned producer will
- /// forward that failure immediately.
- ///
- /// - warning: `Interrupted` events on inner producers will be treated like
- /// `Completed` events on inner producers.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatten(strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
- return self
- .promoteErrors(Value.Error.self)
- .flatten(strategy)
- }
- }
- extension SignalProducerType where Value: SignalProducerType, Error == NoError, Value.Error == NoError {
- /// Flattens the inner producers sent upon `producer` (into a single
- /// producer of values), according to the semantics of the given strategy.
- ///
- /// - warning: `Interrupted` events on inner producers will be treated like
- /// `Completed` events on inner producers.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
- switch strategy {
- case .Merge:
- return self.merge()
- case .Concat:
- return self.concat()
- case .Latest:
- return self.switchToLatest()
- }
- }
- }
- extension SignalProducerType where Value: SignalProducerType, Value.Error == NoError {
- /// Flattens the inner producers sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` fails, the returned signal will forward that failure
- /// immediately.
- ///
- /// - warning: `Interrupted` events on inner producers will be treated like
- /// `Completed` events on inner producers.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
- return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
- }
- }
- extension SignalType where Value: SignalType, Error == Value.Error {
- /// Flattens the inner signals sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` or an active inner signal emits an error, the
- /// returned signal will forward that error immediately.
- ///
- /// - warning: `Interrupted` events on inner signals will be treated like
- /// `Completed` events on inner signals.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
- return self
- .map(SignalProducer.init)
- .flatten(strategy)
- }
- }
- extension SignalType where Value: SignalType, Error == NoError {
- /// Flattens the inner signals sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If an active inner signal emits an error, the returned signal
- /// will forward that error immediately.
- ///
- /// - warning: `Interrupted` events on inner signals will be treated like
- /// `Completed` events on inner signals.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
- return self
- .promoteErrors(Value.Error.self)
- .flatten(strategy)
- }
- }
- extension SignalType where Value: SignalType, Error == NoError, Value.Error == NoError {
- /// Flattens the inner signals sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - warning: `Interrupted` events on inner signals will be treated like
- /// `Completed` events on inner signals.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
- return self
- .map(SignalProducer.init)
- .flatten(strategy)
- }
- }
- extension SignalType where Value: SignalType, Value.Error == NoError {
- /// Flattens the inner signals sent upon `signal` (into a single signal of
- /// values), according to the semantics of the given strategy.
- ///
- /// - note: If `signal` emits an error, the returned signal will forward
- /// that error immediately.
- ///
- /// - warning: `Interrupted` events on inner signals will be treated like
- /// `Completed` events on inner signals.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
- return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
- }
- }
- extension SignalType where Value: SequenceType, Error == NoError {
- /// Flattens the `sequence` value sent by `signal` according to
- /// the semantics of the given strategy.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> Signal<Value.Generator.Element, Error> {
- return self.flatMap(strategy) { .init(values: $0) }
- }
- }
- extension SignalProducerType where Value: SignalType, Error == Value.Error {
- /// Flattens the inner signals sent upon `producer` (into a single producer
- /// of values), according to the semantics of the given strategy.
- ///
- /// - note: If `producer` or an active inner signal emits an error, the
- /// returned producer will forward that error immediately.
- ///
- /// - warning: `Interrupted` events on inner signals will be treated like
- /// `Completed` events on inner signals.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatten(strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
- return self
- .map(SignalProducer.init)
- .flatten(strategy)
- }
- }
- extension SignalProducerType where Value: SignalType, Error == NoError {
- /// Flattens the inner signals sent upon `producer` (into a single producer
- /// of values), according to the semantics of the given strategy.
- ///
- /// - note: If an active inner signal emits an error, the returned producer
- /// will forward that error immediately.
- ///
- /// - warning: `Interrupted` events on inner signals will be treated like
- /// `Completed` events on inner signals.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
- return self
- .promoteErrors(Value.Error.self)
- .flatten(strategy)
- }
- }
- extension SignalProducerType where Value: SignalType, Error == NoError, Value.Error == NoError {
- /// Flattens the inner signals sent upon `producer` (into a single producer
- /// of values), according to the semantics of the given strategy.
- ///
- /// - warning: `Interrupted` events on inner signals will be treated like
- /// `Completed` events on inner signals.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
- return self
- .map(SignalProducer.init)
- .flatten(strategy)
- }
- }
- extension SignalProducerType where Value: SignalType, Value.Error == NoError {
- /// Flattens the inner signals sent upon `producer` (into a single producer
- /// of values), according to the semantics of the given strategy.
- ///
- /// - note: If `producer` emits an error, the returned producer will forward
- /// that error immediately.
- ///
- /// - warning: `Interrupted` events on inner signals will be treated like
- /// `Completed` events on inner signals.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
- return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
- }
- }
- extension SignalProducerType where Value: SequenceType, Error == NoError {
- /// Flattens the `sequence` value sent by `producer` according to
- /// the semantics of the given strategy.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatten(strategy: FlattenStrategy) -> SignalProducer<Value.Generator.Element, Error> {
- return self.flatMap(strategy) { .init(values: $0) }
- }
- }
- extension SignalType where Value: SignalProducerType, Error == Value.Error {
- /// Returns a signal which sends all the values from producer signal emitted
- /// from `signal`, waiting until each inner producer completes before
- /// beginning to send the values from the next inner producer.
- ///
- /// - note: If any of the inner producers fail, the returned signal will
- /// forward that failure immediately
- ///
- /// - note: The returned signal completes only when `signal` and all
- /// producers emitted from `signal` complete.
- private func concat() -> Signal<Value.Value, Error> {
- return Signal<Value.Value, Error> { relayObserver in
- let disposable = CompositeDisposable()
- let relayDisposable = CompositeDisposable()
- disposable += relayDisposable
- disposable += self.observeConcat(relayObserver, relayDisposable)
- return disposable
- }
- }
- private func observeConcat(observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable? = nil) -> Disposable? {
- let state = ConcatState(observer: observer, disposable: disposable)
- return self.observe { event in
- switch event {
- case let .Next(value):
- state.enqueueSignalProducer(value.producer)
- case let .Failed(error):
- observer.sendFailed(error)
- case .Completed:
- // Add one last producer to the queue, whose sole job is to
- // "turn out the lights" by completing `observer`.
- state.enqueueSignalProducer(SignalProducer.empty.on(completed: {
- observer.sendCompleted()
- }))
- case .Interrupted:
- observer.sendInterrupted()
- }
- }
- }
- }
- extension SignalProducerType where Value: SignalProducerType, Error == Value.Error {
- /// Returns a producer which sends all the values from each producer emitted
- /// from `producer`, waiting until each inner producer completes before
- /// beginning to send the values from the next inner producer.
- ///
- /// - note: If any of the inner producers emit an error, the returned
- /// producer will emit that error.
- ///
- /// - note: The returned producer completes only when `producer` and all
- /// producers emitted from `producer` complete.
- private func concat() -> SignalProducer<Value.Value, Error> {
- return SignalProducer<Value.Value, Error> { observer, disposable in
- self.startWithSignal { signal, signalDisposable in
- disposable += signalDisposable
- signal.observeConcat(observer, disposable)
- }
- }
- }
- }
- extension SignalProducerType {
- /// `concat`s `next` onto `self`.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func concat(next: SignalProducer<Value, Error>) -> SignalProducer<Value, Error> {
- return SignalProducer<SignalProducer<Value, Error>, Error>(values: [ self.producer, next ]).flatten(.Concat)
- }
-
- /// `concat`s `value` onto `self`.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func concat(value value: Value) -> SignalProducer<Value, Error> {
- return self.concat(SignalProducer(value: value))
- }
-
- /// `concat`s `self` onto initial `previous`.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func prefix<P: SignalProducerType where P.Value == Value, P.Error == Error>(previous: P) -> SignalProducer<Value, Error> {
- return previous.concat(self.producer)
- }
-
- /// `concat`s `self` onto initial `value`.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func prefix(value value: Value) -> SignalProducer<Value, Error> {
- return self.prefix(SignalProducer(value: value))
- }
- }
- private final class ConcatState<Value, Error: ErrorType> {
- /// The observer of a started `concat` producer.
- let observer: Observer<Value, Error>
- /// The top level disposable of a started `concat` producer.
- let disposable: CompositeDisposable?
- /// The active producer, if any, and the producers waiting to be started.
- let queuedSignalProducers: Atomic<[SignalProducer<Value, Error>]> = Atomic([])
- init(observer: Signal<Value, Error>.Observer, disposable: CompositeDisposable?) {
- self.observer = observer
- self.disposable = disposable
- }
- func enqueueSignalProducer(producer: SignalProducer<Value, Error>) {
- if let d = disposable where d.disposed {
- return
- }
- var shouldStart = true
- queuedSignalProducers.modify {
- // An empty queue means the concat is idle, ready & waiting to start
- // the next producer.
- var queue = $0
- shouldStart = queue.isEmpty
- queue.append(producer)
- return queue
- }
- if shouldStart {
- startNextSignalProducer(producer)
- }
- }
- func dequeueSignalProducer() -> SignalProducer<Value, Error>? {
- if let d = disposable where d.disposed {
- return nil
- }
- var nextSignalProducer: SignalProducer<Value, Error>?
- queuedSignalProducers.modify {
- // Active producers remain in the queue until completed. Since
- // dequeueing happens at completion of the active producer, the
- // first producer in the queue can be removed.
- var queue = $0
- if !queue.isEmpty { queue.removeAtIndex(0) }
- nextSignalProducer = queue.first
- return queue
- }
- return nextSignalProducer
- }
- /// Subscribes to the given signal producer.
- func startNextSignalProducer(signalProducer: SignalProducer<Value, Error>) {
- signalProducer.startWithSignal { signal, disposable in
- let handle = self.disposable?.addDisposable(disposable) ?? nil
- signal.observe { event in
- switch event {
- case .Completed, .Interrupted:
- handle?.remove()
- if let nextSignalProducer = self.dequeueSignalProducer() {
- self.startNextSignalProducer(nextSignalProducer)
- }
- case .Next, .Failed:
- self.observer.action(event)
- }
- }
- }
- }
- }
- extension SignalType where Value: SignalProducerType, Error == Value.Error {
- /// Merges a `signal` of SignalProducers down into a single signal, biased
- /// toward the producer added earlier. Returns a Signal that will forward
- /// events from the inner producers as they arrive.
- private func merge() -> Signal<Value.Value, Error> {
- return Signal<Value.Value, Error> { relayObserver in
- let disposable = CompositeDisposable()
- let relayDisposable = CompositeDisposable()
- disposable += relayDisposable
- disposable += self.observeMerge(relayObserver, relayDisposable)
- return disposable
- }
- }
- private func observeMerge(observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable) -> Disposable? {
- let inFlight = Atomic(1)
- let decrementInFlight = {
- let orig = inFlight.modify { $0 - 1 }
- if orig == 1 {
- observer.sendCompleted()
- }
- }
- return self.observe { event in
- switch event {
- case let .Next(producer):
- producer.startWithSignal { innerSignal, innerDisposable in
- inFlight.modify { $0 + 1 }
- let handle = disposable.addDisposable(innerDisposable)
- innerSignal.observe { event in
- switch event {
- case .Completed, .Interrupted:
- handle.remove()
- decrementInFlight()
- case .Next, .Failed:
- observer.action(event)
- }
- }
- }
- case let .Failed(error):
- observer.sendFailed(error)
- case .Completed:
- decrementInFlight()
- case .Interrupted:
- observer.sendInterrupted()
- }
- }
- }
- }
- extension SignalProducerType where Value: SignalProducerType, Error == Value.Error {
- /// Merges a `signal` of SignalProducers down into a single signal, biased
- /// toward the producer added earlier. Returns a Signal that will forward
- /// events from the inner producers as they arrive.
- private func merge() -> SignalProducer<Value.Value, Error> {
- return SignalProducer<Value.Value, Error> { relayObserver, disposable in
- self.startWithSignal { signal, signalDisposable in
- disposable.addDisposable(signalDisposable)
- signal.observeMerge(relayObserver, disposable)
- }
- }
- }
- }
- extension SignalType {
- /// Merges the given signals into a single `Signal` that will emit all
- /// values from each of them, and complete when all of them have completed.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public static func merge<Seq: SequenceType, S: SignalType where S.Value == Value, S.Error == Error, Seq.Generator.Element == S>(signals: Seq) -> Signal<Value, Error> {
- let producer = SignalProducer<S, Error>(values: signals)
- var result: Signal<Value, Error>!
- producer.startWithSignal { signal, _ in
- result = signal.flatten(.Merge)
- }
- return result
- }
-
- /// Merges the given signals into a single `Signal` that will emit all
- /// values from each of them, and complete when all of them have completed.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public static func merge<S: SignalType where S.Value == Value, S.Error == Error>(signals: S...) -> Signal<Value, Error> {
- return Signal.merge(signals)
- }
- }
- extension SignalProducerType {
- /// Merges the given producers into a single `SignalProducer` that will emit
- /// all values from each of them, and complete when all of them have
- /// completed.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public static func merge<Seq: SequenceType, S: SignalProducerType where S.Value == Value, S.Error == Error, Seq.Generator.Element == S>(producers: Seq) -> SignalProducer<Value, Error> {
- return SignalProducer(values: producers).flatten(.Merge)
- }
-
- /// Merges the given producers into a single `SignalProducer` that will emit
- /// all values from each of them, and complete when all of them have
- /// completed.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public static func merge<S: SignalProducerType where S.Value == Value, S.Error == Error>(producers: S...) -> SignalProducer<Value, Error> {
- return SignalProducer.merge(producers)
- }
- }
- extension SignalType where Value: SignalProducerType, Error == Value.Error {
- /// Returns a signal that forwards values from the latest signal sent on
- /// `signal`, ignoring values sent on previous inner signal.
- ///
- /// An error sent on `signal` or the latest inner signal will be sent on the
- /// returned signal.
- ///
- /// The returned signal completes when `signal` and the latest inner
- /// signal have both completed.
- private func switchToLatest() -> Signal<Value.Value, Error> {
- return Signal<Value.Value, Error> { observer in
- let composite = CompositeDisposable()
- let serial = SerialDisposable()
- composite += serial
- composite += self.observeSwitchToLatest(observer, serial)
- return composite
- }
- }
- private func observeSwitchToLatest(observer: Observer<Value.Value, Error>, _ latestInnerDisposable: SerialDisposable) -> Disposable? {
- let state = Atomic(LatestState<Value, Error>())
- return self.observe { event in
- switch event {
- case let .Next(innerProducer):
- innerProducer.startWithSignal { innerSignal, innerDisposable in
- state.modify {
- // When we replace the disposable below, this prevents
- // the generated Interrupted event from doing any work.
- var state = $0
- state.replacingInnerSignal = true
- return state
- }
- latestInnerDisposable.innerDisposable = innerDisposable
- state.modify {
- var state = $0
- state.replacingInnerSignal = false
- state.innerSignalComplete = false
- return state
- }
- innerSignal.observe { event in
- switch event {
- case .Interrupted:
- // If interruption occurred as a result of a new
- // producer arriving, we don't want to notify our
- // observer.
- let original = state.modify {
- var state = $0
- if !state.replacingInnerSignal {
- state.innerSignalComplete = true
- }
- return state
- }
- if !original.replacingInnerSignal && original.outerSignalComplete {
- observer.sendCompleted()
- }
- case .Completed:
- let original = state.modify {
- var state = $0
- state.innerSignalComplete = true
- return state
- }
- if original.outerSignalComplete {
- observer.sendCompleted()
- }
- case .Next, .Failed:
- observer.action(event)
- }
- }
- }
- case let .Failed(error):
- observer.sendFailed(error)
- case .Completed:
- let original = state.modify {
- var state = $0
- state.outerSignalComplete = true
- return state
- }
- if original.innerSignalComplete {
- observer.sendCompleted()
- }
- case .Interrupted:
- observer.sendInterrupted()
- }
- }
- }
- }
- extension SignalProducerType where Value: SignalProducerType, Error == Value.Error {
- /// Returns a signal that forwards values from the latest signal sent on
- /// `signal`, ignoring values sent on previous inner signal.
- ///
- /// An error sent on `signal` or the latest inner signal will be sent on the
- /// returned signal.
- ///
- /// The returned signal completes when `signal` and the latest inner
- /// signal have both completed.
- private func switchToLatest() -> SignalProducer<Value.Value, Error> {
- return SignalProducer<Value.Value, Error> { observer, disposable in
- let latestInnerDisposable = SerialDisposable()
- disposable.addDisposable(latestInnerDisposable)
- self.startWithSignal { signal, signalDisposable in
- disposable += signalDisposable
- disposable += signal.observeSwitchToLatest(observer, latestInnerDisposable)
- }
- }
- }
- }
- private struct LatestState<Value, Error: ErrorType> {
- var outerSignalComplete: Bool = false
- var innerSignalComplete: Bool = true
-
- var replacingInnerSignal: Bool = false
- }
- extension SignalType {
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting producers (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `signal` or any of the created producers fail, the returned signal
- /// will forward that failure immediately.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> SignalProducer<U, Error>) -> Signal<U, Error> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting producers (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `signal` fails, the returned signal will forward that failure
- /// immediately.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> SignalProducer<U, NoError>) -> Signal<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `signal` or any of the created signals emit an error, the returned
- /// signal will forward that error immediately.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> Signal<U, Error>) -> Signal<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `signal` emits an error, the returned signal will forward that
- /// error immediately.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> Signal<U, NoError>) -> Signal<U, Error> {
- return map(transform).flatten(strategy)
- }
- }
- extension SignalType where Error == NoError {
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If any of the created signals emit an error, the returned signal
- /// will forward that error immediately.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatMap<U, E>(strategy: FlattenStrategy, transform: Value -> SignalProducer<U, E>) -> Signal<U, E> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> SignalProducer<U, NoError>) -> Signal<U, NoError> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- ///
- /// If any of the created signals emit an error, the returned signal
- /// will forward that error immediately.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatMap<U, E>(strategy: FlattenStrategy, transform: Value -> Signal<U, E>) -> Signal<U, E> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `signal` to a new signal, then flattens the
- /// resulting signals (into a signal of values), according to the
- /// semantics of the given strategy.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> Signal<U, NoError>) -> Signal<U, NoError> {
- return map(transform).flatten(strategy)
- }
- }
- extension SignalProducerType {
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting producers (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `self` or any of the created producers fail, the returned producer
- /// will forward that failure immediately.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> SignalProducer<U, Error>) -> SignalProducer<U, Error> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting producers (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `self` fails, the returned producer will forward that failure
- /// immediately.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> SignalProducer<U, NoError>) -> SignalProducer<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting signals (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `self` or any of the created signals emit an error, the returned
- /// producer will forward that error immediately.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> Signal<U, Error>) -> SignalProducer<U, Error> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting signals (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If `self` emits an error, the returned producer will forward that
- /// error immediately.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> Signal<U, NoError>) -> SignalProducer<U, Error> {
- return map(transform).flatten(strategy)
- }
- }
- extension SignalProducerType where Error == NoError {
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting producers (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If any of the created producers fail, the returned producer will
- /// forward that failure immediately.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatMap<U, E>(strategy: FlattenStrategy, transform: Value -> SignalProducer<U, E>) -> SignalProducer<U, E> {
- return map(transform).flatten(strategy)
- }
-
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting producers (into a producer of values), according to the
- /// semantics of the given strategy.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> SignalProducer<U, NoError>) -> SignalProducer<U, NoError> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting signals (into a producer of values), according to the
- /// semantics of the given strategy.
- ///
- /// If any of the created signals emit an error, the returned
- /// producer will forward that error immediately.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatMap<U, E>(strategy: FlattenStrategy, transform: Value -> Signal<U, E>) -> SignalProducer<U, E> {
- return map(transform).flatten(strategy)
- }
- /// Maps each event from `self` to a new producer, then flattens the
- /// resulting signals (into a producer of values), according to the
- /// semantics of the given strategy.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatMap<U>(strategy: FlattenStrategy, transform: Value -> Signal<U, NoError>) -> SignalProducer<U, NoError> {
- return map(transform).flatten(strategy)
- }
- }
- extension SignalType {
- /// Catches any failure that may occur on the input signal, mapping to a new
- /// producer that starts in its place.
- @warn_unused_result(message="Did you forget to call `observe` on the signal?")
- public func flatMapError<F>(handler: Error -> SignalProducer<Value, F>) -> Signal<Value, F> {
- return Signal { observer in
- self.observeFlatMapError(handler, observer, SerialDisposable())
- }
- }
- private func observeFlatMapError<F>(handler: Error -> SignalProducer<Value, F>, _ observer: Observer<Value, F>, _ serialDisposable: SerialDisposable) -> Disposable? {
- return self.observe { event in
- switch event {
- case let .Next(value):
- observer.sendNext(value)
- case let .Failed(error):
- handler(error).startWithSignal { signal, disposable in
- serialDisposable.innerDisposable = disposable
- signal.observe(observer)
- }
- case .Completed:
- observer.sendCompleted()
- case .Interrupted:
- observer.sendInterrupted()
- }
- }
- }
- }
- extension SignalProducerType {
- /// Catches any failure that may occur on the input producer, mapping to a
- /// new producer that starts in its place.
- @warn_unused_result(message="Did you forget to call `start` on the producer?")
- public func flatMapError<F>(handler: Error -> SignalProducer<Value, F>) -> SignalProducer<Value, F> {
- return SignalProducer { observer, disposable in
- let serialDisposable = SerialDisposable()
- disposable.addDisposable(serialDisposable)
- self.startWithSignal { signal, signalDisposable in
- serialDisposable.innerDisposable = signalDisposable
- signal.observeFlatMapError(handler, observer, serialDisposable)
- }
- }
- }
- }
|