RACSignal+Operations.m 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334
  1. //
  2. // RACSignal+Operations.m
  3. // ReactiveObjC
  4. //
  5. // Created by Justin Spahr-Summers on 2012-09-06.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACSignal+Operations.h"
  9. #import "NSObject+RACDeallocating.h"
  10. #import "NSObject+RACDescription.h"
  11. #import "RACBlockTrampoline.h"
  12. #import "RACCommand.h"
  13. #import "RACCompoundDisposable.h"
  14. #import "RACDisposable.h"
  15. #import "RACEvent.h"
  16. #import "RACGroupedSignal.h"
  17. #import "RACMulticastConnection+Private.h"
  18. #import "RACReplaySubject.h"
  19. #import "RACScheduler.h"
  20. #import "RACSerialDisposable.h"
  21. #import "RACSignalSequence.h"
  22. #import "RACStream+Private.h"
  23. #import "RACSubject.h"
  24. #import "RACSubscriber+Private.h"
  25. #import "RACSubscriber.h"
  26. #import "RACTuple.h"
  27. #import "RACUnit.h"
  28. #import <libkern/OSAtomic.h>
  29. #import <objc/runtime.h>
  30. NSString * const RACSignalErrorDomain = @"RACSignalErrorDomain";
  31. const NSInteger RACSignalErrorTimedOut = 1;
  32. const NSInteger RACSignalErrorNoMatchingCase = 2;
  33. // Subscribes to the given signal with the given blocks.
  34. //
  35. // If the signal errors or completes, the corresponding block is invoked. If the
  36. // disposable passed to the block is _not_ disposed, then the signal is
  37. // subscribed to again.
  38. static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
  39. next = [next copy];
  40. error = [error copy];
  41. completed = [completed copy];
  42. RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
  43. RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
  44. RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
  45. [compoundDisposable addDisposable:selfDisposable];
  46. __weak RACDisposable *weakSelfDisposable = selfDisposable;
  47. RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
  48. @autoreleasepool {
  49. error(e, compoundDisposable);
  50. [compoundDisposable removeDisposable:weakSelfDisposable];
  51. }
  52. recurse();
  53. } completed:^{
  54. @autoreleasepool {
  55. completed(compoundDisposable);
  56. [compoundDisposable removeDisposable:weakSelfDisposable];
  57. }
  58. recurse();
  59. }];
  60. [selfDisposable addDisposable:subscriptionDisposable];
  61. };
  62. // Subscribe once immediately, and then use recursive scheduling for any
  63. // further resubscriptions.
  64. recursiveBlock(^{
  65. RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
  66. RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
  67. [compoundDisposable addDisposable:schedulingDisposable];
  68. });
  69. return compoundDisposable;
  70. }
  71. @implementation RACSignal (Operations)
  72. - (RACSignal *)doNext:(void (^)(id x))block {
  73. NSCParameterAssert(block != NULL);
  74. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  75. return [self subscribeNext:^(id x) {
  76. block(x);
  77. [subscriber sendNext:x];
  78. } error:^(NSError *error) {
  79. [subscriber sendError:error];
  80. } completed:^{
  81. [subscriber sendCompleted];
  82. }];
  83. }] setNameWithFormat:@"[%@] -doNext:", self.name];
  84. }
  85. - (RACSignal *)doError:(void (^)(NSError *error))block {
  86. NSCParameterAssert(block != NULL);
  87. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  88. return [self subscribeNext:^(id x) {
  89. [subscriber sendNext:x];
  90. } error:^(NSError *error) {
  91. block(error);
  92. [subscriber sendError:error];
  93. } completed:^{
  94. [subscriber sendCompleted];
  95. }];
  96. }] setNameWithFormat:@"[%@] -doError:", self.name];
  97. }
  98. - (RACSignal *)doCompleted:(void (^)(void))block {
  99. NSCParameterAssert(block != NULL);
  100. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  101. return [self subscribeNext:^(id x) {
  102. [subscriber sendNext:x];
  103. } error:^(NSError *error) {
  104. [subscriber sendError:error];
  105. } completed:^{
  106. block();
  107. [subscriber sendCompleted];
  108. }];
  109. }] setNameWithFormat:@"[%@] -doCompleted:", self.name];
  110. }
  111. - (RACSignal *)throttle:(NSTimeInterval)interval {
  112. return [[self throttle:interval valuesPassingTest:^(id _) {
  113. return YES;
  114. }] setNameWithFormat:@"[%@] -throttle: %f", self.name, (double)interval];
  115. }
  116. - (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
  117. NSCParameterAssert(interval >= 0);
  118. NSCParameterAssert(predicate != nil);
  119. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  120. RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
  121. // We may never use this scheduler, but we need to set it up ahead of
  122. // time so that our scheduled blocks are run serially if we do.
  123. RACScheduler *scheduler = [RACScheduler scheduler];
  124. // Information about any currently-buffered `next` event.
  125. __block id nextValue = nil;
  126. __block BOOL hasNextValue = NO;
  127. RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];
  128. void (^flushNext)(BOOL send) = ^(BOOL send) {
  129. @synchronized (compoundDisposable) {
  130. [nextDisposable.disposable dispose];
  131. if (!hasNextValue) return;
  132. if (send) [subscriber sendNext:nextValue];
  133. nextValue = nil;
  134. hasNextValue = NO;
  135. }
  136. };
  137. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  138. RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
  139. BOOL shouldThrottle = predicate(x);
  140. @synchronized (compoundDisposable) {
  141. flushNext(NO);
  142. if (!shouldThrottle) {
  143. [subscriber sendNext:x];
  144. return;
  145. }
  146. nextValue = x;
  147. hasNextValue = YES;
  148. nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
  149. flushNext(YES);
  150. }];
  151. }
  152. } error:^(NSError *error) {
  153. [compoundDisposable dispose];
  154. [subscriber sendError:error];
  155. } completed:^{
  156. flushNext(YES);
  157. [subscriber sendCompleted];
  158. }];
  159. [compoundDisposable addDisposable:subscriptionDisposable];
  160. return compoundDisposable;
  161. }] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
  162. }
  163. - (RACSignal *)delay:(NSTimeInterval)interval {
  164. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  165. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  166. // We may never use this scheduler, but we need to set it up ahead of
  167. // time so that our scheduled blocks are run serially if we do.
  168. RACScheduler *scheduler = [RACScheduler scheduler];
  169. void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
  170. RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
  171. RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
  172. [disposable addDisposable:schedulerDisposable];
  173. };
  174. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  175. schedule(^{
  176. [subscriber sendNext:x];
  177. });
  178. } error:^(NSError *error) {
  179. [subscriber sendError:error];
  180. } completed:^{
  181. schedule(^{
  182. [subscriber sendCompleted];
  183. });
  184. }];
  185. [disposable addDisposable:subscriptionDisposable];
  186. return disposable;
  187. }] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
  188. }
  189. - (RACSignal *)repeat {
  190. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  191. return subscribeForever(self,
  192. ^(id x) {
  193. [subscriber sendNext:x];
  194. },
  195. ^(NSError *error, RACDisposable *disposable) {
  196. [disposable dispose];
  197. [subscriber sendError:error];
  198. },
  199. ^(RACDisposable *disposable) {
  200. // Resubscribe.
  201. });
  202. }] setNameWithFormat:@"[%@] -repeat", self.name];
  203. }
  204. - (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
  205. NSCParameterAssert(catchBlock != NULL);
  206. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  207. RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];
  208. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  209. [subscriber sendNext:x];
  210. } error:^(NSError *error) {
  211. RACSignal *signal = catchBlock(error);
  212. NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
  213. catchDisposable.disposable = [signal subscribe:subscriber];
  214. } completed:^{
  215. [subscriber sendCompleted];
  216. }];
  217. return [RACDisposable disposableWithBlock:^{
  218. [catchDisposable dispose];
  219. [subscriptionDisposable dispose];
  220. }];
  221. }] setNameWithFormat:@"[%@] -catch:", self.name];
  222. }
  223. - (RACSignal *)catchTo:(RACSignal *)signal {
  224. return [[self catch:^(NSError *error) {
  225. return signal;
  226. }] setNameWithFormat:@"[%@] -catchTo: %@", self.name, signal];
  227. }
  228. + (RACSignal *)try:(id (^)(NSError **errorPtr))tryBlock {
  229. NSCParameterAssert(tryBlock != NULL);
  230. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  231. NSError *error;
  232. id value = tryBlock(&error);
  233. RACSignal *signal = (value == nil ? [RACSignal error:error] : [RACSignal return:value]);
  234. return [signal subscribe:subscriber];
  235. }] setNameWithFormat:@"+try:"];
  236. }
  237. - (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
  238. NSCParameterAssert(tryBlock != NULL);
  239. return [[self flattenMap:^(id value) {
  240. NSError *error = nil;
  241. BOOL passed = tryBlock(value, &error);
  242. return (passed ? [RACSignal return:value] : [RACSignal error:error]);
  243. }] setNameWithFormat:@"[%@] -try:", self.name];
  244. }
  245. - (RACSignal *)tryMap:(id (^)(id value, NSError **errorPtr))mapBlock {
  246. NSCParameterAssert(mapBlock != NULL);
  247. return [[self flattenMap:^(id value) {
  248. NSError *error = nil;
  249. id mappedValue = mapBlock(value, &error);
  250. return (mappedValue == nil ? [RACSignal error:error] : [RACSignal return:mappedValue]);
  251. }] setNameWithFormat:@"[%@] -tryMap:", self.name];
  252. }
  253. - (RACSignal *)initially:(void (^)(void))block {
  254. NSCParameterAssert(block != NULL);
  255. return [[RACSignal defer:^{
  256. block();
  257. return self;
  258. }] setNameWithFormat:@"[%@] -initially:", self.name];
  259. }
  260. - (RACSignal *)finally:(void (^)(void))block {
  261. NSCParameterAssert(block != NULL);
  262. return [[[self
  263. doError:^(NSError *error) {
  264. block();
  265. }]
  266. doCompleted:^{
  267. block();
  268. }]
  269. setNameWithFormat:@"[%@] -finally:", self.name];
  270. }
  271. - (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
  272. NSCParameterAssert(scheduler != nil);
  273. NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
  274. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  275. RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
  276. NSMutableArray *values = [NSMutableArray array];
  277. void (^flushValues)() = ^{
  278. @synchronized (values) {
  279. [timerDisposable.disposable dispose];
  280. if (values.count == 0) return;
  281. RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
  282. [values removeAllObjects];
  283. [subscriber sendNext:tuple];
  284. }
  285. };
  286. RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
  287. @synchronized (values) {
  288. if (values.count == 0) {
  289. timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
  290. }
  291. [values addObject:x ?: RACTupleNil.tupleNil];
  292. }
  293. } error:^(NSError *error) {
  294. [subscriber sendError:error];
  295. } completed:^{
  296. flushValues();
  297. [subscriber sendCompleted];
  298. }];
  299. return [RACDisposable disposableWithBlock:^{
  300. [selfDisposable dispose];
  301. [timerDisposable dispose];
  302. }];
  303. }] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
  304. }
  305. - (RACSignal *)collect {
  306. return [[self aggregateWithStartFactory:^{
  307. return [[NSMutableArray alloc] init];
  308. } reduce:^(NSMutableArray *collectedValues, id x) {
  309. [collectedValues addObject:(x ?: NSNull.null)];
  310. return collectedValues;
  311. }] setNameWithFormat:@"[%@] -collect", self.name];
  312. }
  313. - (RACSignal *)takeLast:(NSUInteger)count {
  314. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  315. NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
  316. return [self subscribeNext:^(id x) {
  317. [valuesTaken addObject:x ? : RACTupleNil.tupleNil];
  318. while (valuesTaken.count > count) {
  319. [valuesTaken removeObjectAtIndex:0];
  320. }
  321. } error:^(NSError *error) {
  322. [subscriber sendError:error];
  323. } completed:^{
  324. for (id value in valuesTaken) {
  325. [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
  326. }
  327. [subscriber sendCompleted];
  328. }];
  329. }] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
  330. }
  331. - (RACSignal *)combineLatestWith:(RACSignal *)signal {
  332. NSCParameterAssert(signal != nil);
  333. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  334. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  335. __block id lastSelfValue = nil;
  336. __block BOOL selfCompleted = NO;
  337. __block id lastOtherValue = nil;
  338. __block BOOL otherCompleted = NO;
  339. void (^sendNext)(void) = ^{
  340. @synchronized (disposable) {
  341. if (lastSelfValue == nil || lastOtherValue == nil) return;
  342. [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
  343. }
  344. };
  345. RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
  346. @synchronized (disposable) {
  347. lastSelfValue = x ?: RACTupleNil.tupleNil;
  348. sendNext();
  349. }
  350. } error:^(NSError *error) {
  351. [subscriber sendError:error];
  352. } completed:^{
  353. @synchronized (disposable) {
  354. selfCompleted = YES;
  355. if (otherCompleted) [subscriber sendCompleted];
  356. }
  357. }];
  358. [disposable addDisposable:selfDisposable];
  359. RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
  360. @synchronized (disposable) {
  361. lastOtherValue = x ?: RACTupleNil.tupleNil;
  362. sendNext();
  363. }
  364. } error:^(NSError *error) {
  365. [subscriber sendError:error];
  366. } completed:^{
  367. @synchronized (disposable) {
  368. otherCompleted = YES;
  369. if (selfCompleted) [subscriber sendCompleted];
  370. }
  371. }];
  372. [disposable addDisposable:otherDisposable];
  373. return disposable;
  374. }] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
  375. }
  376. + (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {
  377. return [[self join:signals block:^(RACSignal *left, RACSignal *right) {
  378. return [left combineLatestWith:right];
  379. }] setNameWithFormat:@"+combineLatest: %@", signals];
  380. }
  381. + (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals reduce:(id (^)())reduceBlock {
  382. NSCParameterAssert(reduceBlock != nil);
  383. RACSignal *result = [self combineLatest:signals];
  384. // Although we assert this condition above, older versions of this method
  385. // supported this argument being nil. Avoid crashing Release builds of
  386. // apps that depended on that.
  387. if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
  388. return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
  389. }
  390. - (RACSignal *)merge:(RACSignal *)signal {
  391. return [[RACSignal
  392. merge:@[ self, signal ]]
  393. setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
  394. }
  395. + (RACSignal *)merge:(id<NSFastEnumeration>)signals {
  396. NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
  397. for (RACSignal *signal in signals) {
  398. [copiedSignals addObject:signal];
  399. }
  400. return [[[RACSignal
  401. createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
  402. for (RACSignal *signal in copiedSignals) {
  403. [subscriber sendNext:signal];
  404. }
  405. [subscriber sendCompleted];
  406. return nil;
  407. }]
  408. flatten]
  409. setNameWithFormat:@"+merge: %@", copiedSignals];
  410. }
  411. - (RACSignal *)flatten:(NSUInteger)maxConcurrent {
  412. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  413. RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
  414. // Contains disposables for the currently active subscriptions.
  415. //
  416. // This should only be used while synchronized on `subscriber`.
  417. NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
  418. // Whether the signal-of-signals has completed yet.
  419. //
  420. // This should only be used while synchronized on `subscriber`.
  421. __block BOOL selfCompleted = NO;
  422. // Subscribes to the given signal.
  423. __block void (^subscribeToSignal)(RACSignal *);
  424. // Weak reference to the above, to avoid a leak.
  425. __weak __block void (^recur)(RACSignal *);
  426. // Sends completed to the subscriber if all signals are finished.
  427. //
  428. // This should only be used while synchronized on `subscriber`.
  429. void (^completeIfAllowed)(void) = ^{
  430. if (selfCompleted && activeDisposables.count == 0) {
  431. [subscriber sendCompleted];
  432. }
  433. };
  434. // The signals waiting to be started.
  435. //
  436. // This array should only be used while synchronized on `subscriber`.
  437. NSMutableArray *queuedSignals = [NSMutableArray array];
  438. recur = subscribeToSignal = ^(RACSignal *signal) {
  439. RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
  440. @synchronized (subscriber) {
  441. [compoundDisposable addDisposable:serialDisposable];
  442. [activeDisposables addObject:serialDisposable];
  443. }
  444. serialDisposable.disposable = [signal subscribeNext:^(id x) {
  445. [subscriber sendNext:x];
  446. } error:^(NSError *error) {
  447. [subscriber sendError:error];
  448. } completed:^{
  449. __strong void (^subscribeToSignal)(RACSignal *) = recur;
  450. RACSignal *nextSignal;
  451. @synchronized (subscriber) {
  452. [compoundDisposable removeDisposable:serialDisposable];
  453. [activeDisposables removeObjectIdenticalTo:serialDisposable];
  454. if (queuedSignals.count == 0) {
  455. completeIfAllowed();
  456. return;
  457. }
  458. nextSignal = queuedSignals[0];
  459. [queuedSignals removeObjectAtIndex:0];
  460. }
  461. subscribeToSignal(nextSignal);
  462. }];
  463. };
  464. [compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
  465. if (signal == nil) return;
  466. NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
  467. @synchronized (subscriber) {
  468. if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
  469. [queuedSignals addObject:signal];
  470. // If we need to wait, skip subscribing to this
  471. // signal.
  472. return;
  473. }
  474. }
  475. subscribeToSignal(signal);
  476. } error:^(NSError *error) {
  477. [subscriber sendError:error];
  478. } completed:^{
  479. @synchronized (subscriber) {
  480. selfCompleted = YES;
  481. completeIfAllowed();
  482. }
  483. }]];
  484. [compoundDisposable addDisposable:[RACDisposable disposableWithBlock:^{
  485. // A strong reference is held to `subscribeToSignal` until we're
  486. // done, preventing it from deallocating early.
  487. subscribeToSignal = nil;
  488. }]];
  489. return compoundDisposable;
  490. }] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
  491. }
  492. - (RACSignal *)then:(RACSignal * (^)(void))block {
  493. NSCParameterAssert(block != nil);
  494. return [[[self
  495. ignoreValues]
  496. concat:[RACSignal defer:block]]
  497. setNameWithFormat:@"[%@] -then:", self.name];
  498. }
  499. - (RACSignal *)concat {
  500. return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
  501. }
  502. - (RACSignal *)aggregateWithStartFactory:(id (^)(void))startFactory reduce:(id (^)(id running, id next))reduceBlock {
  503. NSCParameterAssert(startFactory != NULL);
  504. NSCParameterAssert(reduceBlock != NULL);
  505. return [[RACSignal defer:^{
  506. return [self aggregateWithStart:startFactory() reduce:reduceBlock];
  507. }] setNameWithFormat:@"[%@] -aggregateWithStartFactory:reduce:", self.name];
  508. }
  509. - (RACSignal *)aggregateWithStart:(id)start reduce:(id (^)(id running, id next))reduceBlock {
  510. return [[self
  511. aggregateWithStart:start
  512. reduceWithIndex:^(id running, id next, NSUInteger index) {
  513. return reduceBlock(running, next);
  514. }]
  515. setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduce:", self.name, RACDescription(start)];
  516. }
  517. - (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
  518. return [[[[self
  519. scanWithStart:start reduceWithIndex:reduceBlock]
  520. startWith:start]
  521. takeLast:1]
  522. setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, RACDescription(start)];
  523. }
  524. - (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object {
  525. return [self setKeyPath:keyPath onObject:object nilValue:nil];
  526. }
  527. - (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue {
  528. NSCParameterAssert(keyPath != nil);
  529. NSCParameterAssert(object != nil);
  530. keyPath = [keyPath copy];
  531. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  532. // Purposely not retaining 'object', since we want to tear down the binding
  533. // when it deallocates normally.
  534. __block void * volatile objectPtr = (__bridge void *)object;
  535. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  536. // Possibly spec, possibly compiler bug, but this __bridge cast does not
  537. // result in a retain here, effectively an invisible __unsafe_unretained
  538. // qualifier. Using objc_precise_lifetime gives the __strong reference
  539. // desired. The explicit use of __strong is strictly defensive.
  540. __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
  541. [object setValue:x ?: nilValue forKeyPath:keyPath];
  542. } error:^(NSError *error) {
  543. __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
  544. NSCAssert(NO, @"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
  545. // Log the error if we're running with assertions disabled.
  546. NSLog(@"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
  547. [disposable dispose];
  548. } completed:^{
  549. [disposable dispose];
  550. }];
  551. [disposable addDisposable:subscriptionDisposable];
  552. #if DEBUG
  553. static void *bindingsKey = &bindingsKey;
  554. NSMutableDictionary *bindings;
  555. @synchronized (object) {
  556. bindings = objc_getAssociatedObject(object, bindingsKey);
  557. if (bindings == nil) {
  558. bindings = [NSMutableDictionary dictionary];
  559. objc_setAssociatedObject(object, bindingsKey, bindings, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
  560. }
  561. }
  562. @synchronized (bindings) {
  563. NSCAssert(bindings[keyPath] == nil, @"Signal %@ is already bound to key path \"%@\" on object %@, adding signal %@ is undefined behavior", [bindings[keyPath] nonretainedObjectValue], keyPath, object, self);
  564. bindings[keyPath] = [NSValue valueWithNonretainedObject:self];
  565. }
  566. #endif
  567. RACDisposable *clearPointerDisposable = [RACDisposable disposableWithBlock:^{
  568. #if DEBUG
  569. @synchronized (bindings) {
  570. [bindings removeObjectForKey:keyPath];
  571. }
  572. #endif
  573. while (YES) {
  574. void *ptr = objectPtr;
  575. if (OSAtomicCompareAndSwapPtrBarrier(ptr, NULL, &objectPtr)) {
  576. break;
  577. }
  578. }
  579. }];
  580. [disposable addDisposable:clearPointerDisposable];
  581. [object.rac_deallocDisposable addDisposable:disposable];
  582. RACCompoundDisposable *objectDisposable = object.rac_deallocDisposable;
  583. return [RACDisposable disposableWithBlock:^{
  584. [objectDisposable removeDisposable:disposable];
  585. [disposable dispose];
  586. }];
  587. }
  588. + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
  589. return [[RACSignal interval:interval onScheduler:scheduler withLeeway:0.0] setNameWithFormat:@"+interval: %f onScheduler: %@", (double)interval, scheduler];
  590. }
  591. + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway {
  592. NSCParameterAssert(scheduler != nil);
  593. NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
  594. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  595. return [scheduler after:[NSDate dateWithTimeIntervalSinceNow:interval] repeatingEvery:interval withLeeway:leeway schedule:^{
  596. [subscriber sendNext:[NSDate date]];
  597. }];
  598. }] setNameWithFormat:@"+interval: %f onScheduler: %@ withLeeway: %f", (double)interval, scheduler, (double)leeway];
  599. }
  600. - (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
  601. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  602. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  603. void (^triggerCompletion)(void) = ^{
  604. [disposable dispose];
  605. [subscriber sendCompleted];
  606. };
  607. RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
  608. triggerCompletion();
  609. } completed:^{
  610. triggerCompletion();
  611. }];
  612. [disposable addDisposable:triggerDisposable];
  613. if (!disposable.disposed) {
  614. RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
  615. [subscriber sendNext:x];
  616. } error:^(NSError *error) {
  617. [subscriber sendError:error];
  618. } completed:^{
  619. [disposable dispose];
  620. [subscriber sendCompleted];
  621. }];
  622. [disposable addDisposable:selfDisposable];
  623. }
  624. return disposable;
  625. }] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
  626. }
  627. - (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
  628. return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  629. RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
  630. RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
  631. [selfDisposable dispose];
  632. [subscriber sendNext:x];
  633. } error:^(NSError *error) {
  634. [selfDisposable dispose];
  635. [subscriber sendError:error];
  636. } completed:^{
  637. [selfDisposable dispose];
  638. [subscriber sendCompleted];
  639. }];
  640. if (!selfDisposable.disposed) {
  641. selfDisposable.disposable = [[self
  642. concat:[RACSignal never]]
  643. subscribe:subscriber];
  644. }
  645. return [RACDisposable disposableWithBlock:^{
  646. [selfDisposable dispose];
  647. [replacementDisposable dispose];
  648. }];
  649. }];
  650. }
  651. - (RACSignal *)switchToLatest {
  652. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  653. RACMulticastConnection *connection = [self publish];
  654. RACDisposable *subscriptionDisposable = [[connection.signal
  655. flattenMap:^(RACSignal *x) {
  656. NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);
  657. // -concat:[RACSignal never] prevents completion of the receiver from
  658. // prematurely terminating the inner signal.
  659. return [x takeUntil:[connection.signal concat:[RACSignal never]]];
  660. }]
  661. subscribe:subscriber];
  662. RACDisposable *connectionDisposable = [connection connect];
  663. return [RACDisposable disposableWithBlock:^{
  664. [subscriptionDisposable dispose];
  665. [connectionDisposable dispose];
  666. }];
  667. }] setNameWithFormat:@"[%@] -switchToLatest", self.name];
  668. }
  669. + (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
  670. NSCParameterAssert(signal != nil);
  671. NSCParameterAssert(cases != nil);
  672. for (id key in cases) {
  673. id value __attribute__((unused)) = cases[key];
  674. NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
  675. }
  676. NSDictionary *copy = [cases copy];
  677. return [[[signal
  678. map:^(id key) {
  679. if (key == nil) key = RACTupleNil.tupleNil;
  680. RACSignal *signal = copy[key] ?: defaultSignal;
  681. if (signal == nil) {
  682. NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
  683. return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
  684. }
  685. return signal;
  686. }]
  687. switchToLatest]
  688. setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
  689. }
  690. + (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
  691. NSCParameterAssert(boolSignal != nil);
  692. NSCParameterAssert(trueSignal != nil);
  693. NSCParameterAssert(falseSignal != nil);
  694. return [[[boolSignal
  695. map:^(NSNumber *value) {
  696. NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);
  697. return (value.boolValue ? trueSignal : falseSignal);
  698. }]
  699. switchToLatest]
  700. setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
  701. }
  702. - (id)first {
  703. return [self firstOrDefault:nil];
  704. }
  705. - (id)firstOrDefault:(id)defaultValue {
  706. return [self firstOrDefault:defaultValue success:NULL error:NULL];
  707. }
  708. - (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
  709. NSCondition *condition = [[NSCondition alloc] init];
  710. condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];
  711. __block id value = defaultValue;
  712. __block BOOL done = NO;
  713. // Ensures that we don't pass values across thread boundaries by reference.
  714. __block NSError *localError;
  715. __block BOOL localSuccess;
  716. [[self take:1] subscribeNext:^(id x) {
  717. [condition lock];
  718. value = x;
  719. localSuccess = YES;
  720. done = YES;
  721. [condition broadcast];
  722. [condition unlock];
  723. } error:^(NSError *e) {
  724. [condition lock];
  725. if (!done) {
  726. localSuccess = NO;
  727. localError = e;
  728. done = YES;
  729. [condition broadcast];
  730. }
  731. [condition unlock];
  732. } completed:^{
  733. [condition lock];
  734. localSuccess = YES;
  735. done = YES;
  736. [condition broadcast];
  737. [condition unlock];
  738. }];
  739. [condition lock];
  740. while (!done) {
  741. [condition wait];
  742. }
  743. if (success != NULL) *success = localSuccess;
  744. if (error != NULL) *error = localError;
  745. [condition unlock];
  746. return value;
  747. }
  748. - (BOOL)waitUntilCompleted:(NSError **)error {
  749. BOOL success = NO;
  750. [[[self
  751. ignoreValues]
  752. setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
  753. firstOrDefault:nil success:&success error:error];
  754. return success;
  755. }
  756. + (RACSignal *)defer:(RACSignal<id> * (^)(void))block {
  757. NSCParameterAssert(block != NULL);
  758. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  759. return [block() subscribe:subscriber];
  760. }] setNameWithFormat:@"+defer:"];
  761. }
  762. - (NSArray *)toArray {
  763. return [[[self collect] first] copy];
  764. }
  765. - (RACSequence *)sequence {
  766. return [[RACSignalSequence sequenceWithSignal:self] setNameWithFormat:@"[%@] -sequence", self.name];
  767. }
  768. - (RACMulticastConnection *)publish {
  769. RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
  770. RACMulticastConnection *connection = [self multicast:subject];
  771. return connection;
  772. }
  773. - (RACMulticastConnection *)multicast:(RACSubject *)subject {
  774. [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
  775. RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
  776. return connection;
  777. }
  778. - (RACSignal *)replay {
  779. RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];
  780. RACMulticastConnection *connection = [self multicast:subject];
  781. [connection connect];
  782. return connection.signal;
  783. }
  784. - (RACSignal *)replayLast {
  785. RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];
  786. RACMulticastConnection *connection = [self multicast:subject];
  787. [connection connect];
  788. return connection.signal;
  789. }
  790. - (RACSignal *)replayLazily {
  791. RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
  792. return [[RACSignal
  793. defer:^{
  794. [connection connect];
  795. return connection.signal;
  796. }]
  797. setNameWithFormat:@"[%@] -replayLazily", self.name];
  798. }
  799. - (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
  800. NSCParameterAssert(scheduler != nil);
  801. NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
  802. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  803. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  804. RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
  805. [disposable dispose];
  806. [subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
  807. }];
  808. [disposable addDisposable:timeoutDisposable];
  809. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  810. [subscriber sendNext:x];
  811. } error:^(NSError *error) {
  812. [disposable dispose];
  813. [subscriber sendError:error];
  814. } completed:^{
  815. [disposable dispose];
  816. [subscriber sendCompleted];
  817. }];
  818. [disposable addDisposable:subscriptionDisposable];
  819. return disposable;
  820. }] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
  821. }
  822. - (RACSignal *)deliverOn:(RACScheduler *)scheduler {
  823. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  824. return [self subscribeNext:^(id x) {
  825. [scheduler schedule:^{
  826. [subscriber sendNext:x];
  827. }];
  828. } error:^(NSError *error) {
  829. [scheduler schedule:^{
  830. [subscriber sendError:error];
  831. }];
  832. } completed:^{
  833. [scheduler schedule:^{
  834. [subscriber sendCompleted];
  835. }];
  836. }];
  837. }] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
  838. }
  839. - (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
  840. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  841. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  842. RACDisposable *schedulingDisposable = [scheduler schedule:^{
  843. RACDisposable *subscriptionDisposable = [self subscribe:subscriber];
  844. [disposable addDisposable:subscriptionDisposable];
  845. }];
  846. [disposable addDisposable:schedulingDisposable];
  847. return disposable;
  848. }] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
  849. }
  850. - (RACSignal *)deliverOnMainThread {
  851. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  852. __block volatile int32_t queueLength = 0;
  853. void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
  854. int32_t queued = OSAtomicIncrement32(&queueLength);
  855. if (NSThread.isMainThread && queued == 1) {
  856. block();
  857. OSAtomicDecrement32(&queueLength);
  858. } else {
  859. dispatch_async(dispatch_get_main_queue(), ^{
  860. block();
  861. OSAtomicDecrement32(&queueLength);
  862. });
  863. }
  864. };
  865. return [self subscribeNext:^(id x) {
  866. performOnMainThread(^{
  867. [subscriber sendNext:x];
  868. });
  869. } error:^(NSError *error) {
  870. performOnMainThread(^{
  871. [subscriber sendError:error];
  872. });
  873. } completed:^{
  874. performOnMainThread(^{
  875. [subscriber sendCompleted];
  876. });
  877. }];
  878. }] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
  879. }
  880. - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
  881. NSCParameterAssert(keyBlock != NULL);
  882. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  883. NSMutableDictionary *groups = [NSMutableDictionary dictionary];
  884. NSMutableArray *orderedGroups = [NSMutableArray array];
  885. return [self subscribeNext:^(id x) {
  886. id<NSCopying> key = keyBlock(x);
  887. RACGroupedSignal *groupSubject = nil;
  888. @synchronized(groups) {
  889. groupSubject = groups[key];
  890. if (groupSubject == nil) {
  891. groupSubject = [RACGroupedSignal signalWithKey:key];
  892. groups[key] = groupSubject;
  893. [orderedGroups addObject:groupSubject];
  894. [subscriber sendNext:groupSubject];
  895. }
  896. }
  897. [groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
  898. } error:^(NSError *error) {
  899. [subscriber sendError:error];
  900. [orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
  901. } completed:^{
  902. [subscriber sendCompleted];
  903. [orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
  904. }];
  905. }] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
  906. }
  907. - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock {
  908. return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
  909. }
  910. - (RACSignal *)any {
  911. return [[self any:^(id x) {
  912. return YES;
  913. }] setNameWithFormat:@"[%@] -any", self.name];
  914. }
  915. - (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
  916. NSCParameterAssert(predicateBlock != NULL);
  917. return [[[self materialize] bind:^{
  918. return ^(RACEvent *event, BOOL *stop) {
  919. if (event.finished) {
  920. *stop = YES;
  921. return [RACSignal return:@NO];
  922. }
  923. if (predicateBlock(event.value)) {
  924. *stop = YES;
  925. return [RACSignal return:@YES];
  926. }
  927. return [RACSignal empty];
  928. };
  929. }] setNameWithFormat:@"[%@] -any:", self.name];
  930. }
  931. - (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
  932. NSCParameterAssert(predicateBlock != NULL);
  933. return [[[self materialize] bind:^{
  934. return ^(RACEvent *event, BOOL *stop) {
  935. if (event.eventType == RACEventTypeCompleted) {
  936. *stop = YES;
  937. return [RACSignal return:@YES];
  938. }
  939. if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
  940. *stop = YES;
  941. return [RACSignal return:@NO];
  942. }
  943. return [RACSignal empty];
  944. };
  945. }] setNameWithFormat:@"[%@] -all:", self.name];
  946. }
  947. - (RACSignal *)retry:(NSInteger)retryCount {
  948. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  949. __block NSInteger currentRetryCount = 0;
  950. return subscribeForever(self,
  951. ^(id x) {
  952. [subscriber sendNext:x];
  953. },
  954. ^(NSError *error, RACDisposable *disposable) {
  955. if (retryCount == 0 || currentRetryCount < retryCount) {
  956. // Resubscribe.
  957. currentRetryCount++;
  958. return;
  959. }
  960. [disposable dispose];
  961. [subscriber sendError:error];
  962. },
  963. ^(RACDisposable *disposable) {
  964. [disposable dispose];
  965. [subscriber sendCompleted];
  966. });
  967. }] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
  968. }
  969. - (RACSignal *)retry {
  970. return [[self retry:0] setNameWithFormat:@"[%@] -retry", self.name];
  971. }
  972. - (RACSignal *)sample:(RACSignal *)sampler {
  973. NSCParameterAssert(sampler != nil);
  974. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  975. NSLock *lock = [[NSLock alloc] init];
  976. __block id lastValue;
  977. __block BOOL hasValue = NO;
  978. RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
  979. RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
  980. [lock lock];
  981. hasValue = YES;
  982. lastValue = x;
  983. [lock unlock];
  984. } error:^(NSError *error) {
  985. [samplerDisposable dispose];
  986. [subscriber sendError:error];
  987. } completed:^{
  988. [samplerDisposable dispose];
  989. [subscriber sendCompleted];
  990. }];
  991. samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
  992. BOOL shouldSend = NO;
  993. id value;
  994. [lock lock];
  995. shouldSend = hasValue;
  996. value = lastValue;
  997. [lock unlock];
  998. if (shouldSend) {
  999. [subscriber sendNext:value];
  1000. }
  1001. } error:^(NSError *error) {
  1002. [sourceDisposable dispose];
  1003. [subscriber sendError:error];
  1004. } completed:^{
  1005. [sourceDisposable dispose];
  1006. [subscriber sendCompleted];
  1007. }];
  1008. return [RACDisposable disposableWithBlock:^{
  1009. [samplerDisposable dispose];
  1010. [sourceDisposable dispose];
  1011. }];
  1012. }] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
  1013. }
  1014. - (RACSignal *)ignoreValues {
  1015. return [[self filter:^(id _) {
  1016. return NO;
  1017. }] setNameWithFormat:@"[%@] -ignoreValues", self.name];
  1018. }
  1019. - (RACSignal *)materialize {
  1020. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  1021. return [self subscribeNext:^(id x) {
  1022. [subscriber sendNext:[RACEvent eventWithValue:x]];
  1023. } error:^(NSError *error) {
  1024. [subscriber sendNext:[RACEvent eventWithError:error]];
  1025. [subscriber sendCompleted];
  1026. } completed:^{
  1027. [subscriber sendNext:RACEvent.completedEvent];
  1028. [subscriber sendCompleted];
  1029. }];
  1030. }] setNameWithFormat:@"[%@] -materialize", self.name];
  1031. }
  1032. - (RACSignal *)dematerialize {
  1033. return [[self bind:^{
  1034. return ^(RACEvent *event, BOOL *stop) {
  1035. switch (event.eventType) {
  1036. case RACEventTypeCompleted:
  1037. *stop = YES;
  1038. return [RACSignal empty];
  1039. case RACEventTypeError:
  1040. *stop = YES;
  1041. return [RACSignal error:event.error];
  1042. case RACEventTypeNext:
  1043. return [RACSignal return:event.value];
  1044. }
  1045. };
  1046. }] setNameWithFormat:@"[%@] -dematerialize", self.name];
  1047. }
  1048. - (RACSignal *)not {
  1049. return [[self map:^(NSNumber *value) {
  1050. NSCAssert([value isKindOfClass:NSNumber.class], @"-not must only be used on a signal of NSNumbers. Instead, got: %@", value);
  1051. return @(!value.boolValue);
  1052. }] setNameWithFormat:@"[%@] -not", self.name];
  1053. }
  1054. - (RACSignal *)and {
  1055. return [[self map:^(RACTuple *tuple) {
  1056. NSCAssert([tuple isKindOfClass:RACTuple.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
  1057. NSCAssert(tuple.count > 0, @"-and must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
  1058. return @([tuple.rac_sequence all:^(NSNumber *number) {
  1059. NSCAssert([number isKindOfClass:NSNumber.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
  1060. return number.boolValue;
  1061. }]);
  1062. }] setNameWithFormat:@"[%@] -and", self.name];
  1063. }
  1064. - (RACSignal *)or {
  1065. return [[self map:^(RACTuple *tuple) {
  1066. NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
  1067. NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
  1068. return @([tuple.rac_sequence any:^(NSNumber *number) {
  1069. NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
  1070. return number.boolValue;
  1071. }]);
  1072. }] setNameWithFormat:@"[%@] -or", self.name];
  1073. }
  1074. - (RACSignal *)reduceApply {
  1075. return [[self map:^(RACTuple *tuple) {
  1076. NSCAssert([tuple isKindOfClass:RACTuple.class], @"-reduceApply must only be used on a signal of RACTuples. Instead, received: %@", tuple);
  1077. NSCAssert(tuple.count > 1, @"-reduceApply must only be used on a signal of RACTuples, with at least a block in tuple[0] and its first argument in tuple[1]");
  1078. // We can't use -array, because we need to preserve RACTupleNil
  1079. NSMutableArray *tupleArray = [NSMutableArray arrayWithCapacity:tuple.count];
  1080. for (id val in tuple) {
  1081. [tupleArray addObject:val];
  1082. }
  1083. RACTuple *arguments = [RACTuple tupleWithObjectsFromArray:[tupleArray subarrayWithRange:NSMakeRange(1, tupleArray.count - 1)]];
  1084. return [RACBlockTrampoline invokeBlock:tuple[0] withArguments:arguments];
  1085. }] setNameWithFormat:@"[%@] -reduceApply", self.name];
  1086. }
  1087. @end