SignalProducerSpec.swift 60 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297
  1. //
  2. // SignalProducerSpec.swift
  3. // ReactiveCocoa
  4. //
  5. // Created by Justin Spahr-Summers on 2015-01-23.
  6. // Copyright (c) 2015 GitHub. All rights reserved.
  7. //
  8. import Foundation
  9. import Result
  10. import Nimble
  11. import Quick
  12. import ReactiveCocoa
  13. class SignalProducerSpec: QuickSpec {
  14. override func spec() {
  15. describe("init") {
  16. it("should run the handler once per start()") {
  17. var handlerCalledTimes = 0
  18. let signalProducer = SignalProducer<String, NSError>() { observer, disposable in
  19. handlerCalledTimes += 1
  20. return
  21. }
  22. signalProducer.start()
  23. signalProducer.start()
  24. expect(handlerCalledTimes) == 2
  25. }
  26. it("should release signal observers when given disposable is disposed") {
  27. var disposable: Disposable!
  28. let producer = SignalProducer<Int, NoError> { observer, innerDisposable in
  29. disposable = innerDisposable
  30. innerDisposable.addDisposable {
  31. // This is necessary to keep the observer long enough to
  32. // even test the memory management.
  33. observer.sendNext(0)
  34. }
  35. }
  36. weak var objectRetainedByObserver: NSObject?
  37. producer.startWithSignal { signal, _ in
  38. let object = NSObject()
  39. objectRetainedByObserver = object
  40. signal.observeNext { _ in object }
  41. }
  42. expect(objectRetainedByObserver).toNot(beNil())
  43. disposable.dispose()
  44. expect(objectRetainedByObserver).to(beNil())
  45. }
  46. it("should dispose of added disposables upon completion") {
  47. let addedDisposable = SimpleDisposable()
  48. var observer: Signal<(), NoError>.Observer!
  49. let producer = SignalProducer<(), NoError>() { incomingObserver, disposable in
  50. disposable.addDisposable(addedDisposable)
  51. observer = incomingObserver
  52. }
  53. producer.start()
  54. expect(addedDisposable.disposed) == false
  55. observer.sendCompleted()
  56. expect(addedDisposable.disposed) == true
  57. }
  58. it("should dispose of added disposables upon error") {
  59. let addedDisposable = SimpleDisposable()
  60. var observer: Signal<(), TestError>.Observer!
  61. let producer = SignalProducer<(), TestError>() { incomingObserver, disposable in
  62. disposable.addDisposable(addedDisposable)
  63. observer = incomingObserver
  64. }
  65. producer.start()
  66. expect(addedDisposable.disposed) == false
  67. observer.sendFailed(.Default)
  68. expect(addedDisposable.disposed) == true
  69. }
  70. it("should dispose of added disposables upon interruption") {
  71. let addedDisposable = SimpleDisposable()
  72. var observer: Signal<(), NoError>.Observer!
  73. let producer = SignalProducer<(), NoError>() { incomingObserver, disposable in
  74. disposable.addDisposable(addedDisposable)
  75. observer = incomingObserver
  76. }
  77. producer.start()
  78. expect(addedDisposable.disposed) == false
  79. observer.sendInterrupted()
  80. expect(addedDisposable.disposed) == true
  81. }
  82. it("should dispose of added disposables upon start() disposal") {
  83. let addedDisposable = SimpleDisposable()
  84. let producer = SignalProducer<(), TestError>() { _, disposable in
  85. disposable.addDisposable(addedDisposable)
  86. return
  87. }
  88. let startDisposable = producer.start()
  89. expect(addedDisposable.disposed) == false
  90. startDisposable.dispose()
  91. expect(addedDisposable.disposed) == true
  92. }
  93. }
  94. describe("init(signal:)") {
  95. var signal: Signal<Int, TestError>!
  96. var observer: Signal<Int, TestError>.Observer!
  97. beforeEach {
  98. // Cannot directly assign due to compiler crash on Xcode 7.0.1
  99. let (signalTemp, observerTemp) = Signal<Int, TestError>.pipe()
  100. signal = signalTemp
  101. observer = observerTemp
  102. }
  103. it("should emit values then complete") {
  104. let producer = SignalProducer<Int, TestError>(signal: signal)
  105. var values: [Int] = []
  106. var error: TestError?
  107. var completed = false
  108. producer.start { event in
  109. switch event {
  110. case let .Next(value):
  111. values.append(value)
  112. case let .Failed(err):
  113. error = err
  114. case .Completed:
  115. completed = true
  116. default:
  117. break
  118. }
  119. }
  120. expect(values) == []
  121. expect(error).to(beNil())
  122. expect(completed) == false
  123. observer.sendNext(1)
  124. expect(values) == [ 1 ]
  125. observer.sendNext(2)
  126. observer.sendNext(3)
  127. expect(values) == [ 1, 2, 3 ]
  128. observer.sendCompleted()
  129. expect(completed) == true
  130. }
  131. it("should emit error") {
  132. let producer = SignalProducer<Int, TestError>(signal: signal)
  133. var error: TestError?
  134. let sentError = TestError.Default
  135. producer.start { event in
  136. switch event {
  137. case let .Failed(err):
  138. error = err
  139. default:
  140. break
  141. }
  142. }
  143. expect(error).to(beNil())
  144. observer.sendFailed(sentError)
  145. expect(error) == sentError
  146. }
  147. }
  148. describe("init(value:)") {
  149. it("should immediately send the value then complete") {
  150. let producerValue = "StringValue"
  151. let signalProducer = SignalProducer<String, NSError>(value: producerValue)
  152. expect(signalProducer).to(sendValue(producerValue, sendError: nil, complete: true))
  153. }
  154. }
  155. describe("init(error:)") {
  156. it("should immediately send the error") {
  157. let producerError = NSError(domain: "com.reactivecocoa.errordomain", code: 4815, userInfo: nil)
  158. let signalProducer = SignalProducer<Int, NSError>(error: producerError)
  159. expect(signalProducer).to(sendValue(nil, sendError: producerError, complete: false))
  160. }
  161. }
  162. describe("init(result:)") {
  163. it("should immediately send the value then complete") {
  164. let producerValue = "StringValue"
  165. let producerResult = .Success(producerValue) as Result<String, NSError>
  166. let signalProducer = SignalProducer(result: producerResult)
  167. expect(signalProducer).to(sendValue(producerValue, sendError: nil, complete: true))
  168. }
  169. it("should immediately send the error") {
  170. let producerError = NSError(domain: "com.reactivecocoa.errordomain", code: 4815, userInfo: nil)
  171. let producerResult = .Failure(producerError) as Result<String, NSError>
  172. let signalProducer = SignalProducer(result: producerResult)
  173. expect(signalProducer).to(sendValue(nil, sendError: producerError, complete: false))
  174. }
  175. }
  176. describe("init(values:)") {
  177. it("should immediately send the sequence of values") {
  178. let sequenceValues = [1, 2, 3]
  179. let signalProducer = SignalProducer<Int, NSError>(values: sequenceValues)
  180. expect(signalProducer).to(sendValues(sequenceValues, sendError: nil, complete: true))
  181. }
  182. }
  183. describe("SignalProducer.empty") {
  184. it("should immediately complete") {
  185. let signalProducer = SignalProducer<Int, NSError>.empty
  186. expect(signalProducer).to(sendValue(nil, sendError: nil, complete: true))
  187. }
  188. }
  189. describe("SignalProducer.never") {
  190. it("should not send any events") {
  191. let signalProducer = SignalProducer<Int, NSError>.never
  192. expect(signalProducer).to(sendValue(nil, sendError: nil, complete: false))
  193. }
  194. }
  195. describe("SignalProducer.buffer") {
  196. it("should replay buffered events when started, then forward events as added") {
  197. let (producer, observer) = SignalProducer<Int, NSError>.buffer(Int.max)
  198. observer.sendNext(1)
  199. observer.sendNext(2)
  200. observer.sendNext(3)
  201. var values: [Int] = []
  202. var completed = false
  203. producer.start { event in
  204. switch event {
  205. case let .Next(value):
  206. values.append(value)
  207. case .Completed:
  208. completed = true
  209. default:
  210. break
  211. }
  212. }
  213. expect(values) == [1, 2, 3]
  214. expect(completed) == false
  215. observer.sendNext(4)
  216. observer.sendNext(5)
  217. expect(values) == [1, 2, 3, 4, 5]
  218. expect(completed) == false
  219. observer.sendCompleted()
  220. expect(values) == [1, 2, 3, 4, 5]
  221. expect(completed) == true
  222. }
  223. it("should drop earliest events to maintain the capacity") {
  224. let (producer, observer) = SignalProducer<Int, TestError>.buffer(1)
  225. observer.sendNext(1)
  226. observer.sendNext(2)
  227. var values: [Int] = []
  228. var error: TestError?
  229. producer.start { event in
  230. switch event {
  231. case let .Next(value):
  232. values.append(value)
  233. case let .Failed(err):
  234. error = err
  235. default:
  236. break
  237. }
  238. }
  239. expect(values) == [2]
  240. expect(error).to(beNil())
  241. observer.sendNext(3)
  242. observer.sendNext(4)
  243. expect(values) == [2, 3, 4]
  244. expect(error).to(beNil())
  245. observer.sendFailed(.Default)
  246. expect(values) == [2, 3, 4]
  247. expect(error) == TestError.Default
  248. }
  249. it("should always replay termination event") {
  250. let (producer, observer) = SignalProducer<Int, TestError>.buffer(0)
  251. var completed = false
  252. observer.sendCompleted()
  253. producer.startWithCompleted {
  254. completed = true
  255. }
  256. expect(completed) == true
  257. }
  258. it("should replay values after being terminated") {
  259. let (producer, observer) = SignalProducer<Int, TestError>.buffer(1)
  260. var value: Int?
  261. var completed = false
  262. observer.sendNext(123)
  263. observer.sendCompleted()
  264. producer.start { event in
  265. switch event {
  266. case let .Next(val):
  267. value = val
  268. case .Completed:
  269. completed = true
  270. default:
  271. break
  272. }
  273. }
  274. expect(value) == 123
  275. expect(completed) == true
  276. }
  277. it("should not deadlock when started while sending") {
  278. let (producer, observer) = SignalProducer<Int, NoError>.buffer(Int.max)
  279. observer.sendNext(1)
  280. observer.sendNext(2)
  281. observer.sendNext(3)
  282. var values: [Int] = []
  283. producer.startWithCompleted {
  284. values = []
  285. producer.startWithNext { value in
  286. values.append(value)
  287. }
  288. }
  289. observer.sendCompleted()
  290. expect(values) == [ 1, 2, 3 ]
  291. }
  292. it("should not deadlock in pair when started while sending") {
  293. let (producer1, observer1) = SignalProducer<String, NoError>.buffer(Int.max)
  294. let (producer2, observer2) = SignalProducer<String, NoError>.buffer(Int.max)
  295. observer1.sendNext("A")
  296. observer1.sendNext("B")
  297. observer2.sendNext("1")
  298. observer2.sendNext("2")
  299. var valuePairs: [String] = []
  300. producer1.startWithCompleted {
  301. producer2.startWithCompleted {
  302. valuePairs = []
  303. producer1.startWithNext { value1 in
  304. producer2.startWithNext { value2 in
  305. valuePairs.append(value1 + value2)
  306. }
  307. }
  308. }
  309. }
  310. observer1.sendCompleted()
  311. observer2.sendCompleted()
  312. expect(valuePairs) == [ "A1", "A2", "B1", "B2" ]
  313. }
  314. it("should buffer values before sending recursively to new observers") {
  315. let (producer, observer) = SignalProducer<Int, NoError>.buffer(Int.max)
  316. var values: [Int] = []
  317. var lastBufferedValues: [Int] = []
  318. producer.startWithNext { newValue in
  319. values.append(newValue)
  320. var bufferedValues: [Int] = []
  321. producer.startWithNext { bufferedValue in
  322. bufferedValues.append(bufferedValue)
  323. }
  324. expect(bufferedValues) == values
  325. lastBufferedValues = bufferedValues
  326. }
  327. observer.sendNext(1)
  328. expect(values) == [ 1 ]
  329. expect(lastBufferedValues) == values
  330. observer.sendNext(2)
  331. expect(values) == [ 1, 2 ]
  332. expect(lastBufferedValues) == values
  333. observer.sendNext(3)
  334. expect(values) == [ 1, 2, 3 ]
  335. expect(lastBufferedValues) == values
  336. }
  337. }
  338. describe("trailing closure") {
  339. it("receives next values") {
  340. let (producer, observer) = SignalProducer<Int, NoError>.pipe()
  341. var values = [Int]()
  342. producer.startWithNext { next in
  343. values.append(next)
  344. }
  345. observer.sendNext(1)
  346. expect(values) == [1]
  347. }
  348. }
  349. describe("SignalProducer.attempt") {
  350. it("should run the operation once per start()") {
  351. var operationRunTimes = 0
  352. let operation: () -> Result<String, NSError> = {
  353. operationRunTimes += 1
  354. return .Success("OperationValue")
  355. }
  356. SignalProducer.attempt(operation).start()
  357. SignalProducer.attempt(operation).start()
  358. expect(operationRunTimes) == 2
  359. }
  360. it("should send the value then complete") {
  361. let operationReturnValue = "OperationValue"
  362. let operation: () -> Result<String, NSError> = {
  363. return .Success(operationReturnValue)
  364. }
  365. let signalProducer = SignalProducer.attempt(operation)
  366. expect(signalProducer).to(sendValue(operationReturnValue, sendError: nil, complete: true))
  367. }
  368. it("should send the error") {
  369. let operationError = NSError(domain: "com.reactivecocoa.errordomain", code: 4815, userInfo: nil)
  370. let operation: () -> Result<String, NSError> = {
  371. return .Failure(operationError)
  372. }
  373. let signalProducer = SignalProducer.attempt(operation)
  374. expect(signalProducer).to(sendValue(nil, sendError: operationError, complete: false))
  375. }
  376. }
  377. describe("startWithSignal") {
  378. it("should invoke the closure before any effects or events") {
  379. var started = false
  380. var value: Int?
  381. SignalProducer<Int, NoError>(value: 42)
  382. .on(started: {
  383. started = true
  384. }, next: {
  385. value = $0
  386. })
  387. .startWithSignal { _ in
  388. expect(started) == false
  389. expect(value).to(beNil())
  390. }
  391. expect(started) == true
  392. expect(value) == 42
  393. }
  394. it("should dispose of added disposables if disposed") {
  395. let addedDisposable = SimpleDisposable()
  396. var disposable: Disposable!
  397. let producer = SignalProducer<Int, NoError>() { _, disposable in
  398. disposable.addDisposable(addedDisposable)
  399. return
  400. }
  401. producer.startWithSignal { _, innerDisposable in
  402. disposable = innerDisposable
  403. }
  404. expect(addedDisposable.disposed) == false
  405. disposable.dispose()
  406. expect(addedDisposable.disposed) == true
  407. }
  408. it("should send interrupted if disposed") {
  409. var interrupted = false
  410. var disposable: Disposable!
  411. SignalProducer<Int, NoError>(value: 42)
  412. .startOn(TestScheduler())
  413. .startWithSignal { signal, innerDisposable in
  414. signal.observeInterrupted {
  415. interrupted = true
  416. }
  417. disposable = innerDisposable
  418. }
  419. expect(interrupted) == false
  420. disposable.dispose()
  421. expect(interrupted) == true
  422. }
  423. it("should release signal observers if disposed") {
  424. weak var objectRetainedByObserver: NSObject?
  425. var disposable: Disposable!
  426. let producer = SignalProducer<Int, NoError>.never
  427. producer.startWithSignal { signal, innerDisposable in
  428. let object = NSObject()
  429. objectRetainedByObserver = object
  430. signal.observeNext { _ in object.description }
  431. disposable = innerDisposable
  432. }
  433. expect(objectRetainedByObserver).toNot(beNil())
  434. disposable.dispose()
  435. expect(objectRetainedByObserver).to(beNil())
  436. }
  437. it("should not trigger effects if disposed before closure return") {
  438. var started = false
  439. var value: Int?
  440. SignalProducer<Int, NoError>(value: 42)
  441. .on(started: {
  442. started = true
  443. }, next: {
  444. value = $0
  445. })
  446. .startWithSignal { _, disposable in
  447. expect(started) == false
  448. expect(value).to(beNil())
  449. disposable.dispose()
  450. }
  451. expect(started) == false
  452. expect(value).to(beNil())
  453. }
  454. it("should send interrupted if disposed before closure return") {
  455. var interrupted = false
  456. SignalProducer<Int, NoError>(value: 42)
  457. .startWithSignal { signal, disposable in
  458. expect(interrupted) == false
  459. signal.observeInterrupted {
  460. interrupted = true
  461. }
  462. disposable.dispose()
  463. }
  464. expect(interrupted) == true
  465. }
  466. it("should dispose of added disposables upon completion") {
  467. let addedDisposable = SimpleDisposable()
  468. var observer: Signal<Int, TestError>.Observer!
  469. let producer = SignalProducer<Int, TestError>() { incomingObserver, disposable in
  470. disposable.addDisposable(addedDisposable)
  471. observer = incomingObserver
  472. }
  473. producer.startWithSignal { _ in }
  474. expect(addedDisposable.disposed) == false
  475. observer.sendCompleted()
  476. expect(addedDisposable.disposed) == true
  477. }
  478. it("should dispose of added disposables upon error") {
  479. let addedDisposable = SimpleDisposable()
  480. var observer: Signal<Int, TestError>.Observer!
  481. let producer = SignalProducer<Int, TestError>() { incomingObserver, disposable in
  482. disposable.addDisposable(addedDisposable)
  483. observer = incomingObserver
  484. }
  485. producer.startWithSignal { _ in }
  486. expect(addedDisposable.disposed) == false
  487. observer.sendFailed(.Default)
  488. expect(addedDisposable.disposed) == true
  489. }
  490. }
  491. describe("start") {
  492. it("should immediately begin sending events") {
  493. let producer = SignalProducer<Int, NoError>(values: [1, 2])
  494. var values: [Int] = []
  495. var completed = false
  496. producer.start { event in
  497. switch event {
  498. case let .Next(value):
  499. values.append(value)
  500. case .Completed:
  501. completed = true
  502. default:
  503. break
  504. }
  505. }
  506. expect(values) == [1, 2]
  507. expect(completed) == true
  508. }
  509. it("should send interrupted if disposed") {
  510. let producer = SignalProducer<(), NoError>.never
  511. var interrupted = false
  512. let disposable = producer.startWithInterrupted {
  513. interrupted = true
  514. }
  515. expect(interrupted) == false
  516. disposable.dispose()
  517. expect(interrupted) == true
  518. }
  519. it("should release observer when disposed") {
  520. weak var objectRetainedByObserver: NSObject?
  521. var disposable: Disposable!
  522. let test = {
  523. let producer = SignalProducer<Int, NoError>.never
  524. let object = NSObject()
  525. objectRetainedByObserver = object
  526. disposable = producer.startWithNext { _ in object }
  527. }
  528. test()
  529. expect(objectRetainedByObserver).toNot(beNil())
  530. disposable.dispose()
  531. expect(objectRetainedByObserver).to(beNil())
  532. }
  533. describe("trailing closure") {
  534. it("receives next values") {
  535. let (producer, observer) = SignalProducer<Int, NoError>.pipe()
  536. var values = [Int]()
  537. producer.startWithNext { next in
  538. values.append(next)
  539. }
  540. observer.sendNext(1)
  541. observer.sendNext(2)
  542. observer.sendNext(3)
  543. observer.sendCompleted()
  544. expect(values) == [1, 2, 3]
  545. }
  546. // TODO: remove when the method is marked unavailable.
  547. it("receives next values with erroring signal") {
  548. let (producer, observer) = SignalProducer<Int, TestError>.pipe()
  549. var values = [Int]()
  550. producer.startWithNext { next in
  551. values.append(next)
  552. }
  553. observer.sendNext(1)
  554. observer.sendNext(2)
  555. observer.sendNext(3)
  556. observer.sendCompleted()
  557. expect(values) == [1, 2, 3]
  558. }
  559. it("receives results") {
  560. let (producer, observer) = SignalProducer<Int, TestError>.pipe()
  561. var results: [Result<Int, TestError>] = []
  562. producer.startWithResult { results.append($0) }
  563. observer.sendNext(1)
  564. observer.sendNext(2)
  565. observer.sendNext(3)
  566. observer.sendFailed(.Default)
  567. observer.sendCompleted()
  568. expect(results).to(haveCount(4))
  569. expect(results[0].value) == 1
  570. expect(results[1].value) == 2
  571. expect(results[2].value) == 3
  572. expect(results[3].error) == .Default
  573. }
  574. }
  575. }
  576. describe("lift") {
  577. describe("over unary operators") {
  578. it("should invoke transformation once per started signal") {
  579. let baseProducer = SignalProducer<Int, NoError>(values: [1, 2])
  580. var counter = 0
  581. let transform = { (signal: Signal<Int, NoError>) -> Signal<Int, NoError> in
  582. counter += 1
  583. return signal
  584. }
  585. let producer = baseProducer.lift(transform)
  586. expect(counter) == 0
  587. producer.start()
  588. expect(counter) == 1
  589. producer.start()
  590. expect(counter) == 2
  591. }
  592. it("should not miss any events") {
  593. let baseProducer = SignalProducer<Int, NoError>(values: [1, 2, 3, 4])
  594. let producer = baseProducer.lift { signal in
  595. return signal.map { $0 * $0 }
  596. }
  597. let result = producer.collect().single()
  598. expect(result?.value) == [1, 4, 9, 16]
  599. }
  600. }
  601. describe("over binary operators") {
  602. it("should invoke transformation once per started signal") {
  603. let baseProducer = SignalProducer<Int, NoError>(values: [1, 2])
  604. let otherProducer = SignalProducer<Int, NoError>(values: [3, 4])
  605. var counter = 0
  606. let transform = { (signal: Signal<Int, NoError>) -> Signal<Int, NoError> -> Signal<(Int, Int), NoError> in
  607. return { otherSignal in
  608. counter += 1
  609. return zip(signal, otherSignal)
  610. }
  611. }
  612. let producer = baseProducer.lift(transform)(otherProducer)
  613. expect(counter) == 0
  614. producer.start()
  615. expect(counter) == 1
  616. producer.start()
  617. expect(counter) == 2
  618. }
  619. it("should not miss any events") {
  620. let baseProducer = SignalProducer<Int, NoError>(values: [1, 2, 3])
  621. let otherProducer = SignalProducer<Int, NoError>(values: [4, 5, 6])
  622. let transform = { (signal: Signal<Int, NoError>) -> Signal<Int, NoError> -> Signal<Int, NoError> in
  623. return { otherSignal in
  624. return zip(signal, otherSignal).map { first, second in first + second }
  625. }
  626. }
  627. let producer = baseProducer.lift(transform)(otherProducer)
  628. let result = producer.collect().single()
  629. expect(result?.value) == [5, 7, 9]
  630. }
  631. }
  632. describe("over binary operators with signal") {
  633. it("should invoke transformation once per started signal") {
  634. let baseProducer = SignalProducer<Int, NoError>(values: [1, 2])
  635. let (otherSignal, otherSignalObserver) = Signal<Int, NoError>.pipe()
  636. var counter = 0
  637. let transform = { (signal: Signal<Int, NoError>) -> Signal<Int, NoError> -> Signal<(Int, Int), NoError> in
  638. return { otherSignal in
  639. counter += 1
  640. return zip(signal, otherSignal)
  641. }
  642. }
  643. let producer = baseProducer.lift(transform)(otherSignal)
  644. expect(counter) == 0
  645. producer.start()
  646. otherSignalObserver.sendNext(1)
  647. expect(counter) == 1
  648. producer.start()
  649. otherSignalObserver.sendNext(2)
  650. expect(counter) == 2
  651. }
  652. it("should not miss any events") {
  653. let baseProducer = SignalProducer<Int, NoError>(values: [ 1, 2, 3 ])
  654. let (otherSignal, otherSignalObserver) = Signal<Int, NoError>.pipe()
  655. let transform = { (signal: Signal<Int, NoError>) -> Signal<Int, NoError> -> Signal<Int, NoError> in
  656. return { otherSignal in
  657. return zip(signal, otherSignal).map(+)
  658. }
  659. }
  660. let producer = baseProducer.lift(transform)(otherSignal)
  661. var result: [Int] = []
  662. var completed: Bool = false
  663. producer.start { event in
  664. switch event {
  665. case .Next(let value): result.append(value)
  666. case .Completed: completed = true
  667. default: break
  668. }
  669. }
  670. otherSignalObserver.sendNext(4)
  671. expect(result) == [ 5 ]
  672. otherSignalObserver.sendNext(5)
  673. expect(result) == [ 5, 7 ]
  674. otherSignalObserver.sendNext(6)
  675. expect(result) == [ 5, 7, 9 ]
  676. expect(completed) == true
  677. }
  678. }
  679. }
  680. describe("sequence operators") {
  681. var producerA: SignalProducer<Int, NoError>!
  682. var producerB: SignalProducer<Int, NoError>!
  683. beforeEach {
  684. producerA = SignalProducer<Int, NoError>(values: [ 1, 2 ])
  685. producerB = SignalProducer<Int, NoError>(values: [ 3, 4 ])
  686. }
  687. it("should combine the events to one array") {
  688. let producer = combineLatest([producerA, producerB])
  689. let result = producer.collect().single()
  690. expect(result?.value) == [[1, 4], [2, 4]]
  691. }
  692. it("should zip the events to one array") {
  693. let producer = zip([producerA, producerB])
  694. let result = producer.collect().single()
  695. expect(result?.value) == [[1, 3], [2, 4]]
  696. }
  697. }
  698. describe("timer") {
  699. it("should send the current date at the given interval") {
  700. let scheduler = TestScheduler()
  701. let producer = timer(1, onScheduler: scheduler, withLeeway: 0)
  702. let startDate = scheduler.currentDate
  703. let tick1 = startDate.dateByAddingTimeInterval(1)
  704. let tick2 = startDate.dateByAddingTimeInterval(2)
  705. let tick3 = startDate.dateByAddingTimeInterval(3)
  706. var dates: [NSDate] = []
  707. producer.startWithNext { dates.append($0) }
  708. scheduler.advanceByInterval(0.9)
  709. expect(dates) == []
  710. scheduler.advanceByInterval(1)
  711. expect(dates) == [tick1]
  712. scheduler.advance()
  713. expect(dates) == [tick1]
  714. scheduler.advanceByInterval(0.2)
  715. expect(dates) == [tick1, tick2]
  716. scheduler.advanceByInterval(1)
  717. expect(dates) == [tick1, tick2, tick3]
  718. }
  719. it("should release the signal when disposed") {
  720. let scheduler = TestScheduler()
  721. let producer = timer(1, onScheduler: scheduler, withLeeway: 0)
  722. weak var weakSignal: Signal<NSDate, NoError>?
  723. producer.startWithSignal { signal, disposable in
  724. weakSignal = signal
  725. scheduler.schedule {
  726. disposable.dispose()
  727. }
  728. }
  729. expect(weakSignal).toNot(beNil())
  730. scheduler.run()
  731. expect(weakSignal).to(beNil())
  732. }
  733. }
  734. describe("on") {
  735. it("should attach event handlers to each started signal") {
  736. let (baseProducer, observer) = SignalProducer<Int, TestError>.pipe()
  737. var started = 0
  738. var event = 0
  739. var next = 0
  740. var completed = 0
  741. var terminated = 0
  742. let producer = baseProducer
  743. .on(started: {
  744. started += 1
  745. }, event: { e in
  746. event += 1
  747. }, next: { n in
  748. next += 1
  749. }, completed: {
  750. completed += 1
  751. }, terminated: {
  752. terminated += 1
  753. })
  754. producer.start()
  755. expect(started) == 1
  756. producer.start()
  757. expect(started) == 2
  758. observer.sendNext(1)
  759. expect(event) == 2
  760. expect(next) == 2
  761. observer.sendCompleted()
  762. expect(event) == 4
  763. expect(completed) == 2
  764. expect(terminated) == 2
  765. }
  766. it("should attach event handlers for disposal") {
  767. let (baseProducer, _) = SignalProducer<Int, TestError>.pipe()
  768. var disposed: Bool = false
  769. let producer = baseProducer
  770. .on(disposed: { disposed = true })
  771. let disposable = producer.start()
  772. expect(disposed) == false
  773. disposable.dispose()
  774. expect(disposed) == true
  775. }
  776. }
  777. describe("startOn") {
  778. it("should invoke effects on the given scheduler") {
  779. let scheduler = TestScheduler()
  780. var invoked = false
  781. let producer = SignalProducer<Int, NoError>() { _ in
  782. invoked = true
  783. }
  784. producer.startOn(scheduler).start()
  785. expect(invoked) == false
  786. scheduler.advance()
  787. expect(invoked) == true
  788. }
  789. it("should forward events on their original scheduler") {
  790. let startScheduler = TestScheduler()
  791. let testScheduler = TestScheduler()
  792. let producer = timer(2, onScheduler: testScheduler, withLeeway: 0)
  793. var next: NSDate?
  794. producer.startOn(startScheduler).startWithNext { next = $0 }
  795. startScheduler.advanceByInterval(2)
  796. expect(next).to(beNil())
  797. testScheduler.advanceByInterval(1)
  798. expect(next).to(beNil())
  799. testScheduler.advanceByInterval(1)
  800. expect(next) == testScheduler.currentDate
  801. }
  802. }
  803. describe("flatMapError") {
  804. it("should invoke the handler and start new producer for an error") {
  805. let (baseProducer, baseObserver) = SignalProducer<Int, TestError>.pipe()
  806. var values: [Int] = []
  807. var completed = false
  808. baseProducer
  809. .flatMapError { (error: TestError) -> SignalProducer<Int, TestError> in
  810. expect(error) == TestError.Default
  811. expect(values) == [1]
  812. return .init(value: 2)
  813. }
  814. .start { event in
  815. switch event {
  816. case let .Next(value):
  817. values.append(value)
  818. case .Completed:
  819. completed = true
  820. default:
  821. break
  822. }
  823. }
  824. baseObserver.sendNext(1)
  825. baseObserver.sendFailed(.Default)
  826. expect(values) == [1, 2]
  827. expect(completed) == true
  828. }
  829. it("should interrupt the replaced producer on disposal") {
  830. let (baseProducer, baseObserver) = SignalProducer<Int, TestError>.pipe()
  831. var (disposed, interrupted) = (false, false)
  832. let disposable = baseProducer
  833. .flatMapError { (error: TestError) -> SignalProducer<Int, TestError> in
  834. return SignalProducer<Int, TestError> { _, disposable in
  835. disposable += ActionDisposable { disposed = true }
  836. }
  837. }
  838. .startWithInterrupted { interrupted = true }
  839. baseObserver.sendFailed(.Default)
  840. disposable.dispose()
  841. expect(interrupted) == true
  842. expect(disposed) == true
  843. }
  844. }
  845. describe("flatten") {
  846. describe("FlattenStrategy.Concat") {
  847. describe("sequencing") {
  848. var completePrevious: (() -> Void)!
  849. var sendSubsequent: (() -> Void)!
  850. var completeOuter: (() -> Void)!
  851. var subsequentStarted = false
  852. beforeEach {
  853. let (outerProducer, outerObserver) = SignalProducer<SignalProducer<Int, NoError>, NoError>.pipe()
  854. let (previousProducer, previousObserver) = SignalProducer<Int, NoError>.pipe()
  855. subsequentStarted = false
  856. let subsequentProducer = SignalProducer<Int, NoError> { _ in
  857. subsequentStarted = true
  858. }
  859. completePrevious = { previousObserver.sendCompleted() }
  860. sendSubsequent = { outerObserver.sendNext(subsequentProducer) }
  861. completeOuter = { outerObserver.sendCompleted() }
  862. outerProducer.flatten(.Concat).start()
  863. outerObserver.sendNext(previousProducer)
  864. }
  865. it("should immediately start subsequent inner producer if previous inner producer has already completed") {
  866. completePrevious()
  867. sendSubsequent()
  868. expect(subsequentStarted) == true
  869. }
  870. context("with queued producers") {
  871. beforeEach {
  872. // Place the subsequent producer into `concat`'s queue.
  873. sendSubsequent()
  874. expect(subsequentStarted) == false
  875. }
  876. it("should start subsequent inner producer upon completion of previous inner producer") {
  877. completePrevious()
  878. expect(subsequentStarted) == true
  879. }
  880. it("should start subsequent inner producer upon completion of previous inner producer and completion of outer producer") {
  881. completeOuter()
  882. completePrevious()
  883. expect(subsequentStarted) == true
  884. }
  885. }
  886. }
  887. it("should forward an error from an inner producer") {
  888. let errorProducer = SignalProducer<Int, TestError>(error: TestError.Default)
  889. let outerProducer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: errorProducer)
  890. var error: TestError?
  891. (outerProducer.flatten(.Concat)).startWithFailed { e in
  892. error = e
  893. }
  894. expect(error) == TestError.Default
  895. }
  896. it("should forward an error from the outer producer") {
  897. let (outerProducer, outerObserver) = SignalProducer<SignalProducer<Int, TestError>, TestError>.pipe()
  898. var error: TestError?
  899. outerProducer.flatten(.Concat).startWithFailed { e in
  900. error = e
  901. }
  902. outerObserver.sendFailed(TestError.Default)
  903. expect(error) == TestError.Default
  904. }
  905. describe("completion") {
  906. var completeOuter: (() -> Void)!
  907. var completeInner: (() -> Void)!
  908. var completed = false
  909. beforeEach {
  910. let (outerProducer, outerObserver) = SignalProducer<SignalProducer<Int, NoError>, NoError>.pipe()
  911. let (innerProducer, innerObserver) = SignalProducer<Int, NoError>.pipe()
  912. completeOuter = { outerObserver.sendCompleted() }
  913. completeInner = { innerObserver.sendCompleted() }
  914. completed = false
  915. outerProducer.flatten(.Concat).startWithCompleted {
  916. completed = true
  917. }
  918. outerObserver.sendNext(innerProducer)
  919. }
  920. it("should complete when inner producers complete, then outer producer completes") {
  921. completeInner()
  922. expect(completed) == false
  923. completeOuter()
  924. expect(completed) == true
  925. }
  926. it("should complete when outer producers completes, then inner producers complete") {
  927. completeOuter()
  928. expect(completed) == false
  929. completeInner()
  930. expect(completed) == true
  931. }
  932. }
  933. }
  934. describe("FlattenStrategy.Merge") {
  935. describe("behavior") {
  936. var completeA: (() -> Void)!
  937. var sendA: (() -> Void)!
  938. var completeB: (() -> Void)!
  939. var sendB: (() -> Void)!
  940. var outerCompleted = false
  941. var recv = [Int]()
  942. beforeEach {
  943. let (outerProducer, outerObserver) = SignalProducer<SignalProducer<Int, NoError>, NoError>.pipe()
  944. let (producerA, observerA) = SignalProducer<Int, NoError>.pipe()
  945. let (producerB, observerB) = SignalProducer<Int, NoError>.pipe()
  946. completeA = { observerA.sendCompleted() }
  947. completeB = { observerB.sendCompleted() }
  948. var a = 0
  949. sendA = { observerA.sendNext(a); a += 1 }
  950. var b = 100
  951. sendB = { observerB.sendNext(b); b += 1 }
  952. outerProducer.flatten(.Merge).start { event in
  953. switch event {
  954. case let .Next(i):
  955. recv.append(i)
  956. case .Completed:
  957. outerCompleted = true
  958. default:
  959. break
  960. }
  961. }
  962. outerObserver.sendNext(producerA)
  963. outerObserver.sendNext(producerB)
  964. outerObserver.sendCompleted()
  965. }
  966. it("should forward values from any inner signals") {
  967. sendA()
  968. sendA()
  969. sendB()
  970. sendA()
  971. sendB()
  972. expect(recv) == [0, 1, 100, 2, 101]
  973. }
  974. it("should complete when all signals have completed") {
  975. completeA()
  976. expect(outerCompleted) == false
  977. completeB()
  978. expect(outerCompleted) == true
  979. }
  980. }
  981. describe("error handling") {
  982. it("should forward an error from an inner signal") {
  983. let errorProducer = SignalProducer<Int, TestError>(error: TestError.Default)
  984. let outerProducer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: errorProducer)
  985. var error: TestError?
  986. outerProducer.flatten(.Merge).startWithFailed { e in
  987. error = e
  988. }
  989. expect(error) == TestError.Default
  990. }
  991. it("should forward an error from the outer signal") {
  992. let (outerProducer, outerObserver) = SignalProducer<SignalProducer<Int, TestError>, TestError>.pipe()
  993. var error: TestError?
  994. outerProducer.flatten(.Merge).startWithFailed { e in
  995. error = e
  996. }
  997. outerObserver.sendFailed(TestError.Default)
  998. expect(error) == TestError.Default
  999. }
  1000. }
  1001. }
  1002. describe("FlattenStrategy.Latest") {
  1003. it("should forward values from the latest inner signal") {
  1004. let (outer, outerObserver) = SignalProducer<SignalProducer<Int, TestError>, TestError>.pipe()
  1005. let (firstInner, firstInnerObserver) = SignalProducer<Int, TestError>.pipe()
  1006. let (secondInner, secondInnerObserver) = SignalProducer<Int, TestError>.pipe()
  1007. var receivedValues: [Int] = []
  1008. var errored = false
  1009. var completed = false
  1010. outer.flatten(.Latest).start { event in
  1011. switch event {
  1012. case let .Next(value):
  1013. receivedValues.append(value)
  1014. case .Completed:
  1015. completed = true
  1016. case .Failed:
  1017. errored = true
  1018. case .Interrupted:
  1019. break
  1020. }
  1021. }
  1022. outerObserver.sendNext(SignalProducer(value: 0))
  1023. outerObserver.sendNext(firstInner)
  1024. firstInnerObserver.sendNext(1)
  1025. outerObserver.sendNext(secondInner)
  1026. secondInnerObserver.sendNext(2)
  1027. outerObserver.sendCompleted()
  1028. expect(receivedValues) == [ 0, 1, 2 ]
  1029. expect(errored) == false
  1030. expect(completed) == false
  1031. firstInnerObserver.sendNext(3)
  1032. firstInnerObserver.sendCompleted()
  1033. secondInnerObserver.sendNext(4)
  1034. secondInnerObserver.sendCompleted()
  1035. expect(receivedValues) == [ 0, 1, 2, 4 ]
  1036. expect(errored) == false
  1037. expect(completed) == true
  1038. }
  1039. it("should forward an error from an inner signal") {
  1040. let inner = SignalProducer<Int, TestError>(error: .Default)
  1041. let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: inner)
  1042. let result = outer.flatten(.Latest).first()
  1043. expect(result?.error) == TestError.Default
  1044. }
  1045. it("should forward an error from the outer signal") {
  1046. let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(error: .Default)
  1047. let result = outer.flatten(.Latest).first()
  1048. expect(result?.error) == TestError.Default
  1049. }
  1050. it("should complete when the original and latest signals have completed") {
  1051. let inner = SignalProducer<Int, TestError>.empty
  1052. let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: inner)
  1053. var completed = false
  1054. outer.flatten(.Latest).startWithCompleted {
  1055. completed = true
  1056. }
  1057. expect(completed) == true
  1058. }
  1059. it("should complete when the outer signal completes before sending any signals") {
  1060. let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>.empty
  1061. var completed = false
  1062. outer.flatten(.Latest).startWithCompleted {
  1063. completed = true
  1064. }
  1065. expect(completed) == true
  1066. }
  1067. it("should not deadlock") {
  1068. let producer = SignalProducer<Int, NoError>(value: 1)
  1069. .flatMap(.Latest) { _ in SignalProducer(value: 10) }
  1070. let result = producer.take(1).last()
  1071. expect(result?.value) == 10
  1072. }
  1073. }
  1074. describe("interruption") {
  1075. var innerObserver: Signal<(), NoError>.Observer!
  1076. var outerObserver: Signal<SignalProducer<(), NoError>, NoError>.Observer!
  1077. var execute: (FlattenStrategy -> Void)!
  1078. var interrupted = false
  1079. var completed = false
  1080. beforeEach {
  1081. let (innerProducer, incomingInnerObserver) = SignalProducer<(), NoError>.pipe()
  1082. let (outerProducer, incomingOuterObserver) = SignalProducer<SignalProducer<(), NoError>, NoError>.pipe()
  1083. innerObserver = incomingInnerObserver
  1084. outerObserver = incomingOuterObserver
  1085. execute = { strategy in
  1086. interrupted = false
  1087. completed = false
  1088. outerProducer
  1089. .flatten(strategy)
  1090. .start { event in
  1091. switch event {
  1092. case .Interrupted:
  1093. interrupted = true
  1094. case .Completed:
  1095. completed = true
  1096. default:
  1097. break
  1098. }
  1099. }
  1100. }
  1101. incomingOuterObserver.sendNext(innerProducer)
  1102. }
  1103. describe("Concat") {
  1104. it("should drop interrupted from an inner producer") {
  1105. execute(.Concat)
  1106. innerObserver.sendInterrupted()
  1107. expect(interrupted) == false
  1108. expect(completed) == false
  1109. outerObserver.sendCompleted()
  1110. expect(completed) == true
  1111. }
  1112. it("should forward interrupted from the outer producer") {
  1113. execute(.Concat)
  1114. outerObserver.sendInterrupted()
  1115. expect(interrupted) == true
  1116. }
  1117. }
  1118. describe("Latest") {
  1119. it("should drop interrupted from an inner producer") {
  1120. execute(.Latest)
  1121. innerObserver.sendInterrupted()
  1122. expect(interrupted) == false
  1123. expect(completed) == false
  1124. outerObserver.sendCompleted()
  1125. expect(completed) == true
  1126. }
  1127. it("should forward interrupted from the outer producer") {
  1128. execute(.Latest)
  1129. outerObserver.sendInterrupted()
  1130. expect(interrupted) == true
  1131. }
  1132. }
  1133. describe("Merge") {
  1134. it("should drop interrupted from an inner producer") {
  1135. execute(.Merge)
  1136. innerObserver.sendInterrupted()
  1137. expect(interrupted) == false
  1138. expect(completed) == false
  1139. outerObserver.sendCompleted()
  1140. expect(completed) == true
  1141. }
  1142. it("should forward interrupted from the outer producer") {
  1143. execute(.Merge)
  1144. outerObserver.sendInterrupted()
  1145. expect(interrupted) == true
  1146. }
  1147. }
  1148. }
  1149. describe("disposal") {
  1150. var completeOuter: (() -> Void)!
  1151. var disposeOuter: (() -> Void)!
  1152. var execute: (FlattenStrategy -> Void)!
  1153. var innerDisposable = SimpleDisposable()
  1154. var interrupted = false
  1155. beforeEach {
  1156. execute = { strategy in
  1157. let (outerProducer, outerObserver) = SignalProducer<SignalProducer<Int, NoError>, NoError>.pipe()
  1158. innerDisposable = SimpleDisposable()
  1159. let innerProducer = SignalProducer<Int, NoError> { $1.addDisposable(innerDisposable) }
  1160. interrupted = false
  1161. let outerDisposable = outerProducer.flatten(strategy).startWithInterrupted {
  1162. interrupted = true
  1163. }
  1164. completeOuter = outerObserver.sendCompleted
  1165. disposeOuter = outerDisposable.dispose
  1166. outerObserver.sendNext(innerProducer)
  1167. }
  1168. }
  1169. describe("Concat") {
  1170. it("should cancel inner work when disposed before the outer producer completes") {
  1171. execute(.Concat)
  1172. expect(innerDisposable.disposed) == false
  1173. expect(interrupted) == false
  1174. disposeOuter()
  1175. expect(innerDisposable.disposed) == true
  1176. expect(interrupted) == true
  1177. }
  1178. it("should cancel inner work when disposed after the outer producer completes") {
  1179. execute(.Concat)
  1180. completeOuter()
  1181. expect(innerDisposable.disposed) == false
  1182. expect(interrupted) == false
  1183. disposeOuter()
  1184. expect(innerDisposable.disposed) == true
  1185. expect(interrupted) == true
  1186. }
  1187. }
  1188. describe("Latest") {
  1189. it("should cancel inner work when disposed before the outer producer completes") {
  1190. execute(.Latest)
  1191. expect(innerDisposable.disposed) == false
  1192. expect(interrupted) == false
  1193. disposeOuter()
  1194. expect(innerDisposable.disposed) == true
  1195. expect(interrupted) == true
  1196. }
  1197. it("should cancel inner work when disposed after the outer producer completes") {
  1198. execute(.Latest)
  1199. completeOuter()
  1200. expect(innerDisposable.disposed) == false
  1201. expect(interrupted) == false
  1202. disposeOuter()
  1203. expect(innerDisposable.disposed) == true
  1204. expect(interrupted) == true
  1205. }
  1206. }
  1207. describe("Merge") {
  1208. it("should cancel inner work when disposed before the outer producer completes") {
  1209. execute(.Merge)
  1210. expect(innerDisposable.disposed) == false
  1211. expect(interrupted) == false
  1212. disposeOuter()
  1213. expect(innerDisposable.disposed) == true
  1214. expect(interrupted) == true
  1215. }
  1216. it("should cancel inner work when disposed after the outer producer completes") {
  1217. execute(.Merge)
  1218. completeOuter()
  1219. expect(innerDisposable.disposed) == false
  1220. expect(interrupted) == false
  1221. disposeOuter()
  1222. expect(innerDisposable.disposed) == true
  1223. expect(interrupted) == true
  1224. }
  1225. }
  1226. }
  1227. }
  1228. describe("times") {
  1229. it("should start a signal N times upon completion") {
  1230. let original = SignalProducer<Int, NoError>(values: [ 1, 2, 3 ])
  1231. let producer = original.times(3)
  1232. let result = producer.collect().single()
  1233. expect(result?.value) == [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ]
  1234. }
  1235. it("should produce an equivalent signal producer if count is 1") {
  1236. let original = SignalProducer<Int, NoError>(value: 1)
  1237. let producer = original.times(1)
  1238. let result = producer.collect().single()
  1239. expect(result?.value) == [ 1 ]
  1240. }
  1241. it("should produce an empty signal if count is 0") {
  1242. let original = SignalProducer<Int, NoError>(value: 1)
  1243. let producer = original.times(0)
  1244. let result = producer.first()
  1245. expect(result).to(beNil())
  1246. }
  1247. it("should not repeat upon error") {
  1248. let results: [Result<Int, TestError>] = [
  1249. .Success(1),
  1250. .Success(2),
  1251. .Failure(.Default)
  1252. ]
  1253. let original = SignalProducer.attemptWithResults(results)
  1254. let producer = original.times(3)
  1255. let events = producer
  1256. .materialize()
  1257. .collect()
  1258. .single()
  1259. let result = events?.value
  1260. let expectedEvents: [Event<Int, TestError>] = [
  1261. .Next(1),
  1262. .Next(2),
  1263. .Failed(.Default)
  1264. ]
  1265. // TODO: if let result = result where result.count == expectedEvents.count
  1266. if result?.count != expectedEvents.count {
  1267. fail("Invalid result: \(result)")
  1268. } else {
  1269. // Can't test for equality because Array<T> is not Equatable,
  1270. // and neither is Event<Value, Error>.
  1271. expect(result![0] == expectedEvents[0]) == true
  1272. expect(result![1] == expectedEvents[1]) == true
  1273. expect(result![2] == expectedEvents[2]) == true
  1274. }
  1275. }
  1276. it("should evaluate lazily") {
  1277. let original = SignalProducer<Int, NoError>(value: 1)
  1278. let producer = original.times(Int.max)
  1279. let result = producer.take(1).single()
  1280. expect(result?.value) == 1
  1281. }
  1282. }
  1283. describe("retry") {
  1284. it("should start a signal N times upon error") {
  1285. let results: [Result<Int, TestError>] = [
  1286. .Failure(.Error1),
  1287. .Failure(.Error2),
  1288. .Success(1)
  1289. ]
  1290. let original = SignalProducer.attemptWithResults(results)
  1291. let producer = original.retry(2)
  1292. let result = producer.single()
  1293. expect(result?.value) == 1
  1294. }
  1295. it("should forward errors that occur after all retries") {
  1296. let results: [Result<Int, TestError>] = [
  1297. .Failure(.Default),
  1298. .Failure(.Error1),
  1299. .Failure(.Error2),
  1300. ]
  1301. let original = SignalProducer.attemptWithResults(results)
  1302. let producer = original.retry(2)
  1303. let result = producer.single()
  1304. expect(result?.error) == TestError.Error2
  1305. }
  1306. it("should not retry upon completion") {
  1307. let results: [Result<Int, TestError>] = [
  1308. .Success(1),
  1309. .Success(2),
  1310. .Success(3)
  1311. ]
  1312. let original = SignalProducer.attemptWithResults(results)
  1313. let producer = original.retry(2)
  1314. let result = producer.single()
  1315. expect(result?.value) == 1
  1316. }
  1317. }
  1318. describe("then") {
  1319. it("should start the subsequent producer after the completion of the original") {
  1320. let (original, observer) = SignalProducer<Int, NoError>.pipe()
  1321. var subsequentStarted = false
  1322. let subsequent = SignalProducer<Int, NoError> { observer, _ in
  1323. subsequentStarted = true
  1324. }
  1325. let producer = original.then(subsequent)
  1326. producer.start()
  1327. expect(subsequentStarted) == false
  1328. observer.sendCompleted()
  1329. expect(subsequentStarted) == true
  1330. }
  1331. it("should forward errors from the original producer") {
  1332. let original = SignalProducer<Int, TestError>(error: .Default)
  1333. let subsequent = SignalProducer<Int, TestError>.empty
  1334. let result = original.then(subsequent).first()
  1335. expect(result?.error) == TestError.Default
  1336. }
  1337. it("should forward errors from the subsequent producer") {
  1338. let original = SignalProducer<Int, TestError>.empty
  1339. let subsequent = SignalProducer<Int, TestError>(error: .Default)
  1340. let result = original.then(subsequent).first()
  1341. expect(result?.error) == TestError.Default
  1342. }
  1343. it("should forward interruptions from the original producer") {
  1344. let (original, observer) = SignalProducer<Int, NoError>.pipe()
  1345. var subsequentStarted = false
  1346. let subsequent = SignalProducer<Int, NoError> { observer, _ in
  1347. subsequentStarted = true
  1348. }
  1349. var interrupted = false
  1350. let producer = original.then(subsequent)
  1351. producer.startWithInterrupted {
  1352. interrupted = true
  1353. }
  1354. expect(subsequentStarted) == false
  1355. observer.sendInterrupted()
  1356. expect(interrupted) == true
  1357. }
  1358. it("should complete when both inputs have completed") {
  1359. let (original, originalObserver) = SignalProducer<Int, NoError>.pipe()
  1360. let (subsequent, subsequentObserver) = SignalProducer<String, NoError>.pipe()
  1361. let producer = original.then(subsequent)
  1362. var completed = false
  1363. producer.startWithCompleted {
  1364. completed = true
  1365. }
  1366. originalObserver.sendCompleted()
  1367. expect(completed) == false
  1368. subsequentObserver.sendCompleted()
  1369. expect(completed) == true
  1370. }
  1371. }
  1372. describe("first") {
  1373. it("should start a signal then block on the first value") {
  1374. let (_signal, observer) = Signal<Int, NoError>.pipe()
  1375. let queue = dispatch_queue_create("\(#file):\(#line)", DISPATCH_QUEUE_SERIAL)
  1376. let producer = SignalProducer(signal: _signal.delay(0.1, onScheduler: QueueScheduler(queue: queue)))
  1377. var result: Result<Int, NoError>?
  1378. let group = dispatch_group_create()
  1379. dispatch_group_async(group, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) {
  1380. result = producer.first()
  1381. }
  1382. expect(result).to(beNil())
  1383. observer.sendNext(1)
  1384. dispatch_group_wait(group, DISPATCH_TIME_FOREVER)
  1385. expect(result?.value) == 1
  1386. }
  1387. it("should return a nil result if no values are sent before completion") {
  1388. let result = SignalProducer<Int, NoError>.empty.first()
  1389. expect(result).to(beNil())
  1390. }
  1391. it("should return the first value if more than one value is sent") {
  1392. let result = SignalProducer<Int, NoError>(values: [ 1, 2 ]).first()
  1393. expect(result?.value) == 1
  1394. }
  1395. it("should return an error if one occurs before the first value") {
  1396. let result = SignalProducer<Int, TestError>(error: .Default).first()
  1397. expect(result?.error) == TestError.Default
  1398. }
  1399. }
  1400. describe("single") {
  1401. it("should start a signal then block until completion") {
  1402. let (_signal, observer) = Signal<Int, NoError>.pipe()
  1403. let queue = dispatch_queue_create("\(#file):\(#line)", DISPATCH_QUEUE_SERIAL)
  1404. let producer = SignalProducer(signal: _signal.delay(0.1, onScheduler: QueueScheduler(queue: queue)))
  1405. var result: Result<Int, NoError>?
  1406. let group = dispatch_group_create()
  1407. dispatch_group_async(group, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) {
  1408. result = producer.single()
  1409. }
  1410. expect(result).to(beNil())
  1411. observer.sendNext(1)
  1412. expect(result).to(beNil())
  1413. observer.sendCompleted()
  1414. dispatch_group_wait(group, DISPATCH_TIME_FOREVER)
  1415. expect(result?.value) == 1
  1416. }
  1417. it("should return a nil result if no values are sent before completion") {
  1418. let result = SignalProducer<Int, NoError>.empty.single()
  1419. expect(result).to(beNil())
  1420. }
  1421. it("should return a nil result if more than one value is sent before completion") {
  1422. let result = SignalProducer<Int, NoError>(values: [ 1, 2 ]).single()
  1423. expect(result).to(beNil())
  1424. }
  1425. it("should return an error if one occurs") {
  1426. let result = SignalProducer<Int, TestError>(error: .Default).single()
  1427. expect(result?.error) == TestError.Default
  1428. }
  1429. }
  1430. describe("last") {
  1431. it("should start a signal then block until completion") {
  1432. let (_signal, observer) = Signal<Int, NoError>.pipe()
  1433. let queue = dispatch_queue_create("\(#file):\(#line)", DISPATCH_QUEUE_SERIAL)
  1434. let producer = SignalProducer(signal: _signal.delay(0.1, onScheduler: QueueScheduler(queue: queue)))
  1435. var result: Result<Int, NoError>?
  1436. let group = dispatch_group_create()
  1437. dispatch_group_async(group, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) {
  1438. result = producer.last()
  1439. }
  1440. expect(result).to(beNil())
  1441. observer.sendNext(1)
  1442. observer.sendNext(2)
  1443. expect(result).to(beNil())
  1444. observer.sendCompleted()
  1445. dispatch_group_wait(group, DISPATCH_TIME_FOREVER)
  1446. expect(result?.value) == 2
  1447. }
  1448. it("should return a nil result if no values are sent before completion") {
  1449. let result = SignalProducer<Int, NoError>.empty.last()
  1450. expect(result).to(beNil())
  1451. }
  1452. it("should return the last value if more than one value is sent") {
  1453. let result = SignalProducer<Int, NoError>(values: [ 1, 2 ]).last()
  1454. expect(result?.value) == 2
  1455. }
  1456. it("should return an error if one occurs") {
  1457. let result = SignalProducer<Int, TestError>(error: .Default).last()
  1458. expect(result?.error) == TestError.Default
  1459. }
  1460. }
  1461. describe("wait") {
  1462. it("should start a signal then block until completion") {
  1463. let (_signal, observer) = Signal<Int, NoError>.pipe()
  1464. let queue = dispatch_queue_create("\(#file):\(#line)", DISPATCH_QUEUE_SERIAL)
  1465. let producer = SignalProducer(signal: _signal.delay(0.1, onScheduler: QueueScheduler(queue: queue)))
  1466. var result: Result<(), NoError>?
  1467. let group = dispatch_group_create()
  1468. dispatch_group_async(group, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) {
  1469. result = producer.wait()
  1470. }
  1471. expect(result).to(beNil())
  1472. observer.sendCompleted()
  1473. dispatch_group_wait(group, DISPATCH_TIME_FOREVER)
  1474. expect(result?.value).toNot(beNil())
  1475. }
  1476. it("should return an error if one occurs") {
  1477. let result = SignalProducer<Int, TestError>(error: .Default).wait()
  1478. expect(result.error) == TestError.Default
  1479. }
  1480. }
  1481. describe("observeOn") {
  1482. it("should immediately cancel upstream producer's work when disposed") {
  1483. var upstreamDisposable: Disposable!
  1484. let producer = SignalProducer<(), NoError>{ _, innerDisposable in
  1485. upstreamDisposable = innerDisposable
  1486. }
  1487. var downstreamDisposable: Disposable!
  1488. producer
  1489. .observeOn(TestScheduler())
  1490. .startWithSignal { signal, innerDisposable in
  1491. downstreamDisposable = innerDisposable
  1492. }
  1493. expect(upstreamDisposable.disposed) == false
  1494. downstreamDisposable.dispose()
  1495. expect(upstreamDisposable.disposed) == true
  1496. }
  1497. }
  1498. describe("take") {
  1499. it("Should not start concat'ed producer if the first one sends a value when using take(1)") {
  1500. let scheduler: QueueScheduler
  1501. if #available(OSX 10.10, *) {
  1502. scheduler = QueueScheduler()
  1503. } else {
  1504. scheduler = QueueScheduler(queue: dispatch_get_main_queue())
  1505. }
  1506. // Delaying producer1 from sending a value to test whether producer2 is started in the mean-time.
  1507. let producer1 = SignalProducer<Int, NoError>() { handler, _ in
  1508. handler.sendNext(1)
  1509. handler.sendCompleted()
  1510. }.startOn(scheduler)
  1511. var started = false
  1512. let producer2 = SignalProducer<Int, NoError>() { handler, _ in
  1513. started = true
  1514. handler.sendNext(2)
  1515. handler.sendCompleted()
  1516. }
  1517. let result = producer1.concat(producer2).take(1).collect().first()
  1518. expect(result?.value) == [1]
  1519. expect(started) == false
  1520. }
  1521. }
  1522. describe("replayLazily") {
  1523. var producer: SignalProducer<Int, TestError>!
  1524. var observer: SignalProducer<Int, TestError>.ProducedSignal.Observer!
  1525. var replayedProducer: SignalProducer<Int, TestError>!
  1526. beforeEach {
  1527. let (producerTemp, observerTemp) = SignalProducer<Int, TestError>.pipe()
  1528. producer = producerTemp
  1529. observer = observerTemp
  1530. replayedProducer = producer.replayLazily(2)
  1531. }
  1532. context("subscribing to underlying producer") {
  1533. it("emits new values") {
  1534. var last: Int?
  1535. replayedProducer
  1536. .assumeNoErrors()
  1537. .startWithNext { last = $0 }
  1538. expect(last).to(beNil())
  1539. observer.sendNext(1)
  1540. expect(last) == 1
  1541. observer.sendNext(2)
  1542. expect(last) == 2
  1543. }
  1544. it("emits errors") {
  1545. var error: TestError?
  1546. replayedProducer.startWithFailed { error = $0 }
  1547. expect(error).to(beNil())
  1548. observer.sendFailed(.Default)
  1549. expect(error) == TestError.Default
  1550. }
  1551. }
  1552. context("buffers past values") {
  1553. it("emits last value upon subscription") {
  1554. let disposable = replayedProducer
  1555. .start()
  1556. observer.sendNext(1)
  1557. disposable.dispose()
  1558. var last: Int?
  1559. replayedProducer
  1560. .assumeNoErrors()
  1561. .startWithNext { last = $0 }
  1562. expect(last) == 1
  1563. }
  1564. it("emits previous failure upon subscription") {
  1565. let disposable = replayedProducer
  1566. .start()
  1567. observer.sendFailed(.Default)
  1568. disposable.dispose()
  1569. var error: TestError?
  1570. replayedProducer
  1571. .startWithFailed { error = $0 }
  1572. expect(error) == TestError.Default
  1573. }
  1574. it("emits last n values upon subscription") {
  1575. var disposable = replayedProducer
  1576. .start()
  1577. observer.sendNext(1)
  1578. observer.sendNext(2)
  1579. observer.sendNext(3)
  1580. observer.sendNext(4)
  1581. disposable.dispose()
  1582. var values: [Int] = []
  1583. disposable = replayedProducer
  1584. .assumeNoErrors()
  1585. .startWithNext { values.append($0) }
  1586. expect(values) == [ 3, 4 ]
  1587. observer.sendNext(5)
  1588. expect(values) == [ 3, 4, 5 ]
  1589. disposable.dispose()
  1590. values = []
  1591. replayedProducer
  1592. .assumeNoErrors()
  1593. .startWithNext { values.append($0) }
  1594. expect(values) == [ 4, 5 ]
  1595. }
  1596. }
  1597. context("starting underying producer") {
  1598. it("starts lazily") {
  1599. var started = false
  1600. let producer = SignalProducer<Int, NoError>(value: 0)
  1601. .on(started: { started = true })
  1602. expect(started) == false
  1603. let replayedProducer = producer
  1604. .replayLazily(1)
  1605. expect(started) == false
  1606. replayedProducer.start()
  1607. expect(started) == true
  1608. }
  1609. it("shares a single subscription") {
  1610. var startedTimes = 0
  1611. let producer = SignalProducer<Int, NoError>.never
  1612. .on(started: { startedTimes += 1 })
  1613. expect(startedTimes) == 0
  1614. let replayedProducer = producer
  1615. .replayLazily(1)
  1616. expect(startedTimes) == 0
  1617. replayedProducer.start()
  1618. expect(startedTimes) == 1
  1619. replayedProducer.start()
  1620. expect(startedTimes) == 1
  1621. }
  1622. it("does not start multiple times when subscribing multiple times") {
  1623. var startedTimes = 0
  1624. let producer = SignalProducer<Int, NoError>(value: 0)
  1625. .on(started: { startedTimes += 1 })
  1626. let replayedProducer = producer
  1627. .replayLazily(1)
  1628. expect(startedTimes) == 0
  1629. replayedProducer.start().dispose()
  1630. expect(startedTimes) == 1
  1631. replayedProducer.start().dispose()
  1632. expect(startedTimes) == 1
  1633. }
  1634. it("does not start again if it finished") {
  1635. var startedTimes = 0
  1636. let producer = SignalProducer<Int, NoError>.empty
  1637. .on(started: { startedTimes += 1 })
  1638. expect(startedTimes) == 0
  1639. let replayedProducer = producer
  1640. .replayLazily(1)
  1641. expect(startedTimes) == 0
  1642. replayedProducer.start()
  1643. expect(startedTimes) == 1
  1644. replayedProducer.start()
  1645. expect(startedTimes) == 1
  1646. }
  1647. }
  1648. context("lifetime") {
  1649. it("does not dispose underlying subscription if the replayed producer is still in memory") {
  1650. var disposed = false
  1651. let producer = SignalProducer<Int, NoError>.never
  1652. .on(disposed: { disposed = true })
  1653. let replayedProducer = producer
  1654. .replayLazily(1)
  1655. expect(disposed) == false
  1656. let disposable = replayedProducer.start()
  1657. expect(disposed) == false
  1658. disposable.dispose()
  1659. expect(disposed) == false
  1660. }
  1661. it("does not dispose if it has active subscriptions") {
  1662. var disposed = false
  1663. let producer = SignalProducer<Int, NoError>.never
  1664. .on(disposed: { disposed = true })
  1665. var replayedProducer = ImplicitlyUnwrappedOptional(producer.replayLazily(1))
  1666. expect(disposed) == false
  1667. let disposable1 = replayedProducer.start()
  1668. let disposable2 = replayedProducer.start()
  1669. expect(disposed) == false
  1670. replayedProducer = nil
  1671. expect(disposed) == false
  1672. disposable1.dispose()
  1673. expect(disposed) == false
  1674. disposable2.dispose()
  1675. expect(disposed) == true
  1676. }
  1677. it("disposes underlying producer when the producer is deallocated") {
  1678. var disposed = false
  1679. let producer = SignalProducer<Int, NoError>.never
  1680. .on(disposed: { disposed = true })
  1681. var replayedProducer = ImplicitlyUnwrappedOptional(producer.replayLazily(1))
  1682. expect(disposed) == false
  1683. let disposable = replayedProducer.start()
  1684. expect(disposed) == false
  1685. disposable.dispose()
  1686. expect(disposed) == false
  1687. replayedProducer = nil
  1688. expect(disposed) == true
  1689. }
  1690. it("does not leak buffered values") {
  1691. final class Value {
  1692. private let deinitBlock: () -> Void
  1693. init(deinitBlock: () -> Void) {
  1694. self.deinitBlock = deinitBlock
  1695. }
  1696. deinit {
  1697. self.deinitBlock()
  1698. }
  1699. }
  1700. var deinitValues = 0
  1701. var producer: SignalProducer<Value, NoError>! = SignalProducer(value: Value {
  1702. deinitValues += 1
  1703. })
  1704. expect(deinitValues) == 0
  1705. var replayedProducer: SignalProducer<Value, NoError>! = producer
  1706. .replayLazily(1)
  1707. let disposable = replayedProducer
  1708. .start()
  1709. disposable.dispose()
  1710. expect(deinitValues) == 0
  1711. producer = nil
  1712. expect(deinitValues) == 0
  1713. replayedProducer = nil
  1714. expect(deinitValues) == 1
  1715. }
  1716. }
  1717. describe("log events") {
  1718. it("should output the correct event") {
  1719. let expectations: [String -> Void] = [
  1720. { event in expect(event) == "[] Started" },
  1721. { event in expect(event) == "[] Next 1" },
  1722. { event in expect(event) == "[] Completed" },
  1723. { event in expect(event) == "[] Terminated" },
  1724. { event in expect(event) == "[] Disposed" }
  1725. ]
  1726. let logger = TestLogger(expectations: expectations)
  1727. let (producer, observer) = SignalProducer<Int, TestError>.pipe()
  1728. producer
  1729. .logEvents(logger: logger.logEvent)
  1730. .start()
  1731. observer.sendNext(1)
  1732. observer.sendCompleted()
  1733. }
  1734. }
  1735. describe("init(values) ambiguity") {
  1736. it("should not be a SignalProducer<SignalProducer<Int, NoError>, NoError>") {
  1737. let producer1: SignalProducer<Int, NoError> = SignalProducer.empty
  1738. let producer2: SignalProducer<Int, NoError> = SignalProducer.empty
  1739. let producer = SignalProducer(values: [producer1, producer2])
  1740. .flatten(.Merge)
  1741. expect(producer is SignalProducer<Int, NoError>) == true
  1742. }
  1743. }
  1744. }
  1745. }
  1746. }
  1747. // MARK: - Helpers
  1748. extension SignalProducer {
  1749. internal static func pipe() -> (SignalProducer, ProducedSignal.Observer) {
  1750. let (signal, observer) = ProducedSignal.pipe()
  1751. let producer = SignalProducer(signal: signal)
  1752. return (producer, observer)
  1753. }
  1754. /// Creates a producer that can be started as many times as elements in `results`.
  1755. /// Each signal will immediately send either a value or an error.
  1756. private static func attemptWithResults<C: CollectionType where C.Generator.Element == Result<Value, Error>, C.Index.Distance == Int>(results: C) -> SignalProducer<Value, Error> {
  1757. let resultCount = results.count
  1758. var operationIndex = 0
  1759. precondition(resultCount > 0)
  1760. let operation: () -> Result<Value, Error> = {
  1761. if operationIndex < resultCount {
  1762. defer {
  1763. operationIndex += 1
  1764. }
  1765. return results[results.startIndex.advancedBy(operationIndex)]
  1766. } else {
  1767. fail("Operation started too many times")
  1768. return results[results.startIndex.advancedBy(0)]
  1769. }
  1770. }
  1771. return SignalProducer.attempt(operation)
  1772. }
  1773. }