前言

Linux POSIX IPC的可移植性是不如System V IPC的,但是我们只用Linux,并且内核版本高于2.6.6的话就不存在该问题了。也因为POSIX IPC出现的比较晚,借鉴了systemV IPC的长处,规避其短处,使得POSIX IPC的接口更易用。进程间通信的手段很多,除了消息队列、信号量、共享内存,还有信号、socket、管道,普通的管道需要祖先进程有联系,具名管道可以应用于无关联的进程。

后文记录的内容都是POSIX IPC的使用。

访问标识

IPC标识符的操作行为都模范了文件描述符,可以像操作文件一样打开标识符。内核会维护该标识的引用计数,删除标识符也就是删除了名字,等引用计数为0时才会真正的销毁。这些标识符会被放在/dev/shm目录下。

  • 默认创建消息队列在该目录下看不到,需要我们将消息队列的目录挂载到文件系统中,然后再使用创建函数来创建mq
mkdir /dev/mq
mount -t mqueue none /dev/mq
  • 为了可移植性,给标识符起名以斜线开头后跟非斜线字符的形式,如/mysem

消息队列

创建

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>

mqd_t mq_open(const char *name, int oflag); // 打开

// 创建
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

// 成功返回fd,失败返回(mqd_t)-1并设置errno

oflag:

  • O_RDONLY:只接收、O_WRONLY:只发送、O_RDWR:接收和发送
  • O_CLOEXEC:给fd设置close-on-exec
  • O_CREAT:若不存在则创建,存在则直接使用。同时使用O_CREATE | O_EXCL,如果已经存在该文件返回errno EEXIST。设置O_CREAT则必须设置fd的权限,即mode
    • S_IRUSER、S_IWUSR、S_IRGRP、S_IWGRP、S_IROTH、SIWOTH
  • O_NONBLOCK:mq_receive和mq_send使用fd默认是阻塞的,该标志设置fd为非阻塞,无数据可接收或可发送时返回 errno EAGAIN

attr:

struct mq_attr {
   long mq_flags;       /* Flags: 0 or O_NONBLOCK */
   long mq_maxmsg;      /* Max. # of messages on queue */
   long mq_msgsize;     /* Max. message size (bytes) */
   long mq_curmsgs;     /* # of messages currently in queue */
};

mq_maxmsg和mq_msgsize在创建时就确定好,创建好后无法再进行调整。只能调制mq_flags设置是否为阻塞

关闭

  1. 接口关闭
#include <mqueue.h>

// 关闭mq,引用计数-1,即使全部使用mq_close关闭,消息队列fd仍然存在,需要使用unlink销毁
int mq_close(mqd_t mqdes);

// 删除,直到引用计数为0才真正删除
int mq_unlink(const char *name);

// 成功返回0,失败返回-1并设置errno
  1. 与普通文件描述符一样,也可以到目录下rm删除
  2. fork会继承fd,内核实现中消息队列的fd带有O_CLOEXEC,所以当子进程调用exec函数时会自动关闭消息队列

收发消息

#include <mqueue.h>
// 发送消息
int mq_send(mqd_t mqdes, const char msg_ptr[.msg_len],
            size_t msg_len, unsigned int msg_prio);
// msg_len:长度为0~mq_msgsize, 长度超过mq_msgsize返回EMSGSIZE
// msg_prio:消息优先级,最大为MQ_PRIO_MAX,不需要优先级设置为0


// 接收消息,接收优先级最高的消息中最先到达的
ssize_t mq_receive(mqd_t mqdes, char msg_ptr[.msg_len],
                   size_t msg_len, unsigned int *msg_prio);
// msg_len:>=mq_msgsize,可以通过mq_getattr()获取
// msg_prio:NULL表示不关心优先级,非NULL系统将取到的消息体的优先级复制到msg_prio
  • 如果mq已满,mq_send阻塞。如果设置了O_NONBLOCK标志,立即返回EAGIN。同样,如果mq为空,mq_receive阻塞,如果设置了O_NONBLOCK标志,立即返回EAGIN

mq_notify:

// 异步消息通知,消息到来时可以通知进程。该函数用于进程注册或注销消息通知,给sevp传递NULL
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
  • 同一时间只能有一个进程注册,多个进程注册后面的进程会收到EBUSY错误。
    • 只有注册到空消息队列时,消息到来才会通知进程。如果队列不为空,则注册后要等下次消息队列为空再接收到的消息会给进程发送通知。
    • 通知完成后就会删除进程的注册。
    • 如果先有进程阻塞在mq_receive,那么消息到来不会通知注册的进程,进程状态依然是注册。

const struct sigevent *sevp的结构如下:

#include <signal.h>

union sigval {            /* Data passed with notification */
   int     sival_int;    /* Integer value */
   void   *sival_ptr;    /* Pointer value */
};

struct sigevent {
   int    sigev_notify;  /* Notification method */
   int    sigev_signo;   /* Notification signal */
   union sigval sigev_value;
                         /* Data passed with notification */
   void (*sigev_notify_function) (union sigval);
                         /* Function used for thread
                            notification (SIGEV_THREAD) */
   void  *sigev_notify_attributes;
                         /* Attributes for notification thread
                            (SIGEV_THREAD) */
   pid_t  sigev_notify_thread_id; // 用在POSIX timers,man timer_create(2)
                         /* ID of thread to signal
                            (SIGEV_THREAD_ID); Linux-specific */
};

sigev_notify可以设置为:

  • SIGEV_NONE:消息到达时不做任何事
  • SIGEV_SIGNAL:采用发送信号的方式通知进程
  • SIGEV_THREAD:创建一个线程,执行segev_notify_function函数

同时因为posix 消息队列标识符有文件描述符的属性,那么在linux下I/O多路复用是更好的选择,下面demo使用epoll监听队列消息

demo

客户端给mq发送消息。server端分别使用SIGEV_SIGNALSIGEV_THREAD、epoll模式来监听消息队列到来的消息。
先mount消息队列的目录,方便使用文件接口查看

 mount -t mqueue none /dev/mq

客户端:

#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <unistd.h>
#define OFLAG (O_CREAT | O_EXCL | O_WRONLY)
#define PERM (S_IRUSR | S_IWUSR)

int main(int argc, char *argv[]) {
  if (argc != 2) {
    printf("usage: %s /mqname\n", argv[0]);
    return 1;
  }
  const char *mqname = argv[1];
  mqd_t mq = mq_open(mqname, OFLAG, PERM, NULL);
  struct mq_attr attr;
  mq_getattr(mq, &attr);
  char *buf = (char *)malloc(attr.mq_msgsize);

  while ((fgets(buf, attr.mq_msgsize, stdin) != NULL) && (buf[0] != '\n')) {
    mq_send(mq, buf, attr.mq_msgsize, 0);
  };
  close(mq);
  return 0;
}

信号处理server:

#define _DEFAULT_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <mqueue.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <unistd.h>

int main(int argc, char *argv[]) {
  if (argc != 2) {
    printf("usage: %s /mqname", argv[0]);
    return 1;
  }
  mqd_t mq;             // 声明mq标识符
  struct mq_attr attr;  // 声明消息属性
  if ((mq = mq_open(argv[1], O_RDONLY | O_NONBLOCK)) == -1) {
    printf("open mq failure\n");
    return 1;
  }
      // 信号处理
  sigset_t mask;
  struct sigevent sigev;
  int sig;
  int num;
  mq_getattr(mq, &attr);
  char *buf = (char *)malloc(attr.mq_msgsize); // 分配消息的缓存空间
  // 设置信号集
  sigemptyset(&mask);
  sigaddset(&mask, SIGUSR1);
  sigprocmask(SIG_BLOCK, &mask, NULL);

  sigev.sigev_notify = SIGEV_SIGNAL;  // 使用信号notify
  sigev.sigev_signo = SIGUSR1;        // 使用信号SIGUSR1
  mq_notify(mq, &sigev);              // 注册notify

  for (;;) {
    sigwait(&mask, &sig);  // 等待信号
    if (sig == SIGUSR1) {
      mq_notify(mq, &sigev); // 再次注册notify
      while ((num = mq_receive(mq, buf, attr.mq_msgsize, NULL)) >= 0) {
        fprintf(stderr, "receive %d bytes, content: %s", num, buf);
      }
    }
  }
  close(mq);
  return 0;
}
// ------------------
root@yielde:~/workspace/code-container/cpp/blog_demo# ./client /mq_signal
hello signal
hello signal 1

root@yielde:~/workspace/code-container/cpp/blog_demo# ./server /mq_signal
receive 8192 bytes, content: hello signal
receive 8192 bytes, content: hello signal 1

线程处理server:

static void notify_function(union sigval sv);

// 线程处理
static void setup_notify(mqd_t *mqp) {
  struct sigevent sig_ev;              // 定义sigevent
  sig_ev.sigev_notify = SIGEV_THREAD;  // 通知到达,启用线程处理
  sig_ev.sigev_notify_function = notify_function;  // 处理函数
  sig_ev.sigev_notify_attributes = NULL;           // 线程属性设置为NULL
  sig_ev.sigev_value.sival_ptr = mqp;
  mq_notify(*mqp, &sig_ev);
}

static void notify_function(union sigval sv) {
  mqd_t *mqp = (mqd_t *)sv.sival_ptr;
  struct mq_attr attr;
  mq_getattr(*mqp, &attr);
  int num = 0;
  char *buf = (char *)malloc(attr.mq_msgsize);  // 保证buf足够存放消息
  setup_notify(mqp);
  while ((num = mq_receive(*mqp, buf, attr.mq_msgsize, NULL)) >= 0) {
    fprintf(stderr, "receive %d bytes, content: %s", num, buf);
  }
}

int main(int argc, char *argv[]) {
  if (argc != 2) {
    printf("usage: %s /mqname", argv[0]);
    return 1;
  }
  mqd_t mq;             // 声明mq标识符
  struct mq_attr attr;  // 声明消息属性
  if ((mq = mq_open(argv[1], O_RDONLY | O_NONBLOCK)) == -1) {
    printf("open mq failure\n");
    return 1;
  }

  // 通过线程处理
  setup_notify(&mq);
  for (;;) {
    pause();
  }
  close(mq);
  return 0;
}

// ----------------------
root@yielde:~/workspace/code-container/cpp/blog_demo# ./server /mq_thread
receive 8192 bytes, content: hello thread
receive 8192 bytes, content: hello thread 1

root@yielde:~/workspace/code-container/cpp/blog_demo# ./client /mq_thread
hello thread
hello thread 1

epoll处理server:epoll的使用请看

void add_epoll(int epollfd, int fd) {
  struct epoll_event events;
  events.data.fd = fd;
  events.events = EPOLLIN | EPOLLET;
  epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &events);
}

int main(int argc, char *argv[]) {
  if (argc != 2) {
    printf("usage: %s /mqname", argv[0]);
    return 1;
  }
  mqd_t mq;             // 声明mq标识符
  struct mq_attr attr;  // 声明消息属性
  if ((mq = mq_open(argv[1], O_RDONLY | O_NONBLOCK)) == -1) {
    printf("open mq failure\n");
    return 1;
  }
  // epoll 处理
  struct epoll_event events[10];
  int epollfd = epoll_create(2);
  add_epoll(epollfd, mq);
  mq_getattr(mq, &attr);
  char *buf = (char *)malloc(attr.mq_msgsize);
  while (1) {
    printf("epoll waiting message\n");
    int ret = epoll_wait(epollfd, events, 10, -1);
    if (ret > 0) {
      int num;
      for (int i = 0; i < ret; ++i) {
        int fd = events[i].data.fd;
        if ((fd == mq) && (events[i].events & EPOLLIN)) {
          while ((num = mq_receive(fd, buf, attr.mq_msgsize, 0)) >= 0) {
            printf("receive %d bytes, content: %s", num, buf);
          }
        }
      }
    } else if (ret < 0) {
      printf("events error: %d\n", errno);
      break;
    }
  }

  close(epollfd);
  close(mq);
  return 0;
}
// --------------------
root@yielde:~/workspace/code-container/cpp/blog_demo# ./client /mq_epoll
hello epoll
hello epoll 1

root@yielde:~/workspace/code-container/cpp/blog_demo# ./server /mq_epoll
receive 8192 bytes, content: hello epoll
receive 8192 bytes, content: hello epoll 1

信号量

信号量可以同步进程或线程,协助多个进程或线程之间访问共享资源。信号量分为有名信号量和无名信号量。

  • 有名信号量:有文件标识符,无关进程可以直接打开使用。
  • 无名信号量:没有文件标识符,无法通过open操作打开使用,多用于线程同步

有名信号量API

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <semaphore.h>

// 打开sem
sem_t *sem_open(const char *name, int oflag);
// 创建sem
sem_t *sem_open(const char *name, int oflag,
               mode_t mode, unsigned int value);
// oflag:与消息队列一样
// mode:与消息队列一样
// value:信号量的初始值,0~SEM_VALUE_MAX,表示资源的个数,使用资源用sem_wait,释放资源用sem_post

// 关闭sem
int sem_close(sem_t *sem);
// 进程终止或指向exec时,打开的有名sem会自动关闭,进程引用计数-1

// 删除sem
int sem_unlink(const char *name);

// 使用sem
int sem_wait(sem_t *sem); // 阻塞
int sem_trywait(sem_t *sem); // 非阻塞
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); // 指定时间之前阻塞
// 等待sem可用,将value减1,如果value > 0立即返回,否则阻塞。如果阻塞被信号中断,
// 返回EINTR,且无法通过SA_RESTART重启系统调用

// 释放sem
int sem_post(sem_t *sem);
// 将sem的值+1,如果多个进程处于sem_wait,唤醒一个

// 获取sem的值
int sem_getvalue(sem_t *sem, int *sval);
// 返回value的个数,如果有多个进程正在wait,返回0。但是该值返回的时候可能value的值已经改变。

Link with -pthread.

无名信号量API

上面说过无名信号量就是没有具名标识符,无法通过open打开使用。所以共享的条件是多个进程或线程可以看到同一块内存区域才能使用。线程最为合适,如果硬要给进程用,可以创建共享内存,然后将无名sem放到共享内存上。无名sem不使用 sem_open和sem_close、sem_unlink、sem_close,其余用法与有名sem相同。

// 初始化无名sem
int sem_init(sem_t *sem, int pshared, unsigned int value);
// value:0表示在线程间共享,大于0表示在进程间共享

// 销毁
int sem_destroy(sem_t *sem);
// 没有进程处于sem_wait状态时才可以被安全销毁

共享内存

共享内存可以在无关进程直接创建一块内存区域,让多个进程共同操作这块内存。POSIX共享内存同样采用文件类似的接口,也提供了标识符。可以动态的调整内存空间的大小。

mmap

我们经常用strace去看一个程序运行的系统调用,会看到大量的mmap和munmap的操作。例如在里可以看到,线程栈的内容就是mmap来准备的。运行程序的时候,mmap会参与加载动态链接库等待。
mmap就是在调用进程的虚拟内存空间里创建一个内存映射,mmap分为:

  • 基于文件映射:将文件的一部分内容直接映射到进程的虚拟内存空间中,可以通过直接操作内存区域中的字节来操作文件
  • 匿名映射:没有实体文件与之关联,临时使用,匿名映射的内存区域会被初始化为0

进程有独立的内存空间,栈或者通过malloc分配的堆内存是彼此独立的。但是mmap创建的内存映射时,可以选择私有(MAP_PRIVATE)还是共享(MAP_SHARED):

  • MAP_PRIVATE:内存映射进程间独立,对于文件映射,内存字节的变更不会同步到磁盘上。
  • MAP_SHARED:发生改变时对拥有该共享内存的其他进程可见,对于文件映射,内存字节的改变会同步到磁盘上。

所以mmap可以分为4类:

  1. MAP_SHARED映射文件,内存对所有进程可见,且内存字节更改会同步到磁盘
  2. MAP_SHARED匿名映射,内存对所有进程可见
  3. MAP_PRIVATE映射文件,进程间不可见,内存字节更改不会同步到磁盘
  4. MA_PRIVATE匿名映射,进程间不可见(也是用了copy-on-write,发生了修改才复制新的页)

mmap API

#include <sys/mman.h>

void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
// 解除映射
int munmap(void *addr, size_t length);

mmap参数:
addr:映射到内存的起始地址,设置NULL表示由系统决定
length、fd、offset:将文件fd作为映射源,从offset位置起,将长度为length的内容映射到内存
prot:表示对内存区域的操作保护,有以下几种

  • PROT_EXEC:映射的内容可执行
  • PROT_READ:映射的内容可读
  • PROT_WRITE:映射的内容可修改
  • PROT_NONE:映射的内容不可访问

flags:指定映射的类型

  • MAP_SHARED:创建共享映射
  • MAP_PRIVATE:创建私有映射
  • MAP_ANONYMOUS:创建匿名映射,fd必须设置为-1。
  • MAP_FIXED:表示必须把内容映射到对应的地址上,mmap操作的是页,addr和offset参数需要按页对齐

对于这些不同的映射形式,有如下几种使用场景:

  • 共享文件映射:在访问文件的时候,将磁盘的内容映射到内存空间中,Linux通过Page cache来缓存一部分映射,如果修改的这部分内存空间在Page cache上存在,则直接修改Page cache,否则再去读取磁盘文件,内核将修改过的页标记为脏页,在合适的时间写回到磁盘上。使用read和write时,除了磁盘->page cache,我们需要用户空间的buffer->pagecache或者pagecache->buffer,存在两次复制。使用mmap可以直接操作page cache,节省了一次数据复制,提升了性能
  • 私有文件映射:常用于动态链接库,多个进程共享库的文本信息,运行程序时可以看到有很多mmap的MAP_PRIVATE操作来加载动态链接库
  • 共享匿名映射:子进程可以继承这块区域,所以父子进程可以通过共享匿名映射来通信。共享匿名映射中的字节会被初始化为0,创建方式有两种:
    • flags指定MAP_ANONYMOUS,fd指定-1
    • open /dev/zero,然后将该fd传给mmap
  • 私有匿名映射:给进程分配一段私有的内存,无文件关联,独立访问。例如glibc中的malloc就是用mmap来实现的

共享内存API

  1. 创建共享内存
#include <sys/mman.h>
#include <sys/stat.h>        /* For mode constants */
#include <fcntl.h>           /* For O_* constants */

// 打开共享内存的文件描述符
int shm_open(const char *name, int oflag, mode_t mode);
// oflag:O_RDONLY、O_RDWR、O_CREAT、O_EXCL、O_TRUNC(将内存的size截断为0)
// mode:共享内存的使用权限,0表示只是打开

Link with -lrt.
  1. 创建好共享内存后,调整其大小
int ftruncate(int fd, off_t length);
  1. 调用mmap映射共享内存
// 查看共享内存的大小
int fstat(int fd, struct stat *statbuf);

// 调用mmap来做映射
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
  1. 用完删除
// 删除共享内存,不会影响当前正在使用的映射,当所有的进程munmap解除映射,引用计数归0才删除
// 共享内存的数据具有内核持久性,即使所有进程都调用了munmap,没有unlink,那么这块区域就一直
// 存在,直到重启系统后消失
int shm_unlink(const char *name);

demo

通过client创建共享内存并打印字符串,通过server读取共享内存中的内容
client:

#include <assert.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
int main(int argc, char *argv[]) {
  if (argc != 2) {
    printf("usage: %s /shmname\n", argv[1]);
    return 1;
  }
  const char *shmname = argv[1];
  int shmfd = shm_open(shmname, O_CREAT | O_EXCL | O_RDWR | O_TRUNC,
                       0666);  // 创建共享内存
  assert(shmfd != -1);
  if (ftruncate(shmfd, 1025) == -1) {  // 设置共享内存大小
    printf("resize shm failure\n");
    shm_unlink(shmname);
    return 1;
  }
  int ret;
  struct stat statbuf;
  ret = fstat(shmfd, &statbuf);  // 获取空闲内存大小
  assert(ret != -1);
  printf("shm length is %ld bytes\n", statbuf.st_size);
  char *shmptr;
  shmptr = (char *)mmap(NULL, statbuf.st_size, PROT_WRITE, MAP_SHARED, shmfd,
                        0);  // 通过mmap映射共享内存
  if (shmptr == MAP_FAILED) {
    printf("map shm failure\n");
    shm_unlink(shmname);
    return 1;
  }
  sprintf(shmptr, "%s", "hello world\n");
  sprintf(shmptr + 12, "%s", "hi\n");
  munmap(shmptr, statbuf.st_size);
  return 0;
}

server:

#include <assert.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

int main(int argc, char *argv[]) {
  if (argc != 2) {
    printf("usage: %s /shmname\n", argv[1]);
    return 1;
  }
  const char *shmname = argv[1];

  int shmfd = shm_open(shmname, O_RDONLY, 0666);
  assert(shmfd != -1);
  char *shmptr;
  struct stat statbuf;
  int ret = fstat(shmfd, &statbuf);
  assert(ret != -1);
  printf("shm length is %ld bytes\n", statbuf.st_size);
  shmptr = (char *)mmap(NULL, statbuf.st_size, PROT_READ, MAP_SHARED, shmfd,0);
  if (shmptr == MAP_FAILED) {
    printf("map shm failure\n");
    return 1;
  }
  printf("%s", shmptr);
  munmap(shmptr, statbuf.st_size);
  return 0;
}

// ------------------
root@yielde:~/workspace/code-container/cpp/blog_demo# ./client /myshm
shm length is 1025 bytes

root@yielde:~/workspace/code-container/cpp/blog_demo# ./server /myshm
shm length is 1025 bytes
hello world
hi

学习自:
《UNIX环境高级编程》
《Linux环境编程从应用到内核》高峰 李彬 著