Signal.swift 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825
  1. import Foundation
  2. import Result
  3. /// A push-driven stream that sends Events over time, parameterized by the type
  4. /// of values being sent (`Value`) and the type of failure that can occur
  5. /// (`Error`). If no failures should be possible, NoError can be specified for
  6. /// `Error`.
  7. ///
  8. /// An observer of a Signal will see the exact same sequence of events as all
  9. /// other observers. In other words, events will be sent to all observers at the
  10. /// same time.
  11. ///
  12. /// Signals are generally used to represent event streams that are already “in
  13. /// progress,” like notifications, user input, etc. To represent streams that
  14. /// must first be _started_, see the SignalProducer type.
  15. ///
  16. /// Signals do not need to be retained. A Signal will be automatically kept
  17. /// alive until the event stream has terminated.
  18. public final class Signal<Value, Error: ErrorType> {
  19. public typealias Observer = ReactiveCocoa.Observer<Value, Error>
  20. private let atomicObservers: Atomic<Bag<Observer>?> = Atomic(Bag())
  21. /// Initialize a Signal that will immediately invoke the given generator,
  22. /// then forward events sent to the given observer.
  23. ///
  24. /// - note: The disposable returned from the closure will be automatically
  25. /// disposed if a terminating event is sent to the observer. The
  26. /// Signal itself will remain alive until the observer is released.
  27. ///
  28. /// - parameters:
  29. /// - generator: A closure that accepts an implicitly created observer
  30. /// that will act as an event emitter for the signal.
  31. public init(@noescape _ generator: Observer -> Disposable?) {
  32. /// Used to ensure that events are serialized during delivery to
  33. /// observers.
  34. let sendLock = NSLock()
  35. sendLock.name = "org.reactivecocoa.ReactiveCocoa.Signal"
  36. let generatorDisposable = SerialDisposable()
  37. /// When set to `true`, the Signal should interrupt as soon as possible.
  38. let interrupted = Atomic(false)
  39. let observer = Observer { event in
  40. if case .Interrupted = event {
  41. // Normally we disallow recursive events, but Interrupted is
  42. // kind of a special snowflake, since it can inadvertently be
  43. // sent by downstream consumers.
  44. //
  45. // So we'll flag Interrupted events specially, and if it
  46. // happened to occur while we're sending something else, we'll
  47. // wait to deliver it.
  48. interrupted.value = true
  49. if sendLock.tryLock() {
  50. self.interrupt()
  51. sendLock.unlock()
  52. generatorDisposable.dispose()
  53. }
  54. } else {
  55. if let observers = (event.isTerminating ? self.atomicObservers.swap(nil) : self.atomicObservers.value) {
  56. sendLock.lock()
  57. for observer in observers {
  58. observer.action(event)
  59. }
  60. let shouldInterrupt = !event.isTerminating && interrupted.value
  61. if shouldInterrupt {
  62. self.interrupt()
  63. }
  64. sendLock.unlock()
  65. if event.isTerminating || shouldInterrupt {
  66. // Dispose only after notifying observers, so disposal
  67. // logic is consistently the last thing to run.
  68. generatorDisposable.dispose()
  69. }
  70. }
  71. }
  72. }
  73. generatorDisposable.innerDisposable = generator(observer)
  74. }
  75. /// A Signal that never sends any events to its observers.
  76. public static var never: Signal {
  77. return self.init { _ in nil }
  78. }
  79. /// A Signal that completes immediately without emitting any value.
  80. public static var empty: Signal {
  81. return self.init { observer in
  82. observer.sendCompleted()
  83. return nil
  84. }
  85. }
  86. /// Create a Signal that will be controlled by sending events to the given
  87. /// observer.
  88. ///
  89. /// - note: The Signal will remain alive until a terminating event is sent
  90. /// to the observer.
  91. ///
  92. /// - returns: A tuple made of signal and observer.
  93. public static func pipe() -> (Signal, Observer) {
  94. var observer: Observer!
  95. let signal = self.init { innerObserver in
  96. observer = innerObserver
  97. return nil
  98. }
  99. return (signal, observer)
  100. }
  101. /// Interrupts all observers and terminates the stream.
  102. private func interrupt() {
  103. if let observers = self.atomicObservers.swap(nil) {
  104. for observer in observers {
  105. observer.sendInterrupted()
  106. }
  107. }
  108. }
  109. /// Observe the Signal by sending any future events to the given observer.
  110. ///
  111. /// - note: If the Signal has already terminated, the observer will
  112. /// immediately receive an `Interrupted` event.
  113. ///
  114. /// - parameters:
  115. /// - observer: An observer to forward the events to.
  116. ///
  117. /// - returns: An optional `Disposable` which can be used to disconnect the
  118. /// observer. Disposing of the Disposable will have no effect on
  119. /// the Signal itself.
  120. public func observe(observer: Observer) -> Disposable? {
  121. var token: RemovalToken?
  122. atomicObservers.modify { observers in
  123. guard var observers = observers else {
  124. return nil
  125. }
  126. token = observers.insert(observer)
  127. return observers
  128. }
  129. if let token = token {
  130. return ActionDisposable { [weak self] in
  131. self?.atomicObservers.modify { observers in
  132. guard var observers = observers else {
  133. return nil
  134. }
  135. observers.removeValueForToken(token)
  136. return observers
  137. }
  138. }
  139. } else {
  140. observer.sendInterrupted()
  141. return nil
  142. }
  143. }
  144. }
  145. public protocol SignalType {
  146. /// The type of values being sent on the signal.
  147. associatedtype Value
  148. /// The type of error that can occur on the signal. If errors aren't
  149. /// possible then `NoError` can be used.
  150. associatedtype Error: ErrorType
  151. /// Extracts a signal from the receiver.
  152. var signal: Signal<Value, Error> { get }
  153. /// Observes the Signal by sending any future events to the given observer.
  154. func observe(observer: Signal<Value, Error>.Observer) -> Disposable?
  155. }
  156. extension Signal: SignalType {
  157. public var signal: Signal {
  158. return self
  159. }
  160. }
  161. extension SignalType {
  162. /// Convenience override for observe(_:) to allow trailing-closure style
  163. /// invocations.
  164. ///
  165. /// - parameters:
  166. /// - action: A closure that will accept an event of the signal
  167. ///
  168. /// - returns: An optional `Disposable` which can be used to stop the
  169. /// invocation of the callback. Disposing of the Disposable will
  170. /// have no effect on the Signal itself.
  171. public func observe(action: Signal<Value, Error>.Observer.Action) -> Disposable? {
  172. return observe(Observer(action))
  173. }
  174. @available(*, deprecated, message="This Signal may emit errors which must be handled explicitly, or observed using observeResult:")
  175. public func observeNext(next: Value -> Void) -> Disposable? {
  176. return observe(Observer(next: next))
  177. }
  178. /// Observe the `Signal` by invoking the given callback when `next` or
  179. /// `failed` event are received.
  180. ///
  181. /// - parameters:
  182. /// - result: A closure that accepts instance of `Result<Value, Error>`
  183. /// enum that contains either a `Success(Value)` or
  184. /// `Failure<Error>` case.
  185. ///
  186. /// - returns: An optional `Disposable` which can be used to stop the
  187. /// invocation of the callback. Disposing of the Disposable will
  188. /// have no effect on the Signal itself.
  189. public func observeResult(result: (Result<Value, Error>) -> Void) -> Disposable? {
  190. return observe(
  191. Observer(
  192. next: { result(.Success($0)) },
  193. failed: { result(.Failure($0)) }
  194. )
  195. )
  196. }
  197. /// Observe the `Signal` by invoking the given callback when a `completed`
  198. /// event is received.
  199. ///
  200. /// - parameters:
  201. /// - completed: A closure that is called when `Completed` event is
  202. /// received.
  203. ///
  204. /// - returns: An optional `Disposable` which can be used to stop the
  205. /// invocation of the callback. Disposing of the Disposable will
  206. /// have no effect on the Signal itself.
  207. public func observeCompleted(completed: () -> Void) -> Disposable? {
  208. return observe(Observer(completed: completed))
  209. }
  210. /// Observe the `Signal` by invoking the given callback when a `failed`
  211. /// event is received.
  212. ///
  213. /// - parameters:
  214. /// - error: A closure that is called when `Failed` event is received. It
  215. /// accepts an error parameter.
  216. ///
  217. /// - returns: An optional `Disposable` which can be used to stop the
  218. /// invocation of the callback. Disposing of the Disposable will
  219. /// have no effect on the Signal itself.
  220. public func observeFailed(error: Error -> Void) -> Disposable? {
  221. return observe(Observer(failed: error))
  222. }
  223. /// Observe the `Signal` by invoking the given callback when an
  224. /// `interrupted` event is received. If the Signal has already terminated,
  225. /// the callback will be invoked immediately.
  226. ///
  227. /// - parameters:
  228. /// - interrupted: A closure that is invoked when `Interrupted` event is
  229. /// received
  230. ///
  231. /// - returns: An optional `Disposable` which can be used to stop the
  232. /// invocation of the callback. Disposing of the Disposable will
  233. /// have no effect on the Signal itself.
  234. public func observeInterrupted(interrupted: () -> Void) -> Disposable? {
  235. return observe(Observer(interrupted: interrupted))
  236. }
  237. }
  238. extension SignalType where Error == NoError {
  239. /// Observe the Signal by invoking the given callback when `next` events are
  240. /// received.
  241. ///
  242. /// - parameters:
  243. /// - next: A closure that accepts a value when `Next` event is received.
  244. ///
  245. /// - returns: An optional `Disposable` which can be used to stop the
  246. /// invocation of the callback. Disposing of the Disposable will
  247. /// have no effect on the Signal itself.
  248. public func observeNext(next: Value -> Void) -> Disposable? {
  249. return observe(Observer(next: next))
  250. }
  251. }
  252. extension SignalType {
  253. /// Map each value in the signal to a new value.
  254. ///
  255. /// - parameters:
  256. /// - transform: A closure that accepts a value from the `Next` event and
  257. /// returns a new value.
  258. ///
  259. /// - returns: A signal that will send new values.
  260. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  261. public func map<U>(transform: Value -> U) -> Signal<U, Error> {
  262. return Signal { observer in
  263. return self.observe { event in
  264. observer.action(event.map(transform))
  265. }
  266. }
  267. }
  268. /// Map errors in the signal to a new error.
  269. ///
  270. /// - parameters:
  271. /// - transform: A closure that accepts current error object and returns
  272. /// a new type of error object.
  273. ///
  274. /// - returns: A signal that will send new type of errors.
  275. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  276. public func mapError<F>(transform: Error -> F) -> Signal<Value, F> {
  277. return Signal { observer in
  278. return self.observe { event in
  279. observer.action(event.mapError(transform))
  280. }
  281. }
  282. }
  283. /// Preserve only the values of the signal that pass the given predicate.
  284. ///
  285. /// - parameters:
  286. /// - predicate: A closure that accepts value and returns `Bool` denoting
  287. /// whether value has passed the test.
  288. ///
  289. /// - returns: A signal that will send only the values passing the given
  290. /// predicate.
  291. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  292. public func filter(predicate: Value -> Bool) -> Signal<Value, Error> {
  293. return Signal { observer in
  294. return self.observe { (event: Event<Value, Error>) -> Void in
  295. guard let value = event.value else {
  296. observer.action(event)
  297. return
  298. }
  299. if predicate(value) {
  300. observer.sendNext(value)
  301. }
  302. }
  303. }
  304. }
  305. }
  306. extension SignalType where Value: OptionalType {
  307. /// Unwrap non-`nil` values and forward them on the returned signal, `nil`
  308. /// values are dropped.
  309. ///
  310. /// - returns: A signal that sends only non-nil values.
  311. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  312. public func ignoreNil() -> Signal<Value.Wrapped, Error> {
  313. return filter { $0.optional != nil }.map { $0.optional! }
  314. }
  315. }
  316. extension SignalType {
  317. /// Take up to `n` values from the signal and then complete.
  318. ///
  319. /// - precondition: `count` must be non-negative number.
  320. ///
  321. /// - parameters:
  322. /// - count: A number of values to take from the signal.
  323. ///
  324. /// - returns: A signal that will yield the first `count` values from `self`
  325. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  326. public func take(count: Int) -> Signal<Value, Error> {
  327. precondition(count >= 0)
  328. return Signal { observer in
  329. if count == 0 {
  330. observer.sendCompleted()
  331. return nil
  332. }
  333. var taken = 0
  334. return self.observe { event in
  335. guard let value = event.value else {
  336. observer.action(event)
  337. return
  338. }
  339. if taken < count {
  340. taken += 1
  341. observer.sendNext(value)
  342. }
  343. if taken == count {
  344. observer.sendCompleted()
  345. }
  346. }
  347. }
  348. }
  349. }
  350. /// A reference type which wraps an array to auxiliate the collection of values
  351. /// for `collect` operator.
  352. private final class CollectState<Value> {
  353. var values: [Value] = []
  354. /// Collects a new value.
  355. func append(value: Value) {
  356. values.append(value)
  357. }
  358. /// Check if there are any items remaining.
  359. ///
  360. /// - note: This method also checks if there weren't collected any values
  361. /// and, in that case, it means an empty array should be sent as the
  362. /// result of collect.
  363. var isEmpty: Bool {
  364. /// We use capacity being zero to determine if we haven't collected any
  365. /// value since we're keeping the capacity of the array to avoid
  366. /// unnecessary and expensive allocations). This also guarantees
  367. /// retro-compatibility around the original `collect()` operator.
  368. return values.isEmpty && values.capacity > 0
  369. }
  370. /// Removes all values previously collected if any.
  371. func flush() {
  372. // Minor optimization to avoid consecutive allocations. Can
  373. // be useful for sequences of regular or similar size and to
  374. // track if any value was ever collected.
  375. values.removeAll(keepCapacity: true)
  376. }
  377. }
  378. extension SignalType {
  379. /// Collect all values sent by the signal then forward them as a single
  380. /// array and complete.
  381. ///
  382. /// - note: When `self` completes without collecting any value, it will send
  383. /// an empty array of values.
  384. ///
  385. /// - returns: A signal that will yield an array of values when `self`
  386. /// completes.
  387. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  388. public func collect() -> Signal<[Value], Error> {
  389. return collect { _,_ in false }
  390. }
  391. /// Collect at most `count` values from `self`, forward them as a single
  392. /// array and complete.
  393. ///
  394. /// - note: When the count is reached the array is sent and the signal
  395. /// starts over yielding a new array of values.
  396. ///
  397. /// - note: When `self` completes any remaining values will be sent, the
  398. /// last array may not have `count` values. Alternatively, if were
  399. /// not collected any values will sent an empty array of values.
  400. ///
  401. /// - precondition: `count` should be greater than zero.
  402. ///
  403. /// - returns: A signal that collects at most `count` values from `self`,
  404. /// forwards them as a single array and completes.
  405. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  406. public func collect(count count: Int) -> Signal<[Value], Error> {
  407. precondition(count > 0)
  408. return collect { values in values.count == count }
  409. }
  410. /// Collect values that pass the given predicate then forward them as a
  411. /// single array and complete.
  412. ///
  413. /// - note: When `self` completes any remaining values will be sent, the
  414. /// last array may not match `predicate`. Alternatively, if were not
  415. /// collected any values will sent an empty array of values.
  416. ///
  417. /// ````
  418. /// let (signal, observer) = Signal<Int, NoError>.pipe()
  419. ///
  420. /// signal
  421. /// .collect { values in values.reduce(0, combine: +) == 8 }
  422. /// .observeNext { print($0) }
  423. ///
  424. /// observer.sendNext(1)
  425. /// observer.sendNext(3)
  426. /// observer.sendNext(4)
  427. /// observer.sendNext(7)
  428. /// observer.sendNext(1)
  429. /// observer.sendNext(5)
  430. /// observer.sendNext(6)
  431. /// observer.sendCompleted()
  432. ///
  433. /// // Output:
  434. /// // [1, 3, 4]
  435. /// // [7, 1]
  436. /// // [5, 6]
  437. /// ````
  438. ///
  439. /// - parameters:
  440. /// - predicate: Predicate to match when values should be sent (returning
  441. /// `true`) or alternatively when they should be collected
  442. /// (where it should return `false`). The most recent value
  443. /// (`next`) is included in `values` and will be the end of
  444. /// the current array of values if the predicate returns
  445. /// `true`.
  446. ///
  447. /// - returns: A signal that collects values passing the predicate and, when
  448. /// `self` completes, forwards them as a single array and
  449. /// complets.
  450. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  451. public func collect(predicate: (values: [Value]) -> Bool) -> Signal<[Value], Error> {
  452. return Signal { observer in
  453. let state = CollectState<Value>()
  454. return self.observe { event in
  455. switch event {
  456. case let .Next(value):
  457. state.append(value)
  458. if predicate(values: state.values) {
  459. observer.sendNext(state.values)
  460. state.flush()
  461. }
  462. case .Completed:
  463. if !state.isEmpty {
  464. observer.sendNext(state.values)
  465. }
  466. observer.sendCompleted()
  467. case let .Failed(error):
  468. observer.sendFailed(error)
  469. case .Interrupted:
  470. observer.sendInterrupted()
  471. }
  472. }
  473. }
  474. }
  475. /// Repeatedly collect an array of values up to a matching `Next` value.
  476. /// Then forward them as single array and wait for next events.
  477. ///
  478. /// - note: When `self` completes any remaining values will be sent, the
  479. /// last array may not match `predicate`. Alternatively, if no
  480. /// values were collected an empty array will be sent.
  481. ///
  482. /// ````
  483. /// let (signal, observer) = Signal<Int, NoError>.pipe()
  484. ///
  485. /// signal
  486. /// .collect { values, next in next == 7 }
  487. /// .observeNext { print($0) }
  488. ///
  489. /// observer.sendNext(1)
  490. /// observer.sendNext(1)
  491. /// observer.sendNext(7)
  492. /// observer.sendNext(7)
  493. /// observer.sendNext(5)
  494. /// observer.sendNext(6)
  495. /// observer.sendCompleted()
  496. ///
  497. /// // Output:
  498. /// // [1, 1]
  499. /// // [7]
  500. /// // [7, 5, 6]
  501. /// ````
  502. ///
  503. /// - parameters:
  504. /// - predicate: Predicate to match when values should be sent (returning
  505. /// `true`) or alternatively when they should be collected
  506. /// (where it should return `false`). The most recent value
  507. /// (`next`) is not included in `values` and will be the
  508. /// start of the next array of values if the predicate
  509. /// returns `true`.
  510. ///
  511. /// - returns: A signal that will yield an array of values based on a
  512. /// predicate which matches the values collected and the next
  513. /// value.
  514. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  515. public func collect(predicate: (values: [Value], next: Value) -> Bool) -> Signal<[Value], Error> {
  516. return Signal { observer in
  517. let state = CollectState<Value>()
  518. return self.observe { event in
  519. switch event {
  520. case let .Next(value):
  521. if predicate(values: state.values, next: value) {
  522. observer.sendNext(state.values)
  523. state.flush()
  524. }
  525. state.append(value)
  526. case .Completed:
  527. if !state.isEmpty {
  528. observer.sendNext(state.values)
  529. }
  530. observer.sendCompleted()
  531. case let .Failed(error):
  532. observer.sendFailed(error)
  533. case .Interrupted:
  534. observer.sendInterrupted()
  535. }
  536. }
  537. }
  538. }
  539. /// Forward all events onto the given scheduler, instead of whichever
  540. /// scheduler they originally arrived upon.
  541. ///
  542. /// - parameters:
  543. /// - scheduler: A scheduler to deliver events on.
  544. ///
  545. /// - returns: A signal that will yield `self` values on provided scheduler.
  546. public func observeOn(scheduler: SchedulerType) -> Signal<Value, Error> {
  547. return Signal { observer in
  548. return self.observe { event in
  549. scheduler.schedule {
  550. observer.action(event)
  551. }
  552. }
  553. }
  554. }
  555. }
  556. private final class CombineLatestState<Value> {
  557. var latestValue: Value?
  558. var completed = false
  559. }
  560. extension SignalType {
  561. private func observeWithStates<U>(signalState: CombineLatestState<Value>, _ otherState: CombineLatestState<U>, _ lock: NSLock, _ observer: Signal<(), Error>.Observer) -> Disposable? {
  562. return self.observe { event in
  563. switch event {
  564. case let .Next(value):
  565. lock.lock()
  566. signalState.latestValue = value
  567. if otherState.latestValue != nil {
  568. observer.sendNext()
  569. }
  570. lock.unlock()
  571. case let .Failed(error):
  572. observer.sendFailed(error)
  573. case .Completed:
  574. lock.lock()
  575. signalState.completed = true
  576. if otherState.completed {
  577. observer.sendCompleted()
  578. }
  579. lock.unlock()
  580. case .Interrupted:
  581. observer.sendInterrupted()
  582. }
  583. }
  584. }
  585. /// Combine the latest value of the receiver with the latest value from the
  586. /// given signal.
  587. ///
  588. /// - note: The returned signal will not send a value until both inputs have
  589. /// sent at least one value each.
  590. ///
  591. /// - note: If either signal is interrupted, the returned signal will also
  592. /// be interrupted.
  593. ///
  594. /// - parameters:
  595. /// - otherSignal: A signal to combine `self`'s value with.
  596. ///
  597. /// - returns: A signal that will yield a tuple containing values of `self`
  598. /// and given signal.
  599. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  600. public func combineLatestWith<U>(otherSignal: Signal<U, Error>) -> Signal<(Value, U), Error> {
  601. return Signal { observer in
  602. let lock = NSLock()
  603. lock.name = "org.reactivecocoa.ReactiveCocoa.combineLatestWith"
  604. let signalState = CombineLatestState<Value>()
  605. let otherState = CombineLatestState<U>()
  606. let onBothNext = {
  607. observer.sendNext((signalState.latestValue!, otherState.latestValue!))
  608. }
  609. let observer = Signal<(), Error>.Observer(next: onBothNext, failed: observer.sendFailed, completed: observer.sendCompleted, interrupted: observer.sendInterrupted)
  610. let disposable = CompositeDisposable()
  611. disposable += self.observeWithStates(signalState, otherState, lock, observer)
  612. disposable += otherSignal.observeWithStates(otherState, signalState, lock, observer)
  613. return disposable
  614. }
  615. }
  616. /// Delay `Next` and `Completed` events by the given interval, forwarding
  617. /// them on the given scheduler.
  618. ///
  619. /// - note: `Failed` and `Interrupted` events are always scheduled
  620. /// immediately.
  621. ///
  622. /// - parameters:
  623. /// - interval: Interval to delay `Next` and `Completed` events by.
  624. /// - scheduler: A scheduler to deliver delayed events on.
  625. ///
  626. /// - returns: A signal that will delay `Next` and `Completed` events and
  627. /// will yield them on given scheduler.
  628. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  629. public func delay(interval: NSTimeInterval, onScheduler scheduler: DateSchedulerType) -> Signal<Value, Error> {
  630. precondition(interval >= 0)
  631. return Signal { observer in
  632. return self.observe { event in
  633. switch event {
  634. case .Failed, .Interrupted:
  635. scheduler.schedule {
  636. observer.action(event)
  637. }
  638. case .Next, .Completed:
  639. let date = scheduler.currentDate.dateByAddingTimeInterval(interval)
  640. scheduler.scheduleAfter(date) {
  641. observer.action(event)
  642. }
  643. }
  644. }
  645. }
  646. }
  647. /// Skip first `count` number of values then act as usual.
  648. ///
  649. /// - parameters:
  650. /// - count: A number of values to skip.
  651. ///
  652. /// - returns: A signal that will skip the first `count` values, then
  653. /// forward everything afterward.
  654. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  655. public func skip(count: Int) -> Signal<Value, Error> {
  656. precondition(count >= 0)
  657. if count == 0 {
  658. return signal
  659. }
  660. return Signal { observer in
  661. var skipped = 0
  662. return self.observe { event in
  663. if case .Next = event where skipped < count {
  664. skipped += 1
  665. } else {
  666. observer.action(event)
  667. }
  668. }
  669. }
  670. }
  671. /// Treat all Events from `self` as plain values, allowing them to be
  672. /// manipulated just like any other value.
  673. ///
  674. /// In other words, this brings Events “into the monad”.
  675. ///
  676. /// - note: When a Completed or Failed event is received, the resulting
  677. /// signal will send the Event itself and then complete. When an
  678. /// Interrupted event is received, the resulting signal will send
  679. /// the Event itself and then interrupt.
  680. ///
  681. /// - returns: A signal that sends events as its values.
  682. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  683. public func materialize() -> Signal<Event<Value, Error>, NoError> {
  684. return Signal { observer in
  685. return self.observe { event in
  686. observer.sendNext(event)
  687. switch event {
  688. case .Interrupted:
  689. observer.sendInterrupted()
  690. case .Completed, .Failed:
  691. observer.sendCompleted()
  692. case .Next:
  693. break
  694. }
  695. }
  696. }
  697. }
  698. }
  699. extension SignalType where Value: EventType, Error == NoError {
  700. /// Translate a signal of `Event` _values_ into a signal of those events
  701. /// themselves.
  702. ///
  703. /// - returns: A signal that sends values carried by `self` events.
  704. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  705. public func dematerialize() -> Signal<Value.Value, Value.Error> {
  706. return Signal<Value.Value, Value.Error> { observer in
  707. return self.observe { event in
  708. switch event {
  709. case let .Next(innerEvent):
  710. observer.action(innerEvent.event)
  711. case .Failed:
  712. fatalError("NoError is impossible to construct")
  713. case .Completed:
  714. observer.sendCompleted()
  715. case .Interrupted:
  716. observer.sendInterrupted()
  717. }
  718. }
  719. }
  720. }
  721. }
  722. extension SignalType {
  723. /// Inject side effects to be performed upon the specified signal events.
  724. ///
  725. /// - parameters:
  726. /// - event: A closure that accepts an event and is invoked on every
  727. /// received event.
  728. /// - failed: A closure that accepts error object and is invoked for
  729. /// `Failed` event.
  730. /// - completed: A closure that is invoked for `Completed` event.
  731. /// - interrupted: A closure that is invoked for `Interrupted` event.
  732. /// - terminated: A closure that is invoked for any terminating event.
  733. /// - disposed: A closure added as disposable when signal completes.
  734. /// - next: A closure that accepts a value from `Next` event.
  735. ///
  736. /// - returns: A signal with attached side-effects for given event cases.
  737. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  738. public func on(event event: (Event<Value, Error> -> Void)? = nil, failed: (Error -> Void)? = nil, completed: (() -> Void)? = nil, interrupted: (() -> Void)? = nil, terminated: (() -> Void)? = nil, disposed: (() -> Void)? = nil, next: (Value -> Void)? = nil) -> Signal<Value, Error> {
  739. return Signal { observer in
  740. let disposable = CompositeDisposable()
  741. _ = disposed.map(disposable.addDisposable)
  742. disposable += signal.observe { receivedEvent in
  743. event?(receivedEvent)
  744. switch receivedEvent {
  745. case let .Next(value):
  746. next?(value)
  747. case let .Failed(error):
  748. failed?(error)
  749. case .Completed:
  750. completed?()
  751. case .Interrupted:
  752. interrupted?()
  753. }
  754. if receivedEvent.isTerminating {
  755. terminated?()
  756. }
  757. observer.action(receivedEvent)
  758. }
  759. return disposable
  760. }
  761. }
  762. }
  763. private struct SampleState<Value> {
  764. var latestValue: Value? = nil
  765. var signalCompleted: Bool = false
  766. var samplerCompleted: Bool = false
  767. }
  768. extension SignalType {
  769. /// Forward the latest value from `self` with the value from `sampler` as a
  770. /// tuple, only when`sampler` sends a `Next` event.
  771. ///
  772. /// - note: If `sampler` fires before a value has been observed on `self`,
  773. /// nothing happens.
  774. ///
  775. /// - parameters:
  776. /// - sampler: A signal that will trigger the delivery of `Next` event
  777. /// from `self`.
  778. ///
  779. /// - returns: A signal that will send values from `self` and `sampler`,
  780. /// sampled (possibly multiple times) by `sampler`, then complete
  781. /// once both input signals have completed, or interrupt if
  782. /// either input signal is interrupted.
  783. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  784. public func sampleWith<T>(sampler: Signal<T, NoError>) -> Signal<(Value, T), Error> {
  785. return Signal { observer in
  786. let state = Atomic(SampleState<Value>())
  787. let disposable = CompositeDisposable()
  788. disposable += self.observe { event in
  789. switch event {
  790. case let .Next(value):
  791. state.modify { st in
  792. var st = st
  793. st.latestValue = value
  794. return st
  795. }
  796. case let .Failed(error):
  797. observer.sendFailed(error)
  798. case .Completed:
  799. let oldState = state.modify { st in
  800. var st = st
  801. st.signalCompleted = true
  802. return st
  803. }
  804. if oldState.samplerCompleted {
  805. observer.sendCompleted()
  806. }
  807. case .Interrupted:
  808. observer.sendInterrupted()
  809. }
  810. }
  811. disposable += sampler.observe { event in
  812. switch event {
  813. case .Next(let samplerValue):
  814. if let value = state.value.latestValue {
  815. observer.sendNext((value, samplerValue))
  816. }
  817. case .Completed:
  818. let oldState = state.modify { st in
  819. var st = st
  820. st.samplerCompleted = true
  821. return st
  822. }
  823. if oldState.signalCompleted {
  824. observer.sendCompleted()
  825. }
  826. case .Interrupted:
  827. observer.sendInterrupted()
  828. case .Failed:
  829. break
  830. }
  831. }
  832. return disposable
  833. }
  834. }
  835. /// Forward the latest value from `self` whenever `sampler` sends a `Next`
  836. /// event.
  837. ///
  838. /// - note: If `sampler` fires before a value has been observed on `self`,
  839. /// nothing happens.
  840. ///
  841. /// - parameters:
  842. /// - sampler: A signal that will trigger the delivery of `Next` event
  843. /// from `self`.
  844. ///
  845. /// - returns: A signal that will send values from `self`, sampled (possibly
  846. /// multiple times) by `sampler`, then complete once both input
  847. /// signals have completed, or interrupt if either input signal
  848. /// is interrupted.
  849. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  850. public func sampleOn(sampler: Signal<(), NoError>) -> Signal<Value, Error> {
  851. return sampleWith(sampler)
  852. .map { $0.0 }
  853. }
  854. /// Forward events from `self` until `trigger` sends a `Next` or
  855. /// `Completed` event, at which point the returned signal will complete.
  856. ///
  857. /// - parameters:
  858. /// - trigger: A signal whose `Next` or `Completed` events will stop the
  859. /// delivery of `Next` events from `self`.
  860. ///
  861. /// - returns: A signal that will deliver events until `trigger` sends
  862. /// `Next` or `Completed` events.
  863. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  864. public func takeUntil(trigger: Signal<(), NoError>) -> Signal<Value, Error> {
  865. return Signal { observer in
  866. let disposable = CompositeDisposable()
  867. disposable += self.observe(observer)
  868. disposable += trigger.observe { event in
  869. switch event {
  870. case .Next, .Completed:
  871. observer.sendCompleted()
  872. case .Failed, .Interrupted:
  873. break
  874. }
  875. }
  876. return disposable
  877. }
  878. }
  879. /// Do not forward any values from `self` until `trigger` sends a `Next` or
  880. /// `Completed` event, at which point the returned signal behaves exactly
  881. /// like `signal`.
  882. ///
  883. /// - parameters:
  884. /// - trigger: A signal whose `Next` or `Completed` events will start the
  885. /// deliver of events on `self`.
  886. ///
  887. /// - returns: A signal that will deliver events once the `trigger` sends
  888. /// `Next` or `Completed` events.
  889. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  890. public func skipUntil(trigger: Signal<(), NoError>) -> Signal<Value, Error> {
  891. return Signal { observer in
  892. let disposable = SerialDisposable()
  893. disposable.innerDisposable = trigger.observe { event in
  894. switch event {
  895. case .Next, .Completed:
  896. disposable.innerDisposable = self.observe(observer)
  897. case .Failed, .Interrupted:
  898. break
  899. }
  900. }
  901. return disposable
  902. }
  903. }
  904. /// Forward events from `self` with history: values of the returned signal
  905. /// are a tuples whose first member is the previous value and whose second member
  906. /// is the current value. `initial` is supplied as the first member when `self`
  907. /// sends its first value.
  908. ///
  909. /// - parameters:
  910. /// - initial: A value that will be combined with the first value sent by
  911. /// `self`.
  912. ///
  913. /// - returns: A signal that sends tuples that contain previous and current
  914. /// sent values of `self`.
  915. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  916. public func combinePrevious(initial: Value) -> Signal<(Value, Value), Error> {
  917. return scan((initial, initial)) { previousCombinedValues, newValue in
  918. return (previousCombinedValues.1, newValue)
  919. }
  920. }
  921. /// Send only the final value and then immediately completes.
  922. ///
  923. /// - parameters:
  924. /// - initial: Initial value for the accumulator.
  925. /// - combine: A closure that accepts accumulator and sent value of
  926. /// `self`.
  927. ///
  928. /// - returns: A signal that sends accumulated value after `self` completes.
  929. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  930. public func reduce<U>(initial: U, _ combine: (U, Value) -> U) -> Signal<U, Error> {
  931. // We need to handle the special case in which `signal` sends no values.
  932. // We'll do that by sending `initial` on the output signal (before
  933. // taking the last value).
  934. let (scannedSignalWithInitialValue, outputSignalObserver) = Signal<U, Error>.pipe()
  935. let outputSignal = scannedSignalWithInitialValue.takeLast(1)
  936. // Now that we've got takeLast() listening to the piped signal, send
  937. // that initial value.
  938. outputSignalObserver.sendNext(initial)
  939. // Pipe the scanned input signal into the output signal.
  940. scan(initial, combine).observe(outputSignalObserver)
  941. return outputSignal
  942. }
  943. /// Aggregate values into a single combined value. When `self` emits its
  944. /// first value, `combine` is invoked with `initial` as the first argument
  945. /// and that emitted value as the second argument. The result is emitted
  946. /// from the signal returned from `scan`. That result is then passed to
  947. /// `combine` as the first argument when the next value is emitted, and so
  948. /// on.
  949. ///
  950. /// - parameters:
  951. /// - initial: Initial value for the accumulator.
  952. /// - combine: A closure that accepts accumulator and sent value of
  953. /// `self`.
  954. ///
  955. /// - returns: A signal that sends accumulated value each time `self` emits
  956. /// own value.
  957. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  958. public func scan<U>(initial: U, _ combine: (U, Value) -> U) -> Signal<U, Error> {
  959. return Signal { observer in
  960. var accumulator = initial
  961. return self.observe { event in
  962. observer.action(event.map { value in
  963. accumulator = combine(accumulator, value)
  964. return accumulator
  965. })
  966. }
  967. }
  968. }
  969. }
  970. extension SignalType where Value: Equatable {
  971. /// Forward only those values from `self` which are not duplicates of the
  972. /// immedately preceding value.
  973. ///
  974. /// - note: The first value is always forwarded.
  975. ///
  976. /// - returns: A signal that does not send two equal values sequentially.
  977. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  978. public func skipRepeats() -> Signal<Value, Error> {
  979. return skipRepeats(==)
  980. }
  981. }
  982. extension SignalType {
  983. /// Forward only those values from `self` which do not pass `isRepeat` with
  984. /// respect to the previous value.
  985. ///
  986. /// - note: The first value is always forwarded.
  987. ///
  988. /// - parameters:
  989. /// - isRepeate: A closure that accepts previous and current values of
  990. /// `self` and returns `Bool` whether these values are
  991. /// repeating.
  992. ///
  993. /// - returns: A signal that forwards only those values that fail given
  994. /// `isRepeat` predicate.
  995. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  996. public func skipRepeats(isRepeat: (Value, Value) -> Bool) -> Signal<Value, Error> {
  997. return self
  998. .scan((nil, false)) { (accumulated: (Value?, Bool), next: Value) -> (value: Value?, repeated: Bool) in
  999. switch accumulated.0 {
  1000. case nil:
  1001. return (next, false)
  1002. case let prev? where isRepeat(prev, next):
  1003. return (prev, true)
  1004. case _?:
  1005. return (Optional(next), false)
  1006. }
  1007. }
  1008. .filter { !$0.repeated }
  1009. .map { $0.value }
  1010. .ignoreNil()
  1011. }
  1012. /// Do not forward any values from `self` until `predicate` returns false,
  1013. /// at which point the returned signal behaves exactly like `signal`.
  1014. ///
  1015. /// - parameters:
  1016. /// - predicate: A closure that accepts a value and returns whether `self`
  1017. /// should still not forward that value to a `signal`.
  1018. ///
  1019. /// - returns: A signal that sends only forwarded values from `self`.
  1020. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1021. public func skipWhile(predicate: Value -> Bool) -> Signal<Value, Error> {
  1022. return Signal { observer in
  1023. var shouldSkip = true
  1024. return self.observe { event in
  1025. switch event {
  1026. case let .Next(value):
  1027. shouldSkip = shouldSkip && predicate(value)
  1028. if !shouldSkip {
  1029. fallthrough
  1030. }
  1031. case .Failed, .Completed, .Interrupted:
  1032. observer.action(event)
  1033. }
  1034. }
  1035. }
  1036. }
  1037. /// Forward events from `self` until `replacement` begins sending events.
  1038. ///
  1039. /// - parameters:
  1040. /// - replacement: A signal to wait to wait for values from and start
  1041. /// sending them as a replacement to `self`'s values.
  1042. ///
  1043. /// - returns: A signal which passes through `Next`, `Failed`, and
  1044. /// `Interrupted` events from `self` until `replacement` sends
  1045. /// an event, at which point the returned signal will send that
  1046. /// event and switch to passing through events from `replacement`
  1047. /// instead, regardless of whether `self` has sent events
  1048. /// already.
  1049. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1050. public func takeUntilReplacement(replacement: Signal<Value, Error>) -> Signal<Value, Error> {
  1051. return Signal { observer in
  1052. let disposable = CompositeDisposable()
  1053. let signalDisposable = self.observe { event in
  1054. switch event {
  1055. case .Completed:
  1056. break
  1057. case .Next, .Failed, .Interrupted:
  1058. observer.action(event)
  1059. }
  1060. }
  1061. disposable += signalDisposable
  1062. disposable += replacement.observe { event in
  1063. signalDisposable?.dispose()
  1064. observer.action(event)
  1065. }
  1066. return disposable
  1067. }
  1068. }
  1069. /// Wait until `self` completes and then forward the final `count` values
  1070. /// on the returned signal.
  1071. ///
  1072. /// - parameters:
  1073. /// - count: Number of last events to send after `self` completes.
  1074. ///
  1075. /// - returns: A signal that receives up to `count` values from `self`
  1076. /// after `self` completes.
  1077. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1078. public func takeLast(count: Int) -> Signal<Value, Error> {
  1079. return Signal { observer in
  1080. var buffer: [Value] = []
  1081. buffer.reserveCapacity(count)
  1082. return self.observe { event in
  1083. switch event {
  1084. case let .Next(value):
  1085. // To avoid exceeding the reserved capacity of the buffer,
  1086. // we remove then add. Remove elements until we have room to
  1087. // add one more.
  1088. while (buffer.count + 1) > count {
  1089. buffer.removeAtIndex(0)
  1090. }
  1091. buffer.append(value)
  1092. case let .Failed(error):
  1093. observer.sendFailed(error)
  1094. case .Completed:
  1095. buffer.forEach(observer.sendNext)
  1096. observer.sendCompleted()
  1097. case .Interrupted:
  1098. observer.sendInterrupted()
  1099. }
  1100. }
  1101. }
  1102. }
  1103. /// Forward any values from `self` until `predicate` returns false, at which
  1104. /// point the returned signal will complete.
  1105. ///
  1106. /// - parameters:
  1107. /// - predicate: A closure that accepts value and returns `Bool` value
  1108. /// whether `self` should forward it to `signal` and continue
  1109. /// sending other events.
  1110. ///
  1111. /// - returns: A signal that sends events until the values sent by `self`
  1112. /// pass the given `predicate`.
  1113. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1114. public func takeWhile(predicate: Value -> Bool) -> Signal<Value, Error> {
  1115. return Signal { observer in
  1116. return self.observe { event in
  1117. if let value = event.value where !predicate(value) {
  1118. observer.sendCompleted()
  1119. } else {
  1120. observer.action(event)
  1121. }
  1122. }
  1123. }
  1124. }
  1125. }
  1126. private struct ZipState<Left, Right> {
  1127. var values: (left: [Left], right: [Right]) = ([], [])
  1128. var isCompleted: (left: Bool, right: Bool) = (false, false)
  1129. var isFinished: Bool {
  1130. return (isCompleted.left && values.left.isEmpty) || (isCompleted.right && values.right.isEmpty)
  1131. }
  1132. }
  1133. extension SignalType {
  1134. /// Zip elements of two signals into pairs. The elements of any Nth pair
  1135. /// are the Nth elements of the two input signals.
  1136. ///
  1137. /// - parameters:
  1138. /// - otherSignal: A signal to zip values with.
  1139. ///
  1140. /// - returns: A signal that sends tuples of `self` and `otherSignal`.
  1141. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1142. public func zipWith<U>(otherSignal: Signal<U, Error>) -> Signal<(Value, U), Error> {
  1143. return Signal { observer in
  1144. let state = Atomic(ZipState<Value, U>())
  1145. let disposable = CompositeDisposable()
  1146. let flush = {
  1147. var tuple: (Value, U)?
  1148. var isFinished = false
  1149. state.modify { state in
  1150. guard !state.values.left.isEmpty && !state.values.right.isEmpty else {
  1151. isFinished = state.isFinished
  1152. return state
  1153. }
  1154. var state = state
  1155. tuple = (state.values.left.removeFirst(), state.values.right.removeFirst())
  1156. isFinished = state.isFinished
  1157. return state
  1158. }
  1159. if let tuple = tuple {
  1160. observer.sendNext(tuple)
  1161. }
  1162. if isFinished {
  1163. observer.sendCompleted()
  1164. }
  1165. }
  1166. let onFailed = observer.sendFailed
  1167. let onInterrupted = observer.sendInterrupted
  1168. disposable += self.observe { event in
  1169. switch event {
  1170. case let .Next(value):
  1171. state.modify { state in
  1172. var state = state
  1173. state.values.left.append(value)
  1174. return state
  1175. }
  1176. flush()
  1177. case let .Failed(error):
  1178. onFailed(error)
  1179. case .Completed:
  1180. state.modify { state in
  1181. var state = state
  1182. state.isCompleted.left = true
  1183. return state
  1184. }
  1185. flush()
  1186. case .Interrupted:
  1187. onInterrupted()
  1188. }
  1189. }
  1190. disposable += otherSignal.observe { event in
  1191. switch event {
  1192. case let .Next(value):
  1193. state.modify { state in
  1194. var state = state
  1195. state.values.right.append(value)
  1196. return state
  1197. }
  1198. flush()
  1199. case let .Failed(error):
  1200. onFailed(error)
  1201. case .Completed:
  1202. state.modify { state in
  1203. var state = state
  1204. state.isCompleted.right = true
  1205. return state
  1206. }
  1207. flush()
  1208. case .Interrupted:
  1209. onInterrupted()
  1210. }
  1211. }
  1212. return disposable
  1213. }
  1214. }
  1215. /// Apply `operation` to values from `self` with `Success`ful results
  1216. /// forwarded on the returned signal and `Failure`s sent as `Failed` events.
  1217. ///
  1218. /// - parameters:
  1219. /// - operation: A closure that accepts a value and returns a `Result`.
  1220. ///
  1221. /// - returns: A signal that receives `Success`ful `Result` as `Next` event
  1222. /// and `Failure` as `Failed` event.
  1223. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1224. public func attempt(operation: Value -> Result<(), Error>) -> Signal<Value, Error> {
  1225. return attemptMap { value in
  1226. return operation(value).map {
  1227. return value
  1228. }
  1229. }
  1230. }
  1231. /// Apply `operation` to values from `self` with `Success`ful results mapped
  1232. /// on the returned signal and `Failure`s sent as `Failed` events.
  1233. ///
  1234. /// - parameters:
  1235. /// - operation: A closure that accepts a value and returns a result of
  1236. /// a mapped value as `Success`.
  1237. ///
  1238. /// - returns: A signal that sends mapped values from `self` if returned
  1239. /// `Result` is `Success`ful, `Failed` events otherwise.
  1240. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1241. public func attemptMap<U>(operation: Value -> Result<U, Error>) -> Signal<U, Error> {
  1242. return Signal { observer in
  1243. self.observe { event in
  1244. switch event {
  1245. case let .Next(value):
  1246. operation(value).analysis(
  1247. ifSuccess: observer.sendNext,
  1248. ifFailure: observer.sendFailed
  1249. )
  1250. case let .Failed(error):
  1251. observer.sendFailed(error)
  1252. case .Completed:
  1253. observer.sendCompleted()
  1254. case .Interrupted:
  1255. observer.sendInterrupted()
  1256. }
  1257. }
  1258. }
  1259. }
  1260. /// Throttle values sent by the receiver, so that at least `interval`
  1261. /// seconds pass between each, then forwards them on the given scheduler.
  1262. ///
  1263. /// - note: If multiple values are received before the interval has elapsed,
  1264. /// the latest value is the one that will be passed on.
  1265. ///
  1266. /// - note: If the input signal terminates while a value is being throttled,
  1267. /// that value will be discarded and the returned signal will
  1268. /// terminate immediately.
  1269. ///
  1270. /// - parameters:
  1271. /// - interval: Number of seconds to wait between sent values.
  1272. /// - scheduler: A scheduler to deliver events on.
  1273. ///
  1274. /// - returns: A signal that sends values at least `interval` seconds
  1275. /// appart on a given scheduler.
  1276. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1277. public func throttle(interval: NSTimeInterval, onScheduler scheduler: DateSchedulerType) -> Signal<Value, Error> {
  1278. precondition(interval >= 0)
  1279. return Signal { observer in
  1280. let state: Atomic<ThrottleState<Value>> = Atomic(ThrottleState())
  1281. let schedulerDisposable = SerialDisposable()
  1282. let disposable = CompositeDisposable()
  1283. disposable.addDisposable(schedulerDisposable)
  1284. disposable += self.observe { event in
  1285. guard let value = event.value else {
  1286. schedulerDisposable.innerDisposable = scheduler.schedule {
  1287. observer.action(event)
  1288. }
  1289. return
  1290. }
  1291. var scheduleDate: NSDate!
  1292. state.modify { state in
  1293. var state = state
  1294. state.pendingValue = value
  1295. let proposedScheduleDate = state.previousDate?.dateByAddingTimeInterval(interval) ?? scheduler.currentDate
  1296. scheduleDate = proposedScheduleDate.laterDate(scheduler.currentDate)
  1297. return state
  1298. }
  1299. schedulerDisposable.innerDisposable = scheduler.scheduleAfter(scheduleDate) {
  1300. let previousState = state.modify { state in
  1301. var state = state
  1302. if state.pendingValue != nil {
  1303. state.pendingValue = nil
  1304. state.previousDate = scheduleDate
  1305. }
  1306. return state
  1307. }
  1308. if let pendingValue = previousState.pendingValue {
  1309. observer.sendNext(pendingValue)
  1310. }
  1311. }
  1312. }
  1313. return disposable
  1314. }
  1315. }
  1316. /// Debounce values sent by the receiver, such that at least `interval`
  1317. /// seconds pass after the receiver has last sent a value, then forward the
  1318. /// latest value on the given scheduler.
  1319. ///
  1320. /// - note: If multiple values are received before the interval has elapsed,
  1321. /// the latest value is the one that will be passed on.
  1322. ///
  1323. /// - note: If the input signal terminates while a value is being debounced,
  1324. /// that value will be discarded and the returned signal will
  1325. /// terminate immediately.
  1326. ///
  1327. /// - parameters:
  1328. /// - interval: A number of seconds to wait before sending a value.
  1329. /// - scheduler: A scheduler to send values on.
  1330. ///
  1331. /// - returns: A signal that sends values that are sent from `self` at least
  1332. /// `interval` seconds apart.
  1333. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1334. public func debounce(interval: NSTimeInterval, onScheduler scheduler: DateSchedulerType) -> Signal<Value, Error> {
  1335. precondition(interval >= 0)
  1336. return self
  1337. .materialize()
  1338. .flatMap(.Latest) { event -> SignalProducer<Event<Value, Error>, NoError> in
  1339. if event.isTerminating {
  1340. return SignalProducer(value: event).observeOn(scheduler)
  1341. } else {
  1342. return SignalProducer(value: event).delay(interval, onScheduler: scheduler)
  1343. }
  1344. }
  1345. .dematerialize()
  1346. }
  1347. }
  1348. extension SignalType {
  1349. /// Forward only those values from `self` that have unique identities across
  1350. /// the set of all values that have been seen.
  1351. ///
  1352. /// - note: This causes the identities to be retained to check for
  1353. /// uniqueness.
  1354. ///
  1355. /// - parameters:
  1356. /// - transform: A closure that accepts a value and returns identity
  1357. /// value.
  1358. ///
  1359. /// - returns: A signal that sends unique values during its lifetime.
  1360. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1361. public func uniqueValues<Identity: Hashable>(transform: Value -> Identity) -> Signal<Value, Error> {
  1362. return Signal { observer in
  1363. var seenValues: Set<Identity> = []
  1364. return self
  1365. .observe { event in
  1366. switch event {
  1367. case let .Next(value):
  1368. let identity = transform(value)
  1369. if !seenValues.contains(identity) {
  1370. seenValues.insert(identity)
  1371. fallthrough
  1372. }
  1373. case .Failed, .Completed, .Interrupted:
  1374. observer.action(event)
  1375. }
  1376. }
  1377. }
  1378. }
  1379. }
  1380. extension SignalType where Value: Hashable {
  1381. /// Forward only those values from `self` that are unique across the set of
  1382. /// all values that have been seen.
  1383. ///
  1384. /// - note: This causes the values to be retained to check for uniqueness.
  1385. /// Providing a function that returns a unique value for each sent
  1386. /// value can help you reduce the memory footprint.
  1387. ///
  1388. /// - returns: A signal that sends unique values during its lifetime.
  1389. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1390. public func uniqueValues() -> Signal<Value, Error> {
  1391. return uniqueValues { $0 }
  1392. }
  1393. }
  1394. private struct ThrottleState<Value> {
  1395. var previousDate: NSDate? = nil
  1396. var pendingValue: Value? = nil
  1397. }
  1398. /// Combine the values of all the given signals, in the manner described by
  1399. /// `combineLatestWith`.
  1400. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1401. public func combineLatest<A, B, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>) -> Signal<(A, B), Error> {
  1402. return a.combineLatestWith(b)
  1403. }
  1404. /// Combines the values of all the given signals, in the manner described by
  1405. /// `combineLatestWith`.
  1406. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1407. public func combineLatest<A, B, C, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>) -> Signal<(A, B, C), Error> {
  1408. return combineLatest(a, b)
  1409. .combineLatestWith(c)
  1410. .map(repack)
  1411. }
  1412. /// Combines the values of all the given signals, in the manner described by
  1413. /// `combineLatestWith`.
  1414. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1415. public func combineLatest<A, B, C, D, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>) -> Signal<(A, B, C, D), Error> {
  1416. return combineLatest(a, b, c)
  1417. .combineLatestWith(d)
  1418. .map(repack)
  1419. }
  1420. /// Combines the values of all the given signals, in the manner described by
  1421. /// `combineLatestWith`.
  1422. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1423. public func combineLatest<A, B, C, D, E, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>) -> Signal<(A, B, C, D, E), Error> {
  1424. return combineLatest(a, b, c, d)
  1425. .combineLatestWith(e)
  1426. .map(repack)
  1427. }
  1428. /// Combines the values of all the given signals, in the manner described by
  1429. /// `combineLatestWith`.
  1430. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1431. public func combineLatest<A, B, C, D, E, F, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>) -> Signal<(A, B, C, D, E, F), Error> {
  1432. return combineLatest(a, b, c, d, e)
  1433. .combineLatestWith(f)
  1434. .map(repack)
  1435. }
  1436. /// Combines the values of all the given signals, in the manner described by
  1437. /// `combineLatestWith`.
  1438. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1439. public func combineLatest<A, B, C, D, E, F, G, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>) -> Signal<(A, B, C, D, E, F, G), Error> {
  1440. return combineLatest(a, b, c, d, e, f)
  1441. .combineLatestWith(g)
  1442. .map(repack)
  1443. }
  1444. /// Combines the values of all the given signals, in the manner described by
  1445. /// `combineLatestWith`.
  1446. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1447. public func combineLatest<A, B, C, D, E, F, G, H, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>) -> Signal<(A, B, C, D, E, F, G, H), Error> {
  1448. return combineLatest(a, b, c, d, e, f, g)
  1449. .combineLatestWith(h)
  1450. .map(repack)
  1451. }
  1452. /// Combines the values of all the given signals, in the manner described by
  1453. /// `combineLatestWith`.
  1454. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1455. public func combineLatest<A, B, C, D, E, F, G, H, I, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>, _ i: Signal<I, Error>) -> Signal<(A, B, C, D, E, F, G, H, I), Error> {
  1456. return combineLatest(a, b, c, d, e, f, g, h)
  1457. .combineLatestWith(i)
  1458. .map(repack)
  1459. }
  1460. /// Combines the values of all the given signals, in the manner described by
  1461. /// `combineLatestWith`.
  1462. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1463. public func combineLatest<A, B, C, D, E, F, G, H, I, J, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>, _ i: Signal<I, Error>, _ j: Signal<J, Error>) -> Signal<(A, B, C, D, E, F, G, H, I, J), Error> {
  1464. return combineLatest(a, b, c, d, e, f, g, h, i)
  1465. .combineLatestWith(j)
  1466. .map(repack)
  1467. }
  1468. /// Combines the values of all the given signals, in the manner described by
  1469. /// `combineLatestWith`. No events will be sent if the sequence is empty.
  1470. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1471. public func combineLatest<S: SequenceType, Value, Error where S.Generator.Element == Signal<Value, Error>>(signals: S) -> Signal<[Value], Error> {
  1472. var generator = signals.generate()
  1473. if let first = generator.next() {
  1474. let initial = first.map { [$0] }
  1475. return GeneratorSequence(generator).reduce(initial) { signal, next in
  1476. signal.combineLatestWith(next).map { $0.0 + [$0.1] }
  1477. }
  1478. }
  1479. return .never
  1480. }
  1481. /// Zips the values of all the given signals, in the manner described by
  1482. /// `zipWith`.
  1483. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1484. public func zip<A, B, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>) -> Signal<(A, B), Error> {
  1485. return a.zipWith(b)
  1486. }
  1487. /// Zips the values of all the given signals, in the manner described by
  1488. /// `zipWith`.
  1489. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1490. public func zip<A, B, C, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>) -> Signal<(A, B, C), Error> {
  1491. return zip(a, b)
  1492. .zipWith(c)
  1493. .map(repack)
  1494. }
  1495. /// Zips the values of all the given signals, in the manner described by
  1496. /// `zipWith`.
  1497. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1498. public func zip<A, B, C, D, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>) -> Signal<(A, B, C, D), Error> {
  1499. return zip(a, b, c)
  1500. .zipWith(d)
  1501. .map(repack)
  1502. }
  1503. /// Zips the values of all the given signals, in the manner described by
  1504. /// `zipWith`.
  1505. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1506. public func zip<A, B, C, D, E, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>) -> Signal<(A, B, C, D, E), Error> {
  1507. return zip(a, b, c, d)
  1508. .zipWith(e)
  1509. .map(repack)
  1510. }
  1511. /// Zips the values of all the given signals, in the manner described by
  1512. /// `zipWith`.
  1513. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1514. public func zip<A, B, C, D, E, F, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>) -> Signal<(A, B, C, D, E, F), Error> {
  1515. return zip(a, b, c, d, e)
  1516. .zipWith(f)
  1517. .map(repack)
  1518. }
  1519. /// Zips the values of all the given signals, in the manner described by
  1520. /// `zipWith`.
  1521. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1522. public func zip<A, B, C, D, E, F, G, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>) -> Signal<(A, B, C, D, E, F, G), Error> {
  1523. return zip(a, b, c, d, e, f)
  1524. .zipWith(g)
  1525. .map(repack)
  1526. }
  1527. /// Zips the values of all the given signals, in the manner described by
  1528. /// `zipWith`.
  1529. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1530. public func zip<A, B, C, D, E, F, G, H, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>) -> Signal<(A, B, C, D, E, F, G, H), Error> {
  1531. return zip(a, b, c, d, e, f, g)
  1532. .zipWith(h)
  1533. .map(repack)
  1534. }
  1535. /// Zips the values of all the given signals, in the manner described by
  1536. /// `zipWith`.
  1537. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1538. public func zip<A, B, C, D, E, F, G, H, I, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>, _ i: Signal<I, Error>) -> Signal<(A, B, C, D, E, F, G, H, I), Error> {
  1539. return zip(a, b, c, d, e, f, g, h)
  1540. .zipWith(i)
  1541. .map(repack)
  1542. }
  1543. /// Zips the values of all the given signals, in the manner described by
  1544. /// `zipWith`.
  1545. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1546. public func zip<A, B, C, D, E, F, G, H, I, J, Error>(a: Signal<A, Error>, _ b: Signal<B, Error>, _ c: Signal<C, Error>, _ d: Signal<D, Error>, _ e: Signal<E, Error>, _ f: Signal<F, Error>, _ g: Signal<G, Error>, _ h: Signal<H, Error>, _ i: Signal<I, Error>, _ j: Signal<J, Error>) -> Signal<(A, B, C, D, E, F, G, H, I, J), Error> {
  1547. return zip(a, b, c, d, e, f, g, h, i)
  1548. .zipWith(j)
  1549. .map(repack)
  1550. }
  1551. /// Zips the values of all the given signals, in the manner described by
  1552. /// `zipWith`. No events will be sent if the sequence is empty.
  1553. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1554. public func zip<S: SequenceType, Value, Error where S.Generator.Element == Signal<Value, Error>>(signals: S) -> Signal<[Value], Error> {
  1555. var generator = signals.generate()
  1556. if let first = generator.next() {
  1557. let initial = first.map { [$0] }
  1558. return GeneratorSequence(generator).reduce(initial) { signal, next in
  1559. signal.zipWith(next).map { $0.0 + [$0.1] }
  1560. }
  1561. }
  1562. return .never
  1563. }
  1564. extension SignalType {
  1565. /// Forward events from `self` until `interval`. Then if signal isn't
  1566. /// completed yet, fails with `error` on `scheduler`.
  1567. ///
  1568. /// - note: If the interval is 0, the timeout will be scheduled immediately.
  1569. /// The signal must complete synchronously (or on a faster
  1570. /// scheduler) to avoid the timeout.
  1571. ///
  1572. /// - parameters:
  1573. /// - error: Error to send with `Failed` event if `self` is not completed
  1574. /// when `interval` passes.
  1575. /// - interval: Number of seconds to wait for `self` to complete.
  1576. /// - scheudler: A scheduler to deliver error on.
  1577. ///
  1578. /// - returns: A signal that sends events for at most `interval` seconds,
  1579. /// then, if not `Completed` - sends `error` with `Failed` event
  1580. /// on `scheduler`.
  1581. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1582. public func timeoutWithError(error: Error, afterInterval interval: NSTimeInterval, onScheduler scheduler: DateSchedulerType) -> Signal<Value, Error> {
  1583. precondition(interval >= 0)
  1584. return Signal { observer in
  1585. let disposable = CompositeDisposable()
  1586. let date = scheduler.currentDate.dateByAddingTimeInterval(interval)
  1587. disposable += scheduler.scheduleAfter(date) {
  1588. observer.sendFailed(error)
  1589. }
  1590. disposable += self.observe(observer)
  1591. return disposable
  1592. }
  1593. }
  1594. }
  1595. extension SignalType where Error == NoError {
  1596. /// Promote a signal that does not generate failures into one that can.
  1597. ///
  1598. /// - note: This does not actually cause failures to be generated for the
  1599. /// given signal, but makes it easier to combine with other signals
  1600. /// that may fail; for example, with operators like
  1601. /// `combineLatestWith`, `zipWith`, `flatten`, etc.
  1602. ///
  1603. /// - parameters:
  1604. /// - _ An `ErrorType`.
  1605. ///
  1606. /// - returns: A signal that has an instantiatable `ErrorType`.
  1607. @warn_unused_result(message="Did you forget to call `observe` on the signal?")
  1608. public func promoteErrors<F: ErrorType>(_: F.Type) -> Signal<Value, F> {
  1609. return Signal { observer in
  1610. return self.observe { event in
  1611. switch event {
  1612. case let .Next(value):
  1613. observer.sendNext(value)
  1614. case .Failed:
  1615. fatalError("NoError is impossible to construct")
  1616. case .Completed:
  1617. observer.sendCompleted()
  1618. case .Interrupted:
  1619. observer.sendInterrupted()
  1620. }
  1621. }
  1622. }
  1623. }
  1624. }