PINOperationQueue.m 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. //
  2. // PINOperationQueue.m
  3. // Pods
  4. //
  5. // Created by Garrett Moon on 8/23/16.
  6. //
  7. //
  8. #import "PINOperationQueue.h"
  9. #import <pthread.h>
  10. @class PINOperation;
  11. @interface NSNumber (PINOperationQueue) <PINOperationReference>
  12. @end
  13. @interface PINOperationQueue () {
  14. pthread_mutex_t _lock;
  15. //increments with every operation to allow cancelation
  16. NSUInteger _operationReferenceCount;
  17. NSUInteger _maxConcurrentOperations;
  18. dispatch_group_t _group;
  19. dispatch_queue_t _serialQueue;
  20. BOOL _serialQueueBusy;
  21. dispatch_semaphore_t _concurrentSemaphore;
  22. dispatch_queue_t _concurrentQueue;
  23. dispatch_queue_t _semaphoreQueue;
  24. NSMutableOrderedSet<PINOperation *> *_queuedOperations;
  25. NSMutableOrderedSet<PINOperation *> *_lowPriorityOperations;
  26. NSMutableOrderedSet<PINOperation *> *_defaultPriorityOperations;
  27. NSMutableOrderedSet<PINOperation *> *_highPriorityOperations;
  28. NSMapTable<id<PINOperationReference>, PINOperation *> *_referenceToOperations;
  29. NSMapTable<NSString *, PINOperation *> *_identifierToOperations;
  30. }
  31. @end
  32. @interface PINOperation : NSObject
  33. @property (nonatomic, strong) PINOperationBlock block;
  34. @property (nonatomic, strong) id <PINOperationReference> reference;
  35. @property (nonatomic, assign) PINOperationQueuePriority priority;
  36. @property (nonatomic, strong) NSMutableArray<dispatch_block_t> *completions;
  37. @property (nonatomic, strong) NSString *identifier;
  38. @property (nonatomic, strong) id data;
  39. + (instancetype)operationWithBlock:(PINOperationBlock)block reference:(id <PINOperationReference>)reference priority:(PINOperationQueuePriority)priority identifier:(nullable NSString *)identifier data:(nullable id)data completion:(nullable dispatch_block_t)completion;
  40. - (void)addCompletion:(nullable dispatch_block_t)completion;
  41. @end
  42. @implementation PINOperation
  43. + (instancetype)operationWithBlock:(PINOperationBlock)block reference:(id<PINOperationReference>)reference priority:(PINOperationQueuePriority)priority identifier:(NSString *)identifier data:(id)data completion:(dispatch_block_t)completion
  44. {
  45. PINOperation *operation = [[self alloc] init];
  46. operation.block = block;
  47. operation.reference = reference;
  48. operation.priority = priority;
  49. operation.identifier = identifier;
  50. operation.data = data;
  51. [operation addCompletion:completion];
  52. return operation;
  53. }
  54. - (void)addCompletion:(dispatch_block_t)completion
  55. {
  56. if (completion == nil) {
  57. return;
  58. }
  59. if (_completions == nil) {
  60. _completions = [NSMutableArray array];
  61. }
  62. [_completions addObject:completion];
  63. }
  64. @end
  65. @implementation PINOperationQueue
  66. - (instancetype)initWithMaxConcurrentOperations:(NSUInteger)maxConcurrentOperations
  67. {
  68. return [self initWithMaxConcurrentOperations:maxConcurrentOperations concurrentQueue:dispatch_queue_create("PINOperationQueue Concurrent Queue", DISPATCH_QUEUE_CONCURRENT)];
  69. }
  70. - (instancetype)initWithMaxConcurrentOperations:(NSUInteger)maxConcurrentOperations concurrentQueue:(dispatch_queue_t)concurrentQueue
  71. {
  72. if (self = [super init]) {
  73. NSAssert(maxConcurrentOperations > 1, @"Max concurrent operations must be greater than 1. If it's one, just use a serial queue!");
  74. _maxConcurrentOperations = maxConcurrentOperations;
  75. _operationReferenceCount = 0;
  76. pthread_mutexattr_t attr;
  77. pthread_mutexattr_init(&attr);
  78. //mutex must be recursive to allow scheduling of operations from within operations
  79. pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
  80. pthread_mutex_init(&_lock, &attr);
  81. _group = dispatch_group_create();
  82. _serialQueue = dispatch_queue_create("PINOperationQueue Serial Queue", DISPATCH_QUEUE_SERIAL);
  83. _concurrentQueue = concurrentQueue;
  84. //Create a queue with max - 1 because this plus the serial queue add up to max.
  85. _concurrentSemaphore = dispatch_semaphore_create(_maxConcurrentOperations - 1);
  86. _semaphoreQueue = dispatch_queue_create("PINOperationQueue Serial Semaphore Queue", DISPATCH_QUEUE_SERIAL);
  87. _queuedOperations = [[NSMutableOrderedSet alloc] init];
  88. _lowPriorityOperations = [[NSMutableOrderedSet alloc] init];
  89. _defaultPriorityOperations = [[NSMutableOrderedSet alloc] init];
  90. _highPriorityOperations = [[NSMutableOrderedSet alloc] init];
  91. _referenceToOperations = [NSMapTable weakToWeakObjectsMapTable];
  92. _identifierToOperations = [NSMapTable weakToWeakObjectsMapTable];
  93. }
  94. return self;
  95. }
  96. - (void)dealloc
  97. {
  98. pthread_mutex_destroy(&_lock);
  99. }
  100. + (instancetype)sharedOperationQueue
  101. {
  102. static PINOperationQueue *sharedOperationQueue = nil;
  103. static dispatch_once_t onceToken;
  104. dispatch_once(&onceToken, ^{
  105. sharedOperationQueue = [[PINOperationQueue alloc] initWithMaxConcurrentOperations:MAX([[NSProcessInfo processInfo] activeProcessorCount], 2)];
  106. });
  107. return sharedOperationQueue;
  108. }
  109. - (id <PINOperationReference>)nextOperationReference
  110. {
  111. [self lock];
  112. id <PINOperationReference> reference = [NSNumber numberWithUnsignedInteger:++_operationReferenceCount];
  113. [self unlock];
  114. return reference;
  115. }
  116. - (id <PINOperationReference>)addOperation:(dispatch_block_t)block
  117. {
  118. return [self addOperation:block withPriority:PINOperationQueuePriorityDefault];
  119. }
  120. - (id <PINOperationReference>)addOperation:(dispatch_block_t)block withPriority:(PINOperationQueuePriority)priority
  121. {
  122. PINOperation *operation = [PINOperation operationWithBlock:^(id data) { block(); }
  123. reference:[self nextOperationReference]
  124. priority:priority
  125. identifier:nil
  126. data:nil
  127. completion:nil];
  128. [self lock];
  129. [self locked_addOperation:operation];
  130. [self unlock];
  131. [self scheduleNextOperations:NO];
  132. return operation.reference;
  133. }
  134. - (id<PINOperationReference>)addOperation:(PINOperationBlock)block
  135. withPriority:(PINOperationQueuePriority)priority
  136. identifier:(NSString *)identifier
  137. coalescingData:(id)coalescingData
  138. dataCoalescingBlock:(PINOperationDataCoalescingBlock)dataCoalescingBlock
  139. completion:(dispatch_block_t)completion
  140. {
  141. id<PINOperationReference> reference = nil;
  142. BOOL isNewOperation = NO;
  143. [self lock];
  144. PINOperation *operation = nil;
  145. if (identifier != nil && (operation = [_identifierToOperations objectForKey:identifier]) != nil) {
  146. // There is an exisiting operation with the provided identifier, let's coallesce these operations
  147. if (dataCoalescingBlock != nil) {
  148. operation.data = dataCoalescingBlock(operation.data, coalescingData);
  149. }
  150. [operation addCompletion:completion];
  151. } else {
  152. isNewOperation = YES;
  153. operation = [PINOperation operationWithBlock:block
  154. reference:[self nextOperationReference]
  155. priority:priority
  156. identifier:identifier
  157. data:coalescingData
  158. completion:completion];
  159. [self locked_addOperation:operation];
  160. }
  161. reference = operation.reference;
  162. [self unlock];
  163. if (isNewOperation) {
  164. [self scheduleNextOperations:NO];
  165. }
  166. return reference;
  167. }
  168. - (void)locked_addOperation:(PINOperation *)operation
  169. {
  170. NSMutableOrderedSet *queue = [self operationQueueWithPriority:operation.priority];
  171. dispatch_group_enter(_group);
  172. [queue addObject:operation];
  173. [_queuedOperations addObject:operation];
  174. [_referenceToOperations setObject:operation forKey:operation.reference];
  175. if (operation.identifier != nil) {
  176. [_identifierToOperations setObject:operation forKey:operation.identifier];
  177. }
  178. }
  179. - (void)cancelAllOperations
  180. {
  181. [self lock];
  182. for (PINOperation *operation in [[_referenceToOperations copy] objectEnumerator]) {
  183. [self locked_cancelOperation:operation.reference];
  184. }
  185. [self unlock];
  186. }
  187. - (BOOL)cancelOperation:(id <PINOperationReference>)operationReference
  188. {
  189. [self lock];
  190. BOOL success = [self locked_cancelOperation:operationReference];
  191. [self unlock];
  192. return success;
  193. }
  194. - (NSUInteger)maxConcurrentOperations
  195. {
  196. [self lock];
  197. NSUInteger maxConcurrentOperations = _maxConcurrentOperations;
  198. [self unlock];
  199. return maxConcurrentOperations;
  200. }
  201. - (void)setMaxConcurrentOperations:(NSUInteger)maxConcurrentOperations
  202. {
  203. NSAssert(maxConcurrentOperations > 1, @"Max concurrent operations must be greater than 1. If it's one, just use a serial queue!");
  204. [self lock];
  205. __block NSInteger difference = maxConcurrentOperations - _maxConcurrentOperations;
  206. _maxConcurrentOperations = maxConcurrentOperations;
  207. [self unlock];
  208. if (difference == 0) {
  209. return;
  210. }
  211. dispatch_async(_semaphoreQueue, ^{
  212. while (difference != 0) {
  213. if (difference > 0) {
  214. dispatch_semaphore_signal(_concurrentSemaphore);
  215. difference--;
  216. } else {
  217. dispatch_semaphore_wait(_concurrentSemaphore, DISPATCH_TIME_FOREVER);
  218. difference++;
  219. }
  220. }
  221. });
  222. }
  223. #pragma mark - private methods
  224. - (BOOL)locked_cancelOperation:(id <PINOperationReference>)operationReference
  225. {
  226. BOOL success = NO;
  227. PINOperation *operation = [_referenceToOperations objectForKey:operationReference];
  228. if (operation) {
  229. NSMutableOrderedSet *queue = [self operationQueueWithPriority:operation.priority];
  230. if ([queue containsObject:operation]) {
  231. success = YES;
  232. [queue removeObject:operation];
  233. [_queuedOperations removeObject:operation];
  234. dispatch_group_leave(_group);
  235. }
  236. }
  237. return success;
  238. }
  239. - (void)setOperationPriority:(PINOperationQueuePriority)priority withReference:(id <PINOperationReference>)operationReference
  240. {
  241. [self lock];
  242. PINOperation *operation = [_referenceToOperations objectForKey:operationReference];
  243. if (operation && operation.priority != priority) {
  244. NSMutableOrderedSet *oldQueue = [self operationQueueWithPriority:operation.priority];
  245. [oldQueue removeObject:operation];
  246. operation.priority = priority;
  247. NSMutableOrderedSet *queue = [self operationQueueWithPriority:priority];
  248. [queue addObject:operation];
  249. }
  250. [self unlock];
  251. }
  252. /**
  253. Schedule next operations schedules the next operation by queue order onto the serial queue if
  254. it's available and one operation by priority order onto the concurrent queue.
  255. */
  256. - (void)scheduleNextOperations:(BOOL)onlyCheckSerial
  257. {
  258. [self lock];
  259. //get next available operation in order, ignoring priority and run it on the serial queue
  260. if (_serialQueueBusy == NO) {
  261. PINOperation *operation = [self locked_nextOperationByQueue];
  262. if (operation) {
  263. _serialQueueBusy = YES;
  264. dispatch_async(_serialQueue, ^{
  265. operation.block(operation.data);
  266. for (dispatch_block_t completion in operation.completions) {
  267. completion();
  268. }
  269. dispatch_group_leave(_group);
  270. [self lock];
  271. _serialQueueBusy = NO;
  272. [self unlock];
  273. //see if there are any other operations
  274. [self scheduleNextOperations:YES];
  275. });
  276. }
  277. }
  278. [self unlock];
  279. if (onlyCheckSerial) {
  280. return;
  281. }
  282. dispatch_async(_semaphoreQueue, ^{
  283. dispatch_semaphore_wait(_concurrentSemaphore, DISPATCH_TIME_FOREVER);
  284. [self lock];
  285. PINOperation *operation = [self locked_nextOperationByPriority];
  286. [self unlock];
  287. if (operation) {
  288. dispatch_async(_concurrentQueue, ^{
  289. operation.block(operation.data);
  290. for (dispatch_block_t completion in operation.completions) {
  291. completion();
  292. }
  293. dispatch_group_leave(_group);
  294. dispatch_semaphore_signal(_concurrentSemaphore);
  295. });
  296. } else {
  297. dispatch_semaphore_signal(_concurrentSemaphore);
  298. }
  299. });
  300. }
  301. - (NSMutableOrderedSet *)operationQueueWithPriority:(PINOperationQueuePriority)priority
  302. {
  303. switch (priority) {
  304. case PINOperationQueuePriorityLow:
  305. return _lowPriorityOperations;
  306. case PINOperationQueuePriorityDefault:
  307. return _defaultPriorityOperations;
  308. case PINOperationQueuePriorityHigh:
  309. return _highPriorityOperations;
  310. default:
  311. NSAssert(NO, @"Invalid priority set");
  312. return _defaultPriorityOperations;
  313. }
  314. }
  315. //Call with lock held
  316. - (PINOperation *)locked_nextOperationByPriority
  317. {
  318. PINOperation *operation = [_highPriorityOperations firstObject];
  319. if (operation == nil) {
  320. operation = [_defaultPriorityOperations firstObject];
  321. }
  322. if (operation == nil) {
  323. operation = [_lowPriorityOperations firstObject];
  324. }
  325. if (operation) {
  326. [self locked_removeOperation:operation];
  327. }
  328. return operation;
  329. }
  330. //Call with lock held
  331. - (PINOperation *)locked_nextOperationByQueue
  332. {
  333. PINOperation *operation = [_queuedOperations firstObject];
  334. [self locked_removeOperation:operation];
  335. return operation;
  336. }
  337. - (void)waitUntilAllOperationsAreFinished
  338. {
  339. [self scheduleNextOperations:NO];
  340. dispatch_group_wait(_group, DISPATCH_TIME_FOREVER);
  341. }
  342. //Call with lock held
  343. - (void)locked_removeOperation:(PINOperation *)operation
  344. {
  345. if (operation) {
  346. NSMutableOrderedSet *priorityQueue = [self operationQueueWithPriority:operation.priority];
  347. [priorityQueue removeObject:operation];
  348. [_queuedOperations removeObject:operation];
  349. }
  350. }
  351. - (void)lock
  352. {
  353. pthread_mutex_lock(&_lock);
  354. }
  355. - (void)unlock
  356. {
  357. pthread_mutex_unlock(&_lock);
  358. }
  359. @end