nmsg  0.9.0
xsio.c
1 /*
2  * Copyright (c) 2012-2013 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 #ifdef HAVE_LIBXS
22 
23 /* Private declarations. */
24 
25 typedef enum {
26  sockdir_invalid,
27  sockdir_accept,
28  sockdir_connect,
29 } sockdir_t;
30 
31 typedef enum {
32  socktype_invalid,
33  socktype_pubsub,
34  socktype_pushpull,
35 } socktype_t;
36 
37 /* Forward. */
38 
39 static bool
40 munge_endpoint(const char *, char **, sockdir_t *, socktype_t *);
41 
42 static bool
43 set_options(void *, int);
44 
45 /* Private. */
46 
47 static bool
48 munge_endpoint(const char *ep, char **s_ep, sockdir_t *s_dir, socktype_t *s_type) {
49  char *s, *saveptr, *tok;
50  bool found_sockdir = false;
51  bool found_socktype = false;
52 
53  s = strdup(ep);
54  assert(s != NULL);
55 
56  *s_ep = strtok_r(s, ",", &saveptr);
57  if (*s_ep == NULL) {
58  free(s);
59  return (false);
60  }
61  while ((tok = strtok_r(NULL, ",", &saveptr)) != NULL) {
62  if (strcasecmp(tok, "accept") == 0) {
63  if (found_sockdir) return (false);
64  found_sockdir = true;
65  *s_dir = sockdir_accept;
66  } else if (strcasecmp(tok, "connect") == 0) {
67  if (found_sockdir) return (false);
68  found_sockdir = true;
69  *s_dir = sockdir_connect;
70  } else if (strcasecmp(tok, "pubsub") == 0) {
71  if (found_socktype) return (false);
72  found_socktype = true;
73  *s_type = socktype_pubsub;
74  } else if (strcasecmp(tok, "pushpull") == 0) {
75  if (found_socktype) return (false);
76  found_socktype = true;
77  *s_type = socktype_pushpull;
78  }
79  }
80 
81  return (true);
82 }
83 
84 static bool
85 set_options(void *s, int socket_type) {
86  static const int i_thousand = 1000;
87 
88  if (socket_type == XS_SUB) {
89  if (xs_setsockopt(s, XS_SUBSCRIBE, "NMSG", 4))
90  return (false);
91  } else if (socket_type == XS_PUB || socket_type == XS_PUSH) {
92  if (xs_setsockopt(s, XS_SNDHWM, &i_thousand, sizeof(i_thousand)))
93  return (false);
94  if (xs_setsockopt(s, XS_LINGER, &i_thousand, sizeof(i_thousand)))
95  return (false);
96  }
97 
98  return (true);
99 }
100 
101 /* Export. */
102 
103 nmsg_input_t
104 nmsg_input_open_xs_endpoint(void *xs_ctx, const char *ep) {
105  nmsg_input_t input = NULL;
106  int socket_type = 0;
107  sockdir_t s_dir = sockdir_accept;
108  socktype_t s_type = socktype_pubsub;
109  char *s_ep = NULL;
110  void *s;
111 
112  if (!munge_endpoint(ep, &s_ep, &s_dir, &s_type) || !s_ep)
113  goto out;
114 
115  assert(s_dir == sockdir_accept || s_dir == sockdir_connect);
116  assert(s_type == socktype_pubsub || s_type == socktype_pushpull);
117 
118  if (s_type == socktype_pubsub)
119  socket_type = XS_SUB;
120  else if (s_type == socktype_pushpull)
121  socket_type = XS_PULL;
122 
123  s = xs_socket(xs_ctx, socket_type);
124  if (!s) goto out;
125  if (!set_options(s, socket_type)) {
126  xs_close(s);
127  goto out;
128  }
129 
130  if (s_dir == sockdir_accept) {
131  if (xs_bind(s, s_ep) == -1) {
132  xs_close(s);
133  goto out;
134  }
135  } else if (s_dir == sockdir_connect) {
136  if (xs_connect(s, s_ep) == -1) {
137  xs_close(s);
138  goto out;
139  }
140  }
141 
142  input = nmsg_input_open_xs(s);
143 out:
144  free(s_ep);
145  return (input);
146 }
147 
148 nmsg_output_t
149 nmsg_output_open_xs_endpoint(void *xs_ctx, const char *ep, size_t bufsz) {
150  nmsg_output_t output = NULL;
151  int socket_type = 0;
152  sockdir_t s_dir = sockdir_connect;
153  socktype_t s_type = socktype_pubsub;
154  char *s_ep = NULL;
155  void *s;
156 
157  if (!munge_endpoint(ep, &s_ep, &s_dir, &s_type) || !s_ep)
158  goto out;
159 
160  assert(s_dir == sockdir_accept || s_dir == sockdir_connect);
161  assert(s_type == socktype_pubsub || s_type == socktype_pushpull);
162 
163  if (s_type == socktype_pubsub)
164  socket_type = XS_PUB;
165  else if (s_type == socktype_pushpull)
166  socket_type = XS_PUSH;
167 
168  s = xs_socket(xs_ctx, socket_type);
169  if (!s) goto out;
170  if (!set_options(s, socket_type)) {
171  xs_close(s);
172  goto out;
173  }
174 
175  if (s_dir == sockdir_accept) {
176  if (xs_bind(s, s_ep) == -1) {
177  xs_close(s);
178  goto out;
179  }
180  } else if (s_dir == sockdir_connect) {
181  if (xs_connect(s, s_ep) == -1) {
182  xs_close(s);
183  goto out;
184  }
185  }
186 
187  output = nmsg_output_open_xs(s, bufsz);
188 out:
189  free(s_ep);
190  return (output);
191 }
192 
193 #else /* HAVE_LIBXS */
194 
195 /* Export. */
196 
197 nmsg_input_t
198 nmsg_input_open_xs_endpoint(void *xs_ctx __attribute__((unused)),
199  const char *ep __attribute__((unused)))
200 {
201  return (NULL);
202 }
203 
204 nmsg_output_t
205 nmsg_output_open_xs_endpoint(void *xs_ctx __attribute__((unused)),
206  const char *ep __attribute__((unused)),
207  size_t bufsz __attribute__((unused)))
208 {
209  return (NULL);
210 }
211 
212 #endif /* HAVE_LIBXS */
nmsg_input_t nmsg_input_open_xs_endpoint(void *xs_ctx, const char *ep)
Create an XS socket and initialize a new NMSG stream input from it.
nmsg_output_t nmsg_output_open_xs(void *s, size_t bufsz)
Initialize a new XS socket NMSG output.
nmsg_output_t nmsg_output_open_xs_endpoint(void *xs_ctx, const char *ep, size_t bufsz)
Create an XS socket and initialize a new NMSG stream output from it.
nmsg_input_t nmsg_input_open_xs(void *s)
Initialize a new NMSG stream input from an XS socket source.