消息队列的简单实现


消息队列:一个进程向另一个进程发送数据块
消息队列基于消息,管道基于字节流
 
消息队列是用链表实现
 
1.创建:int megget(key_t key, int msgflag)
    key:函数ftok()的返回值
    msgflag:IPC_CREAT是创建新的消息队列;IPC_EXCL与IPC_CREAT一起使用,即如果要创建的消息队列已存在,则返回错误
    成功:返回队列标识符
    失败:-1
2.获取:ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
    msqid:消息队列的识别码
    msgp:指向消息缓冲区的指针,用来暂时存储发送和接收的消息,是用户自己定义的一个通用结构
        struct msgbuf
        {
            long mtype;  //消息类型,必须> 0
            char mtext[__SIZE__];  //消息文本
        }
    msgsz:消息大小
    msgtyp:消息类型(>0,返回其类型为mtype的第一个消息)
    msgflg:控制函数行为(0表示忽略)
 
    成功:返回拷贝到mtext数组的实际字节数
    失败:-1
3.发送:int msgsnd(int msqid, void *msgp, size_t msgsz, int msgflg)
    成功:返回0
    失败:-1
4.设置属性:int msgctl(int msqid, int cmd, struct msqid_ds *buf)
    对msqid标识的消息队列执行cmd操作
    cmd操作:IPC_STAT(获取对应的msqid_ds的数据结构,保存在buf指定的地址空间)
        IPC_SET(设置消息队列的属性,保存在buf中)
        IPC_RMID(从内核删除msqid标识的消息队列)
    msqid_ds:描述队列当前状态
    成功:0
    失败:-1
 
 
comm.h
#ifndef __COMM__
#define __COMM__
 
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <string.h>
 
#define __SIZE__ 1024
#define FILEPATH "/tmp/.msg1"
#define ID 0
 
 
const int ser_send_type=1;
const int cli_send_type=2;
 
typedef struct msg_info
{
    char mtext[__SIZE__];
    long mtype;
}msginfo;
 
 
#endif
 
client.h
#ifndef __CLIENT__
#define __CLIENT__
 
#include "comm.h"
 
int cli_start();
int cli_end(int);
 
#endif
client.c
#include "client.h"
 
int msg_id= -1;
 
int cli_start()
{
 
    //create the client's buf which storage the msg
    msginfo cli_info;
 
    //create the key
    key_t key= ftok(FILEPATH, ID);
    if(key< 0)
    {
        perror("ftok");
        exit(1);
    }
    //get the ID of msg_queue
    msg_id= msgget(key, 0);
    if(msg_id< 0)
    {
        perror("cli get key id failed");
        exit(2);
    }
 
    while(1)
    {
        //client said
        printf("client:>");
        fgets(cli_info.mtext, __SIZE__, stdin);  //write in the cli_info
        //when it will exit?
        if(strncasecmp(cli_info.mtext, "quit", 4)==0)
        {
            printf("client bye!\n");
            break;
        }
        cli_info.mtype= cli_send_type;
        //send the msg(send judge)
        if(msgsnd(msg_id, &cli_info, sizeof(cli_info.mtext),0)== -1) //send fail
        {
            perror("client send msg fail");
            exit(3);
        }
 
        //receive the msg
        memset(cli_info.mtext, '\0', sizeof(cli_info.mtext));  //reset first
        if(msgrcv(msg_id, &cli_info, sizeof(cli_info.mtext), ser_send_type, 0)==-1)  //receive fail
        {
            perror("client msgrcv fail");
            exit(4);
        }
        printf("server:>%s\n", cli_info.mtext);  //the msg receive from server have storaged in the cli_info.mtext
        fflush(stdout);
    }
    return 0;
}
int cli_end(int id)
{
    if(msgctl(id, IPC_RMID, NULL)==-1)  //fail
    {
        perror("delete th msg_queue from kernel fail");
        exit(5);
    }
    else
    {
        printf("delete the msg_queue from kernel success\n");
        return 0;
    }
}
static void delete_msg(void)
{
    if(msg_id!= -1)  //the msg_queue exist
    {
        cli_end(msg_id);
    }
    printf("delete the msg queue end\n");
}
int main(int argc,char *argv[])
{
    atexit(delete_msg);
    if(cli_start()==0)
    {
        printf("cli start success\n");
    }
    else
    {
        printf("cli start failed\n");
    }
    return 0;
}
 
server.h
#ifndef __SERVER__
#define __SERVER__
 
#include "comm.h"
 
int ser_start();
int ser_end(int);
 
#endif
server.c
#include "server.h"
 
int msg_id= -1;
int ser_start()
{
 
    //create the server's buf which storage the msg
    msginfo ser_info;
 
    //create the key
    key_t key= ftok(FILEPATH, ID);
    if(key< 0)
    {
        perror("ftok");
        exit(1);
    }
    //get the ID of msg_queue
    msg_id= msgget(key, IPC_CREAT|IPC_CREAT|0666);
    if(msg_id< 0)
    {
        perror("ser get key id failed");
        exit(2);
    }
 
    while(1)
    {
        //reveive the msg
        if(msgrcv(msg_id, &ser_info, sizeof(ser_info.mtext), cli_send_type, 0)==-1) //fail
        {
            perror("server msgrcv fail");
            exit(3);
        }
        printf("client:>%s\n",ser_info.mtext);  //first,output the client's msg
 
        printf("server:>");  //then,server will said
        memset(ser_info.mtext, '\0', sizeof(ser_info.mtext));  //reset first
        fgets(ser_info.mtext, __SIZE__, stdin);  //write in the ser_info
        //when it will exit?
        if(strncasecmp(ser_info.mtext, "quit", 4)==0)
        {
            printf("server bye!\n");
            break;
        }
        ser_info.mtype= ser_send_type;
        //send the msg
        if(msgsnd(msg_id, &ser_info, __SIZE__, 0)==-1)  //fail
        {
            perror("server msgsnd fail");
            exit(4);
        }
        fflush(stdout);
    }
    return 0;
}
int ser_end(int id)
{
    if(msgctl(id, IPC_RMID, NULL)== -1)  //fail
    {
        perror("delete the msg_queue from kernel fail");
        exit(5);
    }
    else
    {
        printf("delete the msg_queue from kernel success\n ");
        return 0;
    }
}
static void delete_msg(void)
{
    if(msg_id!= -1)  //the msg_queue exist
    {
        ser_end(msg_id);
    }
    printf("delete the msg_queue end\n");
}
int main(int argc, char *argv[])
{
    atexit(delete_msg);
    if(ser_start()==0)  //success
    {
        printf("ser start success\n");
    }
    else
    {
        printf("ser start failed\n");
    }
    return 0;
}
 
Makefile
.PHONY:all
all:client server
client:client.c
    gcc -o $@ $^
server:server.c
    gcc -o $@ $^
.PHONY:clean
clean:
    rm -f client server

本文永久更新链接地址

相关内容