RACChannel.m 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. //
  2. // RACChannel.m
  3. // ReactiveCocoa
  4. //
  5. // Created by Uri Baghin on 01/01/2013.
  6. // Copyright (c) 2013 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACChannel.h"
  9. #import "RACDisposable.h"
  10. #import "RACReplaySubject.h"
  11. #import "RACSignal+Operations.h"
  12. @interface RACChannelTerminal ()
  13. // The values for this terminal.
  14. @property (nonatomic, strong, readonly) RACSignal *values;
  15. // A subscriber will will send values to the other terminal.
  16. @property (nonatomic, strong, readonly) id<RACSubscriber> otherTerminal;
  17. - (id)initWithValues:(RACSignal *)values otherTerminal:(id<RACSubscriber>)otherTerminal;
  18. @end
  19. @implementation RACChannel
  20. - (id)init {
  21. self = [super init];
  22. if (self == nil) return nil;
  23. // We don't want any starting value from the leadingSubject, but we do want
  24. // error and completion to be replayed.
  25. RACReplaySubject *leadingSubject = [[RACReplaySubject replaySubjectWithCapacity:0] setNameWithFormat:@"leadingSubject"];
  26. RACReplaySubject *followingSubject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"followingSubject"];
  27. // Propagate errors and completion to everything.
  28. [[leadingSubject ignoreValues] subscribe:followingSubject];
  29. [[followingSubject ignoreValues] subscribe:leadingSubject];
  30. _leadingTerminal = [[[RACChannelTerminal alloc] initWithValues:leadingSubject otherTerminal:followingSubject] setNameWithFormat:@"leadingTerminal"];
  31. _followingTerminal = [[[RACChannelTerminal alloc] initWithValues:followingSubject otherTerminal:leadingSubject] setNameWithFormat:@"followingTerminal"];
  32. return self;
  33. }
  34. @end
  35. @implementation RACChannelTerminal
  36. #pragma mark Lifecycle
  37. - (id)initWithValues:(RACSignal *)values otherTerminal:(id<RACSubscriber>)otherTerminal {
  38. NSCParameterAssert(values != nil);
  39. NSCParameterAssert(otherTerminal != nil);
  40. self = [super init];
  41. if (self == nil) return nil;
  42. _values = values;
  43. _otherTerminal = otherTerminal;
  44. return self;
  45. }
  46. #pragma mark RACSignal
  47. - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
  48. return [self.values subscribe:subscriber];
  49. }
  50. #pragma mark <RACSubscriber>
  51. - (void)sendNext:(id)value {
  52. [self.otherTerminal sendNext:value];
  53. }
  54. - (void)sendError:(NSError *)error {
  55. [self.otherTerminal sendError:error];
  56. }
  57. - (void)sendCompleted {
  58. [self.otherTerminal sendCompleted];
  59. }
  60. - (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
  61. [self.otherTerminal didSubscribeWithDisposable:disposable];
  62. }
  63. @end