pacemaker  1.1.24-3850484742
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008 Andrew Beekhof
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  *
18  */
19 #include <crm_internal.h>
20 #include <crm/crm.h>
21 
22 #include <sys/param.h>
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 #include <netinet/ip.h>
31 #include <netinet/tcp.h>
32 #include <netdb.h>
33 
34 #include <stdlib.h>
35 #include <errno.h>
36 #include <glib.h>
37 
38 #include <bzlib.h>
39 
40 #include <crm/common/ipcs.h>
41 #include <crm/common/xml.h>
42 #include <crm/common/mainloop.h>
44 
45 #ifdef HAVE_GNUTLS_GNUTLS_H
46 # undef KEYFILE
47 # include <gnutls/gnutls.h>
48 
49 const int psk_tls_kx_order[] = {
50  GNUTLS_KX_DHE_PSK,
51  GNUTLS_KX_PSK,
52 };
53 
54 const int anon_tls_kx_order[] = {
55  GNUTLS_KX_ANON_DH,
56  GNUTLS_KX_DHE_RSA,
57  GNUTLS_KX_DHE_DSS,
58  GNUTLS_KX_RSA,
59  0
60 };
61 #endif
62 
63 /* Swab macros from linux/swab.h */
64 #ifdef HAVE_LINUX_SWAB_H
65 # include <linux/swab.h>
66 #else
67 /*
68  * casts are necessary for constants, because we never know how for sure
69  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
70  */
71 #define __swab16(x) ((uint16_t)( \
72  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
73  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
74 
75 #define __swab32(x) ((uint32_t)( \
76  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
77  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
78  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
79  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
80 
81 #define __swab64(x) ((uint64_t)( \
82  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
83  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
84  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
85  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
86  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
87  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
88  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
89  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
90 #endif
91 
92 #define REMOTE_MSG_VERSION 1
93 #define ENDIAN_LOCAL 0xBADADBBD
94 
95 struct crm_remote_header_v0
96 {
97  uint32_t endian; /* Detect messages from hosts with different endian-ness */
99  uint64_t id;
100  uint64_t flags;
105 
106  /* New fields get added here */
107 
108 } __attribute__ ((packed));
109 
110 static struct crm_remote_header_v0 *
111 crm_remote_header(crm_remote_t * remote)
112 {
113  struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
114  if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
115  return NULL;
116 
117  } else if(header->endian != ENDIAN_LOCAL) {
118  uint32_t endian = __swab32(header->endian);
119 
120  CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
121  if(endian != ENDIAN_LOCAL) {
122  crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
123  ENDIAN_LOCAL, header->endian, endian);
124  return NULL;
125  }
126 
127  header->id = __swab64(header->id);
128  header->flags = __swab64(header->flags);
129  header->endian = __swab32(header->endian);
130 
131  header->version = __swab32(header->version);
132  header->size_total = __swab32(header->size_total);
133  header->payload_offset = __swab32(header->payload_offset);
134  header->payload_compressed = __swab32(header->payload_compressed);
135  header->payload_uncompressed = __swab32(header->payload_uncompressed);
136  }
137 
138  return header;
139 }
140 
141 #ifdef HAVE_GNUTLS_GNUTLS_H
142 
143 int
144 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
145 {
146  int rc = 0;
147  int pollrc = 0;
148  time_t start = time(NULL);
149 
150  do {
151  rc = gnutls_handshake(*remote->tls_session);
152  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
153  pollrc = crm_remote_ready(remote, 1000);
154  if (pollrc < 0) {
155  /* poll returned error, there is no hope */
156  rc = -1;
157  }
158  }
160  } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
161  (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
162 
163  if (rc < 0) {
164  crm_trace("gnutls_handshake() failed with %d", rc);
165  }
166  return rc;
167 }
168 
175 static void
176 pcmk__set_minimum_dh_bits(gnutls_session_t *session)
177 {
178  const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
179 
180  if (dh_min_bits_s) {
181  int dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
182 
183  /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
184  * the priority string imply the DH requirements, but this is the only
185  * way to give the user control over compatibility with older servers.
186  */
187  if (dh_min_bits > 0) {
188  crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
189  dh_min_bits);
190  gnutls_dh_set_prime_bits(*session, dh_min_bits);
191  }
192  }
193 }
194 
195 static unsigned int
196 pcmk__bound_dh_bits(unsigned int dh_bits)
197 {
198  const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
199  const char *dh_max_bits_s = getenv("PCMK_dh_max_bits");
200  int dh_min_bits = 0;
201  int dh_max_bits = 0;
202 
203  if (dh_min_bits_s) {
204  dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
205  }
206  if (dh_max_bits_s) {
207  dh_max_bits = crm_parse_int(dh_max_bits_s, "0");
208  if ((dh_min_bits > 0) && (dh_max_bits > 0)
209  && (dh_max_bits < dh_min_bits)) {
210  crm_warn("Ignoring PCMK_dh_max_bits because it is less than PCMK_dh_min_bits");
211  dh_max_bits = 0;
212  }
213  }
214  if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
215  return dh_min_bits;
216  }
217  if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
218  return dh_max_bits;
219  }
220  return dh_bits;
221 }
222 
235 pcmk__new_tls_session(int csock, unsigned int conn_type,
236  gnutls_credentials_type_t cred_type, void *credentials)
237 {
238  int rc = GNUTLS_E_SUCCESS;
239 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
240  const char *prio_base = NULL;
241  char *prio = NULL;
242 # endif
243  gnutls_session_t *session = NULL;
244 
245 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
246  /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
247  * values required for its functionality.
248  *
249  * For an example of anonymous authentication, see:
250  * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
251  */
252 
253  prio_base = getenv("PCMK_tls_priorities");
254  if (prio_base == NULL) {
255  prio_base = PCMK_GNUTLS_PRIORITIES;
256  }
257  prio = crm_strdup_printf("%s:%s", prio_base,
258  (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
259 # endif
260 
261  session = gnutls_malloc(sizeof(gnutls_session_t));
262  if (session == NULL) {
263  rc = GNUTLS_E_MEMORY_ERROR;
264  goto error;
265  }
266 
267  rc = gnutls_init(session, conn_type);
268  if (rc != GNUTLS_E_SUCCESS) {
269  goto error;
270  }
271 
272 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
273  /* @TODO On the server side, it would be more efficient to cache the
274  * priority with gnutls_priority_init2() and set it with
275  * gnutls_priority_set() for all sessions.
276  */
277  rc = gnutls_priority_set_direct(*session, prio, NULL);
278  if (rc != GNUTLS_E_SUCCESS) {
279  goto error;
280  }
281  if (conn_type == GNUTLS_CLIENT) {
282  pcmk__set_minimum_dh_bits(session);
283  }
284 # else
285  gnutls_set_default_priority(*session);
286  gnutls_kx_set_priority(*session, (cred_type == GNUTLS_CRD_ANON)? anon_tls_kx_order : psk_tls_kx_order);
287 # endif
288 
289  gnutls_transport_set_ptr(*session,
290  (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
291 
292  rc = gnutls_credentials_set(*session, cred_type, credentials);
293  if (rc != GNUTLS_E_SUCCESS) {
294  goto error;
295  }
296 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
297  free(prio);
298 # endif
299  return session;
300 
301 error:
302  {
303 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
304  const char *prio_s = prio;
305 # else
306  const char *prio_s = "default";
307 # endif
308 
309  crm_err("Could not initialize %s TLS %s session: %s "
310  CRM_XS " rc=%d priority='%s'",
311  (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
312  (conn_type == GNUTLS_SERVER)? "server" : "client",
313  gnutls_strerror(rc), rc, prio_s);
314  }
315 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
316  free(prio);
317 # endif
318  if (session != NULL) {
319  gnutls_free(session);
320  }
321  return NULL;
322 }
323 
339 int
340 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
341 {
342  int rc = GNUTLS_E_SUCCESS;
343  unsigned int dh_bits = 0;
344 
345  rc = gnutls_dh_params_init(dh_params);
346  if (rc != GNUTLS_E_SUCCESS) {
347  goto error;
348  }
349 
350 #ifdef HAVE_GNUTLS_SEC_PARAM_TO_PK_BITS
351  dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
352  GNUTLS_SEC_PARAM_NORMAL);
353  if (dh_bits == 0) {
354  rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
355  goto error;
356  }
357 #else
358  dh_bits = 1024;
359 #endif
360  dh_bits = pcmk__bound_dh_bits(dh_bits);
361 
362  crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
363  dh_bits);
364  rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
365  if (rc != GNUTLS_E_SUCCESS) {
366  goto error;
367  }
368 
369  return rc;
370 
371 error:
372  crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
373  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
374  CRM_ASSERT(rc == GNUTLS_E_SUCCESS);
375  return rc;
376 }
377 
390 int
391 pcmk__read_handshake_data(crm_client_t *client)
392 {
393  int rc = 0;
394 
395  CRM_ASSERT(client && client->remote && client->remote->tls_session);
396 
397  do {
398  rc = gnutls_handshake(*client->remote->tls_session);
399  } while (rc == GNUTLS_E_INTERRUPTED);
400 
401  if (rc == GNUTLS_E_AGAIN) {
402  /* No more data is available at the moment. This function should be
403  * invoked again once the client sends more.
404  */
405  return 0;
406  } else if (rc != GNUTLS_E_SUCCESS) {
407  return rc;
408  }
409  return 1;
410 }
411 
412 static int
413 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
414 {
415  const char *unsent = buf;
416  int rc = 0;
417  int total_send;
418 
419  if (buf == NULL) {
420  return -EINVAL;
421  }
422 
423  total_send = len;
424  crm_trace("Message size: %llu", (unsigned long long) len);
425 
426  while (TRUE) {
427  rc = gnutls_record_send(*session, unsent, len);
428 
429  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
430  crm_trace("Retrying to send %llu bytes",
431  (unsigned long long) len);
432 
433  } else if (rc < 0) {
434  crm_err("Connection terminated: %s " CRM_XS " rc=%d",
435  gnutls_strerror(rc), rc);
436  rc = -ECONNABORTED;
437  break;
438 
439  } else if (rc < len) {
440  crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
441  len -= rc;
442  unsent += rc;
443  } else {
444  crm_trace("Sent all %d bytes", rc);
445  break;
446  }
447  }
448 
449  return rc < 0 ? rc : total_send;
450 }
451 #endif
452 
453 static int
454 crm_send_plaintext(int sock, const char *buf, size_t len)
455 {
456 
457  int rc = 0;
458  const char *unsent = buf;
459  int total_send;
460 
461  if (buf == NULL) {
462  return -EINVAL;
463  }
464  total_send = len;
465 
466  crm_trace("Message on socket %d: size=%llu",
467  sock, (unsigned long long) len);
468  retry:
469  rc = write(sock, unsent, len);
470  if (rc < 0) {
471  rc = -errno;
472  switch (errno) {
473  case EINTR:
474  case EAGAIN:
475  crm_trace("Retry");
476  goto retry;
477  default:
478  crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
479  break;
480  }
481 
482  } else if (rc < len) {
483  crm_trace("Only sent %d of %llu remaining bytes",
484  rc, (unsigned long long) len);
485  len -= rc;
486  unsent += rc;
487  goto retry;
488 
489  } else {
490  crm_trace("Sent %d bytes: %.100s", rc, buf);
491  }
492 
493  return rc < 0 ? rc : total_send;
494 
495 }
496 
497 static int
498 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
499 {
500  int lpc = 0;
501  int rc = -ESOCKTNOSUPPORT;
502 
503  for(; lpc < iovs; lpc++) {
504 
505 #ifdef HAVE_GNUTLS_GNUTLS_H
506  if (remote->tls_session) {
507  rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
508  } else if (remote->tcp_socket) {
509 #else
510  if (remote->tcp_socket) {
511 #endif
512  rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
513 
514  } else {
515  crm_err("Unsupported connection type");
516  }
517  }
518  return rc;
519 }
520 
521 int
522 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
523 {
524  int rc = pcmk_ok;
525  static uint64_t id = 0;
526  char *xml_text = dump_xml_unformatted(msg);
527 
528  struct iovec iov[2];
529  struct crm_remote_header_v0 *header;
530 
531  if (xml_text == NULL) {
532  crm_err("Could not send remote message: no message provided");
533  return -EINVAL;
534  }
535 
536  header = calloc(1, sizeof(struct crm_remote_header_v0));
537  iov[0].iov_base = header;
538  iov[0].iov_len = sizeof(struct crm_remote_header_v0);
539 
540  iov[1].iov_base = xml_text;
541  iov[1].iov_len = 1 + strlen(xml_text);
542 
543  id++;
544  header->id = id;
545  header->endian = ENDIAN_LOCAL;
546  header->version = REMOTE_MSG_VERSION;
547  header->payload_offset = iov[0].iov_len;
548  header->payload_uncompressed = iov[1].iov_len;
549  header->size_total = iov[0].iov_len + iov[1].iov_len;
550 
551  crm_trace("Sending len[0]=%d, start=%x",
552  (int)iov[0].iov_len, *(int*)(void*)xml_text);
553  rc = crm_remote_sendv(remote, iov, 2);
554  if (rc < 0) {
555  crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
556  pcmk_strerror(rc), rc);
557  }
558 
559  free(iov[0].iov_base);
560  free(iov[1].iov_base);
561  return rc;
562 }
563 
564 
570 xmlNode *
572 {
573  xmlNode *xml = NULL;
574  struct crm_remote_header_v0 *header = crm_remote_header(remote);
575 
576  if (remote->buffer == NULL || header == NULL) {
577  return NULL;
578  }
579 
580  /* Support compression on the receiving end now, in case we ever want to add it later */
581  if (header->payload_compressed) {
582  int rc = 0;
583  unsigned int size_u = 1 + header->payload_uncompressed;
584  char *uncompressed = calloc(1, header->payload_offset + size_u);
585 
586  crm_trace("Decompressing message data %d bytes into %d bytes",
587  header->payload_compressed, size_u);
588 
589  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
590  remote->buffer + header->payload_offset,
591  header->payload_compressed, 1, 0);
592 
593  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
594  crm_warn("Couldn't decompress v%d message, we only understand v%d",
595  header->version, REMOTE_MSG_VERSION);
596  free(uncompressed);
597  return NULL;
598 
599  } else if (rc != BZ_OK) {
600  crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
601  free(uncompressed);
602  return NULL;
603  }
604 
605  CRM_ASSERT(size_u == header->payload_uncompressed);
606 
607  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
608  remote->buffer_size = header->payload_offset + size_u;
609 
610  free(remote->buffer);
611  remote->buffer = uncompressed;
612  header = crm_remote_header(remote);
613  }
614 
615  /* take ownership of the buffer */
616  remote->buffer_offset = 0;
617 
618  CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
619 
620  xml = string2xml(remote->buffer + header->payload_offset);
621  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
622  crm_warn("Couldn't parse v%d message, we only understand v%d",
623  header->version, REMOTE_MSG_VERSION);
624 
625  } else if (xml == NULL) {
626  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
627  }
628 
629  return xml;
630 }
631 
641 int
642 crm_remote_ready(crm_remote_t *remote, int total_timeout)
643 {
644  struct pollfd fds = { 0, };
645  int sock = 0;
646  int rc = 0;
647  time_t start;
648  int timeout = total_timeout;
649 
650 #ifdef HAVE_GNUTLS_GNUTLS_H
651  if (remote->tls_session) {
652  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
653 
654  sock = GPOINTER_TO_INT(sock_ptr);
655  } else if (remote->tcp_socket) {
656 #else
657  if (remote->tcp_socket) {
658 #endif
659  sock = remote->tcp_socket;
660  } else {
661  crm_err("Unsupported connection type");
662  }
663 
664  if (sock <= 0) {
665  crm_trace("No longer connected");
666  return -ENOTCONN;
667  }
668 
669  start = time(NULL);
670  errno = 0;
671  do {
672  fds.fd = sock;
673  fds.events = POLLIN;
674 
675  /* If we got an EINTR while polling, and we have a
676  * specific timeout we are trying to honor, attempt
677  * to adjust the timeout to the closest second. */
678  if (errno == EINTR && (timeout > 0)) {
679  timeout = total_timeout - ((time(NULL) - start) * 1000);
680  if (timeout < 1000) {
681  timeout = 1000;
682  }
683  }
684 
685  rc = poll(&fds, 1, timeout);
686  } while (rc < 0 && errno == EINTR);
687 
688  return (rc < 0)? -errno : rc;
689 }
690 
691 
702 static size_t
703 crm_remote_recv_once(crm_remote_t * remote)
704 {
705  int rc = 0;
706  size_t read_len = sizeof(struct crm_remote_header_v0);
707  struct crm_remote_header_v0 *header = crm_remote_header(remote);
708 
709  if(header) {
710  /* Stop at the end of the current message */
711  read_len = header->size_total;
712  }
713 
714  /* automatically grow the buffer when needed */
715  if(remote->buffer_size < read_len) {
716  remote->buffer_size = 2 * read_len;
717  crm_trace("Expanding buffer to %llu bytes",
718  (unsigned long long) remote->buffer_size);
719 
720  remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
721  CRM_ASSERT(remote->buffer != NULL);
722  }
723 
724 #ifdef HAVE_GNUTLS_GNUTLS_H
725  if (remote->tls_session) {
726  rc = gnutls_record_recv(*(remote->tls_session),
727  remote->buffer + remote->buffer_offset,
728  remote->buffer_size - remote->buffer_offset);
729  if (rc == GNUTLS_E_INTERRUPTED) {
730  rc = -EINTR;
731  } else if (rc == GNUTLS_E_AGAIN) {
732  rc = -EAGAIN;
733  } else if (rc < 0) {
734  crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
735  rc = -pcmk_err_generic;
736  }
737  } else if (remote->tcp_socket) {
738 #else
739  if (remote->tcp_socket) {
740 #endif
741  errno = 0;
742  rc = read(remote->tcp_socket,
743  remote->buffer + remote->buffer_offset,
744  remote->buffer_size - remote->buffer_offset);
745  if(rc < 0) {
746  rc = -errno;
747  }
748 
749  } else {
750  crm_err("Unsupported connection type");
751  return -ESOCKTNOSUPPORT;
752  }
753 
754  /* process any errors. */
755  if (rc > 0) {
756  remote->buffer_offset += rc;
757  /* always null terminate buffer, the +1 to alloc always allows for this. */
758  remote->buffer[remote->buffer_offset] = '\0';
759  crm_trace("Received %u more bytes, %llu total",
760  rc, (unsigned long long) remote->buffer_offset);
761 
762  } else if (rc == -EINTR || rc == -EAGAIN) {
763  crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
764 
765  } else if (rc == 0) {
766  crm_debug("EOF encoutered after %llu bytes",
767  (unsigned long long) remote->buffer_offset);
768  return -ENOTCONN;
769 
770  } else {
771  crm_debug("Error receiving message after %llu bytes: %s (%d)",
772  (unsigned long long) remote->buffer_offset,
773  pcmk_strerror(rc), rc);
774  return -ENOTCONN;
775  }
776 
777  header = crm_remote_header(remote);
778  if(header) {
779  if(remote->buffer_offset < header->size_total) {
780  crm_trace("Read less than the advertised length: %llu < %u bytes",
781  (unsigned long long) remote->buffer_offset,
782  header->size_total);
783  } else {
784  crm_trace("Read full message of %llu bytes",
785  (unsigned long long) remote->buffer_offset);
786  return remote->buffer_offset;
787  }
788  }
789 
790  return -EAGAIN;
791 }
792 
803 gboolean
804 crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
805 {
806  int rc;
807  time_t start = time(NULL);
808  int remaining_timeout = 0;
809 
810  if (total_timeout == 0) {
811  total_timeout = 10000;
812  } else if (total_timeout < 0) {
813  total_timeout = 60000;
814  }
815  *disconnected = 0;
816 
817  remaining_timeout = total_timeout;
818  while ((remaining_timeout > 0) && !(*disconnected)) {
819 
820  crm_trace("Waiting for remote data (%d of %d ms timeout remaining)",
821  remaining_timeout, total_timeout);
822  rc = crm_remote_ready(remote, remaining_timeout);
823 
824  if (rc == 0) {
825  crm_err("Timed out (%d ms) while waiting for remote data",
826  remaining_timeout);
827  return FALSE;
828 
829  } else if (rc < 0) {
830  crm_debug("Wait for remote data aborted, will try again: %s "
831  CRM_XS " rc=%d", pcmk_strerror(rc), rc);
832 
833  } else {
834  rc = crm_remote_recv_once(remote);
835  if (rc > 0) {
836  return TRUE;
837  } else if (rc == -EAGAIN) {
838  crm_trace("Still waiting for remote data");
839  } else if (rc < 0) {
840  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
841  pcmk_strerror(rc), rc);
842  }
843  }
844 
845  if (rc == -ENOTCONN) {
846  *disconnected = 1;
847  return FALSE;
848  }
849 
850  remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
851  }
852 
853  return FALSE;
854 }
855 
856 struct tcp_async_cb_data {
857  gboolean success;
858  int sock;
859  void *userdata;
860  void (*callback) (void *userdata, int sock);
861  int timeout; /*ms */
862  time_t start;
863 };
864 
865 static gboolean
866 check_connect_finished(gpointer userdata)
867 {
868  struct tcp_async_cb_data *cb_data = userdata;
869  int cb_arg = 0; // socket fd on success, -errno on error
870  int sock = cb_data->sock;
871  int error = 0;
872 
873  fd_set rset, wset;
874  socklen_t len = sizeof(error);
875  struct timeval ts = { 0, };
876 
877  if (cb_data->success == TRUE) {
878  goto dispatch_done;
879  }
880 
881  FD_ZERO(&rset);
882  FD_SET(sock, &rset);
883  wset = rset;
884 
885  crm_trace("fd %d: checking to see if connect finished", sock);
886  cb_arg = select(sock + 1, &rset, &wset, NULL, &ts);
887 
888  if (cb_arg < 0) {
889  cb_arg = -errno;
890  if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
891  /* reschedule if there is still time left */
892  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
893  goto reschedule;
894  } else {
895  cb_arg = -ETIMEDOUT;
896  }
897  }
898  crm_trace("fd %d: select failed %d connect dispatch ", sock, cb_arg);
899  goto dispatch_done;
900  } else if (cb_arg == 0) {
901  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
902  goto reschedule;
903  }
904  crm_debug("fd %d: timeout during select", sock);
905  cb_arg = -ETIMEDOUT;
906  goto dispatch_done;
907  } else {
908  crm_trace("fd %d: select returned success", sock);
909  cb_arg = 0;
910  }
911 
912  /* can we read or write to the socket now? */
913  if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
914  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
915  cb_arg = -errno;
916  crm_trace("fd %d: call to getsockopt failed", sock);
917  goto dispatch_done;
918  }
919  if (error) {
920  crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
921  cb_arg = -error;
922  goto dispatch_done;
923  }
924  } else {
925  crm_trace("neither read nor write set after select");
926  cb_arg = -EAGAIN;
927  goto dispatch_done;
928  }
929 
930  dispatch_done:
931  if (!cb_arg) {
932  crm_trace("fd %d: connected", sock);
933  /* Success, set the return code to the sock to report to the callback */
934  cb_arg = cb_data->sock;
935  cb_data->sock = 0;
936  } else {
937  close(sock);
938  }
939 
940  if (cb_data->callback) {
941  cb_data->callback(cb_data->userdata, cb_arg);
942  }
943  free(cb_data);
944  return FALSE;
945 
946  reschedule:
947 
948  /* will check again next interval */
949  return TRUE;
950 }
951 
952 static int
953 internal_tcp_connect_async(int sock,
954  const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
955  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
956 {
957  int rc = 0;
958  int interval = 500;
959  int timer;
960  struct tcp_async_cb_data *cb_data = NULL;
961 
962  rc = crm_set_nonblocking(sock);
963  if (rc < 0) {
964  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
965  pcmk_strerror(rc), rc);
966  close(sock);
967  return -1;
968  }
969 
970  rc = connect(sock, addr, addrlen);
971  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
972  crm_perror(LOG_WARNING, "connect");
973  return -1;
974  }
975 
976  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
977  cb_data->userdata = userdata;
978  cb_data->callback = callback;
979  cb_data->sock = sock;
980  cb_data->timeout = timeout;
981  cb_data->start = time(NULL);
982 
983  if (rc == 0) {
984  /* The connect was successful immediately, we still return to mainloop
985  * and let this callback get called later. This avoids the user of this api
986  * to have to account for the fact the callback could be invoked within this
987  * function before returning. */
988  cb_data->success = TRUE;
989  interval = 1;
990  }
991 
992  /* Check connect finished is mostly doing a non-block poll on the socket
993  * to see if we can read/write to it. Once we can, the connect has completed.
994  * This method allows us to connect to the server without blocking mainloop.
995  *
996  * This is a poor man's way of polling to see when the connection finished.
997  * At some point we should figure out a way to use a mainloop fd callback for this.
998  * Something about the way mainloop is currently polling prevents this from working at the
999  * moment though. */
1000  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1001  interval, sock);
1002  timer = g_timeout_add(interval, check_connect_finished, cb_data);
1003  if (timer_id) {
1004  *timer_id = timer;
1005  }
1006 
1007  return 0;
1008 }
1009 
1010 static int
1011 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
1012 {
1013  int rc = connect(sock, addr, addrlen);
1014 
1015  if (rc < 0) {
1016  rc = -errno;
1017  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1018  pcmk_strerror(rc), rc);
1019  return rc;
1020  }
1021 
1022  rc = crm_set_nonblocking(sock);
1023  if (rc < 0) {
1024  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1025  pcmk_strerror(rc), rc);
1026  return rc;
1027  }
1028 
1029  return pcmk_ok;
1030 }
1031 
1045 int
1046 crm_remote_tcp_connect_async(const char *host, int port, int timeout,
1047  int *timer_id, void *userdata,
1048  void (*callback) (void *userdata, int sock))
1049 {
1050  char buffer[INET6_ADDRSTRLEN];
1051  struct addrinfo *res = NULL;
1052  struct addrinfo *rp = NULL;
1053  struct addrinfo hints;
1054  const char *server = host;
1055  int ret_ga;
1056  int sock = -ENOTCONN;
1057 
1058  // Get host's IP address(es)
1059  memset(&hints, 0, sizeof(struct addrinfo));
1060  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1061  hints.ai_socktype = SOCK_STREAM;
1062  hints.ai_flags = AI_CANONNAME;
1063  ret_ga = getaddrinfo(server, NULL, &hints, &res);
1064  if (ret_ga) {
1065  crm_err("Unable to get IP address info for %s: %s",
1066  server, gai_strerror(ret_ga));
1067  goto async_cleanup;
1068  }
1069  if (!res || !res->ai_addr) {
1070  crm_err("Unable to get IP address info for %s: no result", server);
1071  goto async_cleanup;
1072  }
1073 
1074  // getaddrinfo() returns a list of host's addresses, try them in order
1075  for (rp = res; rp != NULL; rp = rp->ai_next) {
1076  struct sockaddr *addr = rp->ai_addr;
1077 
1078  if (!addr) {
1079  continue;
1080  }
1081 
1082  if (rp->ai_canonname) {
1083  server = res->ai_canonname;
1084  }
1085  crm_debug("Got canonical name %s for %s", server, host);
1086 
1087  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1088  if (sock == -1) {
1089  crm_perror(LOG_WARNING, "creating socket for connection to %s",
1090  server);
1091  sock = -ENOTCONN;
1092  continue;
1093  }
1094 
1095  /* Set port appropriately for address family */
1096  /* (void*) casts avoid false-positive compiler alignment warnings */
1097  if (addr->sa_family == AF_INET6) {
1098  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1099  } else {
1100  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1101  }
1102 
1103  memset(buffer, 0, DIMOF(buffer));
1104  crm_sockaddr2str(addr, buffer);
1105  crm_info("Attempting TCP connection to %s:%d", buffer, port);
1106 
1107  if (callback) {
1108  if (internal_tcp_connect_async
1109  (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
1110  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1111  }
1112 
1113  } else if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
1114  break; /* Success */
1115  }
1116 
1117  close(sock);
1118  sock = -ENOTCONN;
1119  }
1120 
1121 async_cleanup:
1122 
1123  if (res) {
1124  freeaddrinfo(res);
1125  }
1126  return sock;
1127 }
1128 
1129 int
1130 crm_remote_tcp_connect(const char *host, int port)
1131 {
1132  return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
1133 }
1134 
1145 void
1146 crm_sockaddr2str(void *sa, char *s)
1147 {
1148  switch (((struct sockaddr*)sa)->sa_family) {
1149  case AF_INET:
1150  inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
1151  s, INET6_ADDRSTRLEN);
1152  break;
1153 
1154  case AF_INET6:
1155  inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
1156  s, INET6_ADDRSTRLEN);
1157  break;
1158 
1159  default:
1160  strcpy(s, "<invalid>");
1161  }
1162 }
1163 
1164 int
1166 {
1167  int csock = 0;
1168  int rc = 0;
1169  unsigned laddr = 0;
1170  struct sockaddr_storage addr;
1171  char addr_str[INET6_ADDRSTRLEN];
1172 #ifdef TCP_USER_TIMEOUT
1173  int optval;
1174  long sbd_timeout = crm_get_sbd_timeout();
1175 #endif
1176 
1177  /* accept the connection */
1178  laddr = sizeof(addr);
1179  memset(&addr, 0, sizeof(addr));
1180  csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1181  crm_sockaddr2str(&addr, addr_str);
1182  crm_info("New remote connection from %s", addr_str);
1183 
1184  if (csock == -1) {
1185  crm_err("accept socket failed");
1186  return -1;
1187  }
1188 
1189  rc = crm_set_nonblocking(csock);
1190  if (rc < 0) {
1191  crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1192  pcmk_strerror(rc), rc);
1193  close(csock);
1194  return rc;
1195  }
1196 
1197 #ifdef TCP_USER_TIMEOUT
1198  if (sbd_timeout > 0) {
1199  optval = sbd_timeout / 2; /* time to fail and retry before watchdog */
1200  rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
1201  &optval, sizeof(optval));
1202  if (rc < 0) {
1203  crm_err("setting TCP_USER_TIMEOUT (%d) on client socket failed",
1204  optval);
1205  close(csock);
1206  return rc;
1207  }
1208  }
1209 #endif
1210 
1211  return csock;
1212 }
1213 
1219 int
1221 {
1222  static int port = 0;
1223 
1224  if (port == 0) {
1225  const char *env = getenv("PCMK_remote_port");
1226 
1227  if (env) {
1228  errno = 0;
1229  port = strtol(env, NULL, 10);
1230  if (errno || (port < 1) || (port > 65535)) {
1231  crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1232  env, DEFAULT_REMOTE_PORT);
1233  port = DEFAULT_REMOTE_PORT;
1234  }
1235  } else {
1236  port = DEFAULT_REMOTE_PORT;
1237  }
1238  }
1239  return port;
1240 }
A dumping ground.
size_t buffer_offset
Definition: ipcs.h:46
long crm_get_sbd_timeout(void)
Definition: watchdog.c:257
uint32_t payload_compressed
Definition: remote.c:159
const char * pcmk_strerror(int rc)
Definition: logging.c:1017
#define pcmk_ok
Definition: error.h:45
char * buffer
Definition: ipcs.h:44
int crm_parse_int(const char *text, const char *default_text)
Parse an integer value from a string.
Definition: strings.c:157
AIS_Host host
Definition: internal.h:80
struct crm_remote_s * remote
Definition: ipcs.h:96
uint32_t payload_uncompressed
Definition: remote.c:160
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:176
#define ENDIAN_LOCAL
Definition: remote.c:93
int crm_default_remote_port()
Get the default remote connection TCP port on this host.
Definition: remote.c:1220
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:153
xmlNode * string2xml(const char *input)
Definition: xml.c:2152
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
Definition: remote.c:522
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:54
#define PCMK_GNUTLS_PRIORITIES
Definition: config.h:675
int crm_remote_tcp_connect(const char *host, int port)
Definition: remote.c:1130
#define crm_warn(fmt, args...)
Definition: logging.h:275
#define crm_debug(fmt, args...)
Definition: logging.h:279
void crm_sockaddr2str(void *sa, char *s)
Convert an IP address (IPv4 or IPv6) to a string for logging.
Definition: remote.c:1146
void gnutls_session_t
Definition: cib_remote.c:53
int crm_remote_accept(int ssock)
Definition: remote.c:1165
#define crm_trace(fmt, args...)
Definition: logging.h:280
uint64_t id
Definition: remote.c:155
int crm_set_nonblocking(int fd)
Definition: io.c:510
int crm_remote_ready(crm_remote_t *remote, int total_timeout)
Definition: remote.c:642
#define __swab64(x)
Definition: remote.c:81
Wrappers for and extensions to libxml2.
#define pcmk_err_generic
Definition: error.h:48
uint32_t payload_offset
Definition: remote.c:158
struct tcp_async_cb_data __attribute__
uint32_t size_total
Definition: remote.c:157
#define CRM_XS
Definition: logging.h:42
#define __swab32(x)
Definition: remote.c:75
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:252
size_t buffer_size
Definition: ipcs.h:45
#define REMOTE_MSG_VERSION
Definition: remote.c:92
#define crm_err(fmt, args...)
Definition: logging.h:274
const char * bz2_strerror(int rc)
Definition: logging.c:1390
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3220
#define DIMOF(a)
Definition: crm.h:29
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
Definition: remote.c:571
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:20
int tcp_socket
Definition: ipcs.h:48
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
Definition: remote.c:1046
#define crm_info(fmt, args...)
Definition: logging.h:277
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
Definition: remote.c:804
uint32_t version
Definition: remote.c:154
uint64_t flags
Definition: remote.c:156