iOS开发-GCDAsyncSocket源码分析

tech2025-11-14  5

GCDAsyncSocket源码分析

GCDAsyncSocket.m的成员变量GCDAsyncSocket的初始化创建 GCDAsyncSocket Connect

GCDAsyncSocket.m的成员变量

@implementation GCDAsyncSocket { //flags 状态标志 uint32_t flags; //config IPV4|IPV6配置 uint16_t config; // 代理 __weak id<GCDAsyncSocketDelegate> delegate; //代理的回调函数 dispatch_queue_t delegateQueue; int socket4FD; int socket6FD; //unix套接字 //进行进程间通讯 int socketUN; //Unix服务端 NSURL *socketUrl; //状态Index int stateIndex; //本机的IPV4 NSData * connectInterface4; //本机的IPV6 NSData * connectInterface6; //本机Unix域地址 NSData * connectInterfaceUN; //这个类的Soctet的操作在这queue中,串行 dispatch_queue_t socketQueue; //souce源 dispatch_source_t accept4Source; dispatch_source_t accept6Source; dispatch_source_t acceptUNSource; //链接timer,GCD定时器重连 dispatch_source_t connectTimer; dispatch_source_t readSource; dispatch_source_t writeSource; dispatch_source_t readTimer; dispatch_source_t writeTimer; //读写数据包数组 类似queue FIFO NSMutableArray *readQueue; NSMutableArray *writeQueue; //当前正在读写数据包 GCDAsyncReadPacket *currentRead; GCDAsyncWritePacket *currentWrite; //当前socket未获取完的数据大小 unsigned long socketFDBytesAvailable; //全局公用的提前缓冲区 GCDAsyncSocketPreBuffer *preBuffer; #if TARGET_OS_IPHONE CFStreamClientContext streamContext; //读的数据流 CFReadStreamRef readStream; //写的数据流 CFWriteStreamRef writeStream; #endif //SSL上下文用来SSL验证 SSLContextRef sslContext; //全局公用的SSL的提前缓冲区 GCDAsyncSocketPreBuffer *sslPreBuffer; size_t sslWriteCachedLength; //记录SSL错误信息 OSStatus sslErrCode; OSStatus lastSSLHandshakeError; //socket队列的标志key void *IsOnSocketQueueOrTargetQueueKey; id userData; //连接备选服务器地址的延时 NSTimeInterval alternateAddressDelay; }

GCDAsyncSocket的初始化

创建

_Socket = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:dispatch_get_main_queue()]; - (instancetype)initWithDelegate:(id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(dispatch_queue_t)dq { return [self initWithDelegate:aDelegate delegateQueue:dq socketQueue:NULL]; }

init方法最终将会来到,在这个方法里,同时socketQueue传值为NULL

- (instancetype)initWithDelegate:(id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(dispatch_queue_t)dq socketQueue:(dispatch_queue_t)sq { if((self = [super init])) { delegate = aDelegate; delegateQueue = dq; #if !OS_OBJECT_USE_OBJC if (dq) dispatch_retain(dq); #endif socket4FD = SOCKET_NULL; socket6FD = SOCKET_NULL; socketUN = SOCKET_NULL; socketUrl = nil; stateIndex = 0; if (sq) { NSAssert(sq != dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), @"The given socketQueue parameter must not be a concurrent queue."); NSAssert(sq != dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), @"The given socketQueue parameter must not be a concurrent queue."); NSAssert(sq != dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), @"The given socketQueue parameter must not be a concurrent queue."); socketQueue = sq; #if !OS_OBJECT_USE_OBJC dispatch_retain(sq); #endif } else { // NSString *const GCDAsyncSocketQueueName = @"GCDAsyncSocket"; socketQueue = dispatch_queue_create([GCDAsyncSocketQueueName UTF8String], NULL); } // The dispatch_queue_set_specific() and dispatch_get_specific() functions take a "void *key" parameter. // From the documentation: // // > Keys are only compared as pointers and are never dereferenced. // > Thus, you can use a pointer to a static variable for a specific subsystem or // > any other value that allows you to identify the value uniquely. // // We're just going to use the memory address of an ivar. // Specifically an ivar that is explicitly named for our purpose to make the code more readable. // // However, it feels tedious (and less readable) to include the "&" all the time: // dispatch_get_specific(&IsOnSocketQueueOrTargetQueueKey) // // So we're going to make it so it doesn't matter if we use the '&' or not, // by assigning the value of the ivar to the address of the ivar. // Thus: IsOnSocketQueueOrTargetQueueKey == &IsOnSocketQueueOrTargetQueueKey; IsOnSocketQueueOrTargetQueueKey = &IsOnSocketQueueOrTargetQueueKey; void *nonNullUnusedPointer = (__bridge void *)self; dispatch_queue_set_specific(socketQueue, IsOnSocketQueueOrTargetQueueKey, nonNullUnusedPointer, NULL); readQueue = [[NSMutableArray alloc] initWithCapacity:5]; currentRead = nil; writeQueue = [[NSMutableArray alloc] initWithCapacity:5]; currentWrite = nil; // 4KB preBuffer = [[GCDAsyncSocketPreBuffer alloc] initWithCapacity:(1024 * 4)]; //交替延迟地址0.3s alternateAddressDelay = 0.3; } return self; }

GCDAsyncSocket Connect

[self.cmdSocket connectToHost:hostIP onPort:[hostPort integerValue] error:&cmdError] - (BOOL)connectToHost:(NSString*)host onPort:(uint16_t)port error:(NSError **)errPtr { return [self connectToHost:host onPort:port withTimeout:-1 error:errPtr]; } ```cpp - (BOOL)connectToHost:(NSString *)host onPort:(uint16_t)port withTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr { return [self connectToHost:host onPort:port viaInterface:nil withTimeout:timeout error:errPtr]; } (BOOL)connectToHost:(NSString *)inHost onPort:(uint16_t)port viaInterface:(NSString *)inInterface withTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr { //#define LogTrace() {} //#define LogTrace() LogObjc(LOG_FLAG_VERBOSE, @"%@: %@", THIS_FILE, THIS_METHOD) LogTrace(); // Just in case immutable objects were passed NSString *host = [inHost copy]; NSString *interface = [inInterface copy]; __block BOOL result = NO; __block NSError *preConnectErr = nil; dispatch_block_t block = ^{ @autoreleasepool { // Check for problems with host parameter if ([host length] == 0) { NSString *msg = @"Invalid host parameter (nil or \"\"). Should be a domain name or IP address string."; preConnectErr = [self badParamError:msg]; /** * Seeing a return statements within an inner block * can sometimes be mistaken for a return point of the enclosing method. * This makes inline blocks a bit easier to read. **/ // /#define return_from_block return // 宏定义-->预编译提前执行 return_from_block; } // Run through standard pre-connect checks if (![self preConnectWithInterface:interface error:&preConnectErr]) { return_from_block; } // We've made it past all the checks. // It's time to start the connection process. self->flags |= kSocketStarted; LogVerbose(@"Dispatching DNS lookup..."); // It's possible that the given host parameter is actually a NSMutableString. // So we want to copy it now, within this block that will be executed synchronously. // This way the asynchronous lookup block below doesn't have to worry about it changing. //防止改变 NSString *hostCpy = [host copy]; // 拿到状态 int aStateIndex = self->stateIndex; __weak GCDAsyncSocket *weakSelf = self; //全局queue -- 服务器 //client -- server。 dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); //异步执行 dispatch_async(globalConcurrentQueue, ^{ @autoreleasepool { // 忽视循环引用 #pragma clang diagnostic push #pragma clang diagnostic warning "-Wimplicit-retain-self" //查找错误 NSError *lookupErr = nil; //serve地址数组 NSMutableArray *addresses = [[self class] lookupHost:hostCpy port:port error:&lookupErr]; __strong GCDAsyncSocket *strongSelf = weakSelf; if (strongSelf == nil) return_from_block; //有问题 if (lookupErr) { dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool { [strongSelf lookup:aStateIndex didFail:lookupErr]; }}); } else { NSData *address4 = nil; NSData *address6 = nil; for (NSData *address in addresses) { if (!address4 && [[self class] isIPv4Address:address]) { address4 = address; } else if (!address6 && [[self class] isIPv6Address:address]) { address6 = address; } } //异步发生连接 dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool { [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6]; }}); } #pragma clang diagnostic pop }}); [self startConnectTimeout:timeout]; result = YES; }}; //在socketQueue中执行这个Block if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey)) block(); else //否则同步调起这个queue执行 dispatch_sync(socketQueue, block); if (errPtr) *errPtr = preConnectErr; return result; } [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6]; - (void)lookup:(int)aStateIndex didSucceedWithAddress4:(NSData *)address4 address6:(NSData *)address6 { LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); NSAssert(address4 || address6, @"Expected at least one valid address"); if (aStateIndex != stateIndex) { LogInfo(@"Ignoring lookupDidSucceed, already disconnected"); // The connect operation has been cancelled. // That is, socket was disconnected, or connection has already timed out. return; } // Check for problems BOOL isIPv4Disabled = (config & kIPv4Disabled) ? YES : NO; BOOL isIPv6Disabled = (config & kIPv6Disabled) ? YES : NO; if (isIPv4Disabled && (address6 == nil)) { NSString *msg = @"IPv4 has been disabled and DNS lookup found no IPv6 address."; [self closeWithError:[self otherError:msg]]; return; } if (isIPv6Disabled && (address4 == nil)) { NSString *msg = @"IPv6 has been disabled and DNS lookup found no IPv4 address."; [self closeWithError:[self otherError:msg]]; return; } // Start the normal connection process NSError *err = nil; //调用链接方法如果失败则错误返回 if (![self connectWithAddress4:address4 address6:address6 error:&err]) { [self closeWithError:err]; } } [self connectWithAddress4:address4 address6:address6 error:&err] - (BOOL)connectWithAddress4:(NSData *)address4 address6:(NSData *)address6 error:(NSError **)errPtr { LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); LogVerbose(@"IPv4: %@:%hu", [[self class] hostFromAddress:address4], [[self class] portFromAddress:address4]); LogVerbose(@"IPv6: %@:%hu", [[self class] hostFromAddress:address6], [[self class] portFromAddress:address6]); // Determine socket type BOOL preferIPv6 = (config & kPreferIPv6) ? YES : NO; // Create and bind the sockets if (address4) { LogVerbose(@"Creating IPv4 socket"); socket4FD = [self createSocket:AF_INET connectInterface:connectInterface4 errPtr:errPtr]; } if (address6) { LogVerbose(@"Creating IPv6 socket"); socket6FD = [self createSocket:AF_INET6 connectInterface:connectInterface6 errPtr:errPtr]; } if (socket4FD == SOCKET_NULL && socket6FD == SOCKET_NULL) { return NO; } int socketFD, alternateSocketFD; NSData *address, *alternateAddress; if ((preferIPv6 && socket6FD != SOCKET_NULL) || socket4FD == SOCKET_NULL) { socketFD = socket6FD; alternateSocketFD = socket4FD; address = address6; alternateAddress = address4; } else { socketFD = socket4FD; alternateSocketFD = socket6FD; address = address4; alternateAddress = address6; } int aStateIndex = stateIndex; [self connectSocket:socketFD address:address stateIndex:aStateIndex]; //如果有备选地址 if (alternateAddress) { dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(alternateAddressDelay * NSEC_PER_SEC)), socketQueue, ^{ [self connectSocket:alternateSocketFD address:alternateAddress stateIndex:aStateIndex]; }); } return YES; } socket6FD = [self createSocket:AF_INET6 connectInterface:connectInterface6 errPtr:errPtr]; - (int)createSocket:(int)family connectInterface:(NSData *)connectInterface errPtr:(NSError **)errPtr { //创建socket int socketFD = socket(family, SOCK_STREAM, 0); //创建失败 if (socketFD == SOCKET_NULL) { if (errPtr) *errPtr = [self errorWithErrno:errno reason:@"Error in socket() function"]; return socketFD; } if (![self bindSocket:socketFD toInterface:connectInterface error:errPtr]) { [self closeSocket:socketFD]; return SOCKET_NULL; } // Prevent SIGPIPE signals int nosigpipe = 1; setsockopt(socketFD, SOL_SOCKET, SO_NOSIGPIPE, &nosigpipe, sizeof(nosigpipe)); return socketFD; } [self connectSocket:socketFD address:address stateIndex:aStateIndex]; - (void)connectSocket:(int)socketFD address:(NSData *)address stateIndex:(int)aStateIndex { // If there already is a socket connected, we close socketFD and return if (self.isConnected) { [self closeSocket:socketFD]; return; } // Start the connection process in a background queue __weak GCDAsyncSocket *weakSelf = self; dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); dispatch_async(globalConcurrentQueue, ^{ #pragma clang diagnostic push #pragma clang diagnostic warning "-Wimplicit-retain-self" int result = connect(socketFD, (const struct sockaddr *)[address bytes], (socklen_t)[address length]); int err = errno; __strong GCDAsyncSocket *strongSelf = weakSelf; if (strongSelf == nil) return_from_block; dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool { if (strongSelf.isConnected) { [strongSelf closeSocket:socketFD]; return_from_block; } if (result == 0) { [self closeUnusedSocket:socketFD]; [strongSelf didConnect:aStateIndex]; } else { [strongSelf closeSocket:socketFD]; // If there are no more sockets trying to connect, we inform the error to the delegate if (strongSelf.socket4FD == SOCKET_NULL && strongSelf.socket6FD == SOCKET_NULL) { NSError *error = [strongSelf errorWithErrno:err reason:@"Error in connect() function"]; [strongSelf didNotConnect:aStateIndex error:error]; } } }}); #pragma clang diagnostic pop }); LogVerbose(@"Connecting..."); }
最新回复(0)