Allow for static initialisation.
[libeze] / ez-port.c
1 /* ez-port.c: A simple inter-thread IPC mechanism with poll() support.
2
3    Copyright (C) 2019 Michael Zucchi
4
5    This program is free software: you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public License
7    as published by the Free Software Foundation, either version 3 of
8    the License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this program. If not, see
17    <http://www.gnu.org/licenses/>.
18 */
19
20 #include <fcntl.h> 
21 #include <sys/stat.h>  
22 #include <mqueue.h>
23 #include <sys/types.h>
24 #include <unistd.h>
25 #include <errno.h>
26 #include <assert.h>
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31
32 #include "ez-port.h"
33
34 /**
35  * This implementation uses anonymous posix message queues.
36  *
37  * Internally it uses the message queue to store the message pointers
38  * directly.  This should work in-thread.  It might do funny things
39  * with valgrind and the like since they pointers escape the user
40  * address space.
41  *
42  * If two readers are reading at the time time, poll() may currently
43  * block as it doesn't use non-blocking i/o.
44  */
45
46 #define EZ_PORT_NAME "ez-port"
47
48 struct ez_port {
49         mqd_t q;
50 };
51
52 ez_port *ez_port_new(int limit) {
53         ez_port *port = malloc(sizeof(*port)+ sizeof(void *) * limit);
54         struct mq_attr at;
55         char name[16 + strlen(EZ_PORT_NAME) + 4];
56
57         if (!port)
58                 return port;
59
60         // Port name includes uid to avoid permission issues if a stale port exists
61         sprintf(name, "/%s-%016lx", EZ_PORT_NAME, getuid() & 0xffffffffffffffffUL);
62
63         at.mq_flags = 0;
64         at.mq_maxmsg = limit;
65         at.mq_msgsize = sizeof(void *);
66
67         // Open a unqiue port anonymously, using O_EXCL and unlink().
68         printf("open %s\n", name);
69         do {
70                 mq_unlink(name);
71                 port->q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_CLOEXEC, 0600, &at);
72         } while (port->q == -1 && errno == EEXIST);
73
74         if (port->q == -1) {
75                 perror("mq_open");
76                 free(port);
77                 return NULL;
78         }
79         mq_unlink(name);
80
81         return port;
82 }
83
84 void ez_port_free(ez_port *port) {
85         if (port) {
86                 mq_close(port->q);
87                 free(port);
88         }
89 }
90
91 int ez_port_fd(ez_port *port) {
92         return port->q;
93 }
94
95 int ez_port_put(struct ez_port *port, void *msg) {
96         return mq_send(port->q, (char *)&msg, sizeof(msg), 0);
97 }
98
99 void *ez_port_poll(ez_port *port) {
100         struct mq_attr at;
101         
102         if (mq_getattr(port->q, &at) == 0 && at.mq_curmsgs > 0)
103                 return ez_port_take(port);
104
105         return NULL;
106 }
107
108 void *ez_port_take(ez_port *port) {
109         void *tail;
110         ssize_t size;
111
112         size = mq_receive(port->q, (char *)&tail, sizeof(tail), NULL);
113         if (size == sizeof(tail))
114                 return tail;
115
116         return NULL;
117 }