RACCommand.m 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. //
  2. // RACCommand.m
  3. // ReactiveObjC
  4. //
  5. // Created by Josh Abernathy on 3/3/12.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACCommand.h"
  9. #import <ReactiveObjC/RACEXTScope.h>
  10. #import "NSArray+RACSequenceAdditions.h"
  11. #import "NSObject+RACDeallocating.h"
  12. #import "NSObject+RACDescription.h"
  13. #import "NSObject+RACPropertySubscribing.h"
  14. #import "RACMulticastConnection.h"
  15. #import "RACReplaySubject.h"
  16. #import "RACScheduler.h"
  17. #import "RACSequence.h"
  18. #import "RACSignal+Operations.h"
  19. #import <libkern/OSAtomic.h>
  20. NSString * const RACCommandErrorDomain = @"RACCommandErrorDomain";
  21. NSString * const RACUnderlyingCommandErrorKey = @"RACUnderlyingCommandErrorKey";
  22. const NSInteger RACCommandErrorNotEnabled = 1;
  23. @interface RACCommand () {
  24. // Atomic backing variable for `allowsConcurrentExecution`.
  25. volatile uint32_t _allowsConcurrentExecution;
  26. }
  27. /// A subject that sends added execution signals.
  28. @property (nonatomic, strong, readonly) RACSubject *addedExecutionSignalsSubject;
  29. /// A subject that sends the new value of `allowsConcurrentExecution` whenever it changes.
  30. @property (nonatomic, strong, readonly) RACSubject *allowsConcurrentExecutionSubject;
  31. // `enabled`, but without a hop to the main thread.
  32. //
  33. // Values from this signal may arrive on any thread.
  34. @property (nonatomic, strong, readonly) RACSignal *immediateEnabled;
  35. // The signal block that the receiver was initialized with.
  36. @property (nonatomic, copy, readonly) RACSignal * (^signalBlock)(id input);
  37. @end
  38. @implementation RACCommand
  39. #pragma mark Properties
  40. - (BOOL)allowsConcurrentExecution {
  41. return _allowsConcurrentExecution != 0;
  42. }
  43. - (void)setAllowsConcurrentExecution:(BOOL)allowed {
  44. if (allowed) {
  45. OSAtomicOr32Barrier(1, &_allowsConcurrentExecution);
  46. } else {
  47. OSAtomicAnd32Barrier(0, &_allowsConcurrentExecution);
  48. }
  49. [self.allowsConcurrentExecutionSubject sendNext:@(_allowsConcurrentExecution)];
  50. }
  51. #pragma mark Lifecycle
  52. - (instancetype)init {
  53. NSCAssert(NO, @"Use -initWithSignalBlock: instead");
  54. return nil;
  55. }
  56. - (instancetype)initWithSignalBlock:(RACSignal<id> * (^)(id input))signalBlock {
  57. return [self initWithEnabled:nil signalBlock:signalBlock];
  58. }
  59. - (void)dealloc {
  60. [_addedExecutionSignalsSubject sendCompleted];
  61. [_allowsConcurrentExecutionSubject sendCompleted];
  62. }
  63. - (instancetype)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal<id> * (^)(id input))signalBlock {
  64. NSCParameterAssert(signalBlock != nil);
  65. self = [super init];
  66. _addedExecutionSignalsSubject = [RACSubject new];
  67. _allowsConcurrentExecutionSubject = [RACSubject new];
  68. _signalBlock = [signalBlock copy];
  69. _executionSignals = [[[self.addedExecutionSignalsSubject
  70. map:^(RACSignal *signal) {
  71. return [signal catchTo:[RACSignal empty]];
  72. }]
  73. deliverOn:RACScheduler.mainThreadScheduler]
  74. setNameWithFormat:@"%@ -executionSignals", self];
  75. // `errors` needs to be multicasted so that it picks up all
  76. // `activeExecutionSignals` that are added.
  77. //
  78. // In other words, if someone subscribes to `errors` _after_ an execution
  79. // has started, it should still receive any error from that execution.
  80. RACMulticastConnection *errorsConnection = [[[self.addedExecutionSignalsSubject
  81. flattenMap:^(RACSignal *signal) {
  82. return [[signal
  83. ignoreValues]
  84. catch:^(NSError *error) {
  85. return [RACSignal return:error];
  86. }];
  87. }]
  88. deliverOn:RACScheduler.mainThreadScheduler]
  89. publish];
  90. _errors = [errorsConnection.signal setNameWithFormat:@"%@ -errors", self];
  91. [errorsConnection connect];
  92. RACSignal *immediateExecuting = [[[[self.addedExecutionSignalsSubject
  93. flattenMap:^(RACSignal *signal) {
  94. return [[[signal
  95. catchTo:[RACSignal empty]]
  96. then:^{
  97. return [RACSignal return:@-1];
  98. }]
  99. startWith:@1];
  100. }]
  101. scanWithStart:@0 reduce:^(NSNumber *running, NSNumber *next) {
  102. return @(running.integerValue + next.integerValue);
  103. }]
  104. map:^(NSNumber *count) {
  105. return @(count.integerValue > 0);
  106. }]
  107. startWith:@NO];
  108. _executing = [[[[[immediateExecuting
  109. deliverOn:RACScheduler.mainThreadScheduler]
  110. // This is useful before the first value arrives on the main thread.
  111. startWith:@NO]
  112. distinctUntilChanged]
  113. replayLast]
  114. setNameWithFormat:@"%@ -executing", self];
  115. RACSignal *moreExecutionsAllowed = [RACSignal
  116. if:[self.allowsConcurrentExecutionSubject startWith:@NO]
  117. then:[RACSignal return:@YES]
  118. else:[immediateExecuting not]];
  119. if (enabledSignal == nil) {
  120. enabledSignal = [RACSignal return:@YES];
  121. } else {
  122. enabledSignal = [enabledSignal startWith:@YES];
  123. }
  124. _immediateEnabled = [[[[RACSignal
  125. combineLatest:@[ enabledSignal, moreExecutionsAllowed ]]
  126. and]
  127. takeUntil:self.rac_willDeallocSignal]
  128. replayLast];
  129. _enabled = [[[[[self.immediateEnabled
  130. take:1]
  131. concat:[[self.immediateEnabled skip:1] deliverOn:RACScheduler.mainThreadScheduler]]
  132. distinctUntilChanged]
  133. replayLast]
  134. setNameWithFormat:@"%@ -enabled", self];
  135. return self;
  136. }
  137. #pragma mark Execution
  138. - (RACSignal *)execute:(id)input {
  139. // `immediateEnabled` is guaranteed to send a value upon subscription, so
  140. // -first is acceptable here.
  141. BOOL enabled = [[self.immediateEnabled first] boolValue];
  142. if (!enabled) {
  143. NSError *error = [NSError errorWithDomain:RACCommandErrorDomain code:RACCommandErrorNotEnabled userInfo:@{
  144. NSLocalizedDescriptionKey: NSLocalizedString(@"The command is disabled and cannot be executed", nil),
  145. RACUnderlyingCommandErrorKey: self
  146. }];
  147. return [RACSignal error:error];
  148. }
  149. RACSignal *signal = self.signalBlock(input);
  150. NSCAssert(signal != nil, @"nil signal returned from signal block for value: %@", input);
  151. // We subscribe to the signal on the main thread so that it occurs _after_
  152. // -addActiveExecutionSignal: completes below.
  153. //
  154. // This means that `executing` and `enabled` will send updated values before
  155. // the signal actually starts performing work.
  156. RACMulticastConnection *connection = [[signal
  157. subscribeOn:RACScheduler.mainThreadScheduler]
  158. multicast:[RACReplaySubject subject]];
  159. [self.addedExecutionSignalsSubject sendNext:connection.signal];
  160. [connection connect];
  161. return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, RACDescription(input)];
  162. }
  163. @end