40 munge_endpoint(
const char *,
char **, sockdir_t *, socktype_t *);
43 set_options(
void *,
int);
48 munge_endpoint(
const char *ep,
char **s_ep, sockdir_t *s_dir, socktype_t *s_type) {
49 char *s, *saveptr, *tok;
50 bool found_sockdir =
false;
51 bool found_socktype =
false;
56 *s_ep = strtok_r(s,
",", &saveptr);
61 while ((tok = strtok_r(NULL,
",", &saveptr)) != NULL) {
62 if (strcasecmp(tok,
"accept") == 0) {
63 if (found_sockdir)
return (
false);
65 *s_dir = sockdir_accept;
66 }
else if (strcasecmp(tok,
"connect") == 0) {
67 if (found_sockdir)
return (
false);
69 *s_dir = sockdir_connect;
70 }
else if (strcasecmp(tok,
"pubsub") == 0) {
71 if (found_socktype)
return (
false);
72 found_socktype =
true;
73 *s_type = socktype_pubsub;
74 }
else if (strcasecmp(tok,
"pushpull") == 0) {
75 if (found_socktype)
return (
false);
76 found_socktype =
true;
77 *s_type = socktype_pushpull;
85 set_options(
void *s,
int socket_type) {
86 static const int i_thousand = 1000;
88 if (socket_type == XS_SUB) {
89 if (xs_setsockopt(s, XS_SUBSCRIBE,
"NMSG", 4))
91 }
else if (socket_type == XS_PUB || socket_type == XS_PUSH) {
92 if (xs_setsockopt(s, XS_SNDHWM, &i_thousand,
sizeof(i_thousand)))
94 if (xs_setsockopt(s, XS_LINGER, &i_thousand,
sizeof(i_thousand)))
105 nmsg_input_t input = NULL;
107 sockdir_t s_dir = sockdir_accept;
108 socktype_t s_type = socktype_pubsub;
112 if (!munge_endpoint(ep, &s_ep, &s_dir, &s_type) || !s_ep)
115 assert(s_dir == sockdir_accept || s_dir == sockdir_connect);
116 assert(s_type == socktype_pubsub || s_type == socktype_pushpull);
118 if (s_type == socktype_pubsub)
119 socket_type = XS_SUB;
120 else if (s_type == socktype_pushpull)
121 socket_type = XS_PULL;
123 s = xs_socket(xs_ctx, socket_type);
125 if (!set_options(s, socket_type)) {
130 if (s_dir == sockdir_accept) {
131 if (xs_bind(s, s_ep) == -1) {
135 }
else if (s_dir == sockdir_connect) {
136 if (xs_connect(s, s_ep) == -1) {
150 nmsg_output_t output = NULL;
152 sockdir_t s_dir = sockdir_connect;
153 socktype_t s_type = socktype_pubsub;
157 if (!munge_endpoint(ep, &s_ep, &s_dir, &s_type) || !s_ep)
160 assert(s_dir == sockdir_accept || s_dir == sockdir_connect);
161 assert(s_type == socktype_pubsub || s_type == socktype_pushpull);
163 if (s_type == socktype_pubsub)
164 socket_type = XS_PUB;
165 else if (s_type == socktype_pushpull)
166 socket_type = XS_PUSH;
168 s = xs_socket(xs_ctx, socket_type);
170 if (!set_options(s, socket_type)) {
175 if (s_dir == sockdir_accept) {
176 if (xs_bind(s, s_ep) == -1) {
180 }
else if (s_dir == sockdir_connect) {
181 if (xs_connect(s, s_ep) == -1) {
199 const char *ep __attribute__((unused)))
206 const char *ep __attribute__((unused)),
207 size_t bufsz __attribute__((unused)))
nmsg_output_t nmsg_output_open_xs(void *s, size_t bufsz)
Initialize a new XS socket NMSG output.
nmsg_output_t nmsg_output_open_xs_endpoint(void *xs_ctx, const char *ep, size_t bufsz)
Create an XS socket and initialize a new NMSG stream output from it.