C语言实现简易Linux线程池代码,应对TCP高并发
一、创建线程池时要解决的问题1.向线程传递任务信息2.任务在子线程之间的分配3.线程的有序退出
二、 线程池的创建步骤二、线程池的退出三、改进方向
一、创建线程池时要解决的问题
线程池是为了处理一系列重复且高并发的任务而定义出来的对任务进行分配的数据类型。 线程池的核心是对任务进行分配,所以其必须包括任务以及工作线程以及分配的逻辑函数
应对高并发任务时,1.主线程需要提前注册好一定数量的线程,并让其处于休眠状态。2.当有新的任务到来时,唤醒一个线程,分配给他这个任务,由线程执行此任务3.主线程继续等待下一个任务。4.子线程完成任务后,再次进入休眠状态,等待下一个任务。需要解决的问题:
如何向线程传递任务信息任务如何在线程之间分配,线程之间的竞争机制如何执行线程的有序退出
1.向线程传递任务信息
由于线程函数可以接收一个void*类型的参数,因此可以将任务信息打包成一个结构体,传递给子线程。
typedef struct task
{
int clientFD
;
int cmd
;
char name
[256];
}task_t
,*ptask_t
;
typedef struct node
{
task_t data
;
struct node
* pNext
;
}node_t
,*pnode_t
;
typedef struct workQueue
{
pnode_t pfront
;
pnode_t prear
;
int num
;
}workQueue_t
,*pworkQueue_t
;
2.任务在子线程之间的分配
由于线程之间共享一块虚拟内存区域,因此需要考虑资源的竞争。 所有线程均从任务队列中拿任务,因此,要保证单个线程拿任务信息时不受其他线程的影响,因此需要在传入参数中加入pthread_mutex_t变量当队列中无任务时,线程需要阻塞等待,因此考虑加入一个pthread_cond_t变量因为对于所有子进程来说,以上两个变量都应该相同,子线程的传入参数时任务队列的指针,所以,修改上面的任务队列结构体
typedef struct workQueue
{
pnode_t pfront
;
pnode_t prear
;
int num
;
int exitFlag
;
pthread_mutex_t mutex
;
pthread_cond_t cond
;
pthread_cond_t maincond
;
}workQueue_t
,*pworkQueue_t
;
void initWQ(pworkQueue_t pqueue
){
memset(pqueue
,0,sizeof(workQueue_t
))
;
pqueue
->pfront
= NULL;
pqueue
->prear
= NULL;
pqueue
->num
= 0;
pqueue
->exitFlag
= 0;
pthread_mutex_init(&queue
->mutex
,NULL);
pthread_cond_init(&queue
->cond
,NULL);
pthread_cond_init(&queue
->maincond
,NULL);
}
int push(pworkQueue_t pqueue
,pnode_t pnode
){
pnode
->pNext
= NULL;
if(pqueue
->pfront
== NULL){
pqueue
->pfront
= pnode
;
pqueue
->prear
= pnode
;
}else{
pqueue
->prear
->pNext
= pnode
;
pqueue
->prear
= pnode
;
}
pqueue
->num
++;
return 0;
}
int pop(pworkQueue_t pqueue
,pnode_t pnode
){
if(NULL == pqueue
|NULL ==pnode
NULL = pqueue
->pfront
) return -1;
*pnode
= *pqueue
->pfront
;
pnode_t tmp
= pqueue
->pfront
;
if(NULL = pqueue
->pfront
->pNext
){
pqueue
->pfront
= NULL;
pqueue
->prear
= NULL;
pqueue
->num
--;
free(tmp
);
return 0;
}
pqueue
->pfront
= queue
->pfront
->pNext
;
pqueue
->num
--;
free(tmp
);
return 0;
}
int freeWQ(pworkQueue_t pqueue
){
if(NULL == pqueue
) return -1;
if(pqueue
->num
== 0){
free(pqueue
);
return 0;
}
pworkQueue_t pA
= pqueue
->pfront
;
pworkQueue_t pB
= pqueue
->pfront
;
while(pA
!= NULL && pB
!= NULL){
free(pA
);
pA
= pB
->pNext
;
pB
= pA
;
}
return 0;
}
3.线程的有序退出
1.主要考虑当主线程接收到SIGINT信号后,如何保证子线程释放掉占用的资源、完成子线程相应的原子任务后再退出. 2.在子线程完成任务前,主线程不能退出 3.由于采用了锁的机智,需要保证子进程在退出时,将相应的锁释放掉,由主线程统一摧毁。
二、 线程池的创建步骤
创建线程池的相关函数
typedef struct Pool
{
pthread_t
* pthid
;
int num
;
short startFlag
;
workQueue queue
;
}Pool_t
,*pPool_t
;
int initPool(pPool_t p
,int num
){
memset(p
,0,sizeof(Pool_t
);
pthid
= (pPool_t
)callloc(num
,sizeof(Pool_t
));
p
->num
= num
;
p
->startFlag
= 0;
initWQ(&queue
);
return 0;
}
void* threadFunc(void* arg
);
int startPool(pPool_t p
){
if(pool
->startFalg
!= 0) return -1;
for(int i
= 0;i
<p
->num
;++i
){
pthread_create(&p
->pthid
[i
],NULL,threadFunc
,&p
->queue
);
}
p
->startFlag
= 1;
return 0;
}
void cleanFunc(void* arg
);
void dealClient(const pnode_t pnode
);
void* threadFunc(void* arg
){
pworkQueue_t pq
= (pworkQueue
)arg
;
pthread_mutex_t
* pmutex
= &pq
->mutex
;
pthread_cond_t
* pcond
= &pq
->cond
;
pthread_cond_t
* pmaincond
= &pq
->maincond
;
pthread_cleanup_push(cleanFunc
,pmutex
);
int ret
= -1;
while(1){
if(1 == pq
->exitFlag
){
printf("child thread exit");
break;
}
pthread_mutex_lock(&pq
->mutex
);
node_t node
;
ret
= pop(pq
,&node
);
if(-1 == ret
) {
pthread_cond_signal(pmaincond
);
pthread_cond_wait(pcond
,pmutex
);
}else{
pthread_mutex_unlock(pmutex
);
dealClient(&node
);
continue;
}
ret
= pop(pq
,&node
);
pthread_mutex_unlock(pmutex
);
if(-1 == ret
) continue;
dealClient(&node
);
}
pthread_cleanup_pop(1);
pthread_exit(NULL);
}
void cleanFunc(void* arg
){
pthread_mutex_t
* pmutex
= (pthread_mutex_t
*)arg
;
pthread_mutex_unlock(pmutex
);
pthread_exit(NULL);
}
void dealClient(pnode_t pnode
){
printf("clientFD = %d\n",pnode
->task
.ClientFD
);
}
二、线程池的退出
线程有两种方式进行退出:
主线程直接使用cancle函数通过发送标志位,通知子线程退出。 由于直接cancle的方式需要考虑子线程函数中cancle点的位置,还有可能导致单个子线程的任务中断。因此考虑使用退出标志位的方式进行。
另外,如果主线程收到了SIGINT的信号退出,主线程的当前执行的任务会被打断、在信号处理函数中更改子线程的退出标志也不是很方便,因此,考虑使用两个进程、异地拉起同步的方式执行有序退出。 主线程测试代码:
#define TESTNUM 20
int exitPipe
[2] = -1;
void sigFunc(int num
){
printf("sig is coming\n");
int flag
= 1;
write(exitPipe
[1],&flag
,sizeof(falg
);
}
int main()
{
pool_t pool
;
poolInit(&pool
,TESTNUM
);
pipe2(exitPipe
,O_NONBLOCK
);
if(fork()){
close(exitPipe
[0]);
signal(SIGINT
,sigFunc
);
wait(NULL);
printf("parent exit\n");
close(exitPipe
[1]);
exit(0);
}
close(exitPipe
[1]);
for(int i
= 0 ;i
<TESTNUM
-10;++i
){
pnode_t pnode
= (pnode_t
)calloc(1,sizeof(node_t
));
pnode
->task
.clientFD
= i
;
push(&pool
.queue
,pnode
);
}
printf("done allocating\n");
pnode_t cur
= pool
.queue
.pfront
;
while(cur
!= NULL){
printf("cur value = %d\n",cur
->task
.clientFD
);
cur
= cur
->pNext
;
printf("cur addr = %p\n",cur
);
}
poolStart(&pool
);
pthread_mutex_t
* pmutex
= &pool
.queue
.mutex
;
pthread_cond_t
* pmaincond
= &pool
.queue
.maincond
;
pthread_cond_t
* pcond
= &pool
.queue
.cond
;
int x
= 20;
int flag
= 0;
while(1){
flag
= read(exitPipe
[0],&flag
,sizeof(flag
));
if(1 == flag
){
pool
.queue
.exitFlag
= 1;
pthread_cond_broadcast(pcond
);
break;
}
pthread_mutex_lock(pmutex
);
if(0 < pool
.queue
.num
){
pthread_cond_wait(pmaincond
,pmutex
);
}
pnode_t pnode
= (pnode_t
)calloc(1,sizeof(node_t
));
pnode
->task
.clientFD
= x
++;
push(&pool
.queue
,pnode
);
pthread_mutex_unlock(pmutex
);
pthread_cond_signal(pcond
);
if(x
>100){
pool
.queue
.exitFlag
= 1;
pthread_cond_broadcast(pcond
);
break;
}
}
for(int i
= 0;i
<10;++i
){
printf("wait %d\n",i
);
pthread_join(pool
.thid
[i
],NULL);
}
close(exitPipe
[0]);
return 0;
}
三、改进方向
1.接入客户端时,使用epoll来监控管道的文件描述符,以及服务器的文件描述符(是否有客户端接入)。2.高并发情况下,考虑使用非阻塞模型+epoll 边缘触发提高服务器效率3.管道read/write未做异常处理其他异常处理机制4.服务器程序转化为守护进程