Signal.swift 75 KB


  1. import Foundation
  2. import Result
  3. /// A push-driven stream that sends Events over time, parameterized by the type
  4. /// of values being sent (`Value`) and the type of failure that can occur
  5. /// (`Error`). If no failures should be possible, NoError can be specified for
  6. /// `Error`.
  7. ///
  8. /// An observer of a Signal will see the exact same sequence of events as all
  9. /// other observers. In other words, events will be sent to all observers at the
  10. /// same time.
  11. ///
  12. /// Signals are generally used to represent event streams that are already “in
  13. /// progress,” like notifications, user input, etc. To represent streams that
  14. /// must first be _started_, see the SignalProducer type.
  15. ///
  16. /// A Signal is kept alive until either of the following happens:
  17. /// 1. its input observer receives a terminating event; or
  18. /// 2. it has no active observers, and is not being retained.
  19. public final class Signal<Value, Error: Swift.Error> {
  20. public typealias Observer = ReactiveSwift.Observer<Value, Error>
  21. /// The disposable returned by the signal generator. It would be disposed of
  22. /// when the signal terminates.
  23. private var generatorDisposable: Disposable?
  24. /// The state of the signal.
  25. ///
  26. /// `state` synchronizes using Read-Copy-Update. Reads on the event delivery
  27. /// routine are thus wait-free. But modifications, e.g. inserting observers,
  28. /// still have to be serialized, and are required not to mutate in place.
  29. ///
  30. /// This suits `Signal` as reads to `status` happens on the critical path of
  31. /// event delivery, while observers bag manipulation or termination generally
  32. /// has a constant occurrence.
  33. ///
  34. /// As `SignalState` is a packed object reference (a tagged pointer) that is
  35. /// naturally aligned, reads to are guaranteed to be atomic on all supported
  36. /// hardware architectures of Swift (ARM and x86).
  37. private var state: SignalState<Value, Error>
  38. /// Used to ensure that state updates are serialized.
  39. private let updateLock: NSLock
  40. /// Used to ensure that events are serialized during delivery to observers.
  41. private let sendLock: NSLock
  42. /// Initialize a Signal that will immediately invoke the given generator,
  43. /// then forward events sent to the given observer.
  44. ///
  45. /// - note: The disposable returned from the closure will be automatically
  46. /// disposed if a terminating event is sent to the observer. The
  47. /// Signal itself will remain alive until the observer is released.
  48. ///
  49. /// - parameters:
  50. /// - generator: A closure that accepts an implicitly created observer
  51. /// that will act as an event emitter for the signal.
  52. public init(_ generator: (Observer) -> Disposable?) {
  53. state = .alive(AliveState())
  54. updateLock = NSLock()
  55. updateLock.name = "org.reactivecocoa.ReactiveSwift.Signal.updateLock"
  56. sendLock = NSLock()
  57. sendLock.name = "org.reactivecocoa.ReactiveSwift.Signal.sendLock"
  58. let observer = Observer { [weak self] event in
  59. guard let signal = self else {
  60. return
  61. }
  62. // Thread Safety Notes on `Signal.state`.
  63. //
  64. // - Check if the signal is at a specific state.
  65. //
  66. // Read directly.
  67. //
  68. // - Deliver `value` events with the alive state.
  69. //
  70. // `sendLock` must be acquired.
  71. //
  72. // - Replace the alive state with another.
  73. // (e.g. observers bag manipulation)
  74. //
  75. // `updateLock` must be acquired.
  76. //
  77. // - Transition from `alive` to `terminating` as a result of receiving
  78. // a termination event.
  79. //
  80. // `updateLock` must be acquired, and should fail gracefully if the
  81. // signal has terminated.
  82. //
  83. // - Check if the signal is terminating. If it is, invoke `tryTerminate`
  84. // which transitions the state from `terminating` to `terminated`, and
  85. // delivers the termination event.
  86. //
  87. // Both `sendLock` and `updateLock` must be acquired. The check can be
  88. // relaxed, but the state must be checked again after the locks are
  89. // acquired. Fail gracefully if the state has changed since the relaxed
  90. // read, i.e. a concurrent sender has already handled the termination
  91. // event.
  92. //
  93. // Exploiting the relaxation of reads, please note that false positives
  94. // are intentionally allowed in the `terminating` checks below. As a
  95. // result, normal event deliveries need not acquire `updateLock`.
  96. // Nevertheless, this should not cause the termination event being
  97. // sent multiple times, since `tryTerminate` would not respond to false
  98. // positives.
  99. /// Try to terminate the signal.
  100. ///
  101. /// If the signal is alive or has terminated, it fails gracefully. In
  102. /// other words, calling this method as a result of a false positive
  103. /// `terminating` check is permitted.
  104. ///
  105. /// - note: The `updateLock` would be acquired.
  106. ///
  107. /// - returns: `true` if the attempt succeeds. `false` otherwise.
  108. @inline(__always)
  109. func tryTerminate() -> Bool {
  110. // Acquire `updateLock`. If the termination has still not yet been
  111. // handled, take it over and bump the status to `terminated`.
  112. signal.updateLock.lock()
  113. if case let .terminating(state) = signal.state {
  114. signal.state = .terminated
  115. signal.updateLock.unlock()
  116. for observer in state.observers {
  117. observer.action(state.event)
  118. }
  119. return true
  120. }
  121. signal.updateLock.unlock()
  122. return false
  123. }
  124. if event.isTerminating {
  125. // Recursive events are disallowed for `value` events, but are permitted
  126. // for termination events. Specifically:
  127. //
  128. // - `interrupted`
  129. // It can inadvertently be sent by downstream consumers as part of the
  130. // `SignalProducer` mechanics.
  131. //
  132. // - `completed`
  133. // If a downstream consumer weakly references an object, invocation of
  134. // such consumer may cause a race condition with its weak retain against
  135. // the last strong release of the object. If the `Lifetime` of the
  136. // object is being referenced by an upstream `take(during:)`, a
  137. // signal recursion might occur.
  138. //
  139. // So we would treat termination events specially. If it happens to
  140. // occur while the `sendLock` is acquired, the observer call-out and
  141. // the disposal would be delegated to the current sender, or
  142. // occasionally one of the senders waiting on `sendLock`.
  143. signal.updateLock.lock()
  144. if case let .alive(state) = signal.state {
  145. let newSnapshot = TerminatingState(observers: state.observers,
  146. event: event)
  147. signal.state = .terminating(newSnapshot)
  148. signal.updateLock.unlock()
  149. if signal.sendLock.try() {
  150. // Check whether the terminating state has been handled by a
  151. // concurrent sender. If not, handle it.
  152. let shouldDispose = tryTerminate()
  153. signal.sendLock.unlock()
  154. if shouldDispose {
  155. signal.swapDisposable()?.dispose()
  156. }
  157. }
  158. } else {
  159. signal.updateLock.unlock()
  160. }
  161. } else {
  162. var shouldDispose = false
  163. // The `terminating` status check is performed twice for two different
  164. // purposes:
  165. //
  166. // 1. Within the main protected section
  167. // It guarantees that a recursive termination event sent by a
  168. // downstream consumer, is immediately processed and need not compete
  169. // with concurrent pending senders (if any).
  170. //
  171. // Termination events sent concurrently may also be caught here, but
  172. // not necessarily all of them due to data races.
  173. //
  174. // 2. After the main protected section
  175. // It ensures the termination event sent concurrently that are not
  176. // caught by (1) due to data races would still be processed.
  177. //
  178. // The related PR on the race conditions:
  179. // https://github.com/ReactiveCocoa/ReactiveSwift/pull/112
  180. signal.sendLock.lock()
  181. // Start of the main protected section.
  182. if case let .alive(state) = signal.state {
  183. for observer in state.observers {
  184. observer.action(event)
  185. }
  186. // Check if the status has been bumped to `terminating` due to a
  187. // concurrent or a recursive termination event.
  188. if case .terminating = signal.state {
  189. shouldDispose = tryTerminate()
  190. }
  191. }
  192. // End of the main protected section.
  193. signal.sendLock.unlock()
  194. // Check if the status has been bumped to `terminating` due to a
  195. // concurrent termination event that has not been caught in the main
  196. // protected section.
  197. if !shouldDispose, case .terminating = signal.state {
  198. signal.sendLock.lock()
  199. shouldDispose = tryTerminate()
  200. signal.sendLock.unlock()
  201. }
  202. if shouldDispose {
  203. // Dispose only after notifying observers, so disposal
  204. // logic is consistently the last thing to run.
  205. signal.swapDisposable()?.dispose()
  206. }
  207. }
  208. }
  209. generatorDisposable = generator(observer)
  210. }
  211. /// Swap the generator disposable with `nil`.
  212. ///
  213. /// - returns:
  214. /// The generator disposable, or `nil` if it has been disposed of.
  215. private func swapDisposable() -> Disposable? {
  216. if let d = generatorDisposable {
  217. generatorDisposable = nil
  218. return d
  219. }
  220. return nil
  221. }
  222. deinit {
  223. // A signal can deinitialize only when it is not retained and has no
  224. // active observers. So `state` need not be swapped.
  225. swapDisposable()?.dispose()
  226. }
  227. /// A Signal that never sends any events to its observers.
  228. public static var never: Signal {
  229. return self.init { _ in nil }
  230. }
  231. /// A Signal that completes immediately without emitting any value.
  232. public static var empty: Signal {
  233. return self.init { observer in
  234. observer.sendCompleted()
  235. return nil
  236. }
  237. }
  238. /// Create a `Signal` that will be controlled by sending events to an
  239. /// input observer.
  240. ///
  241. /// - note: The `Signal` will remain alive until a terminating event is sent
  242. /// to the input observer, or until it has no observers and there
  243. /// are no strong references to it.
  244. ///
  245. /// - parameters:
  246. /// - disposable: An optional disposable to associate with the signal, and
  247. /// to be disposed of when the signal terminates.
  248. ///
  249. /// - returns: A tuple of `output: Signal`, the output end of the pipe,
  250. /// and `input: Observer`, the input end of the pipe.
  251. public static func pipe(disposable: Disposable? = nil) -> (output: Signal, input: Observer) {
  252. var observer: Observer!
  253. let signal = self.init { innerObserver in
  254. observer = innerObserver
  255. return disposable
  256. }
  257. return (signal, observer)
  258. }
  259. /// Observe the Signal by sending any future events to the given observer.
  260. ///
  261. /// - note: If the Signal has already terminated, the observer will
  262. /// immediately receive an `interrupted` event.
  263. ///
  264. /// - parameters:
  265. /// - observer: An observer to forward the events to.
  266. ///
  267. /// - returns: A `Disposable` which can be used to disconnect the observer,
  268. /// or `nil` if the signal has already terminated.
  269. @discardableResult
  270. public func observe(_ observer: Observer) -> Disposable? {
  271. var token: RemovalToken?
  272. updateLock.lock()
  273. if case let .alive(snapshot) = state {
  274. var observers = snapshot.observers
  275. token = observers.insert(observer)
  276. state = .alive(AliveState(observers: observers, retaining: self))
  277. }
  278. updateLock.unlock()
  279. if let token = token {
  280. return ActionDisposable { [weak self] in
  281. if let s = self {
  282. s.updateLock.lock()
  283. if case let .alive(snapshot) = s.state {
  284. var observers = snapshot.observers
  285. observers.remove(using: token)
  286. s.state = .alive(AliveState(observers: observers,
  287. retaining: observers.isEmpty ? nil : self))
  288. }
  289. s.updateLock.unlock()
  290. }
  291. }
  292. } else {
  293. observer.sendInterrupted()
  294. return nil
  295. }
  296. }
  297. }
  298. /// The state of a `Signal`.
  299. ///
  300. /// `SignalState` is guaranteed to be laid out as a tagged pointer by the Swift
  301. /// compiler in the support targets of the Swift 3.0.1 ABI.
  302. ///
  303. /// The Swift compiler has also an optimization for enums with payloads that are
  304. /// all reference counted, and at most one no-payload case.
  305. private enum SignalState<Value, Error: Swift.Error> {
  306. /// The `Signal` is alive.
  307. case alive(AliveState<Value, Error>)
  308. /// The `Signal` has received a termination event, and is about to be
  309. /// terminated.
  310. case terminating(TerminatingState<Value, Error>)
  311. /// The `Signal` has terminated.
  312. case terminated
  313. }
  314. // As the amount of state would definitely span over a cache line,
  315. // `AliveState` and `TerminatingState` is set to be a reference type so
  316. // that we can atomically update the reference instead.
  317. //
  318. // Note that in-place mutation should not be introduced to `AliveState` and
  319. // `TerminatingState`. Copy the states and create a new instance.
  320. /// The state of a `Signal` that is alive. It contains a bag of observers and
  321. /// an optional self-retaining reference.
  322. private final class AliveState<Value, Error: Swift.Error> {
  323. /// The observers of the `Signal`.
  324. fileprivate let observers: Bag<Signal<Value, Error>.Observer>
  325. /// A self-retaining reference. It is set when there are one or more active
  326. /// observers.
  327. fileprivate let retaining: Signal<Value, Error>?
  328. /// Create an alive state.
  329. ///
  330. /// - parameters:
  331. /// - observers: The latest bag of observers.
  332. /// - retaining: The self-retaining reference of the `Signal`, if necessary.
  333. init(observers: Bag<Signal<Value, Error>.Observer> = Bag(), retaining: Signal<Value, Error>? = nil) {
  334. self.observers = observers
  335. self.retaining = retaining
  336. }
  337. }
  338. /// The state of a terminating `Signal`. It contains a bag of observers and the
  339. /// termination event.
  340. private final class TerminatingState<Value, Error: Swift.Error> {
  341. /// The observers of the `Signal`.
  342. fileprivate let observers: Bag<Signal<Value, Error>.Observer>
  343. /// The termination event.
  344. fileprivate let event: Event<Value, Error>
  345. /// Create a terminating state.
  346. ///
  347. /// - parameters:
  348. /// - observers: The latest bag of observers.
  349. /// - event: The termination event.
  350. init(observers: Bag<Signal<Value, Error>.Observer>, event: Event<Value, Error>) {
  351. self.observers = observers
  352. self.event = event
  353. }
  354. }
  355. /// A protocol used to constraint `Signal` operators.
  356. public protocol SignalProtocol {
  357. /// The type of values being sent on the signal.
  358. associatedtype Value
  359. /// The type of error that can occur on the signal. If errors aren't
  360. /// possible then `NoError` can be used.
  361. associatedtype Error: Swift.Error
  362. /// Extracts a signal from the receiver.
  363. var signal: Signal<Value, Error> { get }
  364. /// Observes the Signal by sending any future events to the given observer.
  365. @discardableResult
  366. func observe(_ observer: Signal<Value, Error>.Observer) -> Disposable?
  367. }
  368. extension Signal: SignalProtocol {
  369. public var signal: Signal {
  370. return self
  371. }
  372. }
  373. extension SignalProtocol {
  374. /// Convenience override for observe(_:) to allow trailing-closure style
  375. /// invocations.
  376. ///
  377. /// - parameters:
  378. /// - action: A closure that will accept an event of the signal
  379. ///
  380. /// - returns: An optional `Disposable` which can be used to stop the
  381. /// invocation of the callback. Disposing of the Disposable will
  382. /// have no effect on the Signal itself.
  383. @discardableResult
  384. public func observe(_ action: @escaping Signal<Value, Error>.Observer.Action) -> Disposable? {
  385. return observe(Observer(action))
  386. }
  387. /// Observe the `Signal` by invoking the given callback when `value` or
  388. /// `failed` event are received.
  389. ///
  390. /// - parameters:
  391. /// - result: A closure that accepts instance of `Result<Value, Error>`
  392. /// enum that contains either a `.success(Value)` or
  393. /// `.failure<Error>` case.
  394. ///
  395. /// - returns: An optional `Disposable` which can be used to stop the
  396. /// invocation of the callback. Disposing of the Disposable will
  397. /// have no effect on the Signal itself.
  398. @discardableResult
  399. public func observeResult(_ result: @escaping (Result<Value, Error>) -> Void) -> Disposable? {
  400. return observe(
  401. Observer(
  402. value: { result(.success($0)) },
  403. failed: { result(.failure($0)) }
  404. )
  405. )
  406. }
  407. /// Observe the `Signal` by invoking the given callback when a `completed`
  408. /// event is received.
  409. ///
  410. /// - parameters:
  411. /// - completed: A closure that is called when `completed` event is
  412. /// received.
  413. ///
  414. /// - returns: An optional `Disposable` which can be used to stop the
  415. /// invocation of the callback. Disposing of the Disposable will
  416. /// have no effect on the Signal itself.
  417. @discardableResult
  418. public func observeCompleted(_ completed: @escaping () -> Void) -> Disposable? {
  419. return observe(Observer(completed: completed))
  420. }
  421. /// Observe the `Signal` by invoking the given callback when a `failed`
  422. /// event is received.
  423. ///
  424. /// - parameters:
  425. /// - error: A closure that is called when failed event is received. It
  426. /// accepts an error parameter.
  427. ///
  428. /// Returns a Disposable which can be used to stop the invocation of the
  429. /// callback. Disposing of the Disposable will have no effect on the Signal
  430. /// itself.
  431. @discardableResult
  432. public func observeFailed(_ error: @escaping (Error) -> Void) -> Disposable? {
  433. return observe(Observer(failed: error))
  434. }
  435. /// Observe the `Signal` by invoking the given callback when an
  436. /// `interrupted` event is received. If the Signal has already terminated,
  437. /// the callback will be invoked immediately.
  438. ///
  439. /// - parameters:
  440. /// - interrupted: A closure that is invoked when `interrupted` event is
  441. /// received
  442. ///
  443. /// - returns: An optional `Disposable` which can be used to stop the
  444. /// invocation of the callback. Disposing of the Disposable will
  445. /// have no effect on the Signal itself.
  446. @discardableResult
  447. public func observeInterrupted(_ interrupted: @escaping () -> Void) -> Disposable? {
  448. return observe(Observer(interrupted: interrupted))
  449. }
  450. }
  451. extension SignalProtocol where Error == NoError {
  452. /// Observe the Signal by invoking the given callback when `value` events are
  453. /// received.
  454. ///
  455. /// - parameters:
  456. /// - value: A closure that accepts a value when `value` event is received.
  457. ///
  458. /// - returns: An optional `Disposable` which can be used to stop the
  459. /// invocation of the callback. Disposing of the Disposable will
  460. /// have no effect on the Signal itself.
  461. @discardableResult
  462. public func observeValues(_ value: @escaping (Value) -> Void) -> Disposable? {
  463. return observe(Observer(value: value))
  464. }
  465. }
  466. extension SignalProtocol {
  467. /// Map each value in the signal to a new value.
  468. ///
  469. /// - parameters:
  470. /// - transform: A closure that accepts a value from the `value` event and
  471. /// returns a new value.
  472. ///
  473. /// - returns: A signal that will send new values.
  474. public func map<U>(_ transform: @escaping (Value) -> U) -> Signal<U, Error> {
  475. return Signal { observer in
  476. return self.observe { event in
  477. observer.action(event.map(transform))
  478. }
  479. }
  480. }
  481. /// Map errors in the signal to a new error.
  482. ///
  483. /// - parameters:
  484. /// - transform: A closure that accepts current error object and returns
  485. /// a new type of error object.
  486. ///
  487. /// - returns: A signal that will send new type of errors.
  488. public func mapError<F>(_ transform: @escaping (Error) -> F) -> Signal<Value, F> {
  489. return Signal { observer in
  490. return self.observe { event in
  491. observer.action(event.mapError(transform))
  492. }
  493. }
  494. }
  495. /// Preserve only the values of the signal that pass the given predicate.
  496. ///
  497. /// - parameters:
  498. /// - predicate: A closure that accepts value and returns `Bool` denoting
  499. /// whether value has passed the test.
  500. ///
  501. /// - returns: A signal that will send only the values passing the given
  502. /// predicate.
  503. public func filter(_ predicate: @escaping (Value) -> Bool) -> Signal<Value, Error> {
  504. return Signal { observer in
  505. return self.observe { (event: Event<Value, Error>) -> Void in
  506. guard let value = event.value else {
  507. observer.action(event)
  508. return
  509. }
  510. if predicate(value) {
  511. observer.send(value: value)
  512. }
  513. }
  514. }
  515. }
  516. }
  517. extension SignalProtocol where Value: OptionalProtocol {
  518. /// Unwrap non-`nil` values and forward them on the returned signal, `nil`
  519. /// values are dropped.
  520. ///
  521. /// - returns: A signal that sends only non-nil values.
  522. public func skipNil() -> Signal<Value.Wrapped, Error> {
  523. return filter { $0.optional != nil }.map { $0.optional! }
  524. }
  525. }
  526. extension SignalProtocol {
  527. /// Take up to `n` values from the signal and then complete.
  528. ///
  529. /// - precondition: `count` must be non-negative number.
  530. ///
  531. /// - parameters:
  532. /// - count: A number of values to take from the signal.
  533. ///
  534. /// - returns: A signal that will yield the first `count` values from `self`
  535. public func take(first count: Int) -> Signal<Value, Error> {
  536. precondition(count >= 0)
  537. return Signal { observer in
  538. if count == 0 {
  539. observer.sendCompleted()
  540. return nil
  541. }
  542. var taken = 0
  543. return self.observe { event in
  544. guard let value = event.value else {
  545. observer.action(event)
  546. return
  547. }
  548. if taken < count {
  549. taken += 1
  550. observer.send(value: value)
  551. }
  552. if taken == count {
  553. observer.sendCompleted()
  554. }
  555. }
  556. }
  557. }
  558. }
  559. /// A reference type which wraps an array to auxiliate the collection of values
  560. /// for `collect` operator.
  561. private final class CollectState<Value> {
  562. var values: [Value] = []
  563. /// Collects a new value.
  564. func append(_ value: Value) {
  565. values.append(value)
  566. }
  567. /// Check if there are any items remaining.
  568. ///
  569. /// - note: This method also checks if there weren't collected any values
  570. /// and, in that case, it means an empty array should be sent as the
  571. /// result of collect.
  572. var isEmpty: Bool {
  573. /// We use capacity being zero to determine if we haven't collected any
  574. /// value since we're keeping the capacity of the array to avoid
  575. /// unnecessary and expensive allocations). This also guarantees
  576. /// retro-compatibility around the original `collect()` operator.
  577. return values.isEmpty && values.capacity > 0
  578. }
  579. /// Removes all values previously collected if any.
  580. func flush() {
  581. // Minor optimization to avoid consecutive allocations. Can
  582. // be useful for sequences of regular or similar size and to
  583. // track if any value was ever collected.
  584. values.removeAll(keepingCapacity: true)
  585. }
  586. }
  587. extension SignalProtocol {
  588. /// Collect all values sent by the signal then forward them as a single
  589. /// array and complete.
  590. ///
  591. /// - note: When `self` completes without collecting any value, it will send
  592. /// an empty array of values.
  593. ///
  594. /// - returns: A signal that will yield an array of values when `self`
  595. /// completes.
  596. public func collect() -> Signal<[Value], Error> {
  597. return collect { _,_ in false }
  598. }
  599. /// Collect at most `count` values from `self`, forward them as a single
  600. /// array and complete.
  601. ///
  602. /// - note: When the count is reached the array is sent and the signal
  603. /// starts over yielding a new array of values.
  604. ///
  605. /// - note: When `self` completes any remaining values will be sent, the
  606. /// last array may not have `count` values. Alternatively, if were
  607. /// not collected any values will sent an empty array of values.
  608. ///
  609. /// - precondition: `count` should be greater than zero.
  610. ///
  611. public func collect(count: Int) -> Signal<[Value], Error> {
  612. precondition(count > 0)
  613. return collect { values in values.count == count }
  614. }
  615. /// Collect values that pass the given predicate then forward them as a
  616. /// single array and complete.
  617. ///
  618. /// - note: When `self` completes any remaining values will be sent, the
  619. /// last array may not match `predicate`. Alternatively, if were not
  620. /// collected any values will sent an empty array of values.
  621. ///
  622. /// ````
  623. /// let (signal, observer) = Signal<Int, NoError>.pipe()
  624. ///
  625. /// signal
  626. /// .collect { values in values.reduce(0, combine: +) == 8 }
  627. /// .observeValues { print($0) }
  628. ///
  629. /// observer.send(value: 1)
  630. /// observer.send(value: 3)
  631. /// observer.send(value: 4)
  632. /// observer.send(value: 7)
  633. /// observer.send(value: 1)
  634. /// observer.send(value: 5)
  635. /// observer.send(value: 6)
  636. /// observer.sendCompleted()
  637. ///
  638. /// // Output:
  639. /// // [1, 3, 4]
  640. /// // [7, 1]
  641. /// // [5, 6]
  642. /// ````
  643. ///
  644. /// - parameters:
  645. /// - predicate: Predicate to match when values should be sent (returning
  646. /// `true`) or alternatively when they should be collected
  647. /// (where it should return `false`). The most recent value
  648. /// (`value`) is included in `values` and will be the end of
  649. /// the current array of values if the predicate returns
  650. /// `true`.
  651. ///
  652. /// - returns: A signal that collects values passing the predicate and, when
  653. /// `self` completes, forwards them as a single array and
  654. /// complets.
  655. public func collect(_ predicate: @escaping (_ values: [Value]) -> Bool) -> Signal<[Value], Error> {
  656. return Signal { observer in
  657. let state = CollectState<Value>()
  658. return self.observe { event in
  659. switch event {
  660. case let .value(value):
  661. state.append(value)
  662. if predicate(state.values) {
  663. observer.send(value: state.values)
  664. state.flush()
  665. }
  666. case .completed:
  667. if !state.isEmpty {
  668. observer.send(value: state.values)
  669. }
  670. observer.sendCompleted()
  671. case let .failed(error):
  672. observer.send(error: error)
  673. case .interrupted:
  674. observer.sendInterrupted()
  675. }
  676. }
  677. }
  678. }
  679. /// Repeatedly collect an array of values up to a matching `value` value.
  680. /// Then forward them as single array and wait for value events.
  681. ///
  682. /// - note: When `self` completes any remaining values will be sent, the
  683. /// last array may not match `predicate`. Alternatively, if no
  684. /// values were collected an empty array will be sent.
  685. ///
  686. /// ````
  687. /// let (signal, observer) = Signal<Int, NoError>.pipe()
  688. ///
  689. /// signal
  690. /// .collect { values, value in value == 7 }
  691. /// .observeValues { print($0) }
  692. ///
  693. /// observer.send(value: 1)
  694. /// observer.send(value: 1)
  695. /// observer.send(value: 7)
  696. /// observer.send(value: 7)
  697. /// observer.send(value: 5)
  698. /// observer.send(value: 6)
  699. /// observer.sendCompleted()
  700. ///
  701. /// // Output:
  702. /// // [1, 1]
  703. /// // [7]
  704. /// // [7, 5, 6]
  705. /// ````
  706. ///
  707. /// - parameters:
  708. /// - predicate: Predicate to match when values should be sent (returning
  709. /// `true`) or alternatively when they should be collected
  710. /// (where it should return `false`). The most recent value
  711. /// (`value`) is not included in `values` and will be the
  712. /// start of the next array of values if the predicate
  713. /// returns `true`.
  714. ///
  715. /// - returns: A signal that will yield an array of values based on a
  716. /// predicate which matches the values collected and the next
  717. /// value.
  718. public func collect(_ predicate: @escaping (_ values: [Value], _ value: Value) -> Bool) -> Signal<[Value], Error> {
  719. return Signal { observer in
  720. let state = CollectState<Value>()
  721. return self.observe { event in
  722. switch event {
  723. case let .value(value):
  724. if predicate(state.values, value) {
  725. observer.send(value: state.values)
  726. state.flush()
  727. }
  728. state.append(value)
  729. case .completed:
  730. if !state.isEmpty {
  731. observer.send(value: state.values)
  732. }
  733. observer.sendCompleted()
  734. case let .failed(error):
  735. observer.send(error: error)
  736. case .interrupted:
  737. observer.sendInterrupted()
  738. }
  739. }
  740. }
  741. }
  742. /// Forward all events onto the given scheduler, instead of whichever
  743. /// scheduler they originally arrived upon.
  744. ///
  745. /// - parameters:
  746. /// - scheduler: A scheduler to deliver events on.
  747. ///
  748. /// - returns: A signal that will yield `self` values on provided scheduler.
  749. public func observe(on scheduler: SchedulerProtocol) -> Signal<Value, Error> {
  750. return Signal { observer in
  751. return self.observe { event in
  752. scheduler.schedule {
  753. observer.action(event)
  754. }
  755. }
  756. }
  757. }
  758. }
  759. private final class CombineLatestState<Value> {
  760. var latestValue: Value?
  761. var isCompleted = false
  762. }
  763. extension SignalProtocol {
  764. private func observeWithStates<U>(_ signalState: CombineLatestState<Value>, _ otherState: CombineLatestState<U>, _ lock: NSLock, _ observer: Signal<(), Error>.Observer) -> Disposable? {
  765. return self.observe { event in
  766. switch event {
  767. case let .value(value):
  768. lock.lock()
  769. signalState.latestValue = value
  770. if otherState.latestValue != nil {
  771. observer.send(value: ())
  772. }
  773. lock.unlock()
  774. case let .failed(error):
  775. observer.send(error: error)
  776. case .completed:
  777. lock.lock()
  778. signalState.isCompleted = true
  779. if otherState.isCompleted {
  780. observer.sendCompleted()
  781. }
  782. lock.unlock()
  783. case .interrupted:
  784. observer.sendInterrupted()
  785. }
  786. }
  787. }
  788. /// Combine the latest value of the receiver with the latest value from the
  789. /// given signal.
  790. ///
  791. /// - note: The returned signal will not send a value until both inputs have
  792. /// sent at least one value each.
  793. ///
  794. /// - note: If either signal is interrupted, the returned signal will also
  795. /// be interrupted.
  796. ///
  797. /// - parameters:
  798. /// - otherSignal: A signal to combine `self`'s value with.
  799. ///
  800. /// - returns: A signal that will yield a tuple containing values of `self`
  801. /// and given signal.
  802. public func combineLatest<U>(with other: Signal<U, Error>) -> Signal<(Value, U), Error> {
  803. return Signal { observer in
  804. let lock = NSLock()
  805. lock.name = "org.reactivecocoa.ReactiveSwift.combineLatestWith"
  806. let signalState = CombineLatestState<Value>()
  807. let otherState = CombineLatestState<U>()
  808. let onBothValue = {
  809. observer.send(value: (signalState.latestValue!, otherState.latestValue!))
  810. }
  811. let observer = Signal<(), Error>.Observer(value: onBothValue, failed: observer.send(error:), completed: observer.sendCompleted, interrupted: observer.sendInterrupted)
  812. let disposable = CompositeDisposable()
  813. disposable += self.observeWithStates(signalState, otherState, lock, observer)
  814. disposable += other.observeWithStates(otherState, signalState, lock, observer)
  815. return disposable
  816. }
  817. }
  818. /// Delay `value` and `completed` events by the given interval, forwarding
  819. /// them on the given scheduler.
  820. ///
  821. /// - note: failed and `interrupted` events are always scheduled
  822. /// immediately.
  823. ///
  824. /// - parameters:
  825. /// - interval: Interval to delay `value` and `completed` events by.
  826. /// - scheduler: A scheduler to deliver delayed events on.
  827. ///
  828. /// - returns: A signal that will delay `value` and `completed` events and
  829. /// will yield them on given scheduler.
  830. public func delay(_ interval: TimeInterval, on scheduler: DateSchedulerProtocol) -> Signal<Value, Error> {
  831. precondition(interval >= 0)
  832. return Signal { observer in
  833. return self.observe { event in
  834. switch event {
  835. case .failed, .interrupted:
  836. scheduler.schedule {
  837. observer.action(event)
  838. }
  839. case .value, .completed:
  840. let date = scheduler.currentDate.addingTimeInterval(interval)
  841. scheduler.schedule(after: date) {
  842. observer.action(event)
  843. }
  844. }
  845. }
  846. }
  847. }
  848. /// Skip first `count` number of values then act as usual.
  849. ///
  850. /// - parameters:
  851. /// - count: A number of values to skip.
  852. ///
  853. /// - returns: A signal that will skip the first `count` values, then
  854. /// forward everything afterward.
  855. public func skip(first count: Int) -> Signal<Value, Error> {
  856. precondition(count >= 0)
  857. if count == 0 {
  858. return signal
  859. }
  860. return Signal { observer in
  861. var skipped = 0
  862. return self.observe { event in
  863. if case .value = event, skipped < count {
  864. skipped += 1
  865. } else {
  866. observer.action(event)
  867. }
  868. }
  869. }
  870. }
  871. /// Treat all Events from `self` as plain values, allowing them to be
  872. /// manipulated just like any other value.
  873. ///
  874. /// In other words, this brings Events “into the monad”.
  875. ///
  876. /// - note: When a Completed or Failed event is received, the resulting
  877. /// signal will send the Event itself and then complete. When an
  878. /// Interrupted event is received, the resulting signal will send
  879. /// the Event itself and then interrupt.
  880. ///
  881. /// - returns: A signal that sends events as its values.
  882. public func materialize() -> Signal<Event<Value, Error>, NoError> {
  883. return Signal { observer in
  884. return self.observe { event in
  885. observer.send(value: event)
  886. switch event {
  887. case .interrupted:
  888. observer.sendInterrupted()
  889. case .completed, .failed:
  890. observer.sendCompleted()
  891. case .value:
  892. break
  893. }
  894. }
  895. }
  896. }
  897. }
  898. extension SignalProtocol where Value: EventProtocol, Error == NoError {
  899. /// Translate a signal of `Event` _values_ into a signal of those events
  900. /// themselves.
  901. ///
  902. /// - returns: A signal that sends values carried by `self` events.
  903. public func dematerialize() -> Signal<Value.Value, Value.Error> {
  904. return Signal<Value.Value, Value.Error> { observer in
  905. return self.observe { event in
  906. switch event {
  907. case let .value(innerEvent):
  908. observer.action(innerEvent.event)
  909. case .failed:
  910. fatalError("NoError is impossible to construct")
  911. case .completed:
  912. observer.sendCompleted()
  913. case .interrupted:
  914. observer.sendInterrupted()
  915. }
  916. }
  917. }
  918. }
  919. }
  920. extension SignalProtocol {
  921. /// Inject side effects to be performed upon the specified signal events.
  922. ///
  923. /// - parameters:
  924. /// - event: A closure that accepts an event and is invoked on every
  925. /// received event.
  926. /// - failed: A closure that accepts error object and is invoked for
  927. /// failed event.
  928. /// - completed: A closure that is invoked for `completed` event.
  929. /// - interrupted: A closure that is invoked for `interrupted` event.
  930. /// - terminated: A closure that is invoked for any terminating event.
  931. /// - disposed: A closure added as disposable when signal completes.
  932. /// - value: A closure that accepts a value from `value` event.
  933. ///
  934. /// - returns: A signal with attached side-effects for given event cases.
  935. public func on(
  936. event: ((Event<Value, Error>) -> Void)? = nil,
  937. failed: ((Error) -> Void)? = nil,
  938. completed: (() -> Void)? = nil,
  939. interrupted: (() -> Void)? = nil,
  940. terminated: (() -> Void)? = nil,
  941. disposed: (() -> Void)? = nil,
  942. value: ((Value) -> Void)? = nil
  943. ) -> Signal<Value, Error> {
  944. return Signal { observer in
  945. let disposable = CompositeDisposable()
  946. _ = disposed.map(disposable.add)
  947. disposable += signal.observe { receivedEvent in
  948. event?(receivedEvent)
  949. switch receivedEvent {
  950. case let .value(v):
  951. value?(v)
  952. case let .failed(error):
  953. failed?(error)
  954. case .completed:
  955. completed?()
  956. case .interrupted:
  957. interrupted?()
  958. }
  959. if receivedEvent.isTerminating {
  960. terminated?()
  961. }
  962. observer.action(receivedEvent)
  963. }
  964. return disposable
  965. }
  966. }
  967. }
  968. private struct SampleState<Value> {
  969. var latestValue: Value? = nil
  970. var isSignalCompleted: Bool = false
  971. var isSamplerCompleted: Bool = false
  972. }
  973. extension SignalProtocol {
  974. /// Forward the latest value from `self` with the value from `sampler` as a
  975. /// tuple, only when`sampler` sends a `value` event.
  976. ///
  977. /// - note: If `sampler` fires before a value has been observed on `self`,
  978. /// nothing happens.
  979. ///
  980. /// - parameters:
  981. /// - sampler: A signal that will trigger the delivery of `value` event
  982. /// from `self`.
  983. ///
  984. /// - returns: A signal that will send values from `self` and `sampler`,
  985. /// sampled (possibly multiple times) by `sampler`, then complete
  986. /// once both input signals have completed, or interrupt if
  987. /// either input signal is interrupted.
  988. public func sample<T>(with sampler: Signal<T, NoError>) -> Signal<(Value, T), Error> {
  989. return Signal { observer in
  990. let state = Atomic(SampleState<Value>())
  991. let disposable = CompositeDisposable()
  992. disposable += self.observe { event in
  993. switch event {
  994. case let .value(value):
  995. state.modify {
  996. $0.latestValue = value
  997. }
  998. case let .failed(error):
  999. observer.send(error: error)
  1000. case .completed:
  1001. let shouldComplete: Bool = state.modify {
  1002. $0.isSignalCompleted = true
  1003. return $0.isSamplerCompleted
  1004. }
  1005. if shouldComplete {
  1006. observer.sendCompleted()
  1007. }
  1008. case .interrupted:
  1009. observer.sendInterrupted()
  1010. }
  1011. }
  1012. disposable += sampler.observe { event in
  1013. switch event {
  1014. case .value(let samplerValue):
  1015. if let value = state.value.latestValue {
  1016. observer.send(value: (value, samplerValue))
  1017. }
  1018. case .completed:
  1019. let shouldComplete: Bool = state.modify {
  1020. $0.isSamplerCompleted = true
  1021. return $0.isSignalCompleted
  1022. }
  1023. if shouldComplete {
  1024. observer.sendCompleted()
  1025. }
  1026. case .interrupted:
  1027. observer.sendInterrupted()
  1028. case .failed:
  1029. break
  1030. }
  1031. }
  1032. return disposable
  1033. }
  1034. }
  1035. /// Forward the latest value from `self` whenever `sampler` sends a `value`
  1036. /// event.
  1037. ///
  1038. /// - note: If `sampler` fires before a value has been observed on `self`,
  1039. /// nothing happens.
  1040. ///
  1041. /// - parameters:
  1042. /// - sampler: A signal that will trigger the delivery of `value` event
  1043. /// from `self`.
  1044. ///
  1045. /// - returns: A signal that will send values from `self`, sampled (possibly
  1046. /// multiple times) by `sampler`, then complete once both input
  1047. /// signals have completed, or interrupt if either input signal
  1048. /// is interrupted.
  1049. public func sample(on sampler: Signal<(), NoError>) -> Signal<Value, Error> {
  1050. return sample(with: sampler)
  1051. .map { $0.0 }
  1052. }
  1053. /// Forward the latest value from `samplee` with the value from `self` as a
  1054. /// tuple, only when `self` sends a `value` event.
  1055. /// This is like a flipped version of `sample(with:)`, but `samplee`'s
  1056. /// terminal events are completely ignored.
  1057. ///
  1058. /// - note: If `self` fires before a value has been observed on `samplee`,
  1059. /// nothing happens.
  1060. ///
  1061. /// - parameters:
  1062. /// - samplee: A signal whose latest value is sampled by `self`.
  1063. ///
  1064. /// - returns: A signal that will send values from `self` and `samplee`,
  1065. /// sampled (possibly multiple times) by `self`, then terminate
  1066. /// once `self` has terminated. **`samplee`'s terminated events
  1067. /// are ignored**.
  1068. public func withLatest<U>(from samplee: Signal<U, NoError>) -> Signal<(Value, U), Error> {
  1069. return Signal { observer in
  1070. let state = Atomic<U?>(nil)
  1071. let disposable = CompositeDisposable()
  1072. disposable += samplee.observeValues { value in
  1073. state.value = value
  1074. }
  1075. disposable += self.observe { event in
  1076. switch event {
  1077. case let .value(value):
  1078. if let value2 = state.value {
  1079. observer.send(value: (value, value2))
  1080. }
  1081. case .completed:
  1082. observer.sendCompleted()
  1083. case let .failed(error):
  1084. observer.send(error: error)
  1085. case .interrupted:
  1086. observer.sendInterrupted()
  1087. }
  1088. }
  1089. return disposable
  1090. }
  1091. }
  1092. /// Forward the latest value from `samplee` with the value from `self` as a
  1093. /// tuple, only when `self` sends a `value` event.
  1094. /// This is like a flipped version of `sample(with:)`, but `samplee`'s
  1095. /// terminal events are completely ignored.
  1096. ///
  1097. /// - note: If `self` fires before a value has been observed on `samplee`,
  1098. /// nothing happens.
  1099. ///
  1100. /// - parameters:
  1101. /// - samplee: A producer whose latest value is sampled by `self`.
  1102. ///
  1103. /// - returns: A signal that will send values from `self` and `samplee`,
  1104. /// sampled (possibly multiple times) by `self`, then terminate
  1105. /// once `self` has terminated. **`samplee`'s terminated events
  1106. /// are ignored**.
  1107. public func withLatest<U>(from samplee: SignalProducer<U, NoError>) -> Signal<(Value, U), Error> {
  1108. return Signal { observer in
  1109. let d = CompositeDisposable()
  1110. samplee.startWithSignal { signal, disposable in
  1111. d += disposable
  1112. d += self.withLatest(from: signal).observe(observer)
  1113. }
  1114. return d
  1115. }
  1116. }
  1117. }
  1118. extension SignalProtocol {
  1119. /// Forwards events from `self` until `lifetime` ends, at which point the
  1120. /// returned signal will complete.
  1121. ///
  1122. /// - parameters:
  1123. /// - lifetime: A lifetime whose `ended` signal will cause the returned
  1124. /// signal to complete.
  1125. ///
  1126. /// - returns: A signal that will deliver events until `lifetime` ends.
  1127. public func take(during lifetime: Lifetime) -> Signal<Value, Error> {
  1128. return take(until: lifetime.ended)
  1129. }
  1130. /// Forward events from `self` until `trigger` sends a `value` or
  1131. /// `completed` event, at which point the returned signal will complete.
  1132. ///
  1133. /// - parameters:
  1134. /// - trigger: A signal whose `value` or `completed` events will stop the
  1135. /// delivery of `value` events from `self`.
  1136. ///
  1137. /// - returns: A signal that will deliver events until `trigger` sends
  1138. /// `value` or `completed` events.
  1139. public func take(until trigger: Signal<(), NoError>) -> Signal<Value, Error> {
  1140. return Signal { observer in
  1141. let disposable = CompositeDisposable()
  1142. disposable += self.observe(observer)
  1143. disposable += trigger.observe { event in
  1144. switch event {
  1145. case .value, .completed:
  1146. observer.sendCompleted()
  1147. case .failed, .interrupted:
  1148. break
  1149. }
  1150. }
  1151. return disposable
  1152. }
  1153. }
  1154. /// Do not forward any values from `self` until `trigger` sends a `value` or
  1155. /// `completed` event, at which point the returned signal behaves exactly
  1156. /// like `signal`.
  1157. ///
  1158. /// - parameters:
  1159. /// - trigger: A signal whose `value` or `completed` events will start the
  1160. /// deliver of events on `self`.
  1161. ///
  1162. /// - returns: A signal that will deliver events once the `trigger` sends
  1163. /// `value` or `completed` events.
  1164. public func skip(until trigger: Signal<(), NoError>) -> Signal<Value, Error> {
  1165. return Signal { observer in
  1166. let disposable = SerialDisposable()
  1167. disposable.inner = trigger.observe { event in
  1168. switch event {
  1169. case .value, .completed:
  1170. disposable.inner = self.observe(observer)
  1171. case .failed, .interrupted:
  1172. break
  1173. }
  1174. }
  1175. return disposable
  1176. }
  1177. }
  1178. /// Forward events from `self` with history: values of the returned signal
  1179. /// are a tuples whose first member is the previous value and whose second member
  1180. /// is the current value. `initial` is supplied as the first member when `self`
  1181. /// sends its first value.
  1182. ///
  1183. /// - parameters:
  1184. /// - initial: A value that will be combined with the first value sent by
  1185. /// `self`.
  1186. ///
  1187. /// - returns: A signal that sends tuples that contain previous and current
  1188. /// sent values of `self`.
  1189. public func combinePrevious(_ initial: Value) -> Signal<(Value, Value), Error> {
  1190. return scan((initial, initial)) { previousCombinedValues, newValue in
  1191. return (previousCombinedValues.1, newValue)
  1192. }
  1193. }
  1194. /// Send only the final value and then immediately completes.
  1195. ///
  1196. /// - parameters:
  1197. /// - initial: Initial value for the accumulator.
  1198. /// - combine: A closure that accepts accumulator and sent value of
  1199. /// `self`.
  1200. ///
  1201. /// - returns: A signal that sends accumulated value after `self` completes.
  1202. public func reduce<U>(_ initial: U, _ combine: @escaping (U, Value) -> U) -> Signal<U, Error> {
  1203. // We need to handle the special case in which `signal` sends no values.
  1204. // We'll do that by sending `initial` on the output signal (before
  1205. // taking the last value).
  1206. let (scannedSignalWithInitialValue, outputSignalObserver) = Signal<U, Error>.pipe()
  1207. let outputSignal = scannedSignalWithInitialValue.take(last: 1)
  1208. // Now that we've got takeLast() listening to the piped signal, send
  1209. // that initial value.
  1210. outputSignalObserver.send(value: initial)
  1211. // Pipe the scanned input signal into the output signal.
  1212. self.scan(initial, combine)
  1213. .observe(outputSignalObserver)
  1214. return outputSignal
  1215. }
  1216. /// Aggregate values into a single combined value. When `self` emits its
  1217. /// first value, `combine` is invoked with `initial` as the first argument
  1218. /// and that emitted value as the second argument. The result is emitted
  1219. /// from the signal returned from `scan`. That result is then passed to
  1220. /// `combine` as the first argument when the next value is emitted, and so
  1221. /// on.
  1222. ///
  1223. /// - parameters:
  1224. /// - initial: Initial value for the accumulator.
  1225. /// - combine: A closure that accepts accumulator and sent value of
  1226. /// `self`.
  1227. ///
  1228. /// - returns: A signal that sends accumulated value each time `self` emits
  1229. /// own value.
  1230. public func scan<U>(_ initial: U, _ combine: @escaping (U, Value) -> U) -> Signal<U, Error> {
  1231. return Signal { observer in
  1232. var accumulator = initial
  1233. return self.observe { event in
  1234. observer.action(event.map { value in
  1235. accumulator = combine(accumulator, value)
  1236. return accumulator
  1237. })
  1238. }
  1239. }
  1240. }
  1241. }
  1242. extension SignalProtocol where Value: Equatable {
  1243. /// Forward only those values from `self` which are not duplicates of the
  1244. /// immedately preceding value.
  1245. ///
  1246. /// - note: The first value is always forwarded.
  1247. ///
  1248. /// - returns: A signal that does not send two equal values sequentially.
  1249. public func skipRepeats() -> Signal<Value, Error> {
  1250. return skipRepeats(==)
  1251. }
  1252. }
  1253. extension SignalProtocol {
  1254. /// Forward only those values from `self` which do not pass `isRepeat` with
  1255. /// respect to the previous value.
  1256. ///
  1257. /// - note: The first value is always forwarded.
  1258. ///
  1259. /// - parameters:
  1260. /// - isRepeate: A closure that accepts previous and current values of
  1261. /// `self` and returns `Bool` whether these values are
  1262. /// repeating.
  1263. ///
  1264. /// - returns: A signal that forwards only those values that fail given
  1265. /// `isRepeat` predicate.
  1266. public func skipRepeats(_ isRepeat: @escaping (Value, Value) -> Bool) -> Signal<Value, Error> {
  1267. return self
  1268. .scan((nil, false)) { (accumulated: (Value?, Bool), next: Value) -> (value: Value?, repeated: Bool) in
  1269. switch accumulated.0 {
  1270. case nil:
  1271. return (next, false)
  1272. case let prev? where isRepeat(prev, next):
  1273. return (prev, true)
  1274. case _?:
  1275. return (Optional(next), false)
  1276. }
  1277. }
  1278. .filter { !$0.repeated }
  1279. .map { $0.value }
  1280. .skipNil()
  1281. }
  1282. /// Do not forward any values from `self` until `predicate` returns false,
  1283. /// at which point the returned signal behaves exactly like `signal`.
  1284. ///
  1285. /// - parameters:
  1286. /// - predicate: A closure that accepts a value and returns whether `self`
  1287. /// should still not forward that value to a `signal`.
  1288. ///
  1289. /// - returns: A signal that sends only forwarded values from `self`.
  1290. public func skip(while predicate: @escaping (Value) -> Bool) -> Signal<Value, Error> {
  1291. return Signal { observer in
  1292. var shouldSkip = true
  1293. return self.observe { event in
  1294. switch event {
  1295. case let .value(value):
  1296. shouldSkip = shouldSkip && predicate(value)
  1297. if !shouldSkip {
  1298. fallthrough
  1299. }
  1300. case .failed, .completed, .interrupted:
  1301. observer.action(event)
  1302. }
  1303. }
  1304. }
  1305. }
  1306. /// Forward events from `self` until `replacement` begins sending events.
  1307. ///
  1308. /// - parameters:
  1309. /// - replacement: A signal to wait to wait for values from and start
  1310. /// sending them as a replacement to `self`'s values.
  1311. ///
  1312. /// - returns: A signal which passes through `value`, failed, and
  1313. /// `interrupted` events from `self` until `replacement` sends
  1314. /// an event, at which point the returned signal will send that
  1315. /// event and switch to passing through events from `replacement`
  1316. /// instead, regardless of whether `self` has sent events
  1317. /// already.
  1318. public func take(untilReplacement signal: Signal<Value, Error>) -> Signal<Value, Error> {
  1319. return Signal { observer in
  1320. let disposable = CompositeDisposable()
  1321. let signalDisposable = self.observe { event in
  1322. switch event {
  1323. case .completed:
  1324. break
  1325. case .value, .failed, .interrupted:
  1326. observer.action(event)
  1327. }
  1328. }
  1329. disposable += signalDisposable
  1330. disposable += signal.observe { event in
  1331. signalDisposable?.dispose()
  1332. observer.action(event)
  1333. }
  1334. return disposable
  1335. }
  1336. }
  1337. /// Wait until `self` completes and then forward the final `count` values
  1338. /// on the returned signal.
  1339. ///
  1340. /// - parameters:
  1341. /// - count: Number of last events to send after `self` completes.
  1342. ///
  1343. /// - returns: A signal that receives up to `count` values from `self`
  1344. /// after `self` completes.
  1345. public func take(last count: Int) -> Signal<Value, Error> {
  1346. return Signal { observer in
  1347. var buffer: [Value] = []
  1348. buffer.reserveCapacity(count)
  1349. return self.observe { event in
  1350. switch event {
  1351. case let .value(value):
  1352. // To avoid exceeding the reserved capacity of the buffer,
  1353. // we remove then add. Remove elements until we have room to
  1354. // add one more.
  1355. while (buffer.count + 1) > count {
  1356. buffer.remove(at: 0)
  1357. }
  1358. buffer.append(value)
  1359. case let .failed(error):
  1360. observer.send(error: error)
  1361. case .completed:
  1362. buffer.forEach(observer.send(value:))
  1363. observer.sendCompleted()
  1364. case .interrupted:
  1365. observer.sendInterrupted()
  1366. }
  1367. }
  1368. }
  1369. }
  1370. /// Forward any values from `self` until `predicate` returns false, at which
  1371. /// point the returned signal will complete.
  1372. ///
  1373. /// - parameters:
  1374. /// - predicate: A closure that accepts value and returns `Bool` value
  1375. /// whether `self` should forward it to `signal` and continue
  1376. /// sending other events.
  1377. ///
  1378. /// - returns: A signal that sends events until the values sent by `self`
  1379. /// pass the given `predicate`.
  1380. public func take(while predicate: @escaping (Value) -> Bool) -> Signal<Value, Error> {
  1381. return Signal { observer in
  1382. return self.observe { event in
  1383. if let value = event.value, !predicate(value) {
  1384. observer.sendCompleted()
  1385. } else {
  1386. observer.action(event)
  1387. }
  1388. }
  1389. }
  1390. }
  1391. }
  1392. private struct ZipState<Left, Right> {
  1393. var values: (left: [Left], right: [Right]) = ([], [])
  1394. var isCompleted: (left: Bool, right: Bool) = (false, false)
  1395. var isFinished: Bool {
  1396. return (isCompleted.left && values.left.isEmpty) || (isCompleted.right && values.right.isEmpty)
  1397. }
  1398. }
  1399. extension SignalProtocol {
  1400. /// Zip elements of two signals into pairs. The elements of any Nth pair
  1401. /// are the Nth elements of the two input signals.
  1402. ///
  1403. /// - parameters:
  1404. /// - otherSignal: A signal to zip values with.
  1405. ///
  1406. /// - returns: A signal that sends tuples of `self` and `otherSignal`.
  1407. public func zip<U>(with other: Signal<U, Error>) -> Signal<(Value, U), Error> {
  1408. return Signal { observer in
  1409. let state = Atomic(ZipState<Value, U>())
  1410. let disposable = CompositeDisposable()
  1411. let flush = {
  1412. var tuple: (Value, U)?
  1413. var isFinished = false
  1414. state.modify { state in
  1415. guard !state.values.left.isEmpty && !state.values.right.isEmpty else {
  1416. isFinished = state.isFinished
  1417. return
  1418. }
  1419. tuple = (state.values.left.removeFirst(), state.values.right.removeFirst())
  1420. isFinished = state.isFinished
  1421. }
  1422. if let tuple = tuple {
  1423. observer.send(value: tuple)
  1424. }
  1425. if isFinished {
  1426. observer.sendCompleted()
  1427. }
  1428. }
  1429. let onFailed = observer.send(error:)
  1430. let onInterrupted = observer.sendInterrupted
  1431. disposable += self.observe { event in
  1432. switch event {
  1433. case let .value(value):
  1434. state.modify {
  1435. $0.values.left.append(value)
  1436. }
  1437. flush()
  1438. case let .failed(error):
  1439. onFailed(error)
  1440. case .completed:
  1441. state.modify {
  1442. $0.isCompleted.left = true
  1443. }
  1444. flush()
  1445. case .interrupted:
  1446. onInterrupted()
  1447. }
  1448. }
  1449. disposable += other.observe { event in
  1450. switch event {
  1451. case let .value(value):
  1452. state.modify {
  1453. $0.values.right.append(value)
  1454. }
  1455. flush()
  1456. case let .failed(error):
  1457. onFailed(error)
  1458. case .completed:
  1459. state.modify {
  1460. $0.isCompleted.right = true
  1461. }
  1462. flush()
  1463. case .interrupted:
  1464. onInterrupted()
  1465. }
  1466. }
  1467. return disposable
  1468. }
  1469. }
  1470. /// Throttle values sent by the receiver, so that at least `interval`
  1471. /// seconds pass between each, then forwards them on the given scheduler.
  1472. ///
  1473. /// - note: If multiple values are received before the interval has elapsed,
  1474. /// the latest value is the one that will be passed on.
  1475. ///
  1476. /// - note: If the input signal terminates while a value is being throttled,
  1477. /// that value will be discarded and the returned signal will
  1478. /// terminate immediately.
  1479. ///
  1480. /// - note: If the device time changed backwords before previous date while
  1481. /// a value is being throttled, and if there is a new value sent,
  1482. /// the new value will be passed anyway.
  1483. ///
  1484. /// - parameters:
  1485. /// - interval: Number of seconds to wait between sent values.
  1486. /// - scheduler: A scheduler to deliver events on.
  1487. ///
  1488. /// - returns: A signal that sends values at least `interval` seconds
  1489. /// appart on a given scheduler.
  1490. public func throttle(_ interval: TimeInterval, on scheduler: DateSchedulerProtocol) -> Signal<Value, Error> {
  1491. precondition(interval >= 0)
  1492. return Signal { observer in
  1493. let state: Atomic<ThrottleState<Value>> = Atomic(ThrottleState())
  1494. let schedulerDisposable = SerialDisposable()
  1495. let disposable = CompositeDisposable()
  1496. disposable += schedulerDisposable
  1497. disposable += self.observe { event in
  1498. guard let value = event.value else {
  1499. schedulerDisposable.inner = scheduler.schedule {
  1500. observer.action(event)
  1501. }
  1502. return
  1503. }
  1504. var scheduleDate: Date!
  1505. state.modify {
  1506. $0.pendingValue = value
  1507. let proposedScheduleDate: Date
  1508. if let previousDate = $0.previousDate, previousDate.compare(scheduler.currentDate) != .orderedDescending {
  1509. proposedScheduleDate = previousDate.addingTimeInterval(interval)
  1510. } else {
  1511. proposedScheduleDate = scheduler.currentDate
  1512. }
  1513. switch proposedScheduleDate.compare(scheduler.currentDate) {
  1514. case .orderedAscending:
  1515. scheduleDate = scheduler.currentDate
  1516. case .orderedSame: fallthrough
  1517. case .orderedDescending:
  1518. scheduleDate = proposedScheduleDate
  1519. }
  1520. }
  1521. schedulerDisposable.inner = scheduler.schedule(after: scheduleDate) {
  1522. let pendingValue: Value? = state.modify { state in
  1523. defer {
  1524. if state.pendingValue != nil {
  1525. state.pendingValue = nil
  1526. state.previousDate = scheduleDate
  1527. }
  1528. }
  1529. return state.pendingValue
  1530. }
  1531. if let pendingValue = pendingValue {
  1532. observer.send(value: pendingValue)
  1533. }
  1534. }
  1535. }
  1536. return disposable
  1537. }
  1538. }
  1539. /// Conditionally throttles values sent on the receiver whenever
  1540. /// `shouldThrottle` is true, forwarding values on the given scheduler.
  1541. ///
  1542. /// - note: While `shouldThrottle` remains false, values are forwarded on the
  1543. /// given scheduler. If multiple values are received while
  1544. /// `shouldThrottle` is true, the latest value is the one that will
  1545. /// be passed on.
  1546. ///
  1547. /// - note: If the input signal terminates while a value is being throttled,
  1548. /// that value will be discarded and the returned signal will
  1549. /// terminate immediately.
  1550. ///
  1551. /// - note: If `shouldThrottle` completes before the receiver, and its last
  1552. /// value is `true`, the returned signal will remain in the throttled
  1553. /// state, emitting no further values until it terminates.
  1554. ///
  1555. /// - parameters:
  1556. /// - shouldThrottle: A boolean property that controls whether values
  1557. /// should be throttled.
  1558. /// - scheduler: A scheduler to deliver events on.
  1559. ///
  1560. /// - returns: A signal that sends values only while `shouldThrottle` is false.
  1561. public func throttle<P: PropertyProtocol>(while shouldThrottle: P, on scheduler: SchedulerProtocol) -> Signal<Value, Error>
  1562. where P.Value == Bool
  1563. {
  1564. return Signal { observer in
  1565. let initial: ThrottleWhileState<Value> = .resumed
  1566. let state = Atomic(initial)
  1567. let schedulerDisposable = SerialDisposable()
  1568. let disposable = CompositeDisposable()
  1569. disposable += schedulerDisposable
  1570. disposable += shouldThrottle.producer
  1571. .skipRepeats()
  1572. .startWithValues { shouldThrottle in
  1573. let valueToSend = state.modify { state -> Value? in
  1574. guard !state.isTerminated else { return nil }
  1575. if shouldThrottle {
  1576. state = .throttled(nil)
  1577. } else {
  1578. defer { state = .resumed }
  1579. if case let .throttled(value?) = state {
  1580. return value
  1581. }
  1582. }
  1583. return nil
  1584. }
  1585. if let value = valueToSend {
  1586. schedulerDisposable.inner = scheduler.schedule {
  1587. observer.send(value: value)
  1588. }
  1589. }
  1590. }
  1591. disposable += self.observe { event in
  1592. let eventToSend = state.modify { state -> Event<Value, Error>? in
  1593. switch event {
  1594. case let .value(value):
  1595. switch state {
  1596. case .throttled:
  1597. state = .throttled(value)
  1598. return nil
  1599. case .resumed:
  1600. return event
  1601. case .terminated:
  1602. return nil
  1603. }
  1604. case .completed, .interrupted, .failed:
  1605. state = .terminated
  1606. return event
  1607. }
  1608. }
  1609. if let event = eventToSend {
  1610. schedulerDisposable.inner = scheduler.schedule {
  1611. observer.action(event)
  1612. }
  1613. }
  1614. }
  1615. return disposable
  1616. }
  1617. }
  1618. /// Debounce values sent by the receiver, such that at least `interval`
  1619. /// seconds pass after the receiver has last sent a value, then forward the
  1620. /// latest value on the given scheduler.
  1621. ///
  1622. /// - note: If multiple values are received before the interval has elapsed,
  1623. /// the latest value is the one that will be passed on.
  1624. ///
  1625. /// - note: If the input signal terminates while a value is being debounced,
  1626. /// that value will be discarded and the returned signal will
  1627. /// terminate immediately.
  1628. ///
  1629. /// - parameters:
  1630. /// - interval: A number of seconds to wait before sending a value.
  1631. /// - scheduler: A scheduler to send values on.
  1632. ///
  1633. /// - returns: A signal that sends values that are sent from `self` at least
  1634. /// `interval` seconds apart.
  1635. public func debounce(_ interval: TimeInterval, on scheduler: DateSchedulerProtocol) -> Signal<Value, Error> {
  1636. precondition(interval >= 0)
  1637. return self
  1638. .materialize()
  1639. .flatMap(.latest) { event -> SignalProducer<Event<Value, Error>, NoError> in
  1640. if event.isTerminating {
  1641. return SignalProducer(value: event).observe(on: scheduler)
  1642. } else {
  1643. return SignalProducer(value: event).delay(interval, on: scheduler)
  1644. }
  1645. }
  1646. .dematerialize()
  1647. }
  1648. }
  1649. extension SignalProtocol {
  1650. /// Forward only those values from `self` that have unique identities across
  1651. /// the set of all values that have been seen.
  1652. ///
  1653. /// - note: This causes the identities to be retained to check for
  1654. /// uniqueness.
  1655. ///
  1656. /// - parameters:
  1657. /// - transform: A closure that accepts a value and returns identity
  1658. /// value.
  1659. ///
  1660. /// - returns: A signal that sends unique values during its lifetime.
  1661. public func uniqueValues<Identity: Hashable>(_ transform: @escaping (Value) -> Identity) -> Signal<Value, Error> {
  1662. return Signal { observer in
  1663. var seenValues: Set<Identity> = []
  1664. return self
  1665. .observe { event in
  1666. switch event {
  1667. case let .value(value):
  1668. let identity = transform(value)
  1669. if !seenValues.contains(identity) {
  1670. seenValues.insert(identity)
  1671. fallthrough
  1672. }
  1673. case .failed, .completed, .interrupted:
  1674. observer.action(event)
  1675. }
  1676. }
  1677. }
  1678. }
  1679. }
  1680. extension SignalProtocol where Value: Hashable {
  1681. /// Forward only those values from `self` that are unique across the set of
  1682. /// all values that have been seen.
  1683. ///
  1684. /// - note: This causes the values to be retained to check for uniqueness.
  1685. /// Providing a function that returns a unique value for each sent
  1686. /// value can help you reduce the memory footprint.
  1687. ///
  1688. /// - returns: A signal that sends unique values during its lifetime.
  1689. public func uniqueValues() -> Signal<Value, Error> {
  1690. return uniqueValues { $0 }
  1691. }
  1692. }
  1693. private struct ThrottleState<Value> {
  1694. var previousDate: Date? = nil
  1695. var pendingValue: Value? = nil
  1696. }
  1697. private enum ThrottleWhileState<Value> {
  1698. case resumed
  1699. case throttled(Value?)
  1700. case terminated
  1701. var isTerminated: Bool {
  1702. switch self {
  1703. case .terminated:
  1704. return true
  1705. case .resumed, .throttled:
  1706. return false
  1707. }
  1708. }
  1709. }
  1710. extension SignalProtocol {
  1711. /// Combines the values of all the given signals, in the manner described by
  1712. /// `combineLatestWith`.
  1713. public static func combineLatest<B>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>) -> Signal<(Value, B), Error> {
  1714. return a.combineLatest(with: b)
  1715. }
  1716. /// Combines the values of all the given signals, in the manner described by
  1717. /// `combineLatestWith`.
  1718. public static func combineLatest<B, C>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>) -> Signal<(Value, B, C), Error> {
  1719. return combineLatest(a, b)
  1720. .combineLatest(with: c)
  1721. .map(repack)
  1722. }
  1723. /// Combines the values of all the given signals, in the manner described by
  1724. /// `combineLatestWith`.
  1725. public static func combineLatest<B, C, D>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>) -> Signal<(Value, B, C, D), Error> {
  1726. return combineLatest(a, b, c)
  1727. .combineLatest(with: d)
  1728. .map(repack)
  1729. }
  1730. /// Combines the values of all the given signals, in the manner described by
  1731. /// `combineLatestWith`.
  1732. public static func combineLatest<B, C, D, E>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>) -> Signal<(Value, B, C, D, E), Error> {
  1733. return combineLatest(a, b, c, d)
  1734. .combineLatest(with: e)
  1735. .map(repack)
  1736. }
  1737. /// Combines the values of all the given signals, in the manner described by
  1738. /// `combineLatestWith`.
  1739. public static func combineLatest<B, C, D, E, F>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>) -> Signal<(Value, B, C, D, E, F), Error> {
  1740. return combineLatest(a, b, c, d, e)
  1741. .combineLatest(with: f)
  1742. .map(repack)
  1743. }
  1744. /// Combines the values of all the given signals, in the manner described by
  1745. /// `combineLatestWith`.
  1746. public static func combineLatest<B, C, D, E, F, G>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>) -> Signal<(Value, B, C, D, E, F, G), Error> {
  1747. return combineLatest(a, b, c, d, e, f)
  1748. .combineLatest(with: g)
  1749. .map(repack)
  1750. }
  1751. /// Combines the values of all the given signals, in the manner described by
  1752. /// `combineLatestWith`.
  1753. public static func combineLatest<B, C, D, E, F, G, H>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>) -> Signal<(Value, B, C, D, E, F, G, H), Error> {
  1754. return combineLatest(a, b, c, d, e, f, g)
  1755. .combineLatest(with: h)
  1756. .map(repack)
  1757. }
  1758. /// Combines the values of all the given signals, in the manner described by
  1759. /// `combineLatestWith`.
  1760. public static func combineLatest<B, C, D, E, F, G, H, I>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>, _ i: Signal<I, Error>) -> Signal<(Value, B, C, D, E, F, G, H, I), Error> {
  1761. return combineLatest(a, b, c, d, e, f, g, h)
  1762. .combineLatest(with: i)
  1763. .map(repack)
  1764. }
  1765. /// Combines the values of all the given signals, in the manner described by
  1766. /// `combineLatestWith`.
  1767. public static func combineLatest<B, C, D, E, F, G, H, I, J>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>, _ i: Signal<I, Error>, _ j: Signal<J, Error>) -> Signal<(Value, B, C, D, E, F, G, H, I, J), Error> {
  1768. return combineLatest(a, b, c, d, e, f, g, h, i)
  1769. .combineLatest(with: j)
  1770. .map(repack)
  1771. }
  1772. /// Combines the values of all the given signals, in the manner described by
  1773. /// `combineLatestWith`. No events will be sent if the sequence is empty.
  1774. public static func combineLatest<S: Sequence>(_ signals: S) -> Signal<[Value], Error>
  1775. where S.Iterator.Element == Signal<Value, Error>
  1776. {
  1777. var generator = signals.makeIterator()
  1778. if let first = generator.next() {
  1779. let initial = first.map { [$0] }
  1780. return IteratorSequence(generator).reduce(initial) { signal, next in
  1781. signal.combineLatest(with: next).map { $0.0 + [$0.1] }
  1782. }
  1783. }
  1784. return .never
  1785. }
  1786. /// Zips the values of all the given signals, in the manner described by
  1787. /// `zipWith`.
  1788. public static func zip<B>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>) -> Signal<(Value, B), Error> {
  1789. return a.zip(with: b)
  1790. }
  1791. /// Zips the values of all the given signals, in the manner described by
  1792. /// `zipWith`.
  1793. public static func zip<B, C>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>) -> Signal<(Value, B, C), Error> {
  1794. return zip(a, b)
  1795. .zip(with: c)
  1796. .map(repack)
  1797. }
  1798. /// Zips the values of all the given signals, in the manner described by
  1799. /// `zipWith`.
  1800. public static func zip<B, C, D>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>) -> Signal<(Value, B, C, D), Error> {
  1801. return zip(a, b, c)
  1802. .zip(with: d)
  1803. .map(repack)
  1804. }
  1805. /// Zips the values of all the given signals, in the manner described by
  1806. /// `zipWith`.
  1807. public static func zip<B, C, D, E>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>) -> Signal<(Value, B, C, D, E), Error> {
  1808. return zip(a, b, c, d)
  1809. .zip(with: e)
  1810. .map(repack)
  1811. }
  1812. /// Zips the values of all the given signals, in the manner described by
  1813. /// `zipWith`.
  1814. public static func zip<B, C, D, E, F>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>) -> Signal<(Value, B, C, D, E, F), Error> {
  1815. return zip(a, b, c, d, e)
  1816. .zip(with: f)
  1817. .map(repack)
  1818. }
  1819. /// Zips the values of all the given signals, in the manner described by
  1820. /// `zipWith`.
  1821. public static func zip<B, C, D, E, F, G>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>) -> Signal<(Value, B, C, D, E, F, G), Error> {
  1822. return zip(a, b, c, d, e, f)
  1823. .zip(with: g)
  1824. .map(repack)
  1825. }
  1826. /// Zips the values of all the given signals, in the manner described by
  1827. /// `zipWith`.
  1828. public static func zip<B, C, D, E, F, G, H>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>) -> Signal<(Value, B, C, D, E, F, G, H), Error> {
  1829. return zip(a, b, c, d, e, f, g)
  1830. .zip(with: h)
  1831. .map(repack)
  1832. }
  1833. /// Zips the values of all the given signals, in the manner described by
  1834. /// `zipWith`.
  1835. public static func zip<B, C, D, E, F, G, H, I>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>, _ i: Signal<I, Error>) -> Signal<(Value, B, C, D, E, F, G, H, I), Error> {
  1836. return zip(a, b, c, d, e, f, g, h)
  1837. .zip(with: i)
  1838. .map(repack)
  1839. }
  1840. /// Zips the values of all the given signals, in the manner described by
  1841. /// `zipWith`.
  1842. public static func zip<B, C, D, E, F, G, H, I, J>(_ a: Signal<Value, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>, _ i: Signal<I, Error>, _ j: Signal<J, Error>) -> Signal<(Value, B, C, D, E, F, G, H, I, J), Error> {
  1843. return zip(a, b, c, d, e, f, g, h, i)
  1844. .zip(with: j)
  1845. .map(repack)
  1846. }
  1847. /// Zips the values of all the given signals, in the manner described by
  1848. /// `zipWith`. No events will be sent if the sequence is empty.
  1849. public static func zip<S: Sequence>(_ signals: S) -> Signal<[Value], Error>
  1850. where S.Iterator.Element == Signal<Value, Error>
  1851. {
  1852. var generator = signals.makeIterator()
  1853. if let first = generator.next() {
  1854. let initial = first.map { [$0] }
  1855. return IteratorSequence(generator).reduce(initial) { signal, next in
  1856. signal.zip(with: next).map { $0.0 + [$0.1] }
  1857. }
  1858. }
  1859. return .never
  1860. }
  1861. }
  1862. extension SignalProtocol {
  1863. /// Forward events from `self` until `interval`. Then if signal isn't
  1864. /// completed yet, fails with `error` on `scheduler`.
  1865. ///
  1866. /// - note: If the interval is 0, the timeout will be scheduled immediately.
  1867. /// The signal must complete synchronously (or on a faster
  1868. /// scheduler) to avoid the timeout.
  1869. ///
  1870. /// - parameters:
  1871. /// - error: Error to send with failed event if `self` is not completed
  1872. /// when `interval` passes.
  1873. /// - interval: Number of seconds to wait for `self` to complete.
  1874. /// - scheudler: A scheduler to deliver error on.
  1875. ///
  1876. /// - returns: A signal that sends events for at most `interval` seconds,
  1877. /// then, if not `completed` - sends `error` with failed event
  1878. /// on `scheduler`.
  1879. public func timeout(after interval: TimeInterval, raising error: Error, on scheduler: DateSchedulerProtocol) -> Signal<Value, Error> {
  1880. precondition(interval >= 0)
  1881. return Signal { observer in
  1882. let disposable = CompositeDisposable()
  1883. let date = scheduler.currentDate.addingTimeInterval(interval)
  1884. disposable += scheduler.schedule(after: date) {
  1885. observer.send(error: error)
  1886. }
  1887. disposable += self.observe(observer)
  1888. return disposable
  1889. }
  1890. }
  1891. }
  1892. extension SignalProtocol where Error == NoError {
  1893. /// Promote a signal that does not generate failures into one that can.
  1894. ///
  1895. /// - note: This does not actually cause failures to be generated for the
  1896. /// given signal, but makes it easier to combine with other signals
  1897. /// that may fail; for example, with operators like
  1898. /// `combineLatestWith`, `zipWith`, `flatten`, etc.
  1899. ///
  1900. /// - parameters:
  1901. /// - _ An `ErrorType`.
  1902. ///
  1903. /// - returns: A signal that has an instantiatable `ErrorType`.
  1904. public func promoteErrors<F: Swift.Error>(_: F.Type) -> Signal<Value, F> {
  1905. return Signal { observer in
  1906. return self.observe { event in
  1907. switch event {
  1908. case let .value(value):
  1909. observer.send(value: value)
  1910. case .failed:
  1911. fatalError("NoError is impossible to construct")
  1912. case .completed:
  1913. observer.sendCompleted()
  1914. case .interrupted:
  1915. observer.sendInterrupted()
  1916. }
  1917. }
  1918. }
  1919. }
  1920. /// Forward events from `self` until `interval`. Then if signal isn't
  1921. /// completed yet, fails with `error` on `scheduler`.
  1922. ///
  1923. /// - note: If the interval is 0, the timeout will be scheduled immediately.
  1924. /// The signal must complete synchronously (or on a faster
  1925. /// scheduler) to avoid the timeout.
  1926. ///
  1927. /// - parameters:
  1928. /// - interval: Number of seconds to wait for `self` to complete.
  1929. /// - error: Error to send with `failed` event if `self` is not completed
  1930. /// when `interval` passes.
  1931. /// - scheudler: A scheduler to deliver error on.
  1932. ///
  1933. /// - returns: A signal that sends events for at most `interval` seconds,
  1934. /// then, if not `completed` - sends `error` with `failed` event
  1935. /// on `scheduler`.
  1936. public func timeout<NewError: Swift.Error>(
  1937. after interval: TimeInterval,
  1938. raising error: NewError,
  1939. on scheduler: DateSchedulerProtocol
  1940. ) -> Signal<Value, NewError> {
  1941. return self
  1942. .promoteErrors(NewError.self)
  1943. .timeout(after: interval, raising: error, on: scheduler)
  1944. }
  1945. }
  1946. extension SignalProtocol {
  1947. /// Apply `operation` to values from `self` with `success`ful results
  1948. /// forwarded on the returned signal and `failure`s sent as failed events.
  1949. ///
  1950. /// - parameters:
  1951. /// - operation: A closure that accepts a value and returns a `Result`.
  1952. ///
  1953. /// - returns: A signal that receives `success`ful `Result` as `value` event
  1954. /// and `failure` as failed event.
  1955. public func attempt(_ operation: @escaping (Value) -> Result<(), Error>) -> Signal<Value, Error> {
  1956. return attemptMap { value in
  1957. return operation(value).map {
  1958. return value
  1959. }
  1960. }
  1961. }
  1962. /// Apply `operation` to values from `self` with `success`ful results mapped
  1963. /// on the returned signal and `failure`s sent as failed events.
  1964. ///
  1965. /// - parameters:
  1966. /// - operation: A closure that accepts a value and returns a result of
  1967. /// a mapped value as `success`.
  1968. ///
  1969. /// - returns: A signal that sends mapped values from `self` if returned
  1970. /// `Result` is `success`ful, `failed` events otherwise.
  1971. public func attemptMap<U>(_ operation: @escaping (Value) -> Result<U, Error>) -> Signal<U, Error> {
  1972. return Signal { observer in
  1973. self.observe { event in
  1974. switch event {
  1975. case let .value(value):
  1976. operation(value).analysis(
  1977. ifSuccess: observer.send(value:),
  1978. ifFailure: observer.send(error:)
  1979. )
  1980. case let .failed(error):
  1981. observer.send(error: error)
  1982. case .completed:
  1983. observer.sendCompleted()
  1984. case .interrupted:
  1985. observer.sendInterrupted()
  1986. }
  1987. }
  1988. }
  1989. }
  1990. }
  1991. extension SignalProtocol where Error == NoError {
  1992. /// Apply a failable `operation` to values from `self` with successful
  1993. /// results forwarded on the returned signal and thrown errors sent as
  1994. /// failed events.
  1995. ///
  1996. /// - parameters:
  1997. /// - operation: A failable closure that accepts a value.
  1998. ///
  1999. /// - returns: A signal that forwards successes as `value` events and thrown
  2000. /// errors as `failed` events.
  2001. public func attempt(_ operation: @escaping (Value) throws -> Void) -> Signal<Value, AnyError> {
  2002. return self
  2003. .promoteErrors(AnyError.self)
  2004. .attempt(operation)
  2005. }
  2006. /// Apply a failable `operation` to values from `self` with successful
  2007. /// results mapped on the returned signal and thrown errors sent as
  2008. /// failed events.
  2009. ///
  2010. /// - parameters:
  2011. /// - operation: A failable closure that accepts a value and attempts to
  2012. /// transform it.
  2013. ///
  2014. /// - returns: A signal that sends successfully mapped values from `self`, or
  2015. /// thrown errors as `failed` events.
  2016. public func attemptMap<U>(_ operation: @escaping (Value) throws -> U) -> Signal<U, AnyError> {
  2017. return self
  2018. .promoteErrors(AnyError.self)
  2019. .attemptMap(operation)
  2020. }
  2021. }
  2022. extension SignalProtocol where Error == AnyError {
  2023. /// Apply a failable `operation` to values from `self` with successful
  2024. /// results forwarded on the returned signal and thrown errors sent as
  2025. /// failed events.
  2026. ///
  2027. /// - parameters:
  2028. /// - operation: A failable closure that accepts a value.
  2029. ///
  2030. /// - returns: A signal that forwards successes as `value` events and thrown
  2031. /// errors as `failed` events.
  2032. public func attempt(_ operation: @escaping (Value) throws -> Void) -> Signal<Value, AnyError> {
  2033. return attemptMap { value in
  2034. try operation(value)
  2035. return value
  2036. }
  2037. }
  2038. /// Apply a failable `operation` to values from `self` with successful
  2039. /// results mapped on the returned signal and thrown errors sent as
  2040. /// failed events.
  2041. ///
  2042. /// - parameters:
  2043. /// - operation: A failable closure that accepts a value and attempts to
  2044. /// transform it.
  2045. ///
  2046. /// - returns: A signal that sends successfully mapped values from `self`, or
  2047. /// thrown errors as `failed` events.
  2048. public func attemptMap<U>(_ operation: @escaping (Value) throws -> U) -> Signal<U, AnyError> {
  2049. return attemptMap { value in
  2050. ReactiveSwift.materialize {
  2051. try operation(value)
  2052. }
  2053. }
  2054. }
  2055. }