RACMulticastConnection.m 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. //
  2. // RACMulticastConnection.m
  3. // ReactiveCocoa
  4. //
  5. // Created by Josh Abernathy on 4/11/12.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACMulticastConnection.h"
  9. #import "RACMulticastConnection+Private.h"
  10. #import "RACDisposable.h"
  11. #import "RACSerialDisposable.h"
  12. #import "RACSubject.h"
  13. #import <libkern/OSAtomic.h>
  14. @interface RACMulticastConnection () {
  15. RACSubject *_signal;
  16. // When connecting, a caller should attempt to atomically swap the value of this
  17. // from `0` to `1`.
  18. //
  19. // If the swap is successful the caller is resposible for subscribing `_signal`
  20. // to `sourceSignal` and storing the returned disposable in `serialDisposable`.
  21. //
  22. // If the swap is unsuccessful it means that `_sourceSignal` has already been
  23. // connected and the caller has no action to take.
  24. int32_t volatile _hasConnected;
  25. }
  26. @property (nonatomic, readonly, strong) RACSignal *sourceSignal;
  27. @property (strong) RACSerialDisposable *serialDisposable;
  28. @end
  29. @implementation RACMulticastConnection
  30. #pragma mark Lifecycle
  31. - (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
  32. NSCParameterAssert(source != nil);
  33. NSCParameterAssert(subject != nil);
  34. self = [super init];
  35. if (self == nil) return nil;
  36. _sourceSignal = source;
  37. _serialDisposable = [[RACSerialDisposable alloc] init];
  38. _signal = subject;
  39. return self;
  40. }
  41. #pragma mark Connecting
  42. - (RACDisposable *)connect {
  43. BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
  44. if (shouldConnect) {
  45. self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
  46. }
  47. return self.serialDisposable;
  48. }
  49. - (RACSignal *)autoconnect {
  50. __block volatile int32_t subscriberCount = 0;
  51. return [[RACSignal
  52. createSignal:^(id<RACSubscriber> subscriber) {
  53. OSAtomicIncrement32Barrier(&subscriberCount);
  54. RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
  55. RACDisposable *connectionDisposable = [self connect];
  56. return [RACDisposable disposableWithBlock:^{
  57. [subscriptionDisposable dispose];
  58. if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
  59. [connectionDisposable dispose];
  60. }
  61. }];
  62. }]
  63. setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
  64. }
  65. @end