消息队列的每个条目包含:
优先级;消息长度(可以为0);数据(如果长度大于0);创建或者打开一个已经存在的消息队列:
name:消息队列的名称需要符合Posix标准,一般为/***;oflag:打开文件的模式,比如ORDONLY等;mode:打开文件的权限;attr:消息队列的属性,具体见下。返回值-1出错,成功则返回消息队列的描述符。 struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */ };属性参数的四个元素分别表示:
mq_flags:消息队列的属性,0或者O_NONBLOCK;mq_maxmsg:支持的最大消息数;mq_msgsize:消息数据的最大size;mq_curmsgs:当前消息队列的消息数。 int mq_close(mqd_t mqdes);关闭一个消息队列,表示该描述符失效,但并不会从系统中删除消息队列。
mqdes:消息队列的描述符,通过mq_open获得。 int mq_unlink(const char *name);删除消息队列:
name:消息队列的名字。消息队列本身维护了一个引用计数,当一个进程打开消息队列时引用计数加一,关闭时减一,只有当引用计数为0时才会真正删除。
int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr,struct mq_attr *oldattr);两个api的含义顾名思义,分别是获取和设置消息队列的属性:
mqdes:消息队列的描述符;attr:属性变量地址;newattr:要设置的参数值;oldattr:设置之后之前的属性值;返回-1表示出错,0表示成功。 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);向消息队列中添加消息:
mqdes:消息队列的描述符;msg_ptr:数据的指针;msg_len:数据尺寸;msg_prio:当前消息的优先级,要求必须小于MQ_PRIO_MAX;返回值:0成功,-1失败。 ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);从消息队列中优先级最高最早的消息:
mqdes:消息队列的描述符;msg_ptr:数据的指针;msg_len:数据尺寸;msg_prio:获取的消息优先级;返回值:非-1表示获取的数据的字节数;-1失败。 int mq_notify(mqd_t mqdes, const struct sigevent *sevp);允许进程注册或者注销异步事件通知。
mqdes:消息队列的描述符;sevq:要注册的信号事件: 如果sevp==NULL,则当前进程希望在有一个消息到达指定先前为空的消息队列时得到通知;如果sevq!=NULL,则当前进程被注册为接受所只是队列的通知,已经存在的注册会被注销;任意时刻只能有一个进程被注册为接收某个队列的通知;当有一个消息到达某个先前为空的队列,并且已经有一个进程被注册为接收该队列的通知时,只有没有任何线程只在阻塞该队列的mq_receive调用中的前提下,通知才会发出。即mq_receive调用中的阻塞优先级比任何通知都高;如果通知被发送给他的注册程序时,其注册即被注销。该进程必须再次调用mq_notify注册。 union sigval { /* Data passed with notification */ int sival_int; /* Integer value */ void *sival_ptr; /* Pointer value */ }; struct sigevent { int sigev_notify; /* Notification method */ int sigev_signo; /* Notification signal */ union sigval sigev_value; /* Data passed with notification */ void (*sigev_notify_function) (union sigval); /* Function used for thread notification (SIGEV_THREAD) */ void *sigev_notify_attributes; /* Attributes for notification thread (SIGEV_THREAD) */ pid_t sigev_notify_thread_id; /* ID of thread to signal (SIGEV_THREAD_ID) */ };消息队列的限制:
mq_mqxmsg:消息队列中的最大消息数;mq_msgsize:消息队列中给定消息的最大字节数。MQ_OPEN_MAX:一个进程能够同时打开的消息队列最大数目(Posix=8);MQ_PRIO_MAX:任意消息的最大优先级+1(Posix>=32).信号(英语:Signals)是Unix、类Unix以及其他POSIX兼容的操作系统中进程间通讯的一种有限制的方式。它是一种异步的通知机制,用来提醒进程一个事件已经发生。当一个信号发送给一个进程,操作系统中断了进程正常的控制流程,此时,任何非原子操作都将被中断。如果进程定义了信号的处理函数,那么它将被执行,否则就执行默认的处理函数。 信号类似于中断,不同之处在于中断由处理器调解并由内核处理,而信号由内核调解(可能通过系统调用)并由进程处理。内核可以将中断作为信号传递给导致中断的进程(典型的例子有SIGSEGV、SIGBUS、SIGILL和SIGFPE)。 Unix的信号分为:
实时信号:取值范围为SIGRTMIN到SIGRTMAX之间,Posix要求至少提供RTSIG_MAX种信号;其他信号:无法实时性行为的信号。对于实时信号在安装信号处理程序时必须给sigaction指定SA_SIGINFO标志。 Unix系统中的实时性含义为:
信号是排队的;当有多个SIGRTMIN-SIGRTMAX之间的信号解阻塞排队时,值较小的优先进行信号递交;当某个非实时信号递交时,传递给它的信号处理程序唯一的参数是该信号的值,实时信号比其他信号携带更多的信息,通过设置SA_SIGINFO标志的实时信号处理程序格式如下;一些新函数定义为使用实时信号工作。 void func(int signo, siginfo_t *info, void *context); siginfo_t { int si_signo; /* Signal number */ int si_errno; /* An errno value */ int si_code; /* Signal code */ int si_trapno; /* Trap number that caused hardware-generated signal (unused on most architectures) */ pid_t si_pid; /* Sending process ID */ uid_t si_uid; /* Real user ID of sending process */ int si_status; /* Exit value or signal */ clock_t si_utime; /* User time consumed */ clock_t si_stime; /* System time consumed */ sigval_t si_value; /* Signal value */ int si_int; /* POSIX.1b signal */ void *si_ptr; /* POSIX.1b signal */ int si_overrun; /* Timer overrun count; POSIX.1b timers */ int si_timerid; /* Timer ID; POSIX.1b timers */ void *si_addr; /* Memory location which caused fault */ long si_band; /* Band event (was int in glibc 2.3.2 and earlier) */ int si_fd; /* File descriptor */ short si_addr_lsb; /* Least significant bit of address (since Linux 2.6.32) */ void *si_call_addr; /* Address of system call instruction (since Linux 3.5) */ int si_syscall; /* Number of attempted system call (since Linux 3.5) */ unsigned int si_arch; /* Architecture of attempted system call (since Linux 3.5) */ }通过参数控制消息队列的创建和删除,基本格式为cmd c name,其中c可以为c或者d分别表示创建和删除,name为消息队列,比如/message,该文件会存放在/dev/mqueue。
//获取2个参数 //c 表示创建消息队列 d表示删除消息队列 //第二个参数为消息队列的路径 void handle_msg(int argc, char **argv) { if(argc != 3) err_exit(NULL, -1); mqd_t fd = 0; int flag = O_RDWR | O_CREAT; char ch = argv[1][0]; switch(ch) { case 'c': fd = lmq_open(argv[2], flag, 666,NULL); ///dev/mqueue/ lmq_close(fd); break; case 'd': lmq_unlink(argv[2]); break; default: err_exit("unknown parameters!", -1); } }执行结果如下:
➜ build git:(master) ✗ ./main c /oppo ➜ build git:(master) ✗ ll /dev/mqueue/ total 0 --w--wx--T 1 grayondream grayondream 80 9月 3 14:59 oppo ➜ build git:(master) ✗ ./main d /oppo ➜ build git:(master) ✗ ll /dev/mqueue/ total 0 ➜ build git:(master) ✗下面的程序通过参数控制当前进程是服务器还是客户端,基本命令格式为cmd [c/s] name,第二个参数c,s分别指代客户端还是服务器,name为消息队列的名称。基本功能为客户端启动之后读取标准输入,标准输入的格式为优先级+空格+消息,优先级占两位(纯粹为了编码方便,懒得再处理),将消息写入到消息队列中,之后启动服务端读取消息。
//程序分为客户端和服务端,客户端发送数据,服务端接受数据 //接受2个参数,第一个c或者s表示客户端和服务端,第二个参数指定消息队列的文件名 void ipc_mq(int argc, char **argv) { if(argc != 3) err_exit(NULL, -1); mqd_t fd = 0; int flag = 0; int mode = FILE_MODE; char ch = argv[1][0]; char buff[MAX_LEN] = {0}; int len = 0; int prior = 20; switch(ch) { case 'c': flag = O_CREAT | O_WRONLY; fd = lmq_open(argv[2], flag, mode, NULL); //输入的前两位为消息的优先级 //输入的格式为优先级+空格+消息 while(lfgets(buff, MAX_LEN, stdin) != NULL) { buff[2] = '\0'; prior = atoi(buff); char *msg = buff + 3; len = strlen(msg); if(msg[len - 1] == '\n') len--; msg[len] = '\0'; lmq_send_msg(fd, msg, len, prior); } break; case 's': flag = O_RDONLY; fd = lmq_open(argv[2], flag, mode, NULL); struct mq_attr attr; lmq_getattr(fd, &attr); printf("message size %d, max message %d\n", attr.mq_msgsize, attr.mq_maxmsg); while((len = lmq_receive_msg(fd, buff, attr.mq_msgsize, &prior)) > 0) { printf(buff); printf(" ,prior is %d!\n", prior); } break; } lmq_close(fd); }先启动客户端写入消息:
➜ build git:(master) ✗ ./main c /message ➜ build git:(master) ✗ ./main c /message 15 1 15 2 14 3 18 4随后启动服务端可以看到:
➜ build git:(master) ✗ ./main s /message message size 8192, max message 10 4 ,prior is 18! 1 ,prior is 15! 2 ,prior is 15! 3 ,prior is 14!从上面可以看到优先返回优先级高的消息,优先级不同则优先返回最早的消息。同时消息队列获取消息默认是阻塞的。
下面的程序通过参数控制当前进程是服务器还是客户端,基本命令格式为cmd [c/s] name,第二个参数c,s分别指代客户端还是服务器,name为消息队列的名称。基本功能为客户端启动之后读取标准输入,标准输入的格式为优先级+空格+消息,优先级占两位(纯粹为了编码方便,懒得再处理),将消息写入到消息队列中,之后启动服务端读取消息。
mqd_t sg_mq; struct sigevent sg_ev; struct mq_attr sg_attr; //信号处理函数 static void single_mq_handle(int sig_no) { printf("the program come into the handler!\n"); //这个函数并不是异步信号安全的函数 char buff[MAX_LEN]; int prior; lmq_receive_msg(sg_mq, buff, sg_attr.mq_msgsize, &prior); printf("receive singale and the buffer is %s, and the prior is %d!\n", buff, prior); lmq_notify(sg_mq, &sg_ev); //再次注册 } //程序分为客户端和服务端,客户端发送数据,服务端接受数据 //接受三个参数,第一个c或者s表示客户端和服务端,第二个参数指定消息队列的文件名 void single_mq_test(int argc, char **argv) { if(argc != 3) err_exit(NULL, -1); int flag = 0; int mode = FILE_MODE; char ch = argv[1][0]; char buff[MAX_LEN] = {0}; int len = 0; int prior = 20; switch(ch) { case 'c': flag = O_CREAT | O_WRONLY; sg_mq = lmq_open(argv[2], flag, mode, NULL); //输入的前两位为消息的优先级 //输入的格式为优先级+空格+消息 lfgets(buff, MAX_LEN, stdin); buff[2] = '\0'; prior = atoi(buff); char *msg = buff + 3; len = strlen(msg); if(msg[len - 1] == '\n') len--; msg[len] = '\0'; lmq_send_msg(sg_mq, msg, len, prior); break; case 's': flag = O_RDONLY; sg_mq = lmq_open(argv[2], flag, mode, NULL); lmq_getattr(sg_mq, &sg_attr); lsignal(SIGUSR1, single_mq_handle); sg_ev.sigev_signo = SIGUSR1; sg_ev.sigev_notify = SIGEV_SIGNAL; lmq_notify(sg_mq, &sg_ev); for(;;) pause(); break; } lmq_close(sg_mq); }运行结果如下:
➜ build git:(master) ✗ ./main c /rrrr 02 1111111111111111111111111 ➜ build git:(master) ✗ ./main c /rrrr 03 22222222222222222222222222 ➜ build git:(master) ✗ ./main s /rrrr the program come into the handler! receive singale and the buffer is 1111111111111111111111111, and the prior is 2! the program come into the handler! receive singale and the buffer is 22222222222222222222222222, and the prior is 3!如果另外开一个进程:
➜ build git:(master) ✗ ./main s /rrrr errno is 16,register the singal event failed!#define EBUSY 16 /* Device or resource busy *可以看到一个消息队列同时只能被一个进程注册。
这个例子是上个例子的改版,区别是保证信号异常安全。因为mq_receive等函数并不是信号异常安全的,如果正在执行操作被其他信号中断则会出现不可预测的现象,而利用原子性的标志位可以保证这一点。
volatile sig_atomic_t sig_mask = 0; //信号处理函数 static void safe_single_mq_handle(int sig_no) { sig_mask = 1; } //程序分为客户端和服务端,客户端发送数据,服务端接受数据 //接受三个参数,第一个c或者s表示客户端和服务端,第二个参数指定消息队列的文件名 void safe_single_mq_test(int argc, char **argv) { if(argc != 3) err_exit(NULL, -1); mqd_t mq; int flag = 0; int mode = FILE_MODE; char ch = argv[1][0]; char buff[MAX_LEN] = {0}; int len = 0; int prior = 20; switch(ch) { case 'c': flag = O_CREAT | O_WRONLY; mq = lmq_open(argv[2], flag, mode, NULL); //输入的前两位为消息的优先级 //输入的格式为优先级+空格+消息 lfgets(buff, MAX_LEN, stdin); buff[2] = '\0'; prior = atoi(buff); char *msg = buff + 3; len = strlen(msg); if(msg[len - 1] == '\n') len--; msg[len] = '\0'; lmq_send_msg(mq, msg, len, prior); break; case 's': flag = O_RDONLY; struct sigevent ev; struct mq_attr attr; sigset_t new_set, old_set, zero_set; __sigemptyset(&new_set); __sigemptyset(&old_set); __sigemptyset(&zero_set); __sigaddset(&new_set, SIGUSR1); mq = lmq_open(argv[2], flag, mode, NULL); lmq_getattr(mq, &attr); lsignal(SIGUSR1, safe_single_mq_handle); ev.sigev_signo = SIGUSR1; ev.sigev_notify = SIGEV_SIGNAL; lmq_notify(mq, &ev); for(;;) { lsigprocmask(SIG_BLOCK, &new_set, &old_set); while(sig_mask == 0) lsigsuspend(&zero_set); sig_mask = 0; lmq_notify(mq, &ev); //再次注册 char buff[MAX_LEN]; int prior; int len = 0; while((len = lmq_receive_msg(mq, buff, attr.mq_msgsize, &prior)) > 0) //保证即便读取当前消息时,其他到来的消息也能读取到 { printf("receive singale and the buffer is %s, and the prior is %d!\n", buff, prior); } lsigprocmask(SIG_UNBLOCK, &new_set, NULL); } break; } lmq_close(sg_mq); }这个依然是上个程序的改版,只不过是使用管道来实现异步安全,但是出现个问题,select``本身是阻塞的,即便信号触发了好像也无法出发信号处理函数,即写管道就不成立,就无法触发select```进行读,这个我自己测试有问题,不知道是不是作者使用的系统版本问题。
int pipe_fd[2] = {0}; static void safe_pipe_mq_handle(int sig) { lwrite(pipe_fd[1], "", 1); } void safe_pipe_mq_test(int argc, char **argv) { if(argc != 3) err_exit(NULL, -1); mqd_t mq; int flag = 0; int mode = FILE_MODE; char ch = argv[1][0]; char buff[MAX_LEN] = {0}; int len = 0; int prior = 20; struct sigevent ev; struct mq_attr attr; fd_set rset; switch(ch) { case 'c': flag = O_CREAT | O_WRONLY; mq = lmq_open(argv[2], flag, mode, NULL); //输入的前两位为消息的优先级 //输入的格式为优先级+空格+消息 lfgets(buff, MAX_LEN, stdin); buff[2] = '\0'; prior = atoi(buff); char *msg = buff + 3; len = strlen(msg); if(msg[len - 1] == '\n') len--; msg[len] = '\0'; lmq_send_msg(mq, msg, len, prior); break; case 's': flag = O_RDONLY; mq = lmq_open(argv[2], flag, mode, NULL); lmq_getattr(mq, &attr); lpipe(pipe_fd); lsignal(SIGUSR1, safe_pipe_mq_handle); ev.sigev_signo = SIGUSR1; ev.sigev_notify = SIGEV_SIGNAL; lmq_notify(mq, &ev); FD_ZERO(&rset); for(;;) { FD_SET(pipe_fd[0], &rset); int fds = lselect(pipe_fd[0] + 1, &rset, NULL, NULL, NULL); if(FD_ISSET(pipe_fd[0], &rset)) { char ch; lread(pipe_fd[0], &ch, 1); char buff[MAX_LEN]; int prior; int len = 0; while((len = lmq_receive_msg(mq, buff, attr.mq_msgsize, &prior)) > 0) //保证即便读取当前消息时,其他到来的消息也能读取到 { printf("receive singale and the buffer is %s, and the prior is %d!\n", buff, prior); } lmq_notify(mq, &ev); //再次注册 } } break; } lmq_close(sg_mq); }这个很好理解就是信号触发时新开一个线程处理相应的工作。
mqd_t thread_mq; struct mq_attr thread_mq_attr; struct sigevent thread_mq_sig; void safe_thread_mq_handle(int val) { char buff[MAX_LEN]; int prior; int len = 0; while((len = lmq_receive_msg(thread_mq, buff, thread_mq_attr.mq_msgsize, &prior)) > 0) //保证即便读取当前消息时,其他到来的消息也能读取到 { printf("receive singale and the buffer is %s, and the prior is %d!\n", buff, prior); } lmq_notify(thread_mq, &thread_mq_sig); //再次注册 } //启动一个线程来处理事件,异步读写 void safe_thread_mq_test(int argc, char **argv) { if(argc != 3) err_exit(NULL, -1); int flag = 0; int mode = FILE_MODE; char ch = argv[1][0]; char buff[MAX_LEN] = {0}; int len = 0; int prior = 20; switch(ch) { case 'c': flag = O_CREAT | O_WRONLY; thread_mq = lmq_open(argv[2], flag, mode, NULL); //输入的前两位为消息的优先级 //输入的格式为优先级+空格+消息 lfgets(buff, MAX_LEN, stdin); buff[2] = '\0'; prior = atoi(buff); char *msg = buff + 3; len = strlen(msg); if(msg[len - 1] == '\n') len--; msg[len] = '\0'; lmq_send_msg(thread_mq, msg, len, prior); break; case 's': flag = O_RDONLY; thread_mq = lmq_open(argv[2], flag, mode, NULL); lpipe(pipe_fd); lmq_getattr(thread_mq, &thread_mq_attr); lsignal(SIGUSR1, safe_thread_mq_handle); thread_mq_sig.sigev_notify = SIGEV_THREAD; thread_mq_sig.sigev_value.sival_ptr = NULL; thread_mq_sig._sigev_un._sigev_thread._attribute = NULL; thread_mq_sig._sigev_un._sigev_thread._function = safe_thread_mq_handle; lmq_notify(thread_mq, &thread_mq_sig); for(;;) { pause(); } break; } lmq_close(sg_mq); }父进程针对每个信号发送两组数据然后计数,等待一会儿只有紫禁城获取信号调用信号处理函数接收数据。
void sig_test(int argc, char **argv) { printf("SIGRTMIN=%d, SIGRTMAX=%d\n", (int)(SIGRTMIN), (int)(SIGRTMAX)); pid_t pid; pid = fork(); if(pid == 0) { sigset_t newset; sigemptyset(&newset); sigaddset(&newset, SIGRTMAX); sigaddset(&newset, SIGRTMAX - 1); sigaddset(&newset, SIGRTMAX - 2); sigprocmask(SIG_BLOCK, &newset, NULL); lsig_rt(SIGRTMAX, sig_handle, &newset); lsig_rt(SIGRTMAX - 1, sig_handle, &newset); lsig_rt(SIGRTMAX - 2, sig_handle, &newset); sleep(6); sigprocmask(SIG_UNBLOCK, &newset, NULL); sleep(3); return ; } else { /* code */ sleep(3); union sigval val; for(int i = SIGRTMAX;i >= SIGRTMAX - 2;i--) { for(int j = 0;j <= 2;j ++) { val.sival_int = j; lsigqueue(pid, i, val); printf("send signal = %d, val = %d\n", i, j); } } } return 0; } lsig_handle_t* lsig_rt(int signo, lsig_handle_t *func, sigset_t *mask) { struct sigaction act, oact; act.sa_mask = *mask; act.sa_flags = SA_SIGINFO; act.sa_sigaction = func; if(signo == SIGALRM) { #ifdef SA_RESTART act.sa_flags |= SA_RESTART; #endif } int ret = sigaction(signo, &act, &oact); ERROR_CHECK(ret, <, 0, signo, "set the signal %d handle function failed!"); return oact.sa_sigaction; } ➜ build git:(master) ✗ ./main SIGRTMIN=34, SIGRTMAX=64 send signal = 64, val = 0 send signal = 64, val = 1 send signal = 64, val = 2 send signal = 63, val = 0 send signal = 63, val = 1 send signal = 63, val = 2 send signal = 62, val = 0 send signal = 62, val = 1 send signal = 62, val = 2 ➜ build git:(master) ✗ received signal 62, code = -1, ival = 0 received signal 62, code = -1, ival = 1 received signal 62, code = -1, ival = 2 received signal 63, code = -1, ival = 0 received signal 63, code = -1, ival = 1 received signal 63, code = -1, ival = 2 received signal 64, code = -1, ival = 0 received signal 64, code = -1, ival = 1 received signal 64, code = -1, ival = 2System V消息队列和Posix消息队列类似,会在内核中维护一个消息队列的链表,该链表的结构如下:
/* Structure of record for one message inside the kernel. The type `struct msg' is opaque. */ struct msqid_ds { struct ipc_perm msg_perm; /* Ownership and permissions */ time_t msg_stime; /* Time of last msgsnd(2) */ time_t msg_rtime; /* Time of last msgrcv(2) */ time_t msg_ctime; /* Time of last change */ unsigned long __msg_cbytes; /* Current number of bytes in queue (nonstandard) */ msgqnum_t msg_qnum; /* Current number of messages in queue */ msglen_t msg_qbytes; /* Maximum number of bytes allowed in queue */ pid_t msg_lspid; /* PID of last msgsnd(2) */ pid_t msg_lrpid; /* PID of last msgrcv(2) */ };能够看到我电脑上的版本和作者使用的版本不同已经看不到具体的消息指针了。
创建或者打开一个消息队列:
key:一个键值,可以使IPC_PRIVATE也可以是ftok获取的键值;msgflg:读写权限,它们的用法和创建文件时使用的mode模式标志是一样的;返回值:-1表示失败,否则为队列的标识符。 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);向消息队列中发送信息:
msqid:为消息队列的标识符;msgp:一般为结构: struct msgbuf { long mtype; /* message type, must be > 0 */ char mtext[1]; /* message data */ };但是不强制,也可以定义自己的消息数据结构;
msgsz:消息的尺寸;msgflg:消息的标志位,可以是0也可以是,IPC_WAIT,IPC_WAIT使得该调用非阻塞,如果没有可用空间则函数返回,可能发生的条件为: 在队列中已经存在太多字节;系统范围内存在太多消息:如果上面两个条件一个发生且IPC_NOWAIT置位,则返回一个EAGAIN错误;如果上面的两个条件发生一个且IPC_NOWAIT并未置位,则调用进入睡眠,直到: 有新的消息存放空间;msgid消息被系统删除,返回一个EIDRM错误;调用线程被某个捕获的信号中断,返回一个EINTER错误。 ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);从System V消息队列中读取消息:
msqid:消息队列标识符;msgp:消息存放地址;msgsz:消息的长度;msgtyp:希望读取的消息类型: msgtyp == 0:放回队列的第一个消息;msgtyp > 0:返回类型为msgtyp的第一个消息;msgtyp < 0:放回类型值小于或者等于msgtyp参数的绝对值的消息中类型值最小的第一个消息; msgflg:指定如果希望获取的消息不存时的错误处理方式: 设置IPC_NOWAIT,则立即返回一个ENOMSG;没有设置IPC_NOWAIT,则阻塞至下面的事件中一个发生为止: 有一个所请求的消息类型可获取;该消息队列被从系统中删除;调用线程被某个捕获的信号所中断。 如果接受的数据长度大于指定的长度; 设定了MSG_NOERROR则对数据进行截断;没有设定MSG_NOERROR,则返回错误E2BIG。 int msgctl(int msqid, int cmd, struct msqid_ds *buf);对System V的消息队列进行操作:
msqid:消息队列的标识符;cmd:支持的队列操作方式: IPC_RMID:删除消息队列,所有消息将被抛弃;IPC_STAT:获取消息队列的参数存放到buf中;IPC_SET:给所指定的消息队列设置buf中设置的四个参数msg_perm.uid,msg_perm.gid,mode,msg_qbytes。 返回值-1失败,其他成功。限制,可以通过下面的命令分别查看系统允许的每个消息的最大字节数,任何一个消息队列上的最大字节数,系统范围内的最大消息队列数。
➜ ipc git:(master) ✗ cat /proc/sys/kernel/msgmax 8192 ➜ ipc git:(master) ✗ cat /proc/sys/kernel/msgmnb 16384 ➜ ipc git:(master) ✗ cat /proc/sys/kernel/msgmni 32000程序的功能很简单就是客户端从标准输入读取消息类型和消息然后发送给服务端,服务端再向客户端返回ACK,不断循环。
//客户端发送的消息中格式为:消息类型+空格+消息,消息类型占2位 void vmsg_base_client(int readfd, int writefd) { char buff[MAX_LEN] = {0}; while(lfgets(buff, MAX_LEN, stdin)) { int len = strlen(buff); if(buff[len - 1] == '\n') len--; buff[len] = '\0'; buff[2] = '\0'; int type = atoi(buff); buff[2] = ' '; mymsg_buf mymsg; mymsg.len = strlen(buff); memset(mymsg.data, 0, MAX_LEN); memmove(mymsg.data, buff, mymsg.len); mymsg.type = 20; lmsgsnd(writefd, &(mymsg.type), sizeof(mymsg.len) + mymsg.len, 0); //接受服务端的ack memset(mymsg.data, 0, MAX_LEN); len = lmsgrcv(readfd, &(mymsg.type), MAX_LEN, type, 0); printf("ack from server:type=%d, msg=%s", mymsg.type, mymsg.data); printf("\n"); } } void vmsg_base_server(int readfd, int writefd) { mymsg_buf buf; while(lmsgrcv(readfd, &buf.type, MAX_LEN, 20, 0)) { if(buf.data[buf.len] == '\n') buf.len--; buf.data[buf.len] = '\0'; printf("read data from client: type=%d, msg=%s\n", buf.type, buf.data); buf.data[2] = '\0'; int type = atoi(buf.data); memset(buf.data, 0, MAX_LEN); memmove(buf.data, "ACK ", 4); buf.type = type; buf.len = strlen(buf.data); lmsgsnd(writefd, &(buf.type), sizeof(buf.len) + buf.len, 0); } } //客户端从标准输入读取消息发送给服务端,服务端读取到消息之后回显并给客户端发送确认信息,客户端收到后回显 //很明显的是当前的流程是同步的,如果客户端收不到服务端的ack或者服务端收不到客户端的消息都会阻塞 void vmsg_base_test(int argc, char **argv) { if(argc != 4) { err_exit(NULL, -1); } char ch = argv[1][0]; char *read_name = argv[2]; char *write_name = argv[3]; unsigned long flag = IPC_CREAT | SVMSG_MODE; //SVMSG_MODE=0666 key_t readkey = lftok(read_name, 0); key_t writekey = lftok(write_name, 0); int readfd = lmsgget(readkey, flag); int writefd = lmsgget(writekey, flag); printf("message queue readkey=%d, writekey=%d, readfd=%d, writefd=%d\n", readkey, writekey, readfd, writefd); switch(ch) { case 'c': vmsg_base_client(readfd, writefd); break; case 's': vmsg_base_server(readfd, writefd); break; } }执行结果如下,需要注意的是System V消息队列需要提前创建文件,并且如果之前创建成功但是读取失败的消息队列,如果再次进行打开读取可能会失败,显示权限不足:
➜ build git:(master) ✗ touch tmp/c ➜ build git:(master) ✗ touch tmp/s ➜ build git:(master) ✗ ./main c tmp/s tmp/c message queue readkey=402440, writekey=436367, readfd=163845, writefd=131076 02 1111111111111111111 ack from server:type=2, msg=ACK 33 i want to send a message ack from server:type=33, msg=ACK ➜ build git:(master) ✗ ./main s tmp/c tmp/s message queue readkey=436367, writekey=402440, readfd=131076, writefd=163845 read data from client: type=20, msg=02 1111111111111111111 read data from client: type=20, msg=33 i want to send a message可以通过命令ipcs -q查看消息队列,ipcrm删除消息队列:
➜ ~ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0xffffffff 0 grayondrea 0 0 0 0x0006a84f 32769 grayondrea 0 0 0 0x0006a856 65538 grayondrea 0 0 0 0x0006a68a 98307 grayondrea 644 16 2 0x0006a88f 131076 grayondrea 644 0 0 0x00062408 163845 grayondrea 644 0 0每个客户端拥有一个自身的独一无二的消息队列,通过公开的服务端的消息队列向服务端发送消息。服务端接收到消息之后开启一个进程并向客户端发送ACK。
#define SERVER_TYPE 20 //通过信号处理程序实现客户端和服务端异步的响应 //每个客户端有一个私人的消息队列,客户端创建队列之后按照格式:消息队列编号+空格+消息类型+空格+消息的格式向服务端发送消息 void vmsg_sig_client(int readfd, int writefd) { char buff[MAX_LEN] = {0}; lfgets(buff, MAX_LEN, stdin); int len = strlen(buff); if(buff[len - 1] == '\n') len--; //解析消息类型 buff[len] = '\0'; buff[2] = '\0'; int type = atoi(buff); buff[2] = ' '; //格式为 from [readfd] to [writefd] [msg] char send_msg[MAX_LEN] = {0}; sprintf(send_msg, "from %d to %d %s", readfd, writefd, buff); mymsg_buf mymsg; mymsg.len = strlen(send_msg); memset(mymsg.data, 0, MAX_LEN); memmove(mymsg.data, send_msg, mymsg.len); mymsg.type = SERVER_TYPE; lmsgsnd(writefd, &(mymsg.type), sizeof(mymsg.len) + mymsg.len, 0); //接受服务端的ack memset(mymsg.data, 0, MAX_LEN); mymsg.type = type; len = lmsgrcv(readfd, &(mymsg.type), MAX_LEN, mymsg.type, 0); printf("ack from server:type=%d, msg=%s\n", mymsg.type, mymsg.data); } void sig_child(int signo) { pid_t pid = 0; int stat = 0; while((pid = waitpid(-1, &stat, WNOHANG)) > 0) ; return; } //每当有一个客户端发送了消息则服务端开启一个进程来处理该客户端的请求 void vmsg_sig_server(int readfd, int writefd) { for(;;) { lsignal(SIGCHLD, sig_child); mymsg_buf read_buf; memset(read_buf.data, 0, MAX_LEN); read_buf.type = SERVER_TYPE; int len = lmsgrcv(readfd, &(read_buf.type), MAX_LEN, read_buf.type, 0); printf(read_buf.data); printf("\n"); //解析客户端的id int type = 0; sscanf(read_buf.data, "from %d to %d %d %s", &writefd, &readfd, &type, NULL); pid_t id = lfork(); if(id == 0) //子进程 { memset(read_buf.data, 0, MAX_LEN); memmove(read_buf.data, "ACK ", 4); read_buf.type = type; read_buf.len = strlen(read_buf.data); //printf("send message!type=%d,len=%d writefd=%d\n", read_buf.type, read_buf.len, writefd); lmsgsnd(writefd, &(read_buf.type), sizeof(read_buf.len) + read_buf.len, 0); } else { } } } //第二个参数表示当前进程是服务端还是客户端,c/s //第三个参数为服务端的消息队列描述 void vmsg_sig_test(int argc, char **argv) { if(argc != 3) { err_exit(NULL, -1); } char ch = argv[1][0]; char *server_name = argv[2]; unsigned long flag = IPC_CREAT | SVMSG_MODE; //SVMSG_MODE=0666 key_t serverkey = lftok(server_name, 0); int clientfd = 0; int serverfd = lmsgget(serverkey, flag); printf("message queue serverkey=%d, serverfd=%d\n", serverkey, serverfd); switch(ch) { case 'c': clientfd = lmsgget(IPC_PRIVATE, flag); vmsg_sig_client(clientfd, serverfd); break; case 's': vmsg_sig_server(serverfd, 0); break; } }效果如下:
➜ build git:(master) ✗ touch server ➜ build git:(master) ✗ ./main s ./server message queue serverkey=436296, serverfd=917527 from 950296 to 917527 1 23456789 from 983065 to 917527 2 haaaaaaaaaaaaaa ➜ build git:(master) ✗ ./main c ./server message queue serverkey=436296, serverfd=917527 1 123456789 ack from server:type=1, msg=ACK ➜ build git:(master) ✗ ./main c ./server message queue serverkey=436296, serverfd=917527 2 whaaaaaaaaaaaaaa ack from server:type=2, msg=ACK以上的场景也可以使用select或者poll来实现。