pacemaker  1.1.15-e174ec8
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008 Andrew Beekhof
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  *
18  */
19 #include <crm_internal.h>
20 #include <crm/crm.h>
21 
22 #include <sys/param.h>
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 #include <netinet/ip.h>
31 #include <netdb.h>
32 
33 #include <stdlib.h>
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <glib.h>
37 
38 #include <bzlib.h>
39 
40 #include <crm/common/ipcs.h>
41 #include <crm/common/xml.h>
42 #include <crm/common/mainloop.h>
43 
44 #ifdef HAVE_GNUTLS_GNUTLS_H
45 # undef KEYFILE
46 # include <gnutls/gnutls.h>
47 
48 const int psk_tls_kx_order[] = {
49  GNUTLS_KX_DHE_PSK,
50  GNUTLS_KX_PSK,
51 };
52 
53 const int anon_tls_kx_order[] = {
54  GNUTLS_KX_ANON_DH,
55  GNUTLS_KX_DHE_RSA,
56  GNUTLS_KX_DHE_DSS,
57  GNUTLS_KX_RSA,
58  0
59 };
60 #endif
61 
62 /* Swab macros from linux/swab.h */
63 #ifdef HAVE_LINUX_SWAB_H
64 # include <linux/swab.h>
65 #else
66 /*
67  * casts are necessary for constants, because we never know how for sure
68  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
69  */
70 #define __swab16(x) ((uint16_t)( \
71  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
72  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
73 
74 #define __swab32(x) ((uint32_t)( \
75  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
76  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
77  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
78  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
79 
80 #define __swab64(x) ((uint64_t)( \
81  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
82  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
83  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
84  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
85  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
86  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
87  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
88  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
89 #endif
90 
91 #define REMOTE_MSG_VERSION 1
92 #define ENDIAN_LOCAL 0xBADADBBD
93 
94 struct crm_remote_header_v0
95 {
96  uint32_t endian; /* Detect messages from hosts with different endian-ness */
98  uint64_t id;
99  uint64_t flags;
104 
105  /* New fields get added here */
106 
107 } __attribute__ ((packed));
108 
109 static struct crm_remote_header_v0 *
110 crm_remote_header(crm_remote_t * remote)
111 {
112  struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
113  if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
114  return NULL;
115 
116  } else if(header->endian != ENDIAN_LOCAL) {
117  uint32_t endian = __swab32(header->endian);
120  if(endian != ENDIAN_LOCAL) {
121  crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
122  ENDIAN_LOCAL, header->endian, endian);
123  return NULL;
124  }
126  header->id = __swab64(header->id);
127  header->flags = __swab64(header->flags);
128  header->endian = __swab32(header->endian);
129 
130  header->version = __swab32(header->version);
131  header->size_total = __swab32(header->size_total);
132  header->payload_offset = __swab32(header->payload_offset);
133  header->payload_compressed = __swab32(header->payload_compressed);
134  header->payload_uncompressed = __swab32(header->payload_uncompressed);
135  }
136 
137  return header;
138 }
139 
140 #ifdef HAVE_GNUTLS_GNUTLS_H
141 
142 int
143 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
144 {
145  int rc = 0;
146  int pollrc = 0;
147  time_t start = time(NULL);
148 
149  do {
150  rc = gnutls_handshake(*remote->tls_session);
151  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
152  pollrc = crm_remote_ready(remote, 1000);
153  if (pollrc < 0) {
154  /* poll returned error, there is no hope */
155  rc = -1;
156  }
157  }
158 
159  } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
160  (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
161 
162  if (rc < 0) {
163  crm_trace("gnutls_handshake() failed with %d", rc);
164  }
165  return rc;
166 }
167 
168 void *
169 crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ,
170  void *credentials)
171 {
172  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
173 
174  gnutls_init(session, type);
175 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
176 /* http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication */
177  gnutls_priority_set_direct(*session, "NORMAL:+ANON-DH", NULL);
178 /* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
179 # else
180  gnutls_set_default_priority(*session);
181  gnutls_kx_set_priority(*session, anon_tls_kx_order);
182 # endif
183  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
184  switch (type) {
185  case GNUTLS_SERVER:
186  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
187  (gnutls_anon_server_credentials_t) credentials);
188  break;
189  case GNUTLS_CLIENT:
190  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
191  (gnutls_anon_client_credentials_t) credentials);
192  break;
193  }
194 
195  return session;
196 }
197 
198 void *
199 create_psk_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ , void *credentials)
200 {
201  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
202 
203  gnutls_init(session, type);
204 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
205  gnutls_priority_set_direct(*session, "NORMAL:+DHE-PSK:+PSK", NULL);
206 # else
207  gnutls_set_default_priority(*session);
208  gnutls_kx_set_priority(*session, psk_tls_kx_order);
209 # endif
210  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
211  switch (type) {
212  case GNUTLS_SERVER:
213  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
214  (gnutls_psk_server_credentials_t) credentials);
215  break;
216  case GNUTLS_CLIENT:
217  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
218  (gnutls_psk_client_credentials_t) credentials);
219  break;
220  }
221 
222  return session;
223 }
224 
225 static int
226 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
227 {
228  const char *unsent = buf;
229  int rc = 0;
230  int total_send;
231 
232  if (buf == NULL) {
233  return -1;
234  }
235 
236  total_send = len;
237  crm_trace("Message size: %llu", (unsigned long long) len);
238 
239  while (TRUE) {
240  rc = gnutls_record_send(*session, unsent, len);
241 
242  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
243  crm_debug("Retry");
244 
245  } else if (rc < 0) {
246  crm_err("Connection terminated rc = %d", rc);
247  break;
248 
249  } else if (rc < len) {
250  crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
251  len -= rc;
252  unsent += rc;
253  } else {
254  crm_trace("Sent all %d bytes", rc);
255  break;
256  }
257  }
258 
259  return rc < 0 ? rc : total_send;
260 }
261 #endif
262 
263 static int
264 crm_send_plaintext(int sock, const char *buf, size_t len)
265 {
266 
267  int rc = 0;
268  const char *unsent = buf;
269  int total_send;
270 
271  if (buf == NULL) {
272  return -1;
273  }
274  total_send = len;
275 
276  crm_trace("Message on socket %d: size=%llu",
277  sock, (unsigned long long) len);
278  retry:
279  rc = write(sock, unsent, len);
280  if (rc < 0) {
281  switch (errno) {
282  case EINTR:
283  case EAGAIN:
284  crm_trace("Retry");
285  goto retry;
286  default:
287  crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
288  break;
289  }
290 
291  } else if (rc < len) {
292  crm_trace("Only sent %d of %llu remaining bytes",
293  rc, (unsigned long long) len);
294  len -= rc;
295  unsent += rc;
296  goto retry;
297 
298  } else {
299  crm_trace("Sent %d bytes: %.100s", rc, buf);
300  }
301 
302  return rc < 0 ? rc : total_send;
303 
304 }
305 
306 static int
307 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
308 {
309  int lpc = 0;
310  int rc = -ESOCKTNOSUPPORT;
311 
312  for(; lpc < iovs; lpc++) {
313 
314 #ifdef HAVE_GNUTLS_GNUTLS_H
315  if (remote->tls_session) {
316  rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
317  } else if (remote->tcp_socket) {
318 #else
319  if (remote->tcp_socket) {
320 #endif
321  rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
322 
323  } else {
324  crm_err("Unsupported connection type");
325  }
326  }
327  return rc;
328 }
329 
330 int
331 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
332 {
333  int rc = -1;
334  static uint64_t id = 0;
335  char *xml_text = dump_xml_unformatted(msg);
336 
337  struct iovec iov[2];
338  struct crm_remote_header_v0 *header;
339 
340  if (xml_text == NULL) {
341  crm_err("Invalid XML, can not send msg");
342  return -1;
343  }
344 
345  header = calloc(1, sizeof(struct crm_remote_header_v0));
346  iov[0].iov_base = header;
347  iov[0].iov_len = sizeof(struct crm_remote_header_v0);
348 
349  iov[1].iov_base = xml_text;
350  iov[1].iov_len = 1 + strlen(xml_text);
351 
352  id++;
353  header->id = id;
354  header->endian = ENDIAN_LOCAL;
355  header->version = REMOTE_MSG_VERSION;
356  header->payload_offset = iov[0].iov_len;
357  header->payload_uncompressed = iov[1].iov_len;
358  header->size_total = iov[0].iov_len + iov[1].iov_len;
359 
360  crm_trace("Sending len[0]=%d, start=%x\n",
361  (int)iov[0].iov_len, *(int*)(void*)xml_text);
362  rc = crm_remote_sendv(remote, iov, 2);
363  if (rc < 0) {
364  crm_err("Failed to send remote msg, rc = %d", rc);
365  }
366 
367  free(iov[0].iov_base);
368  free(iov[1].iov_base);
369  return rc;
370 }
371 
372 
378 xmlNode *
380 {
381  xmlNode *xml = NULL;
382  struct crm_remote_header_v0 *header = crm_remote_header(remote);
383 
384  if (remote->buffer == NULL || header == NULL) {
385  return NULL;
386  }
387 
388  /* Support compression on the receiving end now, in case we ever want to add it later */
389  if (header->payload_compressed) {
390  int rc = 0;
391  unsigned int size_u = 1 + header->payload_uncompressed;
392  char *uncompressed = calloc(1, header->payload_offset + size_u);
393 
394  crm_trace("Decompressing message data %d bytes into %d bytes",
395  header->payload_compressed, size_u);
396 
397  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
398  remote->buffer + header->payload_offset,
399  header->payload_compressed, 1, 0);
400 
401  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
402  crm_warn("Couldn't decompress v%d message, we only understand v%d",
403  header->version, REMOTE_MSG_VERSION);
404  free(uncompressed);
405  return NULL;
406 
407  } else if (rc != BZ_OK) {
408  crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
409  free(uncompressed);
410  return NULL;
411  }
412 
413  CRM_ASSERT(size_u == header->payload_uncompressed);
414 
415  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
416  remote->buffer_size = header->payload_offset + size_u;
417 
418  free(remote->buffer);
419  remote->buffer = uncompressed;
420  header = crm_remote_header(remote);
421  }
422 
423  /* take ownership of the buffer */
424  remote->buffer_offset = 0;
425 
426  CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
427 
428  xml = string2xml(remote->buffer + header->payload_offset);
429  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
430  crm_warn("Couldn't parse v%d message, we only understand v%d",
431  header->version, REMOTE_MSG_VERSION);
432 
433  } else if (xml == NULL) {
434  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
435  }
436 
437  return xml;
438 }
439 
448 int
449 crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
450 {
451  struct pollfd fds = { 0, };
452  int sock = 0;
453  int rc = 0;
454  time_t start;
455 
456 #ifdef HAVE_GNUTLS_GNUTLS_H
457  if (remote->tls_session) {
458  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
459 
460  sock = GPOINTER_TO_INT(sock_ptr);
461  } else if (remote->tcp_socket) {
462 #else
463  if (remote->tcp_socket) {
464 #endif
465  sock = remote->tcp_socket;
466  } else {
467  crm_err("Unsupported connection type");
468  }
469 
470  if (sock <= 0) {
471  crm_trace("No longer connected");
472  return -ENOTCONN;
473  }
474 
475  start = time(NULL);
476  errno = 0;
477  do {
478  fds.fd = sock;
479  fds.events = POLLIN;
480 
481  /* If we got an EINTR while polling, and we have a
482  * specific timeout we are trying to honor, attempt
483  * to adjust the timeout to the closest second. */
484  if (errno == EINTR && (timeout > 0)) {
485  timeout = timeout - ((time(NULL) - start) * 1000);
486  if (timeout < 1000) {
487  timeout = 1000;
488  }
489  }
490 
491  rc = poll(&fds, 1, timeout);
492  } while (rc < 0 && errno == EINTR);
493 
494  return rc;
495 }
496 
497 
508 static size_t
509 crm_remote_recv_once(crm_remote_t * remote)
510 {
511  int rc = 0;
512  size_t read_len = sizeof(struct crm_remote_header_v0);
513  struct crm_remote_header_v0 *header = crm_remote_header(remote);
514 
515  if(header) {
516  /* Stop at the end of the current message */
517  read_len = header->size_total;
518  }
519 
520  /* automatically grow the buffer when needed */
521  if(remote->buffer_size < read_len) {
522  remote->buffer_size = 2 * read_len;
523  crm_trace("Expanding buffer to %llu bytes",
524  (unsigned long long) remote->buffer_size);
525 
526  remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
527  CRM_ASSERT(remote->buffer != NULL);
528  }
529 
530 #ifdef HAVE_GNUTLS_GNUTLS_H
531  if (remote->tls_session) {
532  rc = gnutls_record_recv(*(remote->tls_session),
533  remote->buffer + remote->buffer_offset,
534  remote->buffer_size - remote->buffer_offset);
535  if (rc == GNUTLS_E_INTERRUPTED) {
536  rc = -EINTR;
537  } else if (rc == GNUTLS_E_AGAIN) {
538  rc = -EAGAIN;
539  } else if (rc < 0) {
540  crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
541  rc = -pcmk_err_generic;
542  }
543  } else if (remote->tcp_socket) {
544 #else
545  if (remote->tcp_socket) {
546 #endif
547  errno = 0;
548  rc = read(remote->tcp_socket,
549  remote->buffer + remote->buffer_offset,
550  remote->buffer_size - remote->buffer_offset);
551  if(rc < 0) {
552  rc = -errno;
553  }
554 
555  } else {
556  crm_err("Unsupported connection type");
557  return -ESOCKTNOSUPPORT;
558  }
559 
560  /* process any errors. */
561  if (rc > 0) {
562  remote->buffer_offset += rc;
563  /* always null terminate buffer, the +1 to alloc always allows for this. */
564  remote->buffer[remote->buffer_offset] = '\0';
565  crm_trace("Received %u more bytes, %llu total",
566  rc, (unsigned long long) remote->buffer_offset);
567 
568  } else if (rc == -EINTR || rc == -EAGAIN) {
569  crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
570 
571  } else if (rc == 0) {
572  crm_debug("EOF encoutered after %llu bytes",
573  (unsigned long long) remote->buffer_offset);
574  return -ENOTCONN;
575 
576  } else {
577  crm_debug("Error receiving message after %llu bytes: %s (%d)",
578  (unsigned long long) remote->buffer_offset,
579  pcmk_strerror(rc), rc);
580  return -ENOTCONN;
581  }
582 
583  header = crm_remote_header(remote);
584  if(header) {
585  if(remote->buffer_offset < header->size_total) {
586  crm_trace("Read less than the advertised length: %llu < %u bytes",
587  (unsigned long long) remote->buffer_offset,
588  header->size_total);
589  } else {
590  crm_trace("Read full message of %llu bytes",
591  (unsigned long long) remote->buffer_offset);
592  return remote->buffer_offset;
593  }
594  }
595 
596  return -EAGAIN;
597 }
598 
606 gboolean
607 crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconnected)
608 {
609  int rc;
610  time_t start = time(NULL);
611  int remaining_timeout = 0;
612 
613  if (total_timeout == 0) {
614  total_timeout = 10000;
615  } else if (total_timeout < 0) {
616  total_timeout = 60000;
617  }
618  *disconnected = 0;
619 
620  remaining_timeout = total_timeout;
621  while ((remaining_timeout > 0) && !(*disconnected)) {
622 
623  /* read some more off the tls buffer if we still have time left. */
624  crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d",
625  total_timeout, remaining_timeout);
626  rc = crm_remote_ready(remote, remaining_timeout);
627 
628  if (rc == 0) {
629  crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
630  return FALSE;
631 
632  } else if (rc == -EAGAIN) {
633  crm_trace("waiting for remote connection data (up to %dms)",
634  remaining_timeout);
635 
636  } else if(rc < 0) {
637  crm_debug("poll() failed: %s (%d)", pcmk_strerror(rc), rc);
638 
639  } else {
640  rc = crm_remote_recv_once(remote);
641  if(rc > 0) {
642  return TRUE;
643  } else if (rc < 0) {
644  crm_debug("recv() failed: %s (%d)", pcmk_strerror(rc), rc);
645  }
646  }
647 
648  if(rc == -ENOTCONN) {
649  *disconnected = 1;
650  return FALSE;
651  }
652 
653  remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
654  }
655 
656  return FALSE;
657 }
658 
659 struct tcp_async_cb_data {
660  gboolean success;
661  int sock;
662  void *userdata;
663  void (*callback) (void *userdata, int sock);
664  int timeout; /*ms */
665  time_t start;
666 };
667 
668 static gboolean
669 check_connect_finished(gpointer userdata)
670 {
671  struct tcp_async_cb_data *cb_data = userdata;
672  int rc = 0;
673  int sock = cb_data->sock;
674  int error = 0;
675 
676  fd_set rset, wset;
677  socklen_t len = sizeof(error);
678  struct timeval ts = { 0, };
679 
680  if (cb_data->success == TRUE) {
681  goto dispatch_done;
682  }
683 
684  FD_ZERO(&rset);
685  FD_SET(sock, &rset);
686  wset = rset;
687 
688  crm_trace("fd %d: checking to see if connect finished", sock);
689  rc = select(sock + 1, &rset, &wset, NULL, &ts);
690 
691  if (rc < 0) {
692  rc = errno;
693  if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
694  /* reschedule if there is still time left */
695  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
696  goto reschedule;
697  } else {
698  rc = -ETIMEDOUT;
699  }
700  }
701  crm_trace("fd %d: select failed %d connect dispatch ", sock, rc);
702  goto dispatch_done;
703  } else if (rc == 0) {
704  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
705  goto reschedule;
706  }
707  crm_debug("fd %d: timeout during select", sock);
708  rc = -ETIMEDOUT;
709  goto dispatch_done;
710  } else {
711  crm_trace("fd %d: select returned success", sock);
712  rc = 0;
713  }
714 
715  /* can we read or write to the socket now? */
716  if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
717  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
718  crm_trace("fd %d: call to getsockopt failed", sock);
719  rc = -1;
720  goto dispatch_done;
721  }
722 
723  if (error) {
724  crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
725  rc = -1;
726  goto dispatch_done;
727  }
728  } else {
729  crm_trace("neither read nor write set after select");
730  rc = -1;
731  goto dispatch_done;
732  }
733 
734  dispatch_done:
735  if (!rc) {
736  crm_trace("fd %d: connected", sock);
737  /* Success, set the return code to the sock to report to the callback */
738  rc = cb_data->sock;
739  cb_data->sock = 0;
740  } else {
741  close(sock);
742  }
743 
744  if (cb_data->callback) {
745  cb_data->callback(cb_data->userdata, rc);
746  }
747  free(cb_data);
748  return FALSE;
749 
750  reschedule:
751 
752  /* will check again next interval */
753  return TRUE;
754 }
755 
756 static int
757 internal_tcp_connect_async(int sock,
758  const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
759  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
760 {
761  int rc = 0;
762  int flag = 0;
763  int interval = 500;
764  int timer;
765  struct tcp_async_cb_data *cb_data = NULL;
766 
767  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
768  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
769  crm_err("fcntl() write failed");
770  return -1;
771  }
772  }
773 
774  rc = connect(sock, addr, addrlen);
775 
776  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
777  return -1;
778  }
779 
780  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
781  cb_data->userdata = userdata;
782  cb_data->callback = callback;
783  cb_data->sock = sock;
784  cb_data->timeout = timeout;
785  cb_data->start = time(NULL);
786 
787  if (rc == 0) {
788  /* The connect was successful immediately, we still return to mainloop
789  * and let this callback get called later. This avoids the user of this api
790  * to have to account for the fact the callback could be invoked within this
791  * function before returning. */
792  cb_data->success = TRUE;
793  interval = 1;
794  }
795 
796  /* Check connect finished is mostly doing a non-block poll on the socket
797  * to see if we can read/write to it. Once we can, the connect has completed.
798  * This method allows us to connect to the server without blocking mainloop.
799  *
800  * This is a poor man's way of polling to see when the connection finished.
801  * At some point we should figure out a way to use a mainloop fd callback for this.
802  * Something about the way mainloop is currently polling prevents this from working at the
803  * moment though. */
804  crm_trace("fd %d: scheduling to check if connect finished in %dms second", sock, interval);
805  timer = g_timeout_add(interval, check_connect_finished, cb_data);
806  if (timer_id) {
807  *timer_id = timer;
808  }
809 
810  return 0;
811 }
812 
813 static int
814 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
815 {
816  int flag = 0;
817  int rc = connect(sock, addr, addrlen);
818 
819  if (rc == 0) {
820  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
821  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
822  crm_err("fcntl() write failed");
823  return -1;
824  }
825  }
826  }
827 
828  return rc;
829 }
830 
837 int
838 crm_remote_tcp_connect_async(const char *host, int port, int timeout, /*ms */
839  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
840 {
841  char buffer[256];
842  struct addrinfo *res = NULL;
843  struct addrinfo *rp = NULL;
844  struct addrinfo hints;
845  const char *server = host;
846  int ret_ga;
847  int sock = -1;
848 
849  /* getaddrinfo */
850  memset(&hints, 0, sizeof(struct addrinfo));
851  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
852  hints.ai_socktype = SOCK_STREAM;
853  hints.ai_flags = AI_CANONNAME;
854 
855  crm_debug("Looking up %s", server);
856  ret_ga = getaddrinfo(server, NULL, &hints, &res);
857  if (ret_ga) {
858  crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
859  return -1;
860  }
861 
862  if (!res || !res->ai_addr) {
863  crm_err("getaddrinfo failed");
864  goto async_cleanup;
865  }
866 
867  for (rp = res; rp != NULL; rp = rp->ai_next) {
868  struct sockaddr *addr = rp->ai_addr;
869 
870  if (!addr) {
871  continue;
872  }
873 
874  if (rp->ai_canonname) {
875  server = res->ai_canonname;
876  }
877  crm_debug("Got address %s for %s", server, host);
878 
879  /* create socket */
880  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
881  if (sock == -1) {
882  crm_err("Socket creation failed for remote client connection.");
883  continue;
884  }
885 
886  memset(buffer, 0, DIMOF(buffer));
887  if (addr->sa_family == AF_INET6) {
888  struct sockaddr_in6 *addr_in = (struct sockaddr_in6 *)(void*)addr;
889 
890  addr_in->sin6_port = htons(port);
891  inet_ntop(addr->sa_family, &addr_in->sin6_addr, buffer, DIMOF(buffer));
892 
893  } else {
894  struct sockaddr_in *addr_in = (struct sockaddr_in *)(void*)addr;
895 
896  addr_in->sin_port = htons(port);
897  inet_ntop(addr->sa_family, &addr_in->sin_addr, buffer, DIMOF(buffer));
898  }
899 
900  crm_info("Attempting to connect to remote server at %s:%d", buffer, port);
901 
902  if (callback) {
903  if (internal_tcp_connect_async
904  (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
905  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
906  }
907 
908  } else {
909  if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
910  break; /* Success */
911  }
912  }
913 
914  close(sock);
915  sock = -1;
916  }
917 
918 async_cleanup:
919 
920  if (res) {
921  freeaddrinfo(res);
922  }
923  return sock;
924 }
925 
926 int
927 crm_remote_tcp_connect(const char *host, int port)
928 {
929  return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
930 }
A dumping ground.
size_t buffer_offset
Definition: ipcs.h:45
uint32_t payload_compressed
Definition: remote.c:124
const char * pcmk_strerror(int rc)
Definition: logging.c:1113
int crm_remote_ready(crm_remote_t *remote, int timeout)
Definition: remote.c:449
char * buffer
Definition: ipcs.h:43
AIS_Host host
Definition: internal.h:52
uint32_t payload_uncompressed
Definition: remote.c:125
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:150
#define ENDIAN_LOCAL
Definition: remote.c:92
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:118
xmlNode * string2xml(const char *input)
Definition: xml.c:2960
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
Definition: remote.c:331
int crm_remote_tcp_connect(const char *host, int port)
Definition: remote.c:927
#define crm_warn(fmt, args...)
Definition: logging.h:249
#define crm_debug(fmt, args...)
Definition: logging.h:253
int crm_initiate_client_tls_handshake(crm_remote_t *remote, int timeout_ms)
void gnutls_session_t
Definition: cib_remote.c:52
#define crm_trace(fmt, args...)
Definition: logging.h:254
uint64_t id
Definition: remote.c:120
#define __swab64(x)
Definition: remote.c:80
Wrappers for and extensions to libxml2.
#define pcmk_err_generic
Definition: error.h:45
uint32_t payload_offset
Definition: remote.c:123
struct tcp_async_cb_data __attribute__
uint32_t size_total
Definition: remote.c:122
#define __swab32(x)
Definition: remote.c:74
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:226
size_t buffer_size
Definition: ipcs.h:44
#define REMOTE_MSG_VERSION
Definition: remote.c:91
#define crm_err(fmt, args...)
Definition: logging.h:248
const char * bz2_strerror(int rc)
Definition: logging.c:1176
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3990
#define DIMOF(a)
Definition: crm.h:41
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
Definition: remote.c:379
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:35
void * create_psk_tls_session(int csock, int type, void *credentials)
void * crm_create_anon_tls_session(int sock, int type, void *credentials)
int tcp_socket
Definition: ipcs.h:47
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
Definition: remote.c:838
#define crm_info(fmt, args...)
Definition: logging.h:251
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
Definition: remote.c:607
uint32_t version
Definition: remote.c:119
uint64_t flags
Definition: remote.c:121
enum crm_ais_msg_types type
Definition: internal.h:51