pacemaker 2.1.8-2.1.8
Scalable High-Availability cluster resource manager
Loading...
Searching...
No Matches
cpg.c
Go to the documentation of this file.
1/*
2 * Copyright 2004-2024 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
11
12#include <arpa/inet.h>
13#include <inttypes.h> // PRIu32
14#include <netdb.h>
15#include <netinet/in.h>
16#include <stdbool.h>
17#include <stdint.h> // uint32_t
18#include <sys/socket.h>
19#include <sys/types.h> // size_t
20#include <sys/utsname.h>
21
22#include <bzlib.h>
23#include <corosync/corodefs.h>
24#include <corosync/corotypes.h>
25#include <corosync/hdb.h>
26#include <corosync/cpg.h>
27#include <qb/qbipc_common.h>
28#include <qb/qbipcc.h>
29#include <qb/qbutil.h>
30
32#include <crm/common/ipc.h>
33#include <crm/common/ipc_internal.h> // PCMK__SPECIAL_PID
34#include <crm/common/mainloop.h>
35#include <crm/common/xml.h>
36
37#include "crmcluster_private.h"
38
39/* @TODO Once we can update the public API to require pcmk_cluster_t* in more
40 * functions, we can ditch this in favor of cluster->cpg_handle.
41 */
42static cpg_handle_t pcmk_cpg_handle = 0;
43
44// @TODO These could be moved to pcmk_cluster_t* at that time as well
45static bool cpg_evicted = false;
46static GList *cs_message_queue = NULL;
47static int cs_message_timer = 0;
48
49struct pcmk__cpg_host_s {
50 uint32_t id;
51 uint32_t pid;
52 gboolean local;
54 uint32_t size;
55 char uname[MAX_NAME];
56} __attribute__ ((packed));
57
58typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
59
60struct pcmk__cpg_msg_s {
61 struct qb_ipc_response_header header __attribute__ ((aligned(8)));
62 uint32_t id;
63 gboolean is_compressed;
64
67
68 uint32_t size;
69 uint32_t compressed_size;
70 /* 584 bytes */
71 char data[0];
72
73} __attribute__ ((packed));
74
75typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
76
77static void crm_cs_flush(gpointer data);
78
79#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
80
81#define cs_repeat(rc, counter, max, code) do { \
82 rc = code; \
83 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
84 counter++; \
85 crm_debug("Retrying operation after %ds", counter); \
86 sleep(counter); \
87 } else { \
88 break; \
89 } \
90 } while (counter < max)
91
100uint32_t
101pcmk__cpg_local_nodeid(cpg_handle_t handle)
102{
103 cs_error_t rc = CS_OK;
104 int retries = 0;
105 static uint32_t local_nodeid = 0;
106 cpg_handle_t local_handle = handle;
107 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
108 int fd = -1;
109 uid_t found_uid = 0;
110 gid_t found_gid = 0;
111 pid_t found_pid = 0;
112 int rv = 0;
113
114 if (local_nodeid != 0) {
115 return local_nodeid;
116 }
117
118 if (handle == 0) {
119 crm_trace("Creating connection");
120 cs_repeat(rc, retries, 5,
121 cpg_model_initialize(&local_handle, CPG_MODEL_V1,
122 (cpg_model_data_t *) &cpg_model_info,
123 NULL));
124 if (rc != CS_OK) {
125 crm_err("Could not connect to the CPG API: %s (%d)",
126 cs_strerror(rc), rc);
127 return 0;
128 }
129
130 rc = cpg_fd_get(local_handle, &fd);
131 if (rc != CS_OK) {
132 crm_err("Could not obtain the CPG API connection: %s (%d)",
133 cs_strerror(rc), rc);
134 goto bail;
135 }
136
137 // CPG provider run as root (at least in given user namespace)?
138 rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
139 &found_uid, &found_gid);
140 if (rv == 0) {
141 crm_err("CPG provider is not authentic:"
142 " process %lld (uid: %lld, gid: %lld)",
143 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
144 (long long) found_uid, (long long) found_gid);
145 goto bail;
146
147 } else if (rv < 0) {
148 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
149 strerror(-rv), -rv);
150 goto bail;
151 }
152 }
153
154 if (rc == CS_OK) {
155 retries = 0;
156 crm_trace("Performing lookup");
157 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
158 }
159
160 if (rc != CS_OK) {
161 crm_err("Could not get local node id from the CPG API: %s (%d)",
162 pcmk__cs_err_str(rc), rc);
163 }
164
165bail:
166 if (handle == 0) {
167 crm_trace("Closing connection");
168 cpg_finalize(local_handle);
169 }
170 crm_debug("Local nodeid is %u", local_nodeid);
171 return local_nodeid;
172}
173
182static gboolean
183crm_cs_flush_cb(gpointer data)
184{
185 cs_message_timer = 0;
186 crm_cs_flush(data);
187 return FALSE;
188}
189
190// Send no more than this many CPG messages in one flush
191#define CS_SEND_MAX 200
192
199static void
200crm_cs_flush(gpointer data)
201{
202 unsigned int sent = 0;
203 guint queue_len = 0;
204 cs_error_t rc = 0;
205 cpg_handle_t *handle = (cpg_handle_t *) data;
206
207 if (*handle == 0) {
208 crm_trace("Connection is dead");
209 return;
210 }
211
212 queue_len = g_list_length(cs_message_queue);
213 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
214 crm_err("CPG queue has grown to %d", queue_len);
215
216 } else if (queue_len == CS_SEND_MAX) {
217 crm_warn("CPG queue has grown to %d", queue_len);
218 }
219
220 if (cs_message_timer != 0) {
221 /* There is already a timer, wait until it goes off */
222 crm_trace("Timer active %d", cs_message_timer);
223 return;
224 }
225
226 while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
227 struct iovec *iov = cs_message_queue->data;
228
229 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
230 if (rc != CS_OK) {
231 break;
232 }
233
234 sent++;
235 crm_trace("CPG message sent, size=%llu",
236 (unsigned long long) iov->iov_len);
237
238 cs_message_queue = g_list_remove(cs_message_queue, iov);
239 free(iov->iov_base);
240 free(iov);
241 }
242
243 queue_len -= sent;
244 do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
245 "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
246 sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
247 (int) rc);
248
249 if (cs_message_queue) {
250 uint32_t delay_ms = 100;
251 if (rc != CS_OK) {
252 /* Proportionally more if sending failed but cap at 1s */
253 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
254 }
255 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
256 }
257}
258
267static int
268pcmk_cpg_dispatch(gpointer user_data)
269{
270 cs_error_t rc = CS_OK;
271 pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
272
273 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
274 if (rc != CS_OK) {
275 crm_err("Connection to the CPG API failed: %s (%d)",
276 pcmk__cs_err_str(rc), rc);
277 cpg_finalize(cluster->cpg_handle);
278 cluster->cpg_handle = 0;
279 return -1;
280
281 } else if (cpg_evicted) {
282 crm_err("Evicted from CPG membership");
283 return -1;
284 }
285 return 0;
286}
287
288static inline const char *
289ais_dest(const pcmk__cpg_host_t *host)
290{
291 if (host->local) {
292 return "local";
293 } else if (host->size > 0) {
294 return host->uname;
295 } else {
296 return "<all>";
297 }
298}
299
300static inline const char *
301msg_type2text(enum crm_ais_msg_types type)
302{
303 const char *text = "unknown";
304
305 switch (type) {
306 case crm_msg_none:
307 text = "unknown";
308 break;
309 case crm_msg_ais:
310 text = "ais";
311 break;
312 case crm_msg_cib:
313 text = "cib";
314 break;
315 case crm_msg_crmd:
316 text = "crmd";
317 break;
318 case crm_msg_pe:
319 text = "pengine";
320 break;
321 case crm_msg_te:
322 text = "tengine";
323 break;
324 case crm_msg_lrmd:
325 text = "lrmd";
326 break;
327 case crm_msg_attrd:
328 text = "attrd";
329 break;
330 case crm_msg_stonithd:
331 text = "stonithd";
332 break;
334 text = "stonith-ng";
335 break;
336 }
337 return text;
338}
339
348static bool
349check_message_sanity(const pcmk__cpg_msg_t *msg)
350{
351 int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
352
353 if (payload_size < 1) {
354 crm_err("%sCPG message %d from %s invalid: "
355 "Claimed size of %d bytes is too small "
356 CRM_XS " from %s[%u] to %s@%s",
357 (msg->is_compressed? "Compressed " : ""),
358 msg->id, ais_dest(&(msg->sender)),
359 (int) msg->header.size,
360 msg_type2text(msg->sender.type), msg->sender.pid,
361 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
362 return false;
363 }
364
365 if (msg->header.error != CS_OK) {
366 crm_err("%sCPG message %d from %s invalid: "
367 "Sender indicated error %d "
368 CRM_XS " from %s[%u] to %s@%s",
369 (msg->is_compressed? "Compressed " : ""),
370 msg->id, ais_dest(&(msg->sender)),
371 msg->header.error,
372 msg_type2text(msg->sender.type), msg->sender.pid,
373 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
374 return false;
375 }
376
377 if (msg_data_len(msg) != payload_size) {
378 crm_err("%sCPG message %d from %s invalid: "
379 "Total size %d inconsistent with payload size %d "
380 CRM_XS " from %s[%u] to %s@%s",
381 (msg->is_compressed? "Compressed " : ""),
382 msg->id, ais_dest(&(msg->sender)),
383 (int) msg->header.size, (int) msg_data_len(msg),
384 msg_type2text(msg->sender.type), msg->sender.pid,
385 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
386 return false;
387 }
388
389 if (!msg->is_compressed &&
390 /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
391 * but checking the last byte or two should be quick
392 */
393 (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
394 || (msg->data[msg->size - 1] != '\0'))) {
395 crm_err("CPG message %d from %s invalid: "
396 "Payload does not end at byte %llu "
397 CRM_XS " from %s[%u] to %s@%s",
398 msg->id, ais_dest(&(msg->sender)),
399 (unsigned long long) msg->size,
400 msg_type2text(msg->sender.type), msg->sender.pid,
401 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
402 return false;
403 }
404
405 crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
406 (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
407 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
408 ais_dest(&(msg->sender)),
409 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
410 return true;
411}
412
431char *
432pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
433 void *content, uint32_t *kind, const char **from)
434{
435 char *data = NULL;
436 pcmk__cpg_msg_t *msg = content;
437
438 if (handle != 0) {
439 // Do filtering and field massaging
440 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
441 const char *local_name = pcmk__cluster_local_node_name();
442
443 if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) {
444 crm_err("Nodeid mismatch from %" PRIu32 ".%" PRIu32
445 ": claimed nodeid=%" PRIu32,
446 sender_id, pid, msg->sender.id);
447 return NULL;
448 }
449 if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
450 crm_trace("Not for us: %" PRIu32" != %" PRIu32,
451 msg->host.id, local_nodeid);
452 return NULL;
453 }
454 if ((msg->host.size > 0)
455 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
456
457 crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
458 return NULL;
459 }
460
461 msg->sender.id = sender_id;
462 if (msg->sender.size == 0) {
463 const crm_node_t *peer =
464 pcmk__get_node(sender_id, NULL, NULL,
466
467 if (peer->uname == NULL) {
468 crm_err("No uname for peer with nodeid=%u", sender_id);
469
470 } else {
471 crm_notice("Fixing uname for peer with nodeid=%u", sender_id);
472 msg->sender.size = strlen(peer->uname);
473 memset(msg->sender.uname, 0, MAX_NAME);
474 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
475 }
476 }
477 }
478
479 crm_trace("Got new%s message (size=%d, %d, %d)",
480 msg->is_compressed ? " compressed" : "",
481 msg_data_len(msg), msg->size, msg->compressed_size);
482
483 if (kind != NULL) {
484 *kind = msg->header.id;
485 }
486 if (from != NULL) {
487 *from = msg->sender.uname;
488 }
489
490 if (msg->is_compressed && (msg->size > 0)) {
491 int rc = BZ_OK;
492 char *uncompressed = NULL;
493 unsigned int new_size = msg->size + 1;
494
495 if (!check_message_sanity(msg)) {
496 goto badmsg;
497 }
498
499 crm_trace("Decompressing message data");
500 uncompressed = pcmk__assert_alloc(1, new_size);
501 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
502 msg->compressed_size, 1, 0);
503
504 rc = pcmk__bzlib2rc(rc);
505
506 if (rc != pcmk_rc_ok) {
507 crm_err("Decompression failed: %s " CRM_XS " rc=%d",
508 pcmk_rc_str(rc), rc);
509 free(uncompressed);
510 goto badmsg;
511 }
512
513 CRM_ASSERT(new_size == msg->size);
514
515 data = uncompressed;
516
517 } else if (!check_message_sanity(msg)) {
518 goto badmsg;
519
520 } else {
521 data = strdup(msg->data);
522 }
523
524 // Is this necessary?
525 pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
527
528 crm_trace("Payload: %.200s", data);
529 return data;
530
531 badmsg:
532 crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
533 " min=%d, total=%d, size=%d, bz2_size=%d",
534 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
535 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
536 msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
537 msg->header.size, msg->size, msg->compressed_size);
538
539 free(data);
540 return NULL;
541}
542
554static int
555cmp_member_list_nodeid(const void *first, const void *second)
556{
557 const struct cpg_address *const a = *((const struct cpg_address **) first),
558 *const b = *((const struct cpg_address **) second);
559 if (a->nodeid < b->nodeid) {
560 return -1;
561 } else if (a->nodeid > b->nodeid) {
562 return 1;
563 }
564 /* don't bother with "reason" nor "pid" */
565 return 0;
566}
567
576static const char *
577cpgreason2str(cpg_reason_t reason)
578{
579 switch (reason) {
580 case CPG_REASON_JOIN: return " via cpg_join";
581 case CPG_REASON_LEAVE: return " via cpg_leave";
582 case CPG_REASON_NODEDOWN: return " via cluster exit";
583 case CPG_REASON_NODEUP: return " via cluster join";
584 case CPG_REASON_PROCDOWN: return " for unknown reason";
585 default: break;
586 }
587 return "";
588}
589
598static inline const char *
599peer_name(const crm_node_t *peer)
600{
601 if (peer == NULL) {
602 return "unknown node";
603 } else if (peer->uname == NULL) {
604 return "peer node";
605 } else {
606 return peer->uname;
607 }
608}
609
621static void
622node_left(const char *cpg_group_name, int event_counter,
623 uint32_t local_nodeid, const struct cpg_address *cpg_peer,
624 const struct cpg_address **sorted_member_list,
625 size_t member_list_entries)
626{
627 crm_node_t *peer =
628 pcmk__search_node_caches(cpg_peer->nodeid, NULL,
630 const struct cpg_address **rival = NULL;
631
632 /* Most CPG-related Pacemaker code assumes that only one process on a node
633 * can be in the process group, but Corosync does not impose this
634 * limitation, and more than one can be a member in practice due to a
635 * daemon attempting to start while another instance is already running.
636 *
637 * Check for any such duplicate instances, because we don't want to process
638 * their leaving as if our actual peer left. If the peer that left still has
639 * an entry in sorted_member_list (with a different PID), we will ignore the
640 * leaving.
641 *
642 * @TODO Track CPG members' PIDs so we can tell exactly who left.
643 */
644 if (peer != NULL) {
645 rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
646 sizeof(const struct cpg_address *),
647 cmp_member_list_nodeid);
648 }
649
650 if (rival == NULL) {
651 crm_info("Group %s event %d: %s (node %u pid %u) left%s",
652 cpg_group_name, event_counter, peer_name(peer),
653 cpg_peer->nodeid, cpg_peer->pid,
654 cpgreason2str(cpg_peer->reason));
655 if (peer != NULL) {
656 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
658 }
659 } else if (cpg_peer->nodeid == local_nodeid) {
660 crm_warn("Group %s event %d: duplicate local pid %u left%s",
661 cpg_group_name, event_counter,
662 cpg_peer->pid, cpgreason2str(cpg_peer->reason));
663 } else {
664 crm_warn("Group %s event %d: "
665 "%s (node %u) duplicate pid %u left%s (%u remains)",
666 cpg_group_name, event_counter, peer_name(peer),
667 cpg_peer->nodeid, cpg_peer->pid,
668 cpgreason2str(cpg_peer->reason), (*rival)->pid);
669 }
670}
671
688void
689pcmk__cpg_confchg_cb(cpg_handle_t handle,
690 const struct cpg_name *group_name,
691 const struct cpg_address *member_list,
692 size_t member_list_entries,
693 const struct cpg_address *left_list,
694 size_t left_list_entries,
695 const struct cpg_address *joined_list,
696 size_t joined_list_entries)
697{
698 static int counter = 0;
699
700 bool found = false;
701 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
702 const struct cpg_address **sorted = NULL;
703
704 sorted = pcmk__assert_alloc(member_list_entries,
705 sizeof(const struct cpg_address *));
706
707 for (size_t iter = 0; iter < member_list_entries; iter++) {
708 sorted[iter] = member_list + iter;
709 }
710
711 // So that the cross-matching of multiply-subscribed nodes is then cheap
712 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
713 cmp_member_list_nodeid);
714
715 for (int i = 0; i < left_list_entries; i++) {
716 node_left(group_name->value, counter, local_nodeid, &left_list[i],
717 sorted, member_list_entries);
718 }
719 free(sorted);
720 sorted = NULL;
721
722 for (int i = 0; i < joined_list_entries; i++) {
723 crm_info("Group %s event %d: node %u pid %u joined%s",
724 group_name->value, counter, joined_list[i].nodeid,
725 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
726 }
727
728 for (int i = 0; i < member_list_entries; i++) {
729 crm_node_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL,
731
732 if (member_list[i].nodeid == local_nodeid
733 && member_list[i].pid != getpid()) {
734 // See the note in node_left()
735 crm_warn("Group %s event %d: detected duplicate local pid %u",
736 group_name->value, counter, member_list[i].pid);
737 continue;
738 }
739 crm_info("Group %s event %d: %s (node %u pid %u) is member",
740 group_name->value, counter, peer_name(peer),
741 member_list[i].nodeid, member_list[i].pid);
742
743 /* If the caller left auto-reaping enabled, this will also update the
744 * state to member.
745 */
746 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
748
749 if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
750 /* The node is a CPG member, but we currently think it's not a
751 * cluster member. This is possible only if auto-reaping was
752 * disabled. The node may be joining, and we happened to get the CPG
753 * notification before the quorum notification; or the node may have
754 * just died, and we are processing its final messages; or a bug
755 * has affected the peer cache.
756 */
757 time_t now = time(NULL);
758
759 if (peer->when_lost == 0) {
760 // Track when we first got into this contradictory state
761 peer->when_lost = now;
762
763 } else if (now > (peer->when_lost + 60)) {
764 // If it persists for more than a minute, update the state
765 crm_warn("Node %u is member of group %s but was believed "
766 "offline",
767 member_list[i].nodeid, group_name->value);
768 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
769 }
770 }
771
772 if (local_nodeid == member_list[i].nodeid) {
773 found = true;
774 }
775 }
776
777 if (!found) {
778 crm_err("Local node was evicted from group %s", group_name->value);
779 cpg_evicted = true;
780 }
781
782 counter++;
783}
784
793int
794pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
795{
796 if (cluster == NULL) {
797 return EINVAL;
798 }
799 cluster->cpg.cpg_deliver_fn = fn;
800 return pcmk_rc_ok;
801}
802
811int
812pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
813{
814 if (cluster == NULL) {
815 return EINVAL;
816 }
817 cluster->cpg.cpg_confchg_fn = fn;
818 return pcmk_rc_ok;
819}
820
828int
830{
831 cs_error_t rc;
832 int fd = -1;
833 int retries = 0;
834 uint32_t id = 0;
835 crm_node_t *peer = NULL;
836 cpg_handle_t handle = 0;
837 const char *message_name = pcmk__message_name(crm_system_name);
838 uid_t found_uid = 0;
839 gid_t found_gid = 0;
840 pid_t found_pid = 0;
841 int rv;
842
843 struct mainloop_fd_callbacks cpg_fd_callbacks = {
844 .dispatch = pcmk_cpg_dispatch,
845 .destroy = cluster->destroy,
846 };
847
848 cpg_model_v1_data_t cpg_model_info = {
849 .model = CPG_MODEL_V1,
850 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
851 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
852 .cpg_totem_confchg_fn = NULL,
853 .flags = 0,
854 };
855
856 cpg_evicted = false;
857 cluster->group.length = 0;
858 cluster->group.value[0] = 0;
859
860 /* group.value is char[128] */
861 strncpy(cluster->group.value, message_name, 127);
862 cluster->group.value[127] = 0;
863 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
864
865 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
866 if (rc != CS_OK) {
867 crm_err("Could not connect to the CPG API: %s (%d)",
868 cs_strerror(rc), rc);
869 goto bail;
870 }
871
872 rc = cpg_fd_get(handle, &fd);
873 if (rc != CS_OK) {
874 crm_err("Could not obtain the CPG API connection: %s (%d)",
875 cs_strerror(rc), rc);
876 goto bail;
877 }
878
879 /* CPG provider run as root (in given user namespace, anyway)? */
880 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
881 &found_uid, &found_gid))) {
882 crm_err("CPG provider is not authentic:"
883 " process %lld (uid: %lld, gid: %lld)",
884 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
885 (long long) found_uid, (long long) found_gid);
886 rc = CS_ERR_ACCESS;
887 goto bail;
888 } else if (rv < 0) {
889 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
890 strerror(-rv), -rv);
891 rc = CS_ERR_ACCESS;
892 goto bail;
893 }
894
895 id = pcmk__cpg_local_nodeid(handle);
896 if (id == 0) {
897 crm_err("Could not get local node id from the CPG API");
898 goto bail;
899
900 }
901 cluster->nodeid = id;
902
903 retries = 0;
904 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
905 if (rc != CS_OK) {
906 crm_err("Could not join the CPG group '%s': %d", message_name, rc);
907 goto bail;
908 }
909
910 pcmk_cpg_handle = handle;
911 cluster->cpg_handle = handle;
912 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
913
914 bail:
915 if (rc != CS_OK) {
916 cpg_finalize(handle);
917 // @TODO Map rc to more specific Pacemaker return code
918 return ENOTCONN;
919 }
920
921 peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
923 return pcmk_rc_ok;
924}
925
932void
934{
935 pcmk_cpg_handle = 0;
936 if (cluster->cpg_handle != 0) {
937 crm_trace("Disconnecting CPG");
938 cpg_leave(cluster->cpg_handle, &cluster->group);
939 cpg_finalize(cluster->cpg_handle);
940 cluster->cpg_handle = 0;
941
942 } else {
943 crm_info("No CPG connection");
944 }
945}
946
958static bool
959send_cpg_text(const char *data, bool local, const crm_node_t *node,
960 enum crm_ais_msg_types dest)
961{
962 // @COMPAT Drop local argument when send_cluster_text is dropped
963 static int msg_id = 0;
964 static int local_pid = 0;
965 static int local_name_len = 0;
966 static const char *local_name = NULL;
967
968 char *target = NULL;
969 struct iovec *iov;
970 pcmk__cpg_msg_t *msg = NULL;
971
972 CRM_CHECK(dest != crm_msg_ais, return false);
973
974 if (local_name == NULL) {
975 local_name = pcmk__cluster_local_node_name();
976 }
977 if ((local_name_len == 0) && (local_name != NULL)) {
978 local_name_len = strlen(local_name);
979 }
980
981 if (data == NULL) {
982 data = "";
983 }
984
985 if (local_pid == 0) {
986 local_pid = getpid();
987 }
988
989 msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
990
991 msg_id++;
992 msg->id = msg_id;
993 msg->header.id = crm_class_cluster;
994 msg->header.error = CS_OK;
995
996 msg->host.type = dest;
997 msg->host.local = local;
998
999 if (node != NULL) {
1000 if (node->uname != NULL) {
1001 target = pcmk__str_copy(node->uname);
1002 msg->host.size = strlen(node->uname);
1003 memset(msg->host.uname, 0, MAX_NAME);
1004 memcpy(msg->host.uname, node->uname, msg->host.size);
1005
1006 } else {
1007 target = crm_strdup_printf("%u", node->id);
1008 }
1009 msg->host.id = node->id;
1010
1011 } else {
1012 target = pcmk__str_copy("all");
1013 }
1014
1015 msg->sender.id = 0;
1017 msg->sender.pid = local_pid;
1018 msg->sender.size = local_name_len;
1019 memset(msg->sender.uname, 0, MAX_NAME);
1020
1021 if ((local_name != NULL) && (msg->sender.size != 0)) {
1022 memcpy(msg->sender.uname, local_name, msg->sender.size);
1023 }
1024
1025 msg->size = 1 + strlen(data);
1026 msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
1027
1028 if (msg->size < CRM_BZ2_THRESHOLD) {
1029 msg = pcmk__realloc(msg, msg->header.size);
1030 memcpy(msg->data, data, msg->size);
1031
1032 } else {
1033 char *compressed = NULL;
1034 unsigned int new_size = 0;
1035
1036 if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
1037 &new_size) == pcmk_rc_ok) {
1038
1039 msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
1040 msg = pcmk__realloc(msg, msg->header.size);
1041 memcpy(msg->data, compressed, new_size);
1042
1043 msg->is_compressed = TRUE;
1044 msg->compressed_size = new_size;
1045
1046 } else {
1047 // cppcheck seems not to understand the abort logic in pcmk__realloc
1048 // cppcheck-suppress memleak
1049 msg = pcmk__realloc(msg, msg->header.size);
1050 memcpy(msg->data, data, msg->size);
1051 }
1052
1053 free(compressed);
1054 }
1055
1056 iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1057 iov->iov_base = msg;
1058 iov->iov_len = msg->header.size;
1059
1060 if (msg->compressed_size > 0) {
1061 crm_trace("Queueing CPG message %u to %s "
1062 "(%llu bytes, %d bytes compressed payload): %.200s",
1063 msg->id, target, (unsigned long long) iov->iov_len,
1064 msg->compressed_size, data);
1065 } else {
1066 crm_trace("Queueing CPG message %u to %s "
1067 "(%llu bytes, %d bytes payload): %.200s",
1068 msg->id, target, (unsigned long long) iov->iov_len,
1069 msg->size, data);
1070 }
1071
1072 free(target);
1073
1074 cs_message_queue = g_list_append(cs_message_queue, iov);
1075 crm_cs_flush(&pcmk_cpg_handle);
1076
1077 return true;
1078}
1079
1090bool
1091pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node,
1092 enum crm_ais_msg_types dest)
1093{
1094 bool rc = true;
1095 GString *data = g_string_sized_new(1024);
1096
1097 pcmk__xml_string(msg, 0, data, 0);
1098
1099 rc = send_cpg_text(data->str, false, node, dest);
1100 g_string_free(data, TRUE);
1101 return rc;
1102}
1103
1104// Deprecated functions kept only for backward API compatibility
1105// LCOV_EXCL_START
1106
1107#include <crm/cluster/compat.h>
1108
1109gboolean
1111{
1112 return pcmk__cpg_connect(cluster) == pcmk_rc_ok;
1113}
1114
1115void
1120
1121uint32_t
1122get_local_nodeid(cpg_handle_t handle)
1123{
1124 return pcmk__cpg_local_nodeid(handle);
1125}
1126
1127void
1128pcmk_cpg_membership(cpg_handle_t handle,
1129 const struct cpg_name *group_name,
1130 const struct cpg_address *member_list,
1131 size_t member_list_entries,
1132 const struct cpg_address *left_list,
1133 size_t left_list_entries,
1134 const struct cpg_address *joined_list,
1135 size_t joined_list_entries)
1136{
1137 pcmk__cpg_confchg_cb(handle, group_name, member_list, member_list_entries,
1138 left_list, left_list_entries,
1139 joined_list, joined_list_entries);
1140}
1141
1142gboolean
1143send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
1144 gboolean local, const crm_node_t *node,
1145 enum crm_ais_msg_types dest)
1146{
1147 switch (msg_class) {
1148 case crm_class_cluster:
1149 return send_cpg_text(data, local, node, dest);
1150 default:
1151 crm_err("Invalid message class: %d", msg_class);
1152 return FALSE;
1153 }
1154}
1155
1156char *
1157pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid,
1158 void *content, uint32_t *kind, const char **from)
1159{
1160 return pcmk__cpg_message_data(handle, nodeid, pid, content, kind, from);
1161}
1162
1164text2msg_type(const char *text)
1165{
1166 int type = crm_msg_none;
1167
1168 CRM_CHECK(text != NULL, return type);
1169 text = pcmk__message_name(text);
1170 if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1171 type = crm_msg_ais;
1172 } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1173 type = crm_msg_cib;
1174 } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1176 } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1177 type = crm_msg_te;
1178 } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1179 type = crm_msg_pe;
1180 } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1182 } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1184 } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1186 } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1188
1189 } else {
1190 /* This will normally be a transient client rather than
1191 * a cluster daemon. Set the type to the pid of the client
1192 */
1193 int scan_rc = sscanf(text, "%d", &type);
1194
1195 if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1196 /* Ensure it's sane */
1198 }
1199 }
1200 return type;
1201}
1202
1203// LCOV_EXCL_STOP
1204// End deprecated API
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
crm_node_t * pcmk__get_node(unsigned int id, const char *uname, const char *uuid, uint32_t flags)
Definition membership.c:890
@ pcmk__node_search_cluster_member
Search for cluster nodes from membership cache.
Definition internal.h:37
crm_node_t * pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags)
Definition membership.c:765
@ crm_proc_cpg
Definition internal.h:28
const char * pcmk__cluster_local_node_name(void)
Definition cluster.c:325
enum crm_ais_msg_types pcmk__cluster_parse_msg_type(const char *text)
Definition cluster.c:44
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
crm_ais_msg_types
Definition cluster.h:197
@ crm_msg_stonithd
Definition cluster.h:204
@ crm_msg_none
Definition cluster.h:198
@ crm_msg_cib
Definition cluster.h:201
@ crm_msg_pe
Definition cluster.h:206
@ crm_msg_attrd
Definition cluster.h:203
@ crm_msg_ais
Definition cluster.h:199
@ crm_msg_te
Definition cluster.h:205
@ crm_msg_stonith_ng
Definition cluster.h:207
@ crm_msg_crmd
Definition cluster.h:202
@ crm_msg_lrmd
Definition cluster.h:200
#define CRM_NODE_MEMBER
Definition cluster.h:49
crm_ais_msg_class
Definition cluster.h:189
@ crm_class_cluster
Definition cluster.h:190
#define pcmk__assert_alloc(nmemb, size)
Definition internal.h:297
struct tcp_async_cb_data __attribute__
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
Deprecated Pacemaker cluster API.
uint32_t compressed_size
Definition cpg.c:8
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, const crm_node_t *node, enum crm_ais_msg_types dest)
Definition cpg.c:1143
void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition cpg.c:689
pcmk__cpg_host_t host
Definition cpg.c:4
pcmk__cpg_host_t sender
Definition cpg.c:5
bool pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node, enum crm_ais_msg_types dest)
Definition cpg.c:1091
int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
Set the CPG deliver callback function for a cluster object.
Definition cpg.c:794
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
Definition cpg.c:75
int pcmk__cpg_connect(pcmk_cluster_t *cluster)
Connect to Corosync CPG.
Definition cpg.c:829
int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
Set the CPG config change callback function for a cluster object.
Definition cpg.c:812
#define CS_SEND_MAX
Definition cpg.c:191
#define msg_data_len(msg)
Definition cpg.c:79
enum crm_ais_msg_types type
Definition cpg.c:3
char uname[MAX_NAME]
Definition cpg.c:5
char data[0]
Definition cpg.c:10
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition cpg.c:1122
char * pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition cpg.c:432
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition cpg.c:1128
uint32_t size
Definition cpg.c:4
uint32_t id
Definition cpg.c:0
gboolean is_compressed
Definition cpg.c:2
enum crm_ais_msg_types text2msg_type(const char *text)
Definition cpg.c:1164
void pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
Definition cpg.c:933
gboolean local
Definition cpg.c:2
gboolean cluster_connect_cpg(pcmk_cluster_t *cluster)
Definition cpg.c:1110
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition cpg.c:1157
uint32_t pcmk__cpg_local_nodeid(cpg_handle_t handle)
Definition cpg.c:101
#define cs_repeat(rc, counter, max, code)
Definition cpg.c:81
uint32_t pid
Definition cpg.c:1
void cluster_disconnect_cpg(pcmk_cluster_t *cluster)
Definition cpg.c:1116
struct pcmk__cpg_host_s pcmk__cpg_host_t
Definition cpg.c:58
#define CRM_SYSTEM_CIB
Definition crm.h:89
#define CRM_SYSTEM_CRMD
Definition crm.h:90
#define CRM_SYSTEM_DC
Definition crm.h:87
#define CRM_SYSTEM_STONITHD
Definition crm.h:94
#define CRM_SYSTEM_LRMD
Definition crm.h:91
#define CRM_SYSTEM_TENGINE
Definition crm.h:93
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
Definition crm.h:79
char * crm_system_name
Definition utils.c:50
#define CRM_SYSTEM_PENGINE
Definition crm.h:92
IPC interface to Pacemaker daemons.
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
#define PCMK__SPECIAL_PID_AS_0(p)
#define crm_info(fmt, args...)
Definition logging.h:397
#define do_crm_log(level, fmt, args...)
Log a message.
Definition logging.h:181
#define crm_warn(fmt, args...)
Definition logging.h:392
#define CRM_XS
Definition logging.h:56
#define crm_notice(fmt, args...)
Definition logging.h:395
#define CRM_CHECK(expr, failure_action)
Definition logging.h:245
#define crm_debug(fmt, args...)
Definition logging.h:400
#define crm_err(fmt, args...)
Definition logging.h:389
#define crm_trace(fmt, args...)
Definition logging.h:402
#define LOG_TRACE
Definition logging.h:38
Wrappers for and extensions to glib mainloop.
#define G_PRIORITY_MEDIUM
Definition mainloop.h:192
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition mainloop.c:958
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition messages.c:171
#define PCMK_VALUE_OFFLINE
Definition options.h:183
#define PCMK_VALUE_ONLINE
Definition options.h:184
const char * target
Definition pcmk_fence.c:29
#define CRM_ASSERT(expr)
Definition results.h:42
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition results.c:501
@ pcmk_rc_ok
Definition results.h:162
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition results.c:906
#define pcmk__plural_s(i)
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition strings.c:837
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
Definition strings.c:1026
@ pcmk__str_casei
#define pcmk__str_copy(str)
uint32_t nodeid
Definition cluster.h:142
void(* destroy)(gpointer)
Definition cluster.h:146
char * uname
Definition cluster.h:88
uint32_t id
Definition cluster.h:120
time_t when_lost
Definition cluster.h:121
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
Definition mainloop.h:148
Wrappers for and extensions to libxml2.
#define CRM_BZ2_THRESHOLD
Definition xml_io.h:35
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition xml_io.c:488