Flatten.swift 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960
  1. //
  2. // Flatten.swift
  3. // ReactiveSwift
  4. //
  5. // Created by Neil Pankey on 11/30/15.
  6. // Copyright © 2015 GitHub. All rights reserved.
  7. //
  8. import enum Result.NoError
  9. /// Describes how multiple producers should be joined together.
  10. public enum FlattenStrategy: Equatable {
  11. /// The producers should be merged, so that any value received on any of the
  12. /// input producers will be forwarded immediately to the output producer.
  13. ///
  14. /// The resulting producer will complete only when all inputs have
  15. /// completed.
  16. case merge
  17. /// The producers should be concatenated, so that their values are sent in
  18. /// the order of the producers themselves.
  19. ///
  20. /// The resulting producer will complete only when all inputs have
  21. /// completed.
  22. case concat
  23. /// Only the events from the latest input producer should be considered for
  24. /// the output. Any producers received before that point will be disposed
  25. /// of.
  26. ///
  27. /// The resulting producer will complete only when the producer-of-producers
  28. /// and the latest producer has completed.
  29. case latest
  30. }
  31. extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
  32. /// Flattens the inner producers sent upon `signal` (into a single signal of
  33. /// values), according to the semantics of the given strategy.
  34. ///
  35. /// - note: If `signal` or an active inner producer fails, the returned
  36. /// signal will forward that failure immediately.
  37. ///
  38. /// - note: `interrupted` events on inner producers will be treated like
  39. /// `Completed events on inner producers.
  40. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
  41. switch strategy {
  42. case .merge:
  43. return self.merge()
  44. case .concat:
  45. return self.concat()
  46. case .latest:
  47. return self.switchToLatest()
  48. }
  49. }
  50. }
  51. extension SignalProtocol where Value: SignalProducerProtocol, Error == NoError {
  52. /// Flattens the inner producers sent upon `signal` (into a single signal of
  53. /// values), according to the semantics of the given strategy.
  54. ///
  55. /// - note: If an active inner producer fails, the returned signal will
  56. /// forward that failure immediately.
  57. ///
  58. /// - warning: `interrupted` events on inner producers will be treated like
  59. /// `completed` events on inner producers.
  60. ///
  61. /// - parameters:
  62. /// - strategy: Strategy used when flattening signals.
  63. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
  64. return self
  65. .promoteErrors(Value.Error.self)
  66. .flatten(strategy)
  67. }
  68. }
  69. extension SignalProtocol where Value: SignalProducerProtocol, Error == NoError, Value.Error == NoError {
  70. /// Flattens the inner producers sent upon `signal` (into a single signal of
  71. /// values), according to the semantics of the given strategy.
  72. ///
  73. /// - warning: `interrupted` events on inner producers will be treated like
  74. /// `completed` events on inner producers.
  75. ///
  76. /// - parameters:
  77. /// - strategy: Strategy used when flattening signals.
  78. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
  79. switch strategy {
  80. case .merge:
  81. return self.merge()
  82. case .concat:
  83. return self.concat()
  84. case .latest:
  85. return self.switchToLatest()
  86. }
  87. }
  88. }
  89. extension SignalProtocol where Value: SignalProducerProtocol, Value.Error == NoError {
  90. /// Flattens the inner producers sent upon `signal` (into a single signal of
  91. /// values), according to the semantics of the given strategy.
  92. ///
  93. /// - note: If `signal` fails, the returned signal will forward that failure
  94. /// immediately.
  95. ///
  96. /// - warning: `interrupted` events on inner producers will be treated like
  97. /// `completed` events on inner producers.
  98. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
  99. return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
  100. }
  101. }
  102. extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
  103. /// Flattens the inner producers sent upon `producer` (into a single
  104. /// producer of values), according to the semantics of the given strategy.
  105. ///
  106. /// - note: If `producer` or an active inner producer fails, the returned
  107. /// producer will forward that failure immediately.
  108. ///
  109. /// - warning: `interrupted` events on inner producers will be treated like
  110. /// `completed` events on inner producers.
  111. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
  112. switch strategy {
  113. case .merge:
  114. return self.merge()
  115. case .concat:
  116. return self.concat()
  117. case .latest:
  118. return self.switchToLatest()
  119. }
  120. }
  121. }
  122. extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == NoError {
  123. /// Flattens the inner producers sent upon `producer` (into a single
  124. /// producer of values), according to the semantics of the given strategy.
  125. ///
  126. /// - note: If an active inner producer fails, the returned producer will
  127. /// forward that failure immediately.
  128. ///
  129. /// - warning: `interrupted` events on inner producers will be treated like
  130. /// `completed` events on inner producers.
  131. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
  132. return self
  133. .promoteErrors(Value.Error.self)
  134. .flatten(strategy)
  135. }
  136. }
  137. extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == NoError, Value.Error == NoError {
  138. /// Flattens the inner producers sent upon `producer` (into a single
  139. /// producer of values), according to the semantics of the given strategy.
  140. ///
  141. /// - warning: `interrupted` events on inner producers will be treated like
  142. /// `completed` events on inner producers.
  143. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
  144. switch strategy {
  145. case .merge:
  146. return self.merge()
  147. case .concat:
  148. return self.concat()
  149. case .latest:
  150. return self.switchToLatest()
  151. }
  152. }
  153. }
  154. extension SignalProducerProtocol where Value: SignalProducerProtocol, Value.Error == NoError {
  155. /// Flattens the inner producers sent upon `signal` (into a single signal of
  156. /// values), according to the semantics of the given strategy.
  157. ///
  158. /// - note: If `signal` fails, the returned signal will forward that failure
  159. /// immediately.
  160. ///
  161. /// - warning: `interrupted` events on inner producers will be treated like
  162. /// `completed` events on inner producers.
  163. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
  164. return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
  165. }
  166. }
  167. extension SignalProtocol where Value: SignalProtocol, Error == Value.Error {
  168. /// Flattens the inner signals sent upon `signal` (into a single signal of
  169. /// values), according to the semantics of the given strategy.
  170. ///
  171. /// - note: If `signal` or an active inner signal emits an error, the
  172. /// returned signal will forward that error immediately.
  173. ///
  174. /// - warning: `interrupted` events on inner signals will be treated like
  175. /// `completed` events on inner signals.
  176. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
  177. return self
  178. .map(SignalProducer.init)
  179. .flatten(strategy)
  180. }
  181. }
  182. extension SignalProtocol where Value: SignalProtocol, Error == NoError {
  183. /// Flattens the inner signals sent upon `signal` (into a single signal of
  184. /// values), according to the semantics of the given strategy.
  185. ///
  186. /// - note: If an active inner signal emits an error, the returned signal
  187. /// will forward that error immediately.
  188. ///
  189. /// - warning: `interrupted` events on inner signals will be treated like
  190. /// `completed` events on inner signals.
  191. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
  192. return self
  193. .promoteErrors(Value.Error.self)
  194. .flatten(strategy)
  195. }
  196. }
  197. extension SignalProtocol where Value: SignalProtocol, Error == NoError, Value.Error == NoError {
  198. /// Flattens the inner signals sent upon `signal` (into a single signal of
  199. /// values), according to the semantics of the given strategy.
  200. ///
  201. /// - warning: `interrupted` events on inner signals will be treated like
  202. /// `completed` events on inner signals.
  203. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
  204. return self
  205. .map(SignalProducer.init)
  206. .flatten(strategy)
  207. }
  208. }
  209. extension SignalProtocol where Value: SignalProtocol, Value.Error == NoError {
  210. /// Flattens the inner signals sent upon `signal` (into a single signal of
  211. /// values), according to the semantics of the given strategy.
  212. ///
  213. /// - note: If `signal` emits an error, the returned signal will forward
  214. /// that error immediately.
  215. ///
  216. /// - warning: `interrupted` events on inner signals will be treated like
  217. /// `completed` events on inner signals.
  218. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
  219. return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
  220. }
  221. }
  222. extension SignalProtocol where Value: Sequence, Error == NoError {
  223. /// Flattens the `sequence` value sent by `signal` according to
  224. /// the semantics of the given strategy.
  225. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Iterator.Element, Error> {
  226. return self.flatMap(strategy) { .init($0) }
  227. }
  228. }
  229. extension SignalProducerProtocol where Value: SignalProtocol, Error == Value.Error {
  230. /// Flattens the inner signals sent upon `producer` (into a single producer
  231. /// of values), according to the semantics of the given strategy.
  232. ///
  233. /// - note: If `producer` or an active inner signal emits an error, the
  234. /// returned producer will forward that error immediately.
  235. ///
  236. /// - warning: `interrupted` events on inner signals will be treated like
  237. /// `completed` events on inner signals.
  238. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
  239. return self
  240. .map(SignalProducer.init)
  241. .flatten(strategy)
  242. }
  243. }
  244. extension SignalProducerProtocol where Value: SignalProtocol, Error == NoError {
  245. /// Flattens the inner signals sent upon `producer` (into a single producer
  246. /// of values), according to the semantics of the given strategy.
  247. ///
  248. /// - note: If an active inner signal emits an error, the returned producer
  249. /// will forward that error immediately.
  250. ///
  251. /// - warning: `interrupted` events on inner signals will be treated like
  252. /// `completed` events on inner signals.
  253. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
  254. return self
  255. .promoteErrors(Value.Error.self)
  256. .flatten(strategy)
  257. }
  258. }
  259. extension SignalProducerProtocol where Value: SignalProtocol, Error == NoError, Value.Error == NoError {
  260. /// Flattens the inner signals sent upon `producer` (into a single producer
  261. /// of values), according to the semantics of the given strategy.
  262. ///
  263. /// - warning: `interrupted` events on inner signals will be treated like
  264. /// `completed` events on inner signals.
  265. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
  266. return self
  267. .map(SignalProducer.init)
  268. .flatten(strategy)
  269. }
  270. }
  271. extension SignalProducerProtocol where Value: SignalProtocol, Value.Error == NoError {
  272. /// Flattens the inner signals sent upon `producer` (into a single producer
  273. /// of values), according to the semantics of the given strategy.
  274. ///
  275. /// - note: If `producer` emits an error, the returned producer will forward
  276. /// that error immediately.
  277. ///
  278. /// - warning: `interrupted` events on inner signals will be treated like
  279. /// `completed` events on inner signals.
  280. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
  281. return self.flatMap(strategy) { $0.promoteErrors(Error.self) }
  282. }
  283. }
  284. extension SignalProducerProtocol where Value: Sequence, Error == NoError {
  285. /// Flattens the `sequence` value sent by `producer` according to
  286. /// the semantics of the given strategy.
  287. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Iterator.Element, Error> {
  288. return self.flatMap(strategy) { .init($0) }
  289. }
  290. }
  291. extension SignalProtocol where Value: PropertyProtocol {
  292. /// Flattens the inner properties sent upon `signal` (into a single signal of
  293. /// values), according to the semantics of the given strategy.
  294. ///
  295. /// - note: If `signal` fails, the returned signal will forward that failure
  296. /// immediately.
  297. public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
  298. return self.flatMap(strategy) { $0.producer }
  299. }
  300. }
  301. extension SignalProducerProtocol where Value: PropertyProtocol {
  302. /// Flattens the inner properties sent upon `signal` (into a single signal of
  303. /// values), according to the semantics of the given strategy.
  304. ///
  305. /// - note: If `signal` fails, the returned signal will forward that failure
  306. /// immediately.
  307. public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
  308. return self.flatMap(strategy) { $0.producer }
  309. }
  310. }
  311. extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
  312. /// Returns a signal which sends all the values from producer signal emitted
  313. /// from `signal`, waiting until each inner producer completes before
  314. /// beginning to send the values from the next inner producer.
  315. ///
  316. /// - note: If any of the inner producers fail, the returned signal will
  317. /// forward that failure immediately
  318. ///
  319. /// - note: The returned signal completes only when `signal` and all
  320. /// producers emitted from `signal` complete.
  321. fileprivate func concat() -> Signal<Value.Value, Error> {
  322. return Signal<Value.Value, Error> { relayObserver in
  323. let disposable = CompositeDisposable()
  324. let relayDisposable = CompositeDisposable()
  325. disposable += relayDisposable
  326. disposable += self.observeConcat(relayObserver, relayDisposable)
  327. return disposable
  328. }
  329. }
  330. fileprivate func observeConcat(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable? = nil) -> Disposable? {
  331. let state = Atomic(ConcatState<Value.Value, Error>())
  332. func startNextIfNeeded() {
  333. while let producer = state.modify({ $0.dequeue() }) {
  334. producer.startWithSignal { signal, inner in
  335. let handle = disposable?.add(inner)
  336. signal.observe { event in
  337. switch event {
  338. case .completed, .interrupted:
  339. handle?.remove()
  340. let shouldStart: Bool = state.modify {
  341. $0.active = nil
  342. return !$0.isStarting
  343. }
  344. if shouldStart {
  345. startNextIfNeeded()
  346. }
  347. case .value, .failed:
  348. observer.action(event)
  349. }
  350. }
  351. }
  352. state.modify { $0.isStarting = false }
  353. }
  354. }
  355. return observe { event in
  356. switch event {
  357. case let .value(value):
  358. state.modify { $0.queue.append(value.producer) }
  359. startNextIfNeeded()
  360. case let .failed(error):
  361. observer.send(error: error)
  362. case .completed:
  363. state.modify { state in
  364. state.queue.append(SignalProducer.empty.on(completed: observer.sendCompleted))
  365. }
  366. startNextIfNeeded()
  367. case .interrupted:
  368. observer.sendInterrupted()
  369. }
  370. }
  371. }
  372. }
  373. extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
  374. /// Returns a producer which sends all the values from each producer emitted
  375. /// from `producer`, waiting until each inner producer completes before
  376. /// beginning to send the values from the next inner producer.
  377. ///
  378. /// - note: If any of the inner producers emit an error, the returned
  379. /// producer will emit that error.
  380. ///
  381. /// - note: The returned producer completes only when `producer` and all
  382. /// producers emitted from `producer` complete.
  383. fileprivate func concat() -> SignalProducer<Value.Value, Error> {
  384. return SignalProducer<Value.Value, Error> { observer, disposable in
  385. self.startWithSignal { signal, signalDisposable in
  386. disposable += signalDisposable
  387. _ = signal.observeConcat(observer, disposable)
  388. }
  389. }
  390. }
  391. }
  392. extension SignalProducerProtocol {
  393. /// `concat`s `next` onto `self`.
  394. public func concat(_ next: SignalProducer<Value, Error>) -> SignalProducer<Value, Error> {
  395. return SignalProducer<SignalProducer<Value, Error>, Error>([ self.producer, next ]).flatten(.concat)
  396. }
  397. /// `concat`s `value` onto `self`.
  398. public func concat(value: Value) -> SignalProducer<Value, Error> {
  399. return self.concat(SignalProducer(value: value))
  400. }
  401. /// `concat`s `self` onto initial `previous`.
  402. public func prefix<P: SignalProducerProtocol>(_ previous: P) -> SignalProducer<Value, Error>
  403. where P.Value == Value, P.Error == Error
  404. {
  405. return previous.concat(self.producer)
  406. }
  407. /// `concat`s `self` onto initial `value`.
  408. public func prefix(value: Value) -> SignalProducer<Value, Error> {
  409. return self.prefix(SignalProducer(value: value))
  410. }
  411. }
  412. private final class ConcatState<Value, Error: Swift.Error> {
  413. typealias SignalProducer = ReactiveSwift.SignalProducer<Value, Error>
  414. /// The active producer, if any.
  415. var active: SignalProducer? = nil
  416. /// The producers waiting to be started.
  417. var queue: [SignalProducer] = []
  418. /// Whether the active producer is currently starting.
  419. /// Used to prevent deep recursion.
  420. var isStarting: Bool = false
  421. /// Dequeue the next producer if one should be started.
  422. ///
  423. /// - note: The caller *must* set `isStarting` to false after the returned
  424. /// producer has been started.
  425. ///
  426. /// - returns: The `SignalProducer` to start or `nil` if no producer should
  427. /// be started.
  428. func dequeue() -> SignalProducer? {
  429. if active != nil {
  430. return nil
  431. }
  432. active = queue.first
  433. if active != nil {
  434. queue.removeFirst()
  435. isStarting = true
  436. }
  437. return active
  438. }
  439. }
  440. extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
  441. /// Merges a `signal` of SignalProducers down into a single signal, biased
  442. /// toward the producer added earlier. Returns a Signal that will forward
  443. /// events from the inner producers as they arrive.
  444. fileprivate func merge() -> Signal<Value.Value, Error> {
  445. return Signal<Value.Value, Error> { relayObserver in
  446. let disposable = CompositeDisposable()
  447. let relayDisposable = CompositeDisposable()
  448. disposable += relayDisposable
  449. disposable += self.observeMerge(relayObserver, relayDisposable)
  450. return disposable
  451. }
  452. }
  453. fileprivate func observeMerge(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable) -> Disposable? {
  454. let inFlight = Atomic(1)
  455. let decrementInFlight = {
  456. let shouldComplete: Bool = inFlight.modify {
  457. $0 -= 1
  458. return $0 == 0
  459. }
  460. if shouldComplete {
  461. observer.sendCompleted()
  462. }
  463. }
  464. return self.observe { event in
  465. switch event {
  466. case let .value(producer):
  467. producer.startWithSignal { innerSignal, innerDisposable in
  468. inFlight.modify { $0 += 1 }
  469. let handle = disposable.add(innerDisposable)
  470. innerSignal.observe { event in
  471. switch event {
  472. case .completed, .interrupted:
  473. handle.remove()
  474. decrementInFlight()
  475. case .value, .failed:
  476. observer.action(event)
  477. }
  478. }
  479. }
  480. case let .failed(error):
  481. observer.send(error: error)
  482. case .completed:
  483. decrementInFlight()
  484. case .interrupted:
  485. observer.sendInterrupted()
  486. }
  487. }
  488. }
  489. }
  490. extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
  491. /// Merges a `signal` of SignalProducers down into a single signal, biased
  492. /// toward the producer added earlier. Returns a Signal that will forward
  493. /// events from the inner producers as they arrive.
  494. fileprivate func merge() -> SignalProducer<Value.Value, Error> {
  495. return SignalProducer<Value.Value, Error> { relayObserver, disposable in
  496. self.startWithSignal { signal, signalDisposable in
  497. disposable += signalDisposable
  498. _ = signal.observeMerge(relayObserver, disposable)
  499. }
  500. }
  501. }
  502. }
  503. extension SignalProtocol {
  504. /// Merges the given signals into a single `Signal` that will emit all
  505. /// values from each of them, and complete when all of them have completed.
  506. public static func merge<Seq: Sequence, S: SignalProtocol>(_ signals: Seq) -> Signal<Value, Error>
  507. where S.Value == Value, S.Error == Error, Seq.Iterator.Element == S
  508. {
  509. return SignalProducer<S, Error>(signals)
  510. .flatten(.merge)
  511. .startAndRetrieveSignal()
  512. }
  513. /// Merges the given signals into a single `Signal` that will emit all
  514. /// values from each of them, and complete when all of them have completed.
  515. public static func merge<S: SignalProtocol>(_ signals: S...) -> Signal<Value, Error>
  516. where S.Value == Value, S.Error == Error
  517. {
  518. return Signal.merge(signals)
  519. }
  520. }
  521. extension SignalProducerProtocol {
  522. /// Merges the given producers into a single `SignalProducer` that will emit
  523. /// all values from each of them, and complete when all of them have
  524. /// completed.
  525. public static func merge<Seq: Sequence, S: SignalProducerProtocol>(_ producers: Seq) -> SignalProducer<Value, Error>
  526. where S.Value == Value, S.Error == Error, Seq.Iterator.Element == S
  527. {
  528. return SignalProducer(producers).flatten(.merge)
  529. }
  530. /// Merges the given producers into a single `SignalProducer` that will emit
  531. /// all values from each of them, and complete when all of them have
  532. /// completed.
  533. public static func merge<S: SignalProducerProtocol>(_ producers: S...) -> SignalProducer<Value, Error>
  534. where S.Value == Value, S.Error == Error
  535. {
  536. return SignalProducer.merge(producers)
  537. }
  538. }
  539. extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
  540. /// Returns a signal that forwards values from the latest signal sent on
  541. /// `signal`, ignoring values sent on previous inner signal.
  542. ///
  543. /// An error sent on `signal` or the latest inner signal will be sent on the
  544. /// returned signal.
  545. ///
  546. /// The returned signal completes when `signal` and the latest inner
  547. /// signal have both completed.
  548. fileprivate func switchToLatest() -> Signal<Value.Value, Error> {
  549. return Signal<Value.Value, Error> { observer in
  550. let composite = CompositeDisposable()
  551. let serial = SerialDisposable()
  552. composite += serial
  553. composite += self.observeSwitchToLatest(observer, serial)
  554. return composite
  555. }
  556. }
  557. fileprivate func observeSwitchToLatest(_ observer: Observer<Value.Value, Error>, _ latestInnerDisposable: SerialDisposable) -> Disposable? {
  558. let state = Atomic(LatestState<Value, Error>())
  559. return self.observe { event in
  560. switch event {
  561. case let .value(innerProducer):
  562. innerProducer.startWithSignal { innerSignal, innerDisposable in
  563. state.modify {
  564. // When we replace the disposable below, this prevents
  565. // the generated Interrupted event from doing any work.
  566. $0.replacingInnerSignal = true
  567. }
  568. latestInnerDisposable.inner = innerDisposable
  569. state.modify {
  570. $0.replacingInnerSignal = false
  571. $0.innerSignalComplete = false
  572. }
  573. innerSignal.observe { event in
  574. switch event {
  575. case .interrupted:
  576. // If interruption occurred as a result of a new
  577. // producer arriving, we don't want to notify our
  578. // observer.
  579. let shouldComplete: Bool = state.modify { state in
  580. if !state.replacingInnerSignal {
  581. state.innerSignalComplete = true
  582. }
  583. return !state.replacingInnerSignal && state.outerSignalComplete
  584. }
  585. if shouldComplete {
  586. observer.sendCompleted()
  587. }
  588. case .completed:
  589. let shouldComplete: Bool = state.modify {
  590. $0.innerSignalComplete = true
  591. return $0.outerSignalComplete
  592. }
  593. if shouldComplete {
  594. observer.sendCompleted()
  595. }
  596. case .value, .failed:
  597. observer.action(event)
  598. }
  599. }
  600. }
  601. case let .failed(error):
  602. observer.send(error: error)
  603. case .completed:
  604. let shouldComplete: Bool = state.modify {
  605. $0.outerSignalComplete = true
  606. return $0.innerSignalComplete
  607. }
  608. if shouldComplete {
  609. observer.sendCompleted()
  610. }
  611. case .interrupted:
  612. observer.sendInterrupted()
  613. }
  614. }
  615. }
  616. }
  617. extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
  618. /// Returns a signal that forwards values from the latest signal sent on
  619. /// `signal`, ignoring values sent on previous inner signal.
  620. ///
  621. /// An error sent on `signal` or the latest inner signal will be sent on the
  622. /// returned signal.
  623. ///
  624. /// The returned signal completes when `signal` and the latest inner
  625. /// signal have both completed.
  626. fileprivate func switchToLatest() -> SignalProducer<Value.Value, Error> {
  627. return SignalProducer<Value.Value, Error> { observer, disposable in
  628. let latestInnerDisposable = SerialDisposable()
  629. disposable += latestInnerDisposable
  630. self.startWithSignal { signal, signalDisposable in
  631. disposable += signalDisposable
  632. disposable += signal.observeSwitchToLatest(observer, latestInnerDisposable)
  633. }
  634. }
  635. }
  636. }
  637. private struct LatestState<Value, Error: Swift.Error> {
  638. var outerSignalComplete: Bool = false
  639. var innerSignalComplete: Bool = true
  640. var replacingInnerSignal: Bool = false
  641. }
  642. extension SignalProtocol {
  643. /// Maps each event from `signal` to a new signal, then flattens the
  644. /// resulting producers (into a signal of values), according to the
  645. /// semantics of the given strategy.
  646. ///
  647. /// If `signal` or any of the created producers fail, the returned signal
  648. /// will forward that failure immediately.
  649. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, Error>) -> Signal<U, Error> {
  650. return map(transform).flatten(strategy)
  651. }
  652. /// Maps each event from `signal` to a new signal, then flattens the
  653. /// resulting producers (into a signal of values), according to the
  654. /// semantics of the given strategy.
  655. ///
  656. /// If `signal` fails, the returned signal will forward that failure
  657. /// immediately.
  658. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, NoError>) -> Signal<U, Error> {
  659. return map(transform).flatten(strategy)
  660. }
  661. /// Maps each event from `signal` to a new signal, then flattens the
  662. /// resulting signals (into a signal of values), according to the
  663. /// semantics of the given strategy.
  664. ///
  665. /// If `signal` or any of the created signals emit an error, the returned
  666. /// signal will forward that error immediately.
  667. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, Error>) -> Signal<U, Error> {
  668. return map(transform).flatten(strategy)
  669. }
  670. /// Maps each event from `signal` to a new signal, then flattens the
  671. /// resulting signals (into a signal of values), according to the
  672. /// semantics of the given strategy.
  673. ///
  674. /// If `signal` emits an error, the returned signal will forward that
  675. /// error immediately.
  676. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, NoError>) -> Signal<U, Error> {
  677. return map(transform).flatten(strategy)
  678. }
  679. /// Maps each event from `signal` to a new property, then flattens the
  680. /// resulting properties (into a signal of values), according to the
  681. /// semantics of the given strategy.
  682. ///
  683. /// If `signal` emits an error, the returned signal will forward that
  684. /// error immediately.
  685. public func flatMap<P: PropertyProtocol>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> P) -> Signal<P.Value, Error> {
  686. return map(transform).flatten(strategy)
  687. }
  688. }
  689. extension SignalProtocol where Error == NoError {
  690. /// Maps each event from `signal` to a new signal, then flattens the
  691. /// resulting signals (into a signal of values), according to the
  692. /// semantics of the given strategy.
  693. ///
  694. /// If any of the created signals emit an error, the returned signal
  695. /// will forward that error immediately.
  696. public func flatMap<U, E>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, E>) -> Signal<U, E> {
  697. return map(transform).flatten(strategy)
  698. }
  699. /// Maps each event from `signal` to a new signal, then flattens the
  700. /// resulting signals (into a signal of values), according to the
  701. /// semantics of the given strategy.
  702. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, NoError>) -> Signal<U, NoError> {
  703. return map(transform).flatten(strategy)
  704. }
  705. /// Maps each event from `signal` to a new signal, then flattens the
  706. /// resulting signals (into a signal of values), according to the
  707. /// semantics of the given strategy.
  708. ///
  709. /// If any of the created signals emit an error, the returned signal
  710. /// will forward that error immediately.
  711. public func flatMap<U, E>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, E>) -> Signal<U, E> {
  712. return map(transform).flatten(strategy)
  713. }
  714. /// Maps each event from `signal` to a new signal, then flattens the
  715. /// resulting signals (into a signal of values), according to the
  716. /// semantics of the given strategy.
  717. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, NoError>) -> Signal<U, NoError> {
  718. return map(transform).flatten(strategy)
  719. }
  720. }
  721. extension SignalProducerProtocol {
  722. /// Maps each event from `self` to a new producer, then flattens the
  723. /// resulting producers (into a producer of values), according to the
  724. /// semantics of the given strategy.
  725. ///
  726. /// If `self` or any of the created producers fail, the returned producer
  727. /// will forward that failure immediately.
  728. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, Error>) -> SignalProducer<U, Error> {
  729. return map(transform).flatten(strategy)
  730. }
  731. /// Maps each event from `self` to a new producer, then flattens the
  732. /// resulting producers (into a producer of values), according to the
  733. /// semantics of the given strategy.
  734. ///
  735. /// If `self` fails, the returned producer will forward that failure
  736. /// immediately.
  737. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, NoError>) -> SignalProducer<U, Error> {
  738. return map(transform).flatten(strategy)
  739. }
  740. /// Maps each event from `self` to a new producer, then flattens the
  741. /// resulting signals (into a producer of values), according to the
  742. /// semantics of the given strategy.
  743. ///
  744. /// If `self` or any of the created signals emit an error, the returned
  745. /// producer will forward that error immediately.
  746. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, Error>) -> SignalProducer<U, Error> {
  747. return map(transform).flatten(strategy)
  748. }
  749. /// Maps each event from `self` to a new producer, then flattens the
  750. /// resulting signals (into a producer of values), according to the
  751. /// semantics of the given strategy.
  752. ///
  753. /// If `self` emits an error, the returned producer will forward that
  754. /// error immediately.
  755. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, NoError>) -> SignalProducer<U, Error> {
  756. return map(transform).flatten(strategy)
  757. }
  758. /// Maps each event from `self` to a new property, then flattens the
  759. /// resulting properties (into a producer of values), according to the
  760. /// semantics of the given strategy.
  761. ///
  762. /// If `self` emits an error, the returned producer will forward that
  763. /// error immediately.
  764. public func flatMap<P: PropertyProtocol>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> P) -> SignalProducer<P.Value, Error> {
  765. return map(transform).flatten(strategy)
  766. }
  767. }
  768. extension SignalProducerProtocol where Error == NoError {
  769. /// Maps each event from `self` to a new producer, then flattens the
  770. /// resulting producers (into a producer of values), according to the
  771. /// semantics of the given strategy.
  772. ///
  773. /// If any of the created producers fail, the returned producer will
  774. /// forward that failure immediately.
  775. public func flatMap<U, E>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, E>) -> SignalProducer<U, E> {
  776. return map(transform).flatten(strategy)
  777. }
  778. /// Maps each event from `self` to a new producer, then flattens the
  779. /// resulting producers (into a producer of values), according to the
  780. /// semantics of the given strategy.
  781. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> SignalProducer<U, NoError>) -> SignalProducer<U, NoError> {
  782. return map(transform).flatten(strategy)
  783. }
  784. /// Maps each event from `self` to a new producer, then flattens the
  785. /// resulting signals (into a producer of values), according to the
  786. /// semantics of the given strategy.
  787. ///
  788. /// If any of the created signals emit an error, the returned
  789. /// producer will forward that error immediately.
  790. public func flatMap<U, E>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, E>) -> SignalProducer<U, E> {
  791. return map(transform).flatten(strategy)
  792. }
  793. /// Maps each event from `self` to a new producer, then flattens the
  794. /// resulting signals (into a producer of values), according to the
  795. /// semantics of the given strategy.
  796. public func flatMap<U>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> Signal<U, NoError>) -> SignalProducer<U, NoError> {
  797. return map(transform).flatten(strategy)
  798. }
  799. }
  800. extension SignalProtocol {
  801. /// Catches any failure that may occur on the input signal, mapping to a new
  802. /// producer that starts in its place.
  803. public func flatMapError<F>(_ handler: @escaping (Error) -> SignalProducer<Value, F>) -> Signal<Value, F> {
  804. return Signal { observer in
  805. self.observeFlatMapError(handler, observer, SerialDisposable())
  806. }
  807. }
  808. fileprivate func observeFlatMapError<F>(_ handler: @escaping (Error) -> SignalProducer<Value, F>, _ observer: Observer<Value, F>, _ serialDisposable: SerialDisposable) -> Disposable? {
  809. return self.observe { event in
  810. switch event {
  811. case let .value(value):
  812. observer.send(value: value)
  813. case let .failed(error):
  814. handler(error).startWithSignal { signal, disposable in
  815. serialDisposable.inner = disposable
  816. signal.observe(observer)
  817. }
  818. case .completed:
  819. observer.sendCompleted()
  820. case .interrupted:
  821. observer.sendInterrupted()
  822. }
  823. }
  824. }
  825. }
  826. extension SignalProducerProtocol {
  827. /// Catches any failure that may occur on the input producer, mapping to a
  828. /// new producer that starts in its place.
  829. public func flatMapError<F>(_ handler: @escaping (Error) -> SignalProducer<Value, F>) -> SignalProducer<Value, F> {
  830. return SignalProducer { observer, disposable in
  831. let serialDisposable = SerialDisposable()
  832. disposable += serialDisposable
  833. self.startWithSignal { signal, signalDisposable in
  834. serialDisposable.inner = signalDisposable
  835. _ = signal.observeFlatMapError(handler, observer, serialDisposable)
  836. }
  837. }
  838. }
  839. }