RACStream.m 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. //
  2. // RACStream.m
  3. // ReactiveCocoa
  4. //
  5. // Created by Justin Spahr-Summers on 2012-10-31.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACStream.h"
  9. #import "NSObject+RACDescription.h"
  10. #import "RACBlockTrampoline.h"
  11. #import "RACTuple.h"
  12. @implementation RACStream
  13. #pragma mark Lifecycle
  14. - (id)init {
  15. self = [super init];
  16. if (self == nil) return nil;
  17. self.name = @"";
  18. return self;
  19. }
  20. #pragma mark Abstract methods
  21. + (instancetype)empty {
  22. return nil;
  23. }
  24. - (instancetype)bind:(RACStreamBindBlock (^)(void))block {
  25. return nil;
  26. }
  27. + (instancetype)return:(id)value {
  28. return nil;
  29. }
  30. - (instancetype)concat:(RACStream *)stream {
  31. return nil;
  32. }
  33. - (instancetype)zipWith:(RACStream *)stream {
  34. return nil;
  35. }
  36. #pragma mark Naming
  37. - (instancetype)setNameWithFormat:(NSString *)format, ... {
  38. if (getenv("RAC_DEBUG_SIGNAL_NAMES") == NULL) return self;
  39. NSCParameterAssert(format != nil);
  40. va_list args;
  41. va_start(args, format);
  42. NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
  43. va_end(args);
  44. self.name = str;
  45. return self;
  46. }
  47. @end
  48. @implementation RACStream (Operations)
  49. - (instancetype)flattenMap:(RACStream * (^)(id value))block {
  50. Class class = self.class;
  51. return [[self bind:^{
  52. return ^(id value, BOOL *stop) {
  53. id stream = block(value) ?: [class empty];
  54. NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
  55. return stream;
  56. };
  57. }] setNameWithFormat:@"[%@] -flattenMap:", self.name];
  58. }
  59. - (instancetype)flatten {
  60. return [[self flattenMap:^(id value) {
  61. return value;
  62. }] setNameWithFormat:@"[%@] -flatten", self.name];
  63. }
  64. - (instancetype)map:(id (^)(id value))block {
  65. NSCParameterAssert(block != nil);
  66. Class class = self.class;
  67. return [[self flattenMap:^(id value) {
  68. return [class return:block(value)];
  69. }] setNameWithFormat:@"[%@] -map:", self.name];
  70. }
  71. - (instancetype)mapReplace:(id)object {
  72. return [[self map:^(id _) {
  73. return object;
  74. }] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, RACDescription(object)];
  75. }
  76. - (instancetype)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id next))reduceBlock {
  77. NSCParameterAssert(reduceBlock != NULL);
  78. return [[[self
  79. scanWithStart:RACTuplePack(start)
  80. reduce:^(RACTuple *previousTuple, id next) {
  81. id value = reduceBlock(previousTuple[0], next);
  82. return RACTuplePack(next, value);
  83. }]
  84. map:^(RACTuple *tuple) {
  85. return tuple[1];
  86. }]
  87. setNameWithFormat:@"[%@] -combinePreviousWithStart: %@ reduce:", self.name, RACDescription(start)];
  88. }
  89. - (instancetype)filter:(BOOL (^)(id value))block {
  90. NSCParameterAssert(block != nil);
  91. Class class = self.class;
  92. return [[self flattenMap:^ id (id value) {
  93. if (block(value)) {
  94. return [class return:value];
  95. } else {
  96. return class.empty;
  97. }
  98. }] setNameWithFormat:@"[%@] -filter:", self.name];
  99. }
  100. - (instancetype)ignore:(id)value {
  101. return [[self filter:^ BOOL (id innerValue) {
  102. return innerValue != value && ![innerValue isEqual:value];
  103. }] setNameWithFormat:@"[%@] -ignore: %@", self.name, RACDescription(value)];
  104. }
  105. - (instancetype)reduceEach:(id (^)())reduceBlock {
  106. NSCParameterAssert(reduceBlock != nil);
  107. __weak RACStream *stream __attribute__((unused)) = self;
  108. return [[self map:^(RACTuple *t) {
  109. NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
  110. return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
  111. }] setNameWithFormat:@"[%@] -reduceEach:", self.name];
  112. }
  113. - (instancetype)startWith:(id)value {
  114. return [[[self.class return:value]
  115. concat:self]
  116. setNameWithFormat:@"[%@] -startWith: %@", self.name, RACDescription(value)];
  117. }
  118. - (instancetype)skip:(NSUInteger)skipCount {
  119. Class class = self.class;
  120. return [[self bind:^{
  121. __block NSUInteger skipped = 0;
  122. return ^(id value, BOOL *stop) {
  123. if (skipped >= skipCount) return [class return:value];
  124. skipped++;
  125. return class.empty;
  126. };
  127. }] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
  128. }
  129. - (instancetype)take:(NSUInteger)count {
  130. Class class = self.class;
  131. if (count == 0) return class.empty;
  132. return [[self bind:^{
  133. __block NSUInteger taken = 0;
  134. return ^ id (id value, BOOL *stop) {
  135. if (taken < count) {
  136. ++taken;
  137. if (taken == count) *stop = YES;
  138. return [class return:value];
  139. } else {
  140. return nil;
  141. }
  142. };
  143. }] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
  144. }
  145. + (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
  146. RACStream *current = nil;
  147. // Creates streams of successively larger tuples by combining the input
  148. // streams one-by-one.
  149. for (RACStream *stream in streams) {
  150. // For the first stream, just wrap its values in a RACTuple. That way,
  151. // if only one stream is given, the result is still a stream of tuples.
  152. if (current == nil) {
  153. current = [stream map:^(id x) {
  154. return RACTuplePack(x);
  155. }];
  156. continue;
  157. }
  158. current = block(current, stream);
  159. }
  160. if (current == nil) return [self empty];
  161. return [current map:^(RACTuple *xs) {
  162. // Right now, each value is contained in its own tuple, sorta like:
  163. //
  164. // (((1), 2), 3)
  165. //
  166. // We need to unwrap all the layers and create a tuple out of the result.
  167. NSMutableArray *values = [[NSMutableArray alloc] init];
  168. while (xs != nil) {
  169. [values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
  170. xs = (xs.count > 1 ? xs.first : nil);
  171. }
  172. return [RACTuple tupleWithObjectsFromArray:values];
  173. }];
  174. }
  175. + (instancetype)zip:(id<NSFastEnumeration>)streams {
  176. return [[self join:streams block:^(RACStream *left, RACStream *right) {
  177. return [left zipWith:right];
  178. }] setNameWithFormat:@"+zip: %@", streams];
  179. }
  180. + (instancetype)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock {
  181. NSCParameterAssert(reduceBlock != nil);
  182. RACStream *result = [self zip:streams];
  183. // Although we assert this condition above, older versions of this method
  184. // supported this argument being nil. Avoid crashing Release builds of
  185. // apps that depended on that.
  186. if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
  187. return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
  188. }
  189. + (instancetype)concat:(id<NSFastEnumeration>)streams {
  190. RACStream *result = self.empty;
  191. for (RACStream *stream in streams) {
  192. result = [result concat:stream];
  193. }
  194. return [result setNameWithFormat:@"+concat: %@", streams];
  195. }
  196. - (instancetype)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock {
  197. NSCParameterAssert(reduceBlock != nil);
  198. return [[self
  199. scanWithStart:startingValue
  200. reduceWithIndex:^(id running, id next, NSUInteger index) {
  201. return reduceBlock(running, next);
  202. }]
  203. setNameWithFormat:@"[%@] -scanWithStart: %@ reduce:", self.name, RACDescription(startingValue)];
  204. }
  205. - (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
  206. NSCParameterAssert(reduceBlock != nil);
  207. Class class = self.class;
  208. return [[self bind:^{
  209. __block id running = startingValue;
  210. __block NSUInteger index = 0;
  211. return ^(id value, BOOL *stop) {
  212. running = reduceBlock(running, value, index++);
  213. return [class return:running];
  214. };
  215. }] setNameWithFormat:@"[%@] -scanWithStart: %@ reduceWithIndex:", self.name, RACDescription(startingValue)];
  216. }
  217. - (instancetype)takeUntilBlock:(BOOL (^)(id x))predicate {
  218. NSCParameterAssert(predicate != nil);
  219. Class class = self.class;
  220. return [[self bind:^{
  221. return ^ id (id value, BOOL *stop) {
  222. if (predicate(value)) return nil;
  223. return [class return:value];
  224. };
  225. }] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
  226. }
  227. - (instancetype)takeWhileBlock:(BOOL (^)(id x))predicate {
  228. NSCParameterAssert(predicate != nil);
  229. return [[self takeUntilBlock:^ BOOL (id x) {
  230. return !predicate(x);
  231. }] setNameWithFormat:@"[%@] -takeWhileBlock:", self.name];
  232. }
  233. - (instancetype)skipUntilBlock:(BOOL (^)(id x))predicate {
  234. NSCParameterAssert(predicate != nil);
  235. Class class = self.class;
  236. return [[self bind:^{
  237. __block BOOL skipping = YES;
  238. return ^ id (id value, BOOL *stop) {
  239. if (skipping) {
  240. if (predicate(value)) {
  241. skipping = NO;
  242. } else {
  243. return class.empty;
  244. }
  245. }
  246. return [class return:value];
  247. };
  248. }] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
  249. }
  250. - (instancetype)skipWhileBlock:(BOOL (^)(id x))predicate {
  251. NSCParameterAssert(predicate != nil);
  252. return [[self skipUntilBlock:^ BOOL (id x) {
  253. return !predicate(x);
  254. }] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
  255. }
  256. - (instancetype)distinctUntilChanged {
  257. Class class = self.class;
  258. return [[self bind:^{
  259. __block id lastValue = nil;
  260. __block BOOL initial = YES;
  261. return ^(id x, BOOL *stop) {
  262. if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];
  263. initial = NO;
  264. lastValue = x;
  265. return [class return:x];
  266. };
  267. }] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
  268. }
  269. @end