| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- //
- // PINOperationQueue.m
- // Pods
- //
- // Created by Garrett Moon on 8/23/16.
- //
- //
- #import "PINOperationQueue.h"
- #import <pthread.h>
- @class PINOperation;
- @interface NSNumber (PINOperationQueue) <PINOperationReference>
- @end
- @interface PINOperationQueue () {
- pthread_mutex_t _lock;
- //increments with every operation to allow cancelation
- NSUInteger _operationReferenceCount;
- NSUInteger _maxConcurrentOperations;
-
- dispatch_group_t _group;
-
- dispatch_queue_t _serialQueue;
- BOOL _serialQueueBusy;
-
- dispatch_semaphore_t _concurrentSemaphore;
- dispatch_queue_t _concurrentQueue;
- dispatch_queue_t _semaphoreQueue;
-
- NSMutableOrderedSet<PINOperation *> *_queuedOperations;
- NSMutableOrderedSet<PINOperation *> *_lowPriorityOperations;
- NSMutableOrderedSet<PINOperation *> *_defaultPriorityOperations;
- NSMutableOrderedSet<PINOperation *> *_highPriorityOperations;
-
- NSMapTable<id<PINOperationReference>, PINOperation *> *_referenceToOperations;
- NSMapTable<NSString *, PINOperation *> *_identifierToOperations;
- }
- @end
- @interface PINOperation : NSObject
- @property (nonatomic, strong) PINOperationBlock block;
- @property (nonatomic, strong) id <PINOperationReference> reference;
- @property (nonatomic, assign) PINOperationQueuePriority priority;
- @property (nonatomic, strong) NSMutableArray<dispatch_block_t> *completions;
- @property (nonatomic, strong) NSString *identifier;
- @property (nonatomic, strong) id data;
- + (instancetype)operationWithBlock:(PINOperationBlock)block reference:(id <PINOperationReference>)reference priority:(PINOperationQueuePriority)priority identifier:(nullable NSString *)identifier data:(nullable id)data completion:(nullable dispatch_block_t)completion;
- - (void)addCompletion:(nullable dispatch_block_t)completion;
- @end
- @implementation PINOperation
- + (instancetype)operationWithBlock:(PINOperationBlock)block reference:(id<PINOperationReference>)reference priority:(PINOperationQueuePriority)priority identifier:(NSString *)identifier data:(id)data completion:(dispatch_block_t)completion
- {
- PINOperation *operation = [[self alloc] init];
- operation.block = block;
- operation.reference = reference;
- operation.priority = priority;
- operation.identifier = identifier;
- operation.data = data;
- [operation addCompletion:completion];
-
- return operation;
- }
- - (void)addCompletion:(dispatch_block_t)completion
- {
- if (completion == nil) {
- return;
- }
- if (_completions == nil) {
- _completions = [NSMutableArray array];
- }
- [_completions addObject:completion];
- }
- @end
- @implementation PINOperationQueue
- - (instancetype)initWithMaxConcurrentOperations:(NSUInteger)maxConcurrentOperations
- {
- return [self initWithMaxConcurrentOperations:maxConcurrentOperations concurrentQueue:dispatch_queue_create("PINOperationQueue Concurrent Queue", DISPATCH_QUEUE_CONCURRENT)];
- }
- - (instancetype)initWithMaxConcurrentOperations:(NSUInteger)maxConcurrentOperations concurrentQueue:(dispatch_queue_t)concurrentQueue
- {
- if (self = [super init]) {
- NSAssert(maxConcurrentOperations > 1, @"Max concurrent operations must be greater than 1. If it's one, just use a serial queue!");
- _maxConcurrentOperations = maxConcurrentOperations;
- _operationReferenceCount = 0;
-
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- //mutex must be recursive to allow scheduling of operations from within operations
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
- pthread_mutex_init(&_lock, &attr);
-
- _group = dispatch_group_create();
-
- _serialQueue = dispatch_queue_create("PINOperationQueue Serial Queue", DISPATCH_QUEUE_SERIAL);
-
- _concurrentQueue = concurrentQueue;
-
- //Create a queue with max - 1 because this plus the serial queue add up to max.
- _concurrentSemaphore = dispatch_semaphore_create(_maxConcurrentOperations - 1);
- _semaphoreQueue = dispatch_queue_create("PINOperationQueue Serial Semaphore Queue", DISPATCH_QUEUE_SERIAL);
-
- _queuedOperations = [[NSMutableOrderedSet alloc] init];
- _lowPriorityOperations = [[NSMutableOrderedSet alloc] init];
- _defaultPriorityOperations = [[NSMutableOrderedSet alloc] init];
- _highPriorityOperations = [[NSMutableOrderedSet alloc] init];
-
- _referenceToOperations = [NSMapTable weakToWeakObjectsMapTable];
- _identifierToOperations = [NSMapTable weakToWeakObjectsMapTable];
- }
- return self;
- }
- - (void)dealloc
- {
- pthread_mutex_destroy(&_lock);
- }
- + (instancetype)sharedOperationQueue
- {
- static PINOperationQueue *sharedOperationQueue = nil;
- static dispatch_once_t onceToken;
- dispatch_once(&onceToken, ^{
- sharedOperationQueue = [[PINOperationQueue alloc] initWithMaxConcurrentOperations:MAX([[NSProcessInfo processInfo] activeProcessorCount], 2)];
- });
- return sharedOperationQueue;
- }
- - (id <PINOperationReference>)nextOperationReference
- {
- [self lock];
- id <PINOperationReference> reference = [NSNumber numberWithUnsignedInteger:++_operationReferenceCount];
- [self unlock];
- return reference;
- }
- - (id <PINOperationReference>)addOperation:(dispatch_block_t)block
- {
- return [self addOperation:block withPriority:PINOperationQueuePriorityDefault];
- }
- - (id <PINOperationReference>)addOperation:(dispatch_block_t)block withPriority:(PINOperationQueuePriority)priority
- {
- PINOperation *operation = [PINOperation operationWithBlock:^(id data) { block(); }
- reference:[self nextOperationReference]
- priority:priority
- identifier:nil
- data:nil
- completion:nil];
- [self lock];
- [self locked_addOperation:operation];
- [self unlock];
-
- [self scheduleNextOperations:NO];
-
- return operation.reference;
- }
- - (id<PINOperationReference>)addOperation:(PINOperationBlock)block
- withPriority:(PINOperationQueuePriority)priority
- identifier:(NSString *)identifier
- coalescingData:(id)coalescingData
- dataCoalescingBlock:(PINOperationDataCoalescingBlock)dataCoalescingBlock
- completion:(dispatch_block_t)completion
- {
- id<PINOperationReference> reference = nil;
- BOOL isNewOperation = NO;
-
- [self lock];
- PINOperation *operation = nil;
- if (identifier != nil && (operation = [_identifierToOperations objectForKey:identifier]) != nil) {
- // There is an exisiting operation with the provided identifier, let's coallesce these operations
- if (dataCoalescingBlock != nil) {
- operation.data = dataCoalescingBlock(operation.data, coalescingData);
- }
-
- [operation addCompletion:completion];
- } else {
- isNewOperation = YES;
- operation = [PINOperation operationWithBlock:block
- reference:[self nextOperationReference]
- priority:priority
- identifier:identifier
- data:coalescingData
- completion:completion];
- [self locked_addOperation:operation];
- }
- reference = operation.reference;
- [self unlock];
-
- if (isNewOperation) {
- [self scheduleNextOperations:NO];
- }
-
- return reference;
- }
- - (void)locked_addOperation:(PINOperation *)operation
- {
- NSMutableOrderedSet *queue = [self operationQueueWithPriority:operation.priority];
-
- dispatch_group_enter(_group);
- [queue addObject:operation];
- [_queuedOperations addObject:operation];
- [_referenceToOperations setObject:operation forKey:operation.reference];
- if (operation.identifier != nil) {
- [_identifierToOperations setObject:operation forKey:operation.identifier];
- }
- }
- - (void)cancelAllOperations
- {
- [self lock];
- for (PINOperation *operation in [[_referenceToOperations copy] objectEnumerator]) {
- [self locked_cancelOperation:operation.reference];
- }
- [self unlock];
- }
- - (BOOL)cancelOperation:(id <PINOperationReference>)operationReference
- {
- [self lock];
- BOOL success = [self locked_cancelOperation:operationReference];
- [self unlock];
- return success;
- }
- - (NSUInteger)maxConcurrentOperations
- {
- [self lock];
- NSUInteger maxConcurrentOperations = _maxConcurrentOperations;
- [self unlock];
- return maxConcurrentOperations;
- }
- - (void)setMaxConcurrentOperations:(NSUInteger)maxConcurrentOperations
- {
- NSAssert(maxConcurrentOperations > 1, @"Max concurrent operations must be greater than 1. If it's one, just use a serial queue!");
- [self lock];
- __block NSInteger difference = maxConcurrentOperations - _maxConcurrentOperations;
- _maxConcurrentOperations = maxConcurrentOperations;
- [self unlock];
-
- if (difference == 0) {
- return;
- }
-
- dispatch_async(_semaphoreQueue, ^{
- while (difference != 0) {
- if (difference > 0) {
- dispatch_semaphore_signal(_concurrentSemaphore);
- difference--;
- } else {
- dispatch_semaphore_wait(_concurrentSemaphore, DISPATCH_TIME_FOREVER);
- difference++;
- }
- }
- });
- }
- #pragma mark - private methods
- - (BOOL)locked_cancelOperation:(id <PINOperationReference>)operationReference
- {
- BOOL success = NO;
- PINOperation *operation = [_referenceToOperations objectForKey:operationReference];
- if (operation) {
- NSMutableOrderedSet *queue = [self operationQueueWithPriority:operation.priority];
- if ([queue containsObject:operation]) {
- success = YES;
- [queue removeObject:operation];
- [_queuedOperations removeObject:operation];
- dispatch_group_leave(_group);
- }
- }
- return success;
- }
- - (void)setOperationPriority:(PINOperationQueuePriority)priority withReference:(id <PINOperationReference>)operationReference
- {
- [self lock];
- PINOperation *operation = [_referenceToOperations objectForKey:operationReference];
- if (operation && operation.priority != priority) {
- NSMutableOrderedSet *oldQueue = [self operationQueueWithPriority:operation.priority];
- [oldQueue removeObject:operation];
-
- operation.priority = priority;
-
- NSMutableOrderedSet *queue = [self operationQueueWithPriority:priority];
- [queue addObject:operation];
- }
- [self unlock];
- }
- /**
- Schedule next operations schedules the next operation by queue order onto the serial queue if
- it's available and one operation by priority order onto the concurrent queue.
- */
- - (void)scheduleNextOperations:(BOOL)onlyCheckSerial
- {
- [self lock];
- //get next available operation in order, ignoring priority and run it on the serial queue
- if (_serialQueueBusy == NO) {
- PINOperation *operation = [self locked_nextOperationByQueue];
- if (operation) {
- _serialQueueBusy = YES;
- dispatch_async(_serialQueue, ^{
- operation.block(operation.data);
- for (dispatch_block_t completion in operation.completions) {
- completion();
- }
- dispatch_group_leave(_group);
-
- [self lock];
- _serialQueueBusy = NO;
- [self unlock];
-
- //see if there are any other operations
- [self scheduleNextOperations:YES];
- });
- }
- }
- [self unlock];
-
- if (onlyCheckSerial) {
- return;
- }
-
- dispatch_async(_semaphoreQueue, ^{
- dispatch_semaphore_wait(_concurrentSemaphore, DISPATCH_TIME_FOREVER);
- [self lock];
- PINOperation *operation = [self locked_nextOperationByPriority];
- [self unlock];
-
- if (operation) {
- dispatch_async(_concurrentQueue, ^{
- operation.block(operation.data);
- for (dispatch_block_t completion in operation.completions) {
- completion();
- }
- dispatch_group_leave(_group);
- dispatch_semaphore_signal(_concurrentSemaphore);
- });
- } else {
- dispatch_semaphore_signal(_concurrentSemaphore);
- }
- });
- }
- - (NSMutableOrderedSet *)operationQueueWithPriority:(PINOperationQueuePriority)priority
- {
- switch (priority) {
- case PINOperationQueuePriorityLow:
- return _lowPriorityOperations;
-
- case PINOperationQueuePriorityDefault:
- return _defaultPriorityOperations;
-
- case PINOperationQueuePriorityHigh:
- return _highPriorityOperations;
-
- default:
- NSAssert(NO, @"Invalid priority set");
- return _defaultPriorityOperations;
- }
- }
- //Call with lock held
- - (PINOperation *)locked_nextOperationByPriority
- {
- PINOperation *operation = [_highPriorityOperations firstObject];
- if (operation == nil) {
- operation = [_defaultPriorityOperations firstObject];
- }
- if (operation == nil) {
- operation = [_lowPriorityOperations firstObject];
- }
- if (operation) {
- [self locked_removeOperation:operation];
- }
- return operation;
- }
- //Call with lock held
- - (PINOperation *)locked_nextOperationByQueue
- {
- PINOperation *operation = [_queuedOperations firstObject];
- [self locked_removeOperation:operation];
- return operation;
- }
- - (void)waitUntilAllOperationsAreFinished
- {
- [self scheduleNextOperations:NO];
- dispatch_group_wait(_group, DISPATCH_TIME_FOREVER);
- }
- //Call with lock held
- - (void)locked_removeOperation:(PINOperation *)operation
- {
- if (operation) {
- NSMutableOrderedSet *priorityQueue = [self operationQueueWithPriority:operation.priority];
- [priorityQueue removeObject:operation];
- [_queuedOperations removeObject:operation];
- }
- }
- - (void)lock
- {
- pthread_mutex_lock(&_lock);
- }
- - (void)unlock
- {
- pthread_mutex_unlock(&_lock);
- }
- @end
|