corosync 3.1.5
totemsrp.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006-2018 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/*
37 * The first version of this code was based upon Yair Amir's PhD thesis:
38 * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39 *
40 * The current version of totemsrp implements the Totem protocol specified in:
41 * http://citeseer.ist.psu.edu/amir95totem.html
42 *
43 * The deviations from the above published protocols are:
44 * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45 * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46 */
47
48#include <config.h>
49
50#include <assert.h>
51#ifdef HAVE_ALLOCA_H
52#include <alloca.h>
53#endif
54#include <sys/mman.h>
55#include <sys/types.h>
56#include <sys/stat.h>
57#include <sys/socket.h>
58#include <netdb.h>
59#include <sys/un.h>
60#include <sys/ioctl.h>
61#include <sys/param.h>
62#include <netinet/in.h>
63#include <arpa/inet.h>
64#include <unistd.h>
65#include <fcntl.h>
66#include <stdlib.h>
67#include <stdio.h>
68#include <errno.h>
69#include <sched.h>
70#include <time.h>
71#include <sys/time.h>
72#include <sys/poll.h>
73#include <sys/uio.h>
74#include <limits.h>
75
76#include <qb/qblist.h>
77#include <qb/qbdefs.h>
78#include <qb/qbutil.h>
79#include <qb/qbloop.h>
80
81#include <corosync/swab.h>
82#include <corosync/sq.h>
83
84#define LOGSYS_UTILS_ONLY 1
85#include <corosync/logsys.h>
86
87#include "totemsrp.h"
88#include "totemnet.h"
89
90#include "icmap.h"
91#include "totemconfig.h"
92
93#include "cs_queue.h"
94
95#define LOCALHOST_IP inet_addr("127.0.0.1")
96#define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99#define MAXIOVS 5
100#define RETRANSMIT_ENTRIES_MAX 30
101#define TOKEN_SIZE_MAX 64000 /* bytes */
102#define LEAVE_DUMMY_NODEID 0
103
104/*
105 * SRP address.
106 */
107struct srp_addr {
108 unsigned int nodeid;
109};
110
111/*
112 * Rollover handling:
113 * SEQNO_START_MSG is the starting sequence number after a new configuration
114 * This should remain zero, unless testing overflow in which case
115 * 0x7ffff000 and 0xfffff000 are good starting values.
116 *
117 * SEQNO_START_TOKEN is the starting sequence number after a new configuration
118 * for a token. This should remain zero, unless testing overflow in which
119 * case 07fffff00 or 0xffffff00 are good starting values.
120 */
121#define SEQNO_START_MSG 0x0
122#define SEQNO_START_TOKEN 0x0
123
124/*
125 * These can be used ot test different rollover points
126 * #define SEQNO_START_MSG 0xfffffe00
127 * #define SEQNO_START_TOKEN 0xfffffe00
128 */
129
130/*
131 * These can be used to test the error recovery algorithms
132 * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
133 * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
134 * #define TEST_DROP_MCAST_PERCENTAGE 50
135 * #define TEST_RECOVERY_MSG_COUNT 300
136 */
137
138/*
139 * we compare incoming messages to determine if their endian is
140 * different - if so convert them
141 *
142 * do not change
143 */
144#define ENDIAN_LOCAL 0xff22
145
147 MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
148 MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
149 MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
150 MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
151 MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
152 MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
153};
154
159
160/*
161 * New membership algorithm local variables
162 */
165 int set;
166};
167
168
170 struct qb_list_head list;
171 int (*callback_fn) (enum totem_callback_token_type type, const void *);
173 int delete;
174 void *data;
175};
176
177
179 int mcast;
180 int token;
181};
182
183struct mcast {
186 unsigned int seq;
189 unsigned int node_id;
191} __attribute__((packed));
192
193
194struct rtr_item {
196 unsigned int seq;
197}__attribute__((packed));
198
199
200struct orf_token {
202 unsigned int seq;
203 unsigned int token_seq;
204 unsigned int aru;
205 unsigned int aru_addr;
207 unsigned int backlog;
208 unsigned int fcc;
212}__attribute__((packed));
213
214
215struct memb_join {
218 unsigned int proc_list_entries;
220 unsigned long long ring_seq;
221 unsigned char end_of_memb_join[0];
222/*
223 * These parts of the data structure are dynamic:
224 * struct srp_addr proc_list[];
225 * struct srp_addr failed_list[];
226 */
227} __attribute__((packed));
228
229
234} __attribute__((packed));
235
236
240} __attribute__((packed));
241
242
245 unsigned int aru;
246 unsigned int high_delivered;
247 unsigned int received_flg;
248}__attribute__((packed));
249
250
253 unsigned int token_seq;
255 unsigned int retrans_flg;
258 unsigned char end_of_commit_token[0];
259/*
260 * These parts of the data structure are dynamic:
261 *
262 * struct srp_addr addr[PROCESSOR_COUNT_MAX];
263 * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
264 */
265}__attribute__((packed));
266
268 struct mcast *mcast;
269 unsigned int msg_len;
271
273 struct mcast *mcast;
274 unsigned int msg_len;
275};
276
283
286
288
289 /*
290 * Flow control mcasts and remcasts on last and current orf_token
291 */
293
295
297
299
301
303
305
307
309
311
313
315
317
319
321
323
325
327
329
331
333
335
337
339
341
343
345
347
348 unsigned int my_last_aru;
349
351
353
355
356 unsigned int my_install_seq;
357
359
361
363
365
367
368 /*
369 * Queues used to order, deliver, and recover messages
370 */
372
374
376
378
380
381 /*
382 * Received up to and including
383 */
384 unsigned int my_aru;
385
386 unsigned int my_high_delivered;
387
389
390 struct qb_list_head token_callback_sent_listhead;
391
393
395
396 unsigned int my_token_seq;
397
398 /*
399 * Timers
400 */
401 qb_loop_timer_handle timer_pause_timeout;
402
403 qb_loop_timer_handle timer_orf_token_timeout;
404
405 qb_loop_timer_handle timer_orf_token_warning;
406
408
410
411 qb_loop_timer_handle timer_merge_detect_timeout;
412
414
416
418
419 qb_loop_timer_handle timer_heartbeat_timeout;
420
421 /*
422 * Function and data used to log messages
423 */
425
427
429
431
433
435
437
439 int level,
440 int subsys,
441 const char *function,
442 const char *file,
443 int line,
444 const char *format, ...)__attribute__((format(printf, 6, 7)));;
445
447
448//TODO struct srp_addr next_memb;
449
451
453
455 unsigned int nodeid,
456 const void *msg,
457 unsigned int msg_len,
458 int endian_conversion_required);
459
461 enum totem_configuration_type configuration_type,
462 const unsigned int *member_list, size_t member_list_entries,
463 const unsigned int *left_list, size_t left_list_entries,
464 const unsigned int *joined_list, size_t joined_list_entries,
465 const struct memb_ring_id *ring_id);
466
468
471
474 unsigned int nodeid);
475
477 const struct memb_ring_id *memb_ring_id,
478 unsigned int nodeid);
479
481
483
484 unsigned long long token_ring_id_seq;
485
486 unsigned int last_released;
487
488 unsigned int set_aru;
489
491
493
495
496 unsigned int my_last_seq;
497
498 struct timeval tv_old;
499
501
503
504 unsigned int use_heartbeat;
505
506 unsigned int my_trc;
507
508 unsigned int my_pbl;
509
510 unsigned int my_cbl;
511
513
515
517
519
521
523
525
527
531};
532
534 int count;
536 struct totemsrp_instance *instance,
537 const void *msg,
538 size_t msg_len,
539 int endian_conversion_needed);
540};
541
560};
561
562const char* gather_state_from_desc [] = {
563 [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
565 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
566 [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
567 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
568 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
569 [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
570 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
571 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
572 [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
573 [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
574 [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
575 [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
576 [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
577 [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
578 [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
579};
580
581/*
582 * forward decls
583 */
584static int message_handler_orf_token (
585 struct totemsrp_instance *instance,
586 const void *msg,
587 size_t msg_len,
588 int endian_conversion_needed);
589
590static int message_handler_mcast (
591 struct totemsrp_instance *instance,
592 const void *msg,
593 size_t msg_len,
594 int endian_conversion_needed);
595
596static int message_handler_memb_merge_detect (
597 struct totemsrp_instance *instance,
598 const void *msg,
599 size_t msg_len,
600 int endian_conversion_needed);
601
602static int message_handler_memb_join (
603 struct totemsrp_instance *instance,
604 const void *msg,
605 size_t msg_len,
606 int endian_conversion_needed);
607
608static int message_handler_memb_commit_token (
609 struct totemsrp_instance *instance,
610 const void *msg,
611 size_t msg_len,
612 int endian_conversion_needed);
613
614static int message_handler_token_hold_cancel (
615 struct totemsrp_instance *instance,
616 const void *msg,
617 size_t msg_len,
618 int endian_conversion_needed);
619
620static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
621
622static void srp_addr_to_nodeid (
623 struct totemsrp_instance *instance,
624 unsigned int *nodeid_out,
625 struct srp_addr *srp_addr_in,
626 unsigned int entries);
627
628static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
629
630static void memb_leave_message_send (struct totemsrp_instance *instance);
631
632static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
633static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
634static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
635static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
636 int fcc_mcasts_allowed);
637static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
638
639static void memb_ring_id_set (struct totemsrp_instance *instance,
640 const struct memb_ring_id *ring_id);
641static void target_set_completed (void *context);
642static void memb_state_commit_token_update (struct totemsrp_instance *instance);
643static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
644static int memb_state_commit_token_send (struct totemsrp_instance *instance);
645static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
646static void memb_state_commit_token_create (struct totemsrp_instance *instance);
647static int token_hold_cancel_send (struct totemsrp_instance *instance);
648static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
649static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
650static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
651static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
652static void memb_merge_detect_endian_convert (
653 const struct memb_merge_detect *in,
654 struct memb_merge_detect *out);
655static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
656static void timer_function_orf_token_timeout (void *data);
657static void timer_function_orf_token_warning (void *data);
658static void timer_function_pause_timeout (void *data);
659static void timer_function_heartbeat_timeout (void *data);
660static void timer_function_token_retransmit_timeout (void *data);
661static void timer_function_token_hold_retransmit_timeout (void *data);
662static void timer_function_merge_detect_timeout (void *data);
663static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
664static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
665static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
666
667void main_deliver_fn (
668 void *context,
669 const void *msg,
670 unsigned int msg_len,
671 const struct sockaddr_storage *system_from);
672
674 void *context,
675 const struct totem_ip_address *iface_address,
676 unsigned int iface_no);
677
679 6,
680 {
681 message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
682 message_handler_mcast, /* MESSAGE_TYPE_MCAST */
683 message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
684 message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
685 message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
686 message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
687 }
688};
689
690#define log_printf(level, format, args...) \
691do { \
692 instance->totemsrp_log_printf ( \
693 level, instance->totemsrp_subsys_id, \
694 __FUNCTION__, __FILE__, __LINE__, \
695 format, ##args); \
696} while (0);
697#define LOGSYS_PERROR(err_num, level, fmt, args...) \
698do { \
699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
701 instance->totemsrp_log_printf ( \
702 level, instance->totemsrp_subsys_id, \
703 __FUNCTION__, __FILE__, __LINE__, \
704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
705 } while(0)
706
707static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
708{
709 if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
710 return gather_state_from_desc[gsfrom];
711 }
712 else {
713 return "UNKNOWN";
714 }
715}
716
717static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
718{
719 memset (instance, 0, sizeof (struct totemsrp_instance));
720
721 qb_list_init (&instance->token_callback_received_listhead);
722
723 qb_list_init (&instance->token_callback_sent_listhead);
724
725 instance->my_received_flg = 1;
726
727 instance->my_token_seq = SEQNO_START_TOKEN - 1;
728
730
731 instance->set_aru = -1;
732
733 instance->my_aru = SEQNO_START_MSG;
734
736
738
739 instance->orf_token_discard = 0;
740
741 instance->originated_orf_token = 0;
742
743 instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
744
745 instance->waiting_trans_ack = 1;
746}
747
748static int pause_flush (struct totemsrp_instance *instance)
749{
750 uint64_t now_msec;
751 uint64_t timestamp_msec;
752 int res = 0;
753
754 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
755 timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
756
757 if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
759 "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
760 /*
761 * -1 indicates an error from recvmsg
762 */
763 do {
765 } while (res == -1);
766 }
767 return (res);
768}
769
770static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
771{
772 struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
773 uint32_t time_now;
774 unsigned long long nano_secs = qb_util_nano_current_get ();
775
776 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
777
779 /* incr latest token the index */
780 if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
781 instance->stats.latest_token = 0;
782 else
783 instance->stats.latest_token++;
784
785 if (instance->stats.earliest_token == instance->stats.latest_token) {
786 /* we have filled up the array, start overwriting */
787 if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
788 instance->stats.earliest_token = 0;
789 else
790 instance->stats.earliest_token++;
791
792 instance->stats.token[instance->stats.earliest_token].rx = 0;
793 instance->stats.token[instance->stats.earliest_token].tx = 0;
794 instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
795 }
796
797 instance->stats.token[instance->stats.latest_token].rx = time_now;
798 instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
799 } else {
800 instance->stats.token[instance->stats.latest_token].tx = time_now;
801 }
802 return 0;
803}
804
805static void totempg_mtu_changed(void *context, int net_mtu)
806{
807 struct totemsrp_instance *instance = context;
808
809 instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
810
812 "Net MTU changed to %d, new value is %d",
813 net_mtu, instance->totem_config->net_mtu);
814}
815
816/*
817 * Exported interfaces
818 */
820 qb_loop_t *poll_handle,
821 void **srp_context,
823 totempg_stats_t *stats,
824
825 void (*deliver_fn) (
826 unsigned int nodeid,
827 const void *msg,
828 unsigned int msg_len,
829 int endian_conversion_required),
830
831 void (*confchg_fn) (
832 enum totem_configuration_type configuration_type,
833 const unsigned int *member_list, size_t member_list_entries,
834 const unsigned int *left_list, size_t left_list_entries,
835 const unsigned int *joined_list, size_t joined_list_entries,
836 const struct memb_ring_id *ring_id),
837 void (*waiting_trans_ack_cb_fn) (
838 int waiting_trans_ack))
839{
840 struct totemsrp_instance *instance;
841 int res;
842
843 instance = malloc (sizeof (struct totemsrp_instance));
844 if (instance == NULL) {
845 goto error_exit;
846 }
847
848 totemsrp_instance_initialize (instance);
849
850 instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
852
853 stats->srp = &instance->stats;
854 instance->stats.latest_token = 0;
855 instance->stats.earliest_token = 0;
856
857 instance->totem_config = totem_config;
858
859 /*
860 * Configure logging
861 */
870
871 /*
872 * Configure totem store and load functions
873 */
876
877 /*
878 * Initialize local variables for totemsrp
879 */
881
882 /*
883 * Display totem configuration
884 */
886 "Token Timeout (%d ms) retransmit timeout (%d ms)",
889 uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100;
891 "Token warning every %d ms (%d%% of Token Timeout)",
892 token_warning_ms, totem_config->token_warning);
893 if (token_warning_ms < totem_config->token_retransmit_timeout)
895 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
896 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
897 token_warning_ms, totem_config->token_retransmit_timeout);
898 } else {
900 "Token warnings disabled");
901 }
903 "token hold (%d ms) retransmits before loss (%d retrans)",
906 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
910
913 "downcheck (%d ms) fail to recv const (%d msgs)",
916 "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
917
919 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
921
923 "missed count const (%d messages)",
925
927 "send threads (%d threads)", totem_config->threads);
928
930 "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
932 "max_network_delay (%d ms)", totem_config->max_network_delay);
933
934
935 cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
936 sizeof (struct message_item), instance->threaded_mode_enabled);
937
938 sq_init (&instance->regular_sort_queue,
939 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
940
941 sq_init (&instance->recovery_sort_queue,
942 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
943
944 instance->totemsrp_poll_handle = poll_handle;
945
946 instance->totemsrp_deliver_fn = deliver_fn;
947
948 instance->totemsrp_confchg_fn = confchg_fn;
949 instance->use_heartbeat = 1;
950
951 timer_function_pause_timeout (instance);
952
955 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
956 instance->use_heartbeat = 0;
957 }
958
959 if (instance->use_heartbeat) {
960 instance->heartbeat_timeout
963
964 if (instance->heartbeat_timeout >= totem_config->token_timeout) {
966 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
967 instance->heartbeat_timeout,
970 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
972 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
973 instance->use_heartbeat = 0;
974 }
975 else {
977 "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
978 }
979 }
980
981 res = totemnet_initialize (
982 poll_handle,
983 &instance->totemnet_context,
985 stats->srp,
986 instance,
989 totempg_mtu_changed,
990 target_set_completed);
991 if (res == -1) {
992 goto error_exit;
993 }
994
995 instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
996
997 /*
998 * Must have net_mtu adjusted by totemnet_initialize first
999 */
1000 cs_queue_init (&instance->new_message_queue,
1002 sizeof (struct message_item), instance->threaded_mode_enabled);
1003
1004 cs_queue_init (&instance->new_message_queue_trans,
1006 sizeof (struct message_item), instance->threaded_mode_enabled);
1007
1009 &instance->token_recv_event_handle,
1011 0,
1012 token_event_stats_collector,
1013 instance);
1015 &instance->token_sent_event_handle,
1017 0,
1018 token_event_stats_collector,
1019 instance);
1020 *srp_context = instance;
1021 return (0);
1022
1023error_exit:
1024 return (-1);
1025}
1026
1028 void *srp_context)
1029{
1030 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1031
1032 memb_leave_message_send (instance);
1034 cs_queue_free (&instance->new_message_queue);
1035 cs_queue_free (&instance->new_message_queue_trans);
1036 cs_queue_free (&instance->retrans_message_queue);
1037 sq_free (&instance->regular_sort_queue);
1038 sq_free (&instance->recovery_sort_queue);
1039 free (instance);
1040}
1041
1043 void *srp_context,
1044 unsigned int nodeid,
1045 struct totem_node_status *node_status)
1046{
1047 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1048 int i;
1049
1051
1052 /* Fill in 'reachable' here as the lower level UDP[u] layers don't know */
1053 for (i = 0; i < instance->my_proc_list_entries; i++) {
1054 if (instance->my_proc_list[i].nodeid == nodeid) {
1055 node_status->reachable = 1;
1056 }
1057 }
1058
1059 return totemnet_nodestatus_get(instance->totemnet_context, nodeid, node_status);
1060}
1061
1062
1063/*
1064 * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1065 * with interaces_size number of items. iface_count is final number of interfaces filled by this
1066 * function.
1067 *
1068 * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1069 * and if interface was not found, -1 is returned.
1070 */
1072 void *srp_context,
1073 unsigned int nodeid,
1074 unsigned int *interface_id,
1075 struct totem_ip_address *interfaces,
1076 unsigned int interfaces_size,
1077 char ***status,
1078 unsigned int *iface_count)
1079{
1080 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1081 struct totem_ip_address *iface_ptr = interfaces;
1082 int res = 0;
1083 int i,n;
1084 int num_ifs = 0;
1085
1086 memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1087 *iface_count = INTERFACE_MAX;
1088
1089 for (i=0; i<INTERFACE_MAX; i++) {
1090 for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1091 if (instance->totem_config->interfaces[i].configured &&
1092 instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) {
1093 memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1094 interface_id[num_ifs] = i;
1095 iface_ptr++;
1096 if (++num_ifs > interfaces_size) {
1097 res = -2;
1098 break;
1099 }
1100 }
1101 }
1102 }
1103
1104 totemnet_ifaces_get(instance->totemnet_context, status, iface_count);
1105 *iface_count = num_ifs;
1106 return (res);
1107}
1108
1110 void *srp_context,
1111 const char *cipher_type,
1112 const char *hash_type)
1113{
1114 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1115 int res;
1116
1117 res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type);
1118
1119 return (res);
1120}
1121
1122
1124 void *srp_context)
1125{
1126 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1127 unsigned int res;
1128
1129 res = instance->my_id.nodeid;
1130
1131 return (res);
1132}
1133
1135 void *srp_context)
1136{
1137 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1138 int res;
1139
1140 res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1141
1142 return (res);
1143}
1144
1145
1146/*
1147 * Set operations for use by the membership algorithm
1148 */
1149static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1150{
1151 if (a->nodeid == b->nodeid) {
1152 return 1;
1153 }
1154 return 0;
1155}
1156
1157static void srp_addr_to_nodeid (
1158 struct totemsrp_instance *instance,
1159 unsigned int *nodeid_out,
1160 struct srp_addr *srp_addr_in,
1161 unsigned int entries)
1162{
1163 unsigned int i;
1164
1165 for (i = 0; i < entries; i++) {
1166 nodeid_out[i] = srp_addr_in[i].nodeid;
1167 }
1168}
1169
1170static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1171{
1172 struct srp_addr res;
1173
1174 res.nodeid = swab32 (in.nodeid);
1175
1176 return (res);
1177}
1178
1179static void memb_consensus_reset (struct totemsrp_instance *instance)
1180{
1181 instance->consensus_list_entries = 0;
1182}
1183
1184static void memb_set_subtract (
1185 struct srp_addr *out_list, int *out_list_entries,
1186 struct srp_addr *one_list, int one_list_entries,
1187 struct srp_addr *two_list, int two_list_entries)
1188{
1189 int found = 0;
1190 int i;
1191 int j;
1192
1193 *out_list_entries = 0;
1194
1195 for (i = 0; i < one_list_entries; i++) {
1196 for (j = 0; j < two_list_entries; j++) {
1197 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1198 found = 1;
1199 break;
1200 }
1201 }
1202 if (found == 0) {
1203 out_list[*out_list_entries] = one_list[i];
1204 *out_list_entries = *out_list_entries + 1;
1205 }
1206 found = 0;
1207 }
1208}
1209
1210/*
1211 * Set consensus for a specific processor
1212 */
1213static void memb_consensus_set (
1214 struct totemsrp_instance *instance,
1215 const struct srp_addr *addr)
1216{
1217 int found = 0;
1218 int i;
1219
1220 for (i = 0; i < instance->consensus_list_entries; i++) {
1221 if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1222 found = 1;
1223 break; /* found entry */
1224 }
1225 }
1226 instance->consensus_list[i].addr = *addr;
1227 instance->consensus_list[i].set = 1;
1228 if (found == 0) {
1229 instance->consensus_list_entries++;
1230 }
1231 return;
1232}
1233
1234/*
1235 * Is consensus set for a specific processor
1236 */
1237static int memb_consensus_isset (
1238 struct totemsrp_instance *instance,
1239 const struct srp_addr *addr)
1240{
1241 int i;
1242
1243 for (i = 0; i < instance->consensus_list_entries; i++) {
1244 if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1245 return (instance->consensus_list[i].set);
1246 }
1247 }
1248 return (0);
1249}
1250
1251/*
1252 * Is consensus agreed upon based upon consensus database
1253 */
1254static int memb_consensus_agreed (
1255 struct totemsrp_instance *instance)
1256{
1257 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1258 int token_memb_entries = 0;
1259 int agreed = 1;
1260 int i;
1261
1262 memb_set_subtract (token_memb, &token_memb_entries,
1263 instance->my_proc_list, instance->my_proc_list_entries,
1264 instance->my_failed_list, instance->my_failed_list_entries);
1265
1266 for (i = 0; i < token_memb_entries; i++) {
1267 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1268 agreed = 0;
1269 break;
1270 }
1271 }
1272
1273 if (agreed && instance->failed_to_recv == 1) {
1274 /*
1275 * Both nodes agreed on our failure. We don't care how many proc list items left because we
1276 * will create single ring anyway.
1277 */
1278
1279 return (agreed);
1280 }
1281
1282 assert (token_memb_entries >= 1);
1283
1284 return (agreed);
1285}
1286
1287static void memb_consensus_notset (
1288 struct totemsrp_instance *instance,
1289 struct srp_addr *no_consensus_list,
1290 int *no_consensus_list_entries,
1291 struct srp_addr *comparison_list,
1292 int comparison_list_entries)
1293{
1294 int i;
1295
1296 *no_consensus_list_entries = 0;
1297
1298 for (i = 0; i < instance->my_proc_list_entries; i++) {
1299 if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1300 no_consensus_list[*no_consensus_list_entries] = instance->my_proc_list[i];
1301 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1302 }
1303 }
1304}
1305
1306/*
1307 * Is set1 equal to set2 Entries can be in different orders
1308 */
1309static int memb_set_equal (
1310 struct srp_addr *set1, int set1_entries,
1311 struct srp_addr *set2, int set2_entries)
1312{
1313 int i;
1314 int j;
1315
1316 int found = 0;
1317
1318 if (set1_entries != set2_entries) {
1319 return (0);
1320 }
1321 for (i = 0; i < set2_entries; i++) {
1322 for (j = 0; j < set1_entries; j++) {
1323 if (srp_addr_equal (&set1[j], &set2[i])) {
1324 found = 1;
1325 break;
1326 }
1327 }
1328 if (found == 0) {
1329 return (0);
1330 }
1331 found = 0;
1332 }
1333 return (1);
1334}
1335
1336/*
1337 * Is subset fully contained in fullset
1338 */
1339static int memb_set_subset (
1340 const struct srp_addr *subset, int subset_entries,
1341 const struct srp_addr *fullset, int fullset_entries)
1342{
1343 int i;
1344 int j;
1345 int found = 0;
1346
1347 if (subset_entries > fullset_entries) {
1348 return (0);
1349 }
1350 for (i = 0; i < subset_entries; i++) {
1351 for (j = 0; j < fullset_entries; j++) {
1352 if (srp_addr_equal (&subset[i], &fullset[j])) {
1353 found = 1;
1354 }
1355 }
1356 if (found == 0) {
1357 return (0);
1358 }
1359 found = 0;
1360 }
1361 return (1);
1362}
1363/*
1364 * merge subset into fullset taking care not to add duplicates
1365 */
1366static void memb_set_merge (
1367 const struct srp_addr *subset, int subset_entries,
1368 struct srp_addr *fullset, int *fullset_entries)
1369{
1370 int found = 0;
1371 int i;
1372 int j;
1373
1374 for (i = 0; i < subset_entries; i++) {
1375 for (j = 0; j < *fullset_entries; j++) {
1376 if (srp_addr_equal (&fullset[j], &subset[i])) {
1377 found = 1;
1378 break;
1379 }
1380 }
1381 if (found == 0) {
1382 fullset[*fullset_entries] = subset[i];
1383 *fullset_entries = *fullset_entries + 1;
1384 }
1385 found = 0;
1386 }
1387 return;
1388}
1389
1390static void memb_set_and_with_ring_id (
1391 struct srp_addr *set1,
1392 struct memb_ring_id *set1_ring_ids,
1393 int set1_entries,
1394 struct srp_addr *set2,
1395 int set2_entries,
1396 struct memb_ring_id *old_ring_id,
1397 struct srp_addr *and,
1398 int *and_entries)
1399{
1400 int i;
1401 int j;
1402 int found = 0;
1403
1404 *and_entries = 0;
1405
1406 for (i = 0; i < set2_entries; i++) {
1407 for (j = 0; j < set1_entries; j++) {
1408 if (srp_addr_equal (&set1[j], &set2[i])) {
1409 if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1410 found = 1;
1411 }
1412 break;
1413 }
1414 }
1415 if (found) {
1416 and[*and_entries] = set1[j];
1417 *and_entries = *and_entries + 1;
1418 }
1419 found = 0;
1420 }
1421 return;
1422}
1423
1424static void memb_set_log(
1425 struct totemsrp_instance *instance,
1426 int level,
1427 const char *string,
1428 struct srp_addr *list,
1429 int list_entries)
1430{
1431 char int_buf[32];
1432 char list_str[512];
1433 int i;
1434
1435 memset(list_str, 0, sizeof(list_str));
1436
1437 for (i = 0; i < list_entries; i++) {
1438 if (i == 0) {
1439 snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1440 } else {
1441 snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1442 }
1443
1444 if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1445 break ;
1446 }
1447 strcat(list_str, int_buf);
1448 }
1449
1450 log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1451}
1452
1453static void my_leave_memb_clear(
1454 struct totemsrp_instance *instance)
1455{
1456 memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1457 instance->my_leave_memb_entries = 0;
1458}
1459
1460static unsigned int my_leave_memb_match(
1461 struct totemsrp_instance *instance,
1462 unsigned int nodeid)
1463{
1464 int i;
1465 unsigned int ret = 0;
1466
1467 for (i = 0; i < instance->my_leave_memb_entries; i++){
1468 if (instance->my_leave_memb_list[i] == nodeid){
1469 ret = nodeid;
1470 break;
1471 }
1472 }
1473 return ret;
1474}
1475
1476static void my_leave_memb_set(
1477 struct totemsrp_instance *instance,
1478 unsigned int nodeid)
1479{
1480 int i, found = 0;
1481 for (i = 0; i < instance->my_leave_memb_entries; i++){
1482 if (instance->my_leave_memb_list[i] == nodeid){
1483 found = 1;
1484 break;
1485 }
1486 }
1487 if (found == 1) {
1488 return;
1489 }
1490 if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1491 instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1492 instance->my_leave_memb_entries++;
1493 } else {
1495 "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1496 }
1497}
1498
1499
1500static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1501{
1502 assert (instance != NULL);
1503 return totemnet_buffer_alloc (instance->totemnet_context);
1504}
1505
1506static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1507{
1508 assert (instance != NULL);
1510}
1511
1512static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1513{
1514 int32_t res;
1515
1516 qb_loop_timer_del (instance->totemsrp_poll_handle,
1518 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1519 QB_LOOP_MED,
1520 instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1521 (void *)instance,
1522 timer_function_token_retransmit_timeout,
1523 &instance->timer_orf_token_retransmit_timeout);
1524 if (res != 0) {
1525 log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1526 }
1527
1528}
1529
1530static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1531{
1532 int32_t res;
1533
1534 if (instance->my_merge_detect_timeout_outstanding == 0) {
1535 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1536 QB_LOOP_MED,
1537 instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1538 (void *)instance,
1539 timer_function_merge_detect_timeout,
1540 &instance->timer_merge_detect_timeout);
1541 if (res != 0) {
1542 log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1543 }
1544
1546 }
1547}
1548
1549static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1550{
1551 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1553}
1554
1555/*
1556 * ring_state_* is used to save and restore the sort queue
1557 * state when a recovery operation fails (and enters gather)
1558 */
1559static void old_ring_state_save (struct totemsrp_instance *instance)
1560{
1561 if (instance->old_ring_state_saved == 0) {
1562 instance->old_ring_state_saved = 1;
1563 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1564 sizeof (struct memb_ring_id));
1565 instance->old_ring_state_aru = instance->my_aru;
1568 "Saving state aru %x high seq received %x",
1569 instance->my_aru, instance->my_high_seq_received);
1570 }
1571}
1572
1573static void old_ring_state_restore (struct totemsrp_instance *instance)
1574{
1575 instance->my_aru = instance->old_ring_state_aru;
1578 "Restoring instance->my_aru %x my high seq received %x",
1579 instance->my_aru, instance->my_high_seq_received);
1580}
1581
1582static void old_ring_state_reset (struct totemsrp_instance *instance)
1583{
1585 "Resetting old ring state");
1586 instance->old_ring_state_saved = 0;
1587}
1588
1589static void reset_pause_timeout (struct totemsrp_instance *instance)
1590{
1591 int32_t res;
1592
1593 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1594 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1595 QB_LOOP_MED,
1596 instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1597 (void *)instance,
1598 timer_function_pause_timeout,
1599 &instance->timer_pause_timeout);
1600 if (res != 0) {
1601 log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1602 }
1603}
1604
1605static void reset_token_warning (struct totemsrp_instance *instance) {
1606 int32_t res;
1607
1608 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1609 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1610 QB_LOOP_MED,
1611 instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC,
1612 (void *)instance,
1613 timer_function_orf_token_warning,
1614 &instance->timer_orf_token_warning);
1615 if (res != 0) {
1616 log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1617 }
1618}
1619
1620static void reset_token_timeout (struct totemsrp_instance *instance) {
1621 int32_t res;
1622
1623 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1624 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1625 QB_LOOP_MED,
1626 instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1627 (void *)instance,
1628 timer_function_orf_token_timeout,
1629 &instance->timer_orf_token_timeout);
1630 if (res != 0) {
1631 log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1632 }
1633
1634 if (instance->totem_config->token_warning)
1635 reset_token_warning(instance);
1636}
1637
1638static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1639 int32_t res;
1640
1641 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1642 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1643 QB_LOOP_MED,
1644 instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1645 (void *)instance,
1646 timer_function_heartbeat_timeout,
1647 &instance->timer_heartbeat_timeout);
1648 if (res != 0) {
1649 log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1650 }
1651}
1652
1653
1654static void cancel_token_warning (struct totemsrp_instance *instance) {
1655 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1656}
1657
1658static void cancel_token_timeout (struct totemsrp_instance *instance) {
1659 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1660
1661 if (instance->totem_config->token_warning)
1662 cancel_token_warning(instance);
1663}
1664
1665static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1666 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1667}
1668
1669static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1670{
1671 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1672}
1673
1674static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1675{
1676 int32_t res;
1677
1678 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1679 QB_LOOP_MED,
1680 instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1681 (void *)instance,
1682 timer_function_token_hold_retransmit_timeout,
1683 &instance->timer_orf_token_hold_retransmit_timeout);
1684 if (res != 0) {
1685 log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1686 }
1687}
1688
1689static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1690{
1691 qb_loop_timer_del (instance->totemsrp_poll_handle,
1693}
1694
1695static void memb_state_consensus_timeout_expired (
1696 struct totemsrp_instance *instance)
1697{
1698 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1699 int no_consensus_list_entries;
1700
1701 instance->stats.consensus_timeouts++;
1702 if (memb_consensus_agreed (instance)) {
1703 memb_consensus_reset (instance);
1704
1705 memb_consensus_set (instance, &instance->my_id);
1706
1707 reset_token_timeout (instance); // REVIEWED
1708 } else {
1709 memb_consensus_notset (
1710 instance,
1711 no_consensus_list,
1712 &no_consensus_list_entries,
1713 instance->my_proc_list,
1714 instance->my_proc_list_entries);
1715
1716 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1717 instance->my_failed_list, &instance->my_failed_list_entries);
1718 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1719 }
1720}
1721
1722static void memb_join_message_send (struct totemsrp_instance *instance);
1723
1724static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1725
1726/*
1727 * Timers used for various states of the membership algorithm
1728 */
1729static void timer_function_pause_timeout (void *data)
1730{
1731 struct totemsrp_instance *instance = data;
1732
1733 instance->pause_timestamp = qb_util_nano_current_get ();
1734 reset_pause_timeout (instance);
1735}
1736
1737static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1738{
1739 old_ring_state_restore (instance);
1740 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1741 instance->stats.recovery_token_lost++;
1742}
1743
1744static void timer_function_orf_token_warning (void *data)
1745{
1746 struct totemsrp_instance *instance = data;
1747 uint64_t tv_diff;
1748
1749 /* need to protect against the case where token_warning is set to 0 dynamically */
1750 if (instance->totem_config->token_warning) {
1751 tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1752 instance->stats.token[instance->stats.latest_token].rx;
1754 "Token has not been received in %d ms ", (unsigned int) tv_diff);
1755 reset_token_warning(instance);
1756 } else {
1757 cancel_token_warning(instance);
1758 }
1759}
1760
1761static void timer_function_orf_token_timeout (void *data)
1762{
1763 struct totemsrp_instance *instance = data;
1764
1765 switch (instance->memb_state) {
1768 "The token was lost in the OPERATIONAL state.");
1770 "A processor failed, forming new configuration:"
1771 " token timed out (%ums), waiting %ums for consensus.",
1772 instance->totem_config->token_timeout,
1773 instance->totem_config->consensus_timeout);
1775 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1776 instance->stats.operational_token_lost++;
1777 break;
1778
1779 case MEMB_STATE_GATHER:
1781 "The consensus timeout expired (%ums).",
1782 instance->totem_config->consensus_timeout);
1783 memb_state_consensus_timeout_expired (instance);
1784 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1785 instance->stats.gather_token_lost++;
1786 break;
1787
1788 case MEMB_STATE_COMMIT:
1790 "The token was lost in the COMMIT state.");
1791 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1792 instance->stats.commit_token_lost++;
1793 break;
1794
1797 "The token was lost in the RECOVERY state.");
1798 memb_recovery_state_token_loss (instance);
1799 instance->orf_token_discard = 1;
1800 break;
1801 }
1802}
1803
1804static void timer_function_heartbeat_timeout (void *data)
1805{
1806 struct totemsrp_instance *instance = data;
1808 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1809 timer_function_orf_token_timeout(data);
1810}
1811
1812static void memb_timer_function_state_gather (void *data)
1813{
1814 struct totemsrp_instance *instance = data;
1815 int32_t res;
1816
1817 switch (instance->memb_state) {
1820 assert (0); /* this should never happen */
1821 break;
1822 case MEMB_STATE_GATHER:
1823 case MEMB_STATE_COMMIT:
1824 memb_join_message_send (instance);
1825
1826 /*
1827 * Restart the join timeout
1828 `*/
1829 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1830
1831 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1832 QB_LOOP_MED,
1833 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1834 (void *)instance,
1835 memb_timer_function_state_gather,
1836 &instance->memb_timer_state_gather_join_timeout);
1837
1838 if (res != 0) {
1839 log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1840 }
1841 break;
1842 }
1843}
1844
1845static void memb_timer_function_gather_consensus_timeout (void *data)
1846{
1847 struct totemsrp_instance *instance = data;
1848 memb_state_consensus_timeout_expired (instance);
1849}
1850
1851static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1852{
1853 unsigned int i;
1854 struct sort_queue_item *recovery_message_item;
1855 struct sort_queue_item regular_message_item;
1856 unsigned int range = 0;
1857 int res;
1858 void *ptr;
1859 struct mcast *mcast;
1860
1862 "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1863
1864 range = instance->my_aru - SEQNO_START_MSG;
1865 /*
1866 * Move messages from recovery to regular sort queue
1867 */
1868// todo should i be initialized to 0 or 1 ?
1869 for (i = 1; i <= range; i++) {
1870 res = sq_item_get (&instance->recovery_sort_queue,
1871 i + SEQNO_START_MSG, &ptr);
1872 if (res != 0) {
1873 continue;
1874 }
1875 recovery_message_item = ptr;
1876
1877 /*
1878 * Convert recovery message into regular message
1879 */
1880 mcast = recovery_message_item->mcast;
1882 /*
1883 * Message is a recovery message encapsulated
1884 * in a new ring message
1885 */
1886 regular_message_item.mcast =
1887 (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1888 regular_message_item.msg_len =
1889 recovery_message_item->msg_len - sizeof (struct mcast);
1890 mcast = regular_message_item.mcast;
1891 } else {
1892 /*
1893 * TODO this case shouldn't happen
1894 */
1895 continue;
1896 }
1897
1899 "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1900 (uint64_t)mcast->seq);
1901
1902 /*
1903 * Only add this message to the regular sort
1904 * queue if it was originated with the same ring
1905 * id as the previous ring
1906 */
1907 if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1908 sizeof (struct memb_ring_id)) == 0) {
1909
1910 res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1911 if (res == 0) {
1912 sq_item_add (&instance->regular_sort_queue,
1913 &regular_message_item, mcast->seq);
1914 if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1916 }
1917 }
1918 } else {
1920 "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1921 }
1922 }
1923}
1924
1925/*
1926 * Change states in the state machine of the membership algorithm
1927 */
1928static void memb_state_operational_enter (struct totemsrp_instance *instance)
1929{
1930 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1931 int joined_list_entries = 0;
1932 unsigned int aru_save;
1933 unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1934 unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1935 unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1936 unsigned int left_list[PROCESSOR_COUNT_MAX];
1937 unsigned int i;
1938 unsigned int res;
1939 char left_node_msg[1024];
1940 char joined_node_msg[1024];
1941 char failed_node_msg[1024];
1942
1943 instance->originated_orf_token = 0;
1944
1945 memb_consensus_reset (instance);
1946
1947 old_ring_state_reset (instance);
1948
1949 deliver_messages_from_recovery_to_regular (instance);
1950
1952 "Delivering to app %x to %x",
1953 instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1954
1955 aru_save = instance->my_aru;
1956 instance->my_aru = instance->old_ring_state_aru;
1957
1958 messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1959
1960 /*
1961 * Calculate joined and left list
1962 */
1963 memb_set_subtract (instance->my_left_memb_list,
1964 &instance->my_left_memb_entries,
1965 instance->my_memb_list, instance->my_memb_entries,
1966 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1967
1968 memb_set_subtract (joined_list, &joined_list_entries,
1969 instance->my_new_memb_list, instance->my_new_memb_entries,
1970 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1971
1972 /*
1973 * Install new membership
1974 */
1975 instance->my_memb_entries = instance->my_new_memb_entries;
1976 memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1977 sizeof (struct srp_addr) * instance->my_memb_entries);
1978 instance->last_released = 0;
1979 instance->my_set_retrans_flg = 0;
1980
1981 /*
1982 * Deliver transitional configuration to application
1983 */
1984 srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1985 instance->my_left_memb_entries);
1986 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1987 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1989 trans_memb_list_totemip, instance->my_trans_memb_entries,
1990 left_list, instance->my_left_memb_entries,
1991 0, 0, &instance->my_ring_id);
1992 instance->waiting_trans_ack = 1;
1994
1995// TODO we need to filter to ensure we only deliver those
1996// messages which are part of instance->my_deliver_memb
1997 messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1998
1999 instance->my_aru = aru_save;
2000
2001 /*
2002 * Deliver regular configuration to application
2003 */
2004 srp_addr_to_nodeid (instance, new_memb_list_totemip,
2005 instance->my_new_memb_list, instance->my_new_memb_entries);
2006 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
2007 joined_list_entries);
2009 new_memb_list_totemip, instance->my_new_memb_entries,
2010 0, 0,
2011 joined_list_totemip, joined_list_entries, &instance->my_ring_id);
2012
2013 /*
2014 * The recovery sort queue now becomes the regular
2015 * sort queue. It is necessary to copy the state
2016 * into the regular sort queue.
2017 */
2018 sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2019 instance->my_last_aru = SEQNO_START_MSG;
2020
2021 /* When making my_proc_list smaller, ensure that the
2022 * now non-used entries are zero-ed out. There are some suspect
2023 * assert's that assume that there is always 2 entries in the list.
2024 * These fail when my_proc_list is reduced to 1 entry (and the
2025 * valid [0] entry is the same as the 'unused' [1] entry).
2026 */
2027 memset(instance->my_proc_list, 0,
2028 sizeof (struct srp_addr) * instance->my_proc_list_entries);
2029
2030 instance->my_proc_list_entries = instance->my_new_memb_entries;
2031 memcpy (instance->my_proc_list, instance->my_new_memb_list,
2032 sizeof (struct srp_addr) * instance->my_memb_entries);
2033
2034 instance->my_failed_list_entries = 0;
2035 /*
2036 * TODO Not exactly to spec
2037 *
2038 * At the entry to this function all messages without a gap are
2039 * deliered.
2040 *
2041 * This code throw away messages from the last gap in the sort queue
2042 * to my_high_seq_received
2043 *
2044 * What should really happen is we should deliver all messages up to
2045 * a gap, then delier the transitional configuration, then deliver
2046 * the messages between the first gap and my_high_seq_received, then
2047 * deliver a regular configuration, then deliver the regular
2048 * configuration
2049 *
2050 * Unfortunately totempg doesn't appear to like this operating mode
2051 * which needs more inspection
2052 */
2053 i = instance->my_high_seq_received + 1;
2054 do {
2055 void *ptr;
2056
2057 i -= 1;
2058 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2059 if (i == 0) {
2060 break;
2061 }
2062 } while (res);
2063
2064 instance->my_high_delivered = i;
2065
2066 for (i = 0; i <= instance->my_high_delivered; i++) {
2067 void *ptr;
2068
2069 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2070 if (res == 0) {
2071 struct sort_queue_item *regular_message;
2072
2073 regular_message = ptr;
2074 free (regular_message->mcast);
2075 }
2076 }
2077 sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2078 instance->last_released = instance->my_high_delivered;
2079
2080 if (joined_list_entries) {
2081 int sptr = 0;
2082 sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2083 for (i=0; i< joined_list_entries; i++) {
2084 sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " " CS_PRI_NODE_ID, joined_list_totemip[i]);
2085 }
2086 }
2087 else {
2088 joined_node_msg[0] = '\0';
2089 }
2090
2091 if (instance->my_left_memb_entries) {
2092 int sptr = 0;
2093 int sptr2 = 0;
2094 sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2095 for (i=0; i< instance->my_left_memb_entries; i++) {
2096 sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " " CS_PRI_NODE_ID, left_list[i]);
2097 }
2098 for (i=0; i< instance->my_left_memb_entries; i++) {
2099 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2100 if (sptr2 == 0) {
2101 sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2102 }
2103 sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " " CS_PRI_NODE_ID, left_list[i]);
2104 }
2105 }
2106 if (sptr2 == 0) {
2107 failed_node_msg[0] = '\0';
2108 }
2109 }
2110 else {
2111 left_node_msg[0] = '\0';
2112 failed_node_msg[0] = '\0';
2113 }
2114
2115 my_leave_memb_clear(instance);
2116
2118 "entering OPERATIONAL state.");
2120 "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2121 instance->my_ring_id.rep,
2122 (uint64_t)instance->my_ring_id.seq,
2123 joined_node_msg,
2124 left_node_msg);
2125
2126 if (strlen(failed_node_msg)) {
2128 "Failed to receive the leave message.%s",
2129 failed_node_msg);
2130 }
2131
2133
2134 instance->stats.operational_entered++;
2135 instance->stats.continuous_gather = 0;
2136
2137 instance->my_received_flg = 1;
2138
2139 reset_pause_timeout (instance);
2140
2141 /*
2142 * Save ring id information from this configuration to determine
2143 * which processors are transitioning from old regular configuration
2144 * in to new regular configuration on the next configuration change
2145 */
2146 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2147 sizeof (struct memb_ring_id));
2148
2149 return;
2150}
2151
2152static void memb_state_gather_enter (
2153 struct totemsrp_instance *instance,
2154 enum gather_state_from gather_from)
2155{
2156 int32_t res;
2157
2158 instance->orf_token_discard = 1;
2159
2160 instance->originated_orf_token = 0;
2161
2162 memb_set_merge (
2163 &instance->my_id, 1,
2164 instance->my_proc_list, &instance->my_proc_list_entries);
2165
2166 memb_join_message_send (instance);
2167
2168 /*
2169 * Restart the join timeout
2170 */
2171 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2172
2173 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2174 QB_LOOP_MED,
2175 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2176 (void *)instance,
2177 memb_timer_function_state_gather,
2178 &instance->memb_timer_state_gather_join_timeout);
2179 if (res != 0) {
2180 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2181 }
2182
2183 /*
2184 * Restart the consensus timeout
2185 */
2186 qb_loop_timer_del (instance->totemsrp_poll_handle,
2188
2189 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2190 QB_LOOP_MED,
2191 instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2192 (void *)instance,
2193 memb_timer_function_gather_consensus_timeout,
2194 &instance->memb_timer_state_gather_consensus_timeout);
2195 if (res != 0) {
2196 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2197 }
2198
2199 /*
2200 * Cancel the token loss and token retransmission timeouts
2201 */
2202 cancel_token_retransmit_timeout (instance); // REVIEWED
2203 cancel_token_timeout (instance); // REVIEWED
2204 cancel_merge_detect_timeout (instance);
2205
2206 memb_consensus_reset (instance);
2207
2208 memb_consensus_set (instance, &instance->my_id);
2209
2211 "entering GATHER state from %d(%s).",
2212 gather_from, gsfrom_to_msg(gather_from));
2213
2214 instance->memb_state = MEMB_STATE_GATHER;
2215 instance->stats.gather_entered++;
2216
2218 /*
2219 * State 3 means gather, so we are continuously gathering.
2220 */
2221 instance->stats.continuous_gather++;
2222 }
2223
2224 return;
2225}
2226
2227static void timer_function_token_retransmit_timeout (void *data);
2228
2229static void target_set_completed (
2230 void *context)
2231{
2232 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2233
2234 memb_state_commit_token_send (instance);
2235
2236}
2237
2238static void memb_state_commit_enter (
2239 struct totemsrp_instance *instance)
2240{
2241 old_ring_state_save (instance);
2242
2243 memb_state_commit_token_update (instance);
2244
2245 memb_state_commit_token_target_set (instance);
2246
2247 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2248
2250
2251 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2252
2254
2255 memb_ring_id_set (instance, &instance->commit_token->ring_id);
2256
2257 instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2258
2259 instance->token_ring_id_seq = instance->my_ring_id.seq;
2260
2262 "entering COMMIT state.");
2263
2264 instance->memb_state = MEMB_STATE_COMMIT;
2265 reset_token_retransmit_timeout (instance); // REVIEWED
2266 reset_token_timeout (instance); // REVIEWED
2267
2268 instance->stats.commit_entered++;
2269 instance->stats.continuous_gather = 0;
2270
2271 /*
2272 * reset all flow control variables since we are starting a new ring
2273 */
2274 instance->my_trc = 0;
2275 instance->my_pbl = 0;
2276 instance->my_cbl = 0;
2277 /*
2278 * commit token sent after callback that token target has been set
2279 */
2280}
2281
2282static void memb_state_recovery_enter (
2283 struct totemsrp_instance *instance,
2285{
2286 int i;
2287 int local_received_flg = 1;
2288 unsigned int low_ring_aru;
2289 unsigned int range = 0;
2290 unsigned int messages_originated = 0;
2291 const struct srp_addr *addr;
2292 struct memb_commit_token_memb_entry *memb_list;
2293 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2294
2295 addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2296 memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2297
2299 "entering RECOVERY state.");
2300
2301 instance->orf_token_discard = 0;
2302
2303 instance->my_high_ring_delivered = 0;
2304
2305 sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2306 cs_queue_reinit (&instance->retrans_message_queue);
2307
2308 low_ring_aru = instance->old_ring_state_high_seq_received;
2309
2310 memb_state_commit_token_send_recovery (instance, commit_token);
2311
2312 instance->my_token_seq = SEQNO_START_TOKEN - 1;
2313
2314 /*
2315 * Build regular configuration
2316 */
2318 instance->totemnet_context,
2319 commit_token->addr_entries);
2320
2321 /*
2322 * Build transitional configuration
2323 */
2324 for (i = 0; i < instance->my_new_memb_entries; i++) {
2325 memcpy (&my_new_memb_ring_id_list[i],
2326 &memb_list[i].ring_id,
2327 sizeof (struct memb_ring_id));
2328 }
2329 memb_set_and_with_ring_id (
2330 instance->my_new_memb_list,
2331 my_new_memb_ring_id_list,
2332 instance->my_new_memb_entries,
2333 instance->my_memb_list,
2334 instance->my_memb_entries,
2335 &instance->my_old_ring_id,
2336 instance->my_trans_memb_list,
2337 &instance->my_trans_memb_entries);
2338
2339 for (i = 0; i < instance->my_trans_memb_entries; i++) {
2341 "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2342 }
2343 for (i = 0; i < instance->my_new_memb_entries; i++) {
2345 "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2347 "previous ringid (" CS_PRI_RING_ID ")",
2348 memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2349
2351 "aru %x high delivered %x received flag %d",
2352 memb_list[i].aru,
2353 memb_list[i].high_delivered,
2354 memb_list[i].received_flg);
2355
2356 // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2357 }
2358 /*
2359 * Determine if any received flag is false
2360 */
2361 for (i = 0; i < commit_token->addr_entries; i++) {
2362 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2363 instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2364
2365 memb_list[i].received_flg == 0) {
2366 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2367 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2368 sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2369 local_received_flg = 0;
2370 break;
2371 }
2372 }
2373 if (local_received_flg == 1) {
2374 goto no_originate;
2375 } /* Else originate messages if we should */
2376
2377 /*
2378 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2379 */
2380 for (i = 0; i < commit_token->addr_entries; i++) {
2381 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2382 instance->my_deliver_memb_list,
2383 instance->my_deliver_memb_entries) &&
2384
2385 memcmp (&instance->my_old_ring_id,
2386 &memb_list[i].ring_id,
2387 sizeof (struct memb_ring_id)) == 0) {
2388
2389 if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2390
2391 low_ring_aru = memb_list[i].aru;
2392 }
2393 if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2394 instance->my_high_ring_delivered = memb_list[i].high_delivered;
2395 }
2396 }
2397 }
2398
2399 /*
2400 * Copy all old ring messages to instance->retrans_message_queue
2401 */
2402 range = instance->old_ring_state_high_seq_received - low_ring_aru;
2403 if (range == 0) {
2404 /*
2405 * No messages to copy
2406 */
2407 goto no_originate;
2408 }
2409 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2410
2412 "copying all old ring messages from %x-%x.",
2413 low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2414
2415 for (i = 1; i <= range; i++) {
2418 void *ptr;
2419 int res;
2420
2421 res = sq_item_get (&instance->regular_sort_queue,
2422 low_ring_aru + i, &ptr);
2423 if (res != 0) {
2424 continue;
2425 }
2426 sort_queue_item = ptr;
2427 messages_originated++;
2428 memset (&message_item, 0, sizeof (struct message_item));
2429 // TODO LEAK
2430 message_item.mcast = totemsrp_buffer_alloc (instance);
2431 assert (message_item.mcast);
2432 memset(message_item.mcast, 0, sizeof (struct mcast));
2436 message_item.mcast->system_from = instance->my_id;
2438
2440 assert (message_item.mcast->header.nodeid);
2441 memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2442 sizeof (struct memb_ring_id));
2443 message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2444 memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2447 cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2448 }
2450 "Originated %d messages in RECOVERY.", messages_originated);
2451 goto originated;
2452
2453no_originate:
2455 "Did not need to originate any messages in recovery.");
2456
2457originated:
2458 instance->my_aru = SEQNO_START_MSG;
2459 instance->my_aru_count = 0;
2460 instance->my_seq_unchanged = 0;
2462 instance->my_install_seq = SEQNO_START_MSG;
2463 instance->last_released = SEQNO_START_MSG;
2464
2465 reset_token_timeout (instance); // REVIEWED
2466 reset_token_retransmit_timeout (instance); // REVIEWED
2467
2468 instance->memb_state = MEMB_STATE_RECOVERY;
2469 instance->stats.recovery_entered++;
2470 instance->stats.continuous_gather = 0;
2471
2472 return;
2473}
2474
2475void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2476{
2477 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2478
2479 token_hold_cancel_send (instance);
2480
2481 return;
2482}
2483
2485 void *srp_context,
2486 struct iovec *iovec,
2487 unsigned int iov_len,
2488 int guarantee)
2489{
2490 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2491 int i;
2493 char *addr;
2494 unsigned int addr_idx;
2495 struct cs_queue *queue_use;
2496
2497 if (instance->waiting_trans_ack) {
2498 queue_use = &instance->new_message_queue_trans;
2499 } else {
2500 queue_use = &instance->new_message_queue;
2501 }
2502
2503 if (cs_queue_is_full (queue_use)) {
2504 log_printf (instance->totemsrp_log_level_debug, "queue full");
2505 return (-1);
2506 }
2507
2508 memset (&message_item, 0, sizeof (struct message_item));
2509
2510 /*
2511 * Allocate pending item
2512 */
2513 message_item.mcast = totemsrp_buffer_alloc (instance);
2514 if (message_item.mcast == 0) {
2515 goto error_mcast;
2516 }
2517
2518 /*
2519 * Set mcast header
2520 */
2521 memset(message_item.mcast, 0, sizeof (struct mcast));
2526
2528 assert (message_item.mcast->header.nodeid);
2529
2531 message_item.mcast->system_from = instance->my_id;
2532
2533 addr = (char *)message_item.mcast;
2534 addr_idx = sizeof (struct mcast);
2535 for (i = 0; i < iov_len; i++) {
2536 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2537 addr_idx += iovec[i].iov_len;
2538 }
2539
2540 message_item.msg_len = addr_idx;
2541
2542 log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2543 instance->stats.mcast_tx++;
2544 cs_queue_item_add (queue_use, &message_item);
2545
2546 return (0);
2547
2548error_mcast:
2549 return (-1);
2550}
2551
2552/*
2553 * Determine if there is room to queue a new message
2554 */
2555int totemsrp_avail (void *srp_context)
2556{
2557 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2558 int avail;
2559 struct cs_queue *queue_use;
2560
2561 if (instance->waiting_trans_ack) {
2562 queue_use = &instance->new_message_queue_trans;
2563 } else {
2564 queue_use = &instance->new_message_queue;
2565 }
2566 cs_queue_avail (queue_use, &avail);
2567
2568 return (avail);
2569}
2570
2571/*
2572 * ORF Token Management
2573 */
2574/*
2575 * Recast message to mcast group if it is available
2576 */
2577static int orf_token_remcast (
2578 struct totemsrp_instance *instance,
2579 int seq)
2580{
2582 int res;
2583 void *ptr;
2584
2585 struct sq *sort_queue;
2586
2587 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2588 sort_queue = &instance->recovery_sort_queue;
2589 } else {
2590 sort_queue = &instance->regular_sort_queue;
2591 }
2592
2593 res = sq_in_range (sort_queue, seq);
2594 if (res == 0) {
2595 log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2596 return (-1);
2597 }
2598
2599 /*
2600 * Get RTR item at seq, if not available, return
2601 */
2602 res = sq_item_get (sort_queue, seq, &ptr);
2603 if (res != 0) {
2604 return -1;
2605 }
2606
2607 sort_queue_item = ptr;
2608
2610 instance->totemnet_context,
2613
2614 return (0);
2615}
2616
2617
2618/*
2619 * Free all freeable messages from ring
2620 */
2621static void messages_free (
2622 struct totemsrp_instance *instance,
2623 unsigned int token_aru)
2624{
2625 struct sort_queue_item *regular_message;
2626 unsigned int i;
2627 int res;
2628 int log_release = 0;
2629 unsigned int release_to;
2630 unsigned int range = 0;
2631
2632 release_to = token_aru;
2633 if (sq_lt_compare (instance->my_last_aru, release_to)) {
2634 release_to = instance->my_last_aru;
2635 }
2636 if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2637 release_to = instance->my_high_delivered;
2638 }
2639
2640 /*
2641 * Ensure we dont try release before an already released point
2642 */
2643 if (sq_lt_compare (release_to, instance->last_released)) {
2644 return;
2645 }
2646
2647 range = release_to - instance->last_released;
2648 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2649
2650 /*
2651 * Release retransmit list items if group aru indicates they are transmitted
2652 */
2653 for (i = 1; i <= range; i++) {
2654 void *ptr;
2655
2656 res = sq_item_get (&instance->regular_sort_queue,
2657 instance->last_released + i, &ptr);
2658 if (res == 0) {
2659 regular_message = ptr;
2660 totemsrp_buffer_release (instance, regular_message->mcast);
2661 }
2662 sq_items_release (&instance->regular_sort_queue,
2663 instance->last_released + i);
2664
2665 log_release = 1;
2666 }
2667 instance->last_released += range;
2668
2669 if (log_release) {
2671 "releasing messages up to and including %x", release_to);
2672 }
2673}
2674
2675static void update_aru (
2676 struct totemsrp_instance *instance)
2677{
2678 unsigned int i;
2679 int res;
2680 struct sq *sort_queue;
2681 unsigned int range;
2682 unsigned int my_aru_saved = 0;
2683
2684 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2685 sort_queue = &instance->recovery_sort_queue;
2686 } else {
2687 sort_queue = &instance->regular_sort_queue;
2688 }
2689
2690 range = instance->my_high_seq_received - instance->my_aru;
2691
2692 my_aru_saved = instance->my_aru;
2693 for (i = 1; i <= range; i++) {
2694
2695 void *ptr;
2696
2697 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2698 /*
2699 * If hole, stop updating aru
2700 */
2701 if (res != 0) {
2702 break;
2703 }
2704 }
2705 instance->my_aru += i - 1;
2706}
2707
2708/*
2709 * Multicasts pending messages onto the ring (requires orf_token possession)
2710 */
2711static int orf_token_mcast (
2712 struct totemsrp_instance *instance,
2713 struct orf_token *token,
2714 int fcc_mcasts_allowed)
2715{
2716 struct message_item *message_item = 0;
2717 struct cs_queue *mcast_queue;
2718 struct sq *sort_queue;
2720 struct mcast *mcast;
2721 unsigned int fcc_mcast_current;
2722
2723 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2724 mcast_queue = &instance->retrans_message_queue;
2725 sort_queue = &instance->recovery_sort_queue;
2726 reset_token_retransmit_timeout (instance); // REVIEWED
2727 } else {
2728 if (instance->waiting_trans_ack) {
2729 mcast_queue = &instance->new_message_queue_trans;
2730 } else {
2731 mcast_queue = &instance->new_message_queue;
2732 }
2733
2734 sort_queue = &instance->regular_sort_queue;
2735 }
2736
2737 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2738 if (cs_queue_is_empty (mcast_queue)) {
2739 break;
2740 }
2741 message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2742
2743 message_item->mcast->seq = ++token->seq;
2744 message_item->mcast->this_seqno = instance->global_seqno++;
2745
2746 /*
2747 * Build IO vector
2748 */
2749 memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2752
2754
2755 memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2756
2757 /*
2758 * Add message to retransmit queue
2759 */
2760 sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2761
2763 instance->totemnet_context,
2766
2767 /*
2768 * Delete item from pending queue
2769 */
2770 cs_queue_item_remove (mcast_queue);
2771
2772 /*
2773 * If messages mcasted, deliver any new messages to totempg
2774 */
2775 instance->my_high_seq_received = token->seq;
2776 }
2777
2778 update_aru (instance);
2779
2780 /*
2781 * Return 1 if more messages are available for single node clusters
2782 */
2783 return (fcc_mcast_current);
2784}
2785
2786/*
2787 * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2788 * Modify's orf_token's rtr to include retransmits required by this process
2789 */
2790static int orf_token_rtr (
2791 struct totemsrp_instance *instance,
2792 struct orf_token *orf_token,
2793 unsigned int *fcc_allowed)
2794{
2795 unsigned int res;
2796 unsigned int i, j;
2797 unsigned int found;
2798 struct sq *sort_queue;
2799 struct rtr_item *rtr_list;
2800 unsigned int range = 0;
2801 char retransmit_msg[1024];
2802 char value[64];
2803
2804 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2805 sort_queue = &instance->recovery_sort_queue;
2806 } else {
2807 sort_queue = &instance->regular_sort_queue;
2808 }
2809
2811
2812 strcpy (retransmit_msg, "Retransmit List: ");
2815 "Retransmit List %d", orf_token->rtr_list_entries);
2816 for (i = 0; i < orf_token->rtr_list_entries; i++) {
2817 sprintf (value, "%x ", rtr_list[i].seq);
2818 strcat (retransmit_msg, value);
2819 }
2820 strcat (retransmit_msg, "");
2822 "%s", retransmit_msg);
2823 }
2824
2825 /*
2826 * Retransmit messages on orf_token's RTR list from RTR queue
2827 */
2828 for (instance->fcc_remcast_current = 0, i = 0;
2829 instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2830
2831 /*
2832 * If this retransmit request isn't from this configuration,
2833 * try next rtr entry
2834 */
2835 if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2836 sizeof (struct memb_ring_id)) != 0) {
2837
2838 i += 1;
2839 continue;
2840 }
2841
2842 res = orf_token_remcast (instance, rtr_list[i].seq);
2843 if (res == 0) {
2844 /*
2845 * Multicasted message, so no need to copy to new retransmit list
2846 */
2848 assert (orf_token->rtr_list_entries >= 0);
2849 memmove (&rtr_list[i], &rtr_list[i + 1],
2850 sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2851
2852 instance->stats.mcast_retx++;
2853 instance->fcc_remcast_current++;
2854 } else {
2855 i += 1;
2856 }
2857 }
2858 *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2859
2860 /*
2861 * Add messages to retransmit to RTR list
2862 * but only retry if there is room in the retransmit list
2863 */
2864
2865 range = orf_token->seq - instance->my_aru;
2866 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2867
2869 (i <= range); i++) {
2870
2871 /*
2872 * Ensure message is within the sort queue range
2873 */
2874 res = sq_in_range (sort_queue, instance->my_aru + i);
2875 if (res == 0) {
2876 break;
2877 }
2878
2879 /*
2880 * Find if a message is missing from this processor
2881 */
2882 res = sq_item_inuse (sort_queue, instance->my_aru + i);
2883 if (res == 0) {
2884 /*
2885 * Determine how many times we have missed receiving
2886 * this sequence number. sq_item_miss_count increments
2887 * a counter for the sequence number. The miss count
2888 * will be returned and compared. This allows time for
2889 * delayed multicast messages to be received before
2890 * declaring the message is missing and requesting a
2891 * retransmit.
2892 */
2893 res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2894 if (res < instance->totem_config->miss_count_const) {
2895 continue;
2896 }
2897
2898 /*
2899 * Determine if missing message is already in retransmit list
2900 */
2901 found = 0;
2902 for (j = 0; j < orf_token->rtr_list_entries; j++) {
2903 if (instance->my_aru + i == rtr_list[j].seq) {
2904 found = 1;
2905 }
2906 }
2907 if (found == 0) {
2908 /*
2909 * Missing message not found in current retransmit list so add it
2910 */
2912 &instance->my_ring_id, sizeof (struct memb_ring_id));
2913 rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2915 }
2916 }
2917 }
2918 return (instance->fcc_remcast_current);
2919}
2920
2921static void token_retransmit (struct totemsrp_instance *instance)
2922{
2924 instance->orf_token_retransmit,
2925 instance->orf_token_retransmit_size);
2926}
2927
2928/*
2929 * Retransmit the regular token if no mcast or token has
2930 * been received in retransmit token period retransmit
2931 * the token to the next processor
2932 */
2933static void timer_function_token_retransmit_timeout (void *data)
2934{
2935 struct totemsrp_instance *instance = data;
2936
2937 switch (instance->memb_state) {
2938 case MEMB_STATE_GATHER:
2939 break;
2940 case MEMB_STATE_COMMIT:
2943 token_retransmit (instance);
2944 reset_token_retransmit_timeout (instance); // REVIEWED
2945 break;
2946 }
2947}
2948
2949static void timer_function_token_hold_retransmit_timeout (void *data)
2950{
2951 struct totemsrp_instance *instance = data;
2952
2953 switch (instance->memb_state) {
2954 case MEMB_STATE_GATHER:
2955 break;
2956 case MEMB_STATE_COMMIT:
2957 break;
2960 token_retransmit (instance);
2961 break;
2962 }
2963}
2964
2965static void timer_function_merge_detect_timeout(void *data)
2966{
2967 struct totemsrp_instance *instance = data;
2968
2970
2971 switch (instance->memb_state) {
2973 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2974 memb_merge_detect_transmit (instance);
2975 }
2976 break;
2977 case MEMB_STATE_GATHER:
2978 case MEMB_STATE_COMMIT:
2980 break;
2981 }
2982}
2983
2984/*
2985 * Send orf_token to next member (requires orf_token)
2986 */
2987static int token_send (
2988 struct totemsrp_instance *instance,
2989 struct orf_token *orf_token,
2990 int forward_token)
2991{
2992 int res = 0;
2993 unsigned int orf_token_size;
2994
2995 orf_token_size = sizeof (struct orf_token) +
2996 (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2997
2998 orf_token->header.nodeid = instance->my_id.nodeid;
2999 memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
3000 instance->orf_token_retransmit_size = orf_token_size;
3001 assert (orf_token->header.nodeid);
3002
3003 if (forward_token == 0) {
3004 return (0);
3005 }
3006
3008 orf_token,
3009 orf_token_size);
3010
3011 return (res);
3012}
3013
3014static int token_hold_cancel_send (struct totemsrp_instance *instance)
3015{
3017
3018 /*
3019 * Only cancel if the token is currently held
3020 */
3021 if (instance->my_token_held == 0) {
3022 return (0);
3023 }
3024 instance->my_token_held = 0;
3025
3026 /*
3027 * Build message
3028 */
3034 memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3035 sizeof (struct memb_ring_id));
3037
3038 instance->stats.token_hold_cancel_tx++;
3039
3041 sizeof (struct token_hold_cancel));
3042
3043 return (0);
3044}
3045
3046static int orf_token_send_initial (struct totemsrp_instance *instance)
3047{
3048 struct orf_token orf_token;
3049 int res;
3050
3055 orf_token.header.nodeid = instance->my_id.nodeid;
3056 assert (orf_token.header.nodeid);
3060 instance->my_set_retrans_flg = 1;
3061 instance->stats.orf_token_tx++;
3062
3063 if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3065 instance->my_set_retrans_flg = 0;
3066 } else {
3068 instance->my_set_retrans_flg = 1;
3069 }
3070
3071 orf_token.aru = 0;
3073 orf_token.aru_addr = instance->my_id.nodeid;
3074
3075 memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3076 orf_token.fcc = 0;
3077 orf_token.backlog = 0;
3078
3080
3081 res = token_send (instance, &orf_token, 1);
3082
3083 return (res);
3084}
3085
3086static void memb_state_commit_token_update (
3087 struct totemsrp_instance *instance)
3088{
3089 struct srp_addr *addr;
3090 struct memb_commit_token_memb_entry *memb_list;
3091 unsigned int high_aru;
3092 unsigned int i;
3093
3094 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3095 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3096
3097 memcpy (instance->my_new_memb_list, addr,
3098 sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3099
3100 instance->my_new_memb_entries = instance->commit_token->addr_entries;
3101
3102 memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3103 &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3104
3105 memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3106 /*
3107 * TODO high delivered is really instance->my_aru, but with safe this
3108 * could change?
3109 */
3110 instance->my_received_flg =
3111 (instance->my_aru == instance->my_high_seq_received);
3112
3113 memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3114
3115 memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3116 /*
3117 * find high aru up to current memb_index for all matching ring ids
3118 * if any ring id matching memb_index has aru less then high aru set
3119 * received flag for that entry to false
3120 */
3121 high_aru = memb_list[instance->commit_token->memb_index].aru;
3122 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3123 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3124 &memb_list[i].ring_id,
3125 sizeof (struct memb_ring_id)) == 0) {
3126
3127 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3128 high_aru = memb_list[i].aru;
3129 }
3130 }
3131 }
3132
3133 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3134 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3135 &memb_list[i].ring_id,
3136 sizeof (struct memb_ring_id)) == 0) {
3137
3138 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3139 memb_list[i].received_flg = 0;
3140 if (i == instance->commit_token->memb_index) {
3141 instance->my_received_flg = 0;
3142 }
3143 }
3144 }
3145 }
3146
3147 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3148 instance->commit_token->memb_index += 1;
3149 assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3150 assert (instance->commit_token->header.nodeid);
3151}
3152
3153static void memb_state_commit_token_target_set (
3154 struct totemsrp_instance *instance)
3155{
3156 struct srp_addr *addr;
3157
3158 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3159
3160 /* Totemnet just looks at the node id */
3162 instance->totemnet_context,
3163 addr[instance->commit_token->memb_index %
3164 instance->commit_token->addr_entries].nodeid);
3165}
3166
3167static int memb_state_commit_token_send_recovery (
3168 struct totemsrp_instance *instance,
3169 struct memb_commit_token *commit_token)
3170{
3171 unsigned int commit_token_size;
3172
3173 commit_token->token_seq++;
3174 commit_token->header.nodeid = instance->my_id.nodeid;
3175 commit_token_size = sizeof (struct memb_commit_token) +
3176 ((sizeof (struct srp_addr) +
3177 sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3178 /*
3179 * Make a copy for retransmission if necessary
3180 */
3181 memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3182 instance->orf_token_retransmit_size = commit_token_size;
3183
3184 instance->stats.memb_commit_token_tx++;
3185
3187 commit_token,
3188 commit_token_size);
3189
3190 /*
3191 * Request retransmission of the commit token in case it is lost
3192 */
3193 reset_token_retransmit_timeout (instance);
3194 return (0);
3195}
3196
3197static int memb_state_commit_token_send (
3198 struct totemsrp_instance *instance)
3199{
3200 unsigned int commit_token_size;
3201
3202 instance->commit_token->token_seq++;
3203 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3204 commit_token_size = sizeof (struct memb_commit_token) +
3205 ((sizeof (struct srp_addr) +
3206 sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3207 /*
3208 * Make a copy for retransmission if necessary
3209 */
3210 memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3211 instance->orf_token_retransmit_size = commit_token_size;
3212
3213 instance->stats.memb_commit_token_tx++;
3214
3216 instance->commit_token,
3217 commit_token_size);
3218
3219 /*
3220 * Request retransmission of the commit token in case it is lost
3221 */
3222 reset_token_retransmit_timeout (instance);
3223 return (0);
3224}
3225
3226
3227static int memb_lowest_in_config (struct totemsrp_instance *instance)
3228{
3229 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3230 int token_memb_entries = 0;
3231 int i;
3232 unsigned int lowest_nodeid;
3233
3234 memb_set_subtract (token_memb, &token_memb_entries,
3235 instance->my_proc_list, instance->my_proc_list_entries,
3236 instance->my_failed_list, instance->my_failed_list_entries);
3237
3238 /*
3239 * find representative by searching for smallest identifier
3240 */
3241 assert(token_memb_entries > 0);
3242
3243 lowest_nodeid = token_memb[0].nodeid;
3244 for (i = 1; i < token_memb_entries; i++) {
3245 if (lowest_nodeid > token_memb[i].nodeid) {
3246 lowest_nodeid = token_memb[i].nodeid;
3247 }
3248 }
3249 return (lowest_nodeid == instance->my_id.nodeid);
3250}
3251
3252static int srp_addr_compare (const void *a, const void *b)
3253{
3254 const struct srp_addr *srp_a = (const struct srp_addr *)a;
3255 const struct srp_addr *srp_b = (const struct srp_addr *)b;
3256
3257 if (srp_a->nodeid < srp_b->nodeid) {
3258 return -1;
3259 } else if (srp_a->nodeid > srp_b->nodeid) {
3260 return 1;
3261 } else {
3262 return 0;
3263 }
3264}
3265
3266static void memb_state_commit_token_create (
3267 struct totemsrp_instance *instance)
3268{
3269 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3270 struct srp_addr *addr;
3271 struct memb_commit_token_memb_entry *memb_list;
3272 int token_memb_entries = 0;
3273
3275 "Creating commit token because I am the rep.");
3276
3277 memb_set_subtract (token_memb, &token_memb_entries,
3278 instance->my_proc_list, instance->my_proc_list_entries,
3279 instance->my_failed_list, instance->my_failed_list_entries);
3280
3281 memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3285 instance->commit_token->header.encapsulated = 0;
3286 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3287 assert (instance->commit_token->header.nodeid);
3288
3289 instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3290 instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3291
3292 /*
3293 * This qsort is necessary to ensure the commit token traverses
3294 * the ring in the proper order
3295 */
3296 qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3297 srp_addr_compare);
3298
3299 instance->commit_token->memb_index = 0;
3300 instance->commit_token->addr_entries = token_memb_entries;
3301
3302 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3303 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3304
3305 memcpy (addr, token_memb,
3306 token_memb_entries * sizeof (struct srp_addr));
3307 memset (memb_list, 0,
3308 sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3309}
3310
3311static void memb_join_message_send (struct totemsrp_instance *instance)
3312{
3313 char memb_join_data[40000];
3314 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3315 char *addr;
3316 unsigned int addr_idx;
3317 size_t msg_len;
3318
3323 memb_join->header.nodeid = instance->my_id.nodeid;
3324 assert (memb_join->header.nodeid);
3325
3326 msg_len = sizeof(struct memb_join) +
3327 ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3328
3329 if (msg_len > sizeof(memb_join_data)) {
3331 "memb_join_message too long. Ignoring message.");
3332
3333 return ;
3334 }
3335
3336 memb_join->ring_seq = instance->my_ring_id.seq;
3339 memb_join->system_from = instance->my_id;
3340
3341 /*
3342 * This mess adds the joined and failed processor lists into the join
3343 * message
3344 */
3345 addr = (char *)memb_join;
3346 addr_idx = sizeof (struct memb_join);
3347 memcpy (&addr[addr_idx],
3348 instance->my_proc_list,
3349 instance->my_proc_list_entries *
3350 sizeof (struct srp_addr));
3351 addr_idx +=
3352 instance->my_proc_list_entries *
3353 sizeof (struct srp_addr);
3354 memcpy (&addr[addr_idx],
3355 instance->my_failed_list,
3356 instance->my_failed_list_entries *
3357 sizeof (struct srp_addr));
3358 addr_idx +=
3359 instance->my_failed_list_entries *
3360 sizeof (struct srp_addr);
3361
3362 if (instance->totem_config->send_join_timeout) {
3363 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3364 }
3365
3366 instance->stats.memb_join_tx++;
3367
3369 instance->totemnet_context,
3370 memb_join,
3371 addr_idx);
3372}
3373
3374static void memb_leave_message_send (struct totemsrp_instance *instance)
3375{
3376 char memb_join_data[40000];
3377 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3378 char *addr;
3379 unsigned int addr_idx;
3380 int active_memb_entries;
3381 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3382 size_t msg_len;
3383
3385 "sending join/leave message");
3386
3387 /*
3388 * add us to the failed list, and remove us from
3389 * the members list
3390 */
3391 memb_set_merge(
3392 &instance->my_id, 1,
3393 instance->my_failed_list, &instance->my_failed_list_entries);
3394
3395 memb_set_subtract (active_memb, &active_memb_entries,
3396 instance->my_proc_list, instance->my_proc_list_entries,
3397 &instance->my_id, 1);
3398
3399 msg_len = sizeof(struct memb_join) +
3400 ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3401
3402 if (msg_len > sizeof(memb_join_data)) {
3404 "memb_leave message too long. Ignoring message.");
3405
3406 return ;
3407 }
3408
3414
3415 memb_join->ring_seq = instance->my_ring_id.seq;
3416 memb_join->proc_list_entries = active_memb_entries;
3418 memb_join->system_from = instance->my_id;
3419
3420 // TODO: CC Maybe use the actual join send routine.
3421 /*
3422 * This mess adds the joined and failed processor lists into the join
3423 * message
3424 */
3425 addr = (char *)memb_join;
3426 addr_idx = sizeof (struct memb_join);
3427 memcpy (&addr[addr_idx],
3428 active_memb,
3429 active_memb_entries *
3430 sizeof (struct srp_addr));
3431 addr_idx +=
3432 active_memb_entries *
3433 sizeof (struct srp_addr);
3434 memcpy (&addr[addr_idx],
3435 instance->my_failed_list,
3436 instance->my_failed_list_entries *
3437 sizeof (struct srp_addr));
3438 addr_idx +=
3439 instance->my_failed_list_entries *
3440 sizeof (struct srp_addr);
3441
3442
3443 if (instance->totem_config->send_join_timeout) {
3444 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3445 }
3446 instance->stats.memb_join_tx++;
3447
3449 instance->totemnet_context,
3450 memb_join,
3451 addr_idx);
3452}
3453
3454static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3455{
3457
3464 memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3465 sizeof (struct memb_ring_id));
3467
3468 instance->stats.memb_merge_detect_tx++;
3471 sizeof (struct memb_merge_detect));
3472}
3473
3474static void memb_ring_id_set (
3475 struct totemsrp_instance *instance,
3476 const struct memb_ring_id *ring_id)
3477{
3478
3479 memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3480}
3481
3483 void *srp_context,
3484 void **handle_out,
3486 int delete,
3487 int (*callback_fn) (enum totem_callback_token_type type, const void *),
3488 const void *data)
3489{
3490 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3491 struct token_callback_instance *callback_handle;
3492
3493 token_hold_cancel_send (instance);
3494
3495 callback_handle = malloc (sizeof (struct token_callback_instance));
3496 if (callback_handle == 0) {
3497 return (-1);
3498 }
3499 *handle_out = (void *)callback_handle;
3500 qb_list_init (&callback_handle->list);
3501 callback_handle->callback_fn = callback_fn;
3502 callback_handle->data = (void *) data;
3503 callback_handle->callback_type = type;
3504 callback_handle->delete = delete;
3505 switch (type) {
3507 qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3508 break;
3510 qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3511 break;
3512 }
3513
3514 return (0);
3515}
3516
3517void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3518{
3519 struct token_callback_instance *h;
3520
3521 if (*handle_out) {
3522 h = (struct token_callback_instance *)*handle_out;
3523 qb_list_del (&h->list);
3524 free (h);
3525 h = NULL;
3526 *handle_out = 0;
3527 }
3528}
3529
3530static void token_callbacks_execute (
3531 struct totemsrp_instance *instance,
3533{
3534 struct qb_list_head *list, *tmp_iter;
3535 struct qb_list_head *callback_listhead = 0;
3537 int res;
3538 int del;
3539
3540 switch (type) {
3542 callback_listhead = &instance->token_callback_received_listhead;
3543 break;
3545 callback_listhead = &instance->token_callback_sent_listhead;
3546 break;
3547 default:
3548 assert (0);
3549 }
3550
3551 qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3552 token_callback_instance = qb_list_entry (list, struct token_callback_instance, list);
3554 if (del == 1) {
3555 qb_list_del (list);
3556 }
3557
3561 /*
3562 * This callback failed to execute, try it again on the next token
3563 */
3564 if (res == -1 && del == 1) {
3565 qb_list_add (list, callback_listhead);
3566 } else if (del) {
3568 }
3569 }
3570}
3571
3572/*
3573 * Flow control functions
3574 */
3575static unsigned int backlog_get (struct totemsrp_instance *instance)
3576{
3577 unsigned int backlog = 0;
3578 struct cs_queue *queue_use = NULL;
3579
3580 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3581 if (instance->waiting_trans_ack) {
3582 queue_use = &instance->new_message_queue_trans;
3583 } else {
3584 queue_use = &instance->new_message_queue;
3585 }
3586 } else
3587 if (instance->memb_state == MEMB_STATE_RECOVERY) {
3588 queue_use = &instance->retrans_message_queue;
3589 }
3590
3591 if (queue_use != NULL) {
3592 backlog = cs_queue_used (queue_use);
3593 }
3594
3595 instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3596 return (backlog);
3597}
3598
3599static int fcc_calculate (
3600 struct totemsrp_instance *instance,
3601 struct orf_token *token)
3602{
3603 unsigned int transmits_allowed;
3604 unsigned int backlog_calc;
3605
3606 transmits_allowed = instance->totem_config->max_messages;
3607
3608 if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3609 transmits_allowed = instance->totem_config->window_size - token->fcc;
3610 }
3611
3612 instance->my_cbl = backlog_get (instance);
3613
3614 /*
3615 * Only do backlog calculation if there is a backlog otherwise
3616 * we would result in div by zero
3617 */
3618 if (token->backlog + instance->my_cbl - instance->my_pbl) {
3619 backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3620 (token->backlog + instance->my_cbl - instance->my_pbl);
3621 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3622 transmits_allowed = backlog_calc;
3623 }
3624 }
3625
3626 return (transmits_allowed);
3627}
3628
3629/*
3630 * don't overflow the RTR sort queue
3631 */
3632static void fcc_rtr_limit (
3633 struct totemsrp_instance *instance,
3634 struct orf_token *token,
3635 unsigned int *transmits_allowed)
3636{
3637 int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3638 check -= (*transmits_allowed + instance->totem_config->window_size);
3639 assert (check >= 0);
3640 if (sq_lt_compare (instance->last_released +
3641 QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3642 instance->totem_config->window_size,
3643
3644 token->seq)) {
3645
3646 *transmits_allowed = 0;
3647 }
3648}
3649
3650static void fcc_token_update (
3651 struct totemsrp_instance *instance,
3652 struct orf_token *token,
3653 unsigned int msgs_transmitted)
3654{
3655 token->fcc += msgs_transmitted - instance->my_trc;
3656 token->backlog += instance->my_cbl - instance->my_pbl;
3657 instance->my_trc = msgs_transmitted;
3658 instance->my_pbl = instance->my_cbl;
3659}
3660
3661/*
3662 * Sanity checkers
3663 */
3664static int check_orf_token_sanity(
3665 const struct totemsrp_instance *instance,
3666 const void *msg,
3667 size_t msg_len,
3668 int endian_conversion_needed)
3669{
3670 int rtr_entries;
3671 const struct orf_token *token = (const struct orf_token *)msg;
3672 size_t required_len;
3673
3674 if (msg_len < sizeof(struct orf_token)) {
3676 "Received orf_token message is too short... ignoring.");
3677
3678 return (-1);
3679 }
3680
3681 if (endian_conversion_needed) {
3682 rtr_entries = swab32(token->rtr_list_entries);
3683 } else {
3684 rtr_entries = token->rtr_list_entries;
3685 }
3686
3687 required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3688 if (msg_len < required_len) {
3690 "Received orf_token message is too short... ignoring.");
3691
3692 return (-1);
3693 }
3694
3695 return (0);
3696}
3697
3698static int check_mcast_sanity(
3699 struct totemsrp_instance *instance,
3700 const void *msg,
3701 size_t msg_len,
3702 int endian_conversion_needed)
3703{
3704
3705 if (msg_len < sizeof(struct mcast)) {
3707 "Received mcast message is too short... ignoring.");
3708
3709 return (-1);
3710 }
3711
3712 return (0);
3713}
3714
3715static int check_memb_merge_detect_sanity(
3716 struct totemsrp_instance *instance,
3717 const void *msg,
3718 size_t msg_len,
3719 int endian_conversion_needed)
3720{
3721
3722 if (msg_len < sizeof(struct memb_merge_detect)) {
3724 "Received memb_merge_detect message is too short... ignoring.");
3725
3726 return (-1);
3727 }
3728
3729 return (0);
3730}
3731
3732static int check_memb_join_sanity(
3733 struct totemsrp_instance *instance,
3734 const void *msg,
3735 size_t msg_len,
3736 int endian_conversion_needed)
3737{
3738 const struct memb_join *mj_msg = (const struct memb_join *)msg;
3739 unsigned int proc_list_entries;
3740 unsigned int failed_list_entries;
3741 size_t required_len;
3742
3743 if (msg_len < sizeof(struct memb_join)) {
3745 "Received memb_join message is too short... ignoring.");
3746
3747 return (-1);
3748 }
3749
3752
3753 if (endian_conversion_needed) {
3756 }
3757
3758 required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3759 if (msg_len < required_len) {
3761 "Received memb_join message is too short... ignoring.");
3762
3763 return (-1);
3764 }
3765
3766 return (0);
3767}
3768
3769static int check_memb_commit_token_sanity(
3770 struct totemsrp_instance *instance,
3771 const void *msg,
3772 size_t msg_len,
3773 int endian_conversion_needed)
3774{
3775 const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3776 unsigned int addr_entries;
3777 size_t required_len;
3778
3779 if (msg_len < sizeof(struct memb_commit_token)) {
3781 "Received memb_commit_token message is too short... ignoring.");
3782
3783 return (0);
3784 }
3785
3786 addr_entries= mct_msg->addr_entries;
3787 if (endian_conversion_needed) {
3789 }
3790
3791 required_len = sizeof(struct memb_commit_token) +
3792 (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3793 if (msg_len < required_len) {
3795 "Received memb_commit_token message is too short... ignoring.");
3796
3797 return (-1);
3798 }
3799
3800 return (0);
3801}
3802
3803static int check_token_hold_cancel_sanity(
3804 struct totemsrp_instance *instance,
3805 const void *msg,
3806 size_t msg_len,
3807 int endian_conversion_needed)
3808{
3809
3810 if (msg_len < sizeof(struct token_hold_cancel)) {
3812 "Received token_hold_cancel message is too short... ignoring.");
3813
3814 return (-1);
3815 }
3816
3817 return (0);
3818}
3819
3820/*
3821 * Message Handlers
3822 */
3823
3824unsigned long long int tv_old;
3825/*
3826 * message handler called when TOKEN message type received
3827 */
3828static int message_handler_orf_token (
3829 struct totemsrp_instance *instance,
3830 const void *msg,
3831 size_t msg_len,
3832 int endian_conversion_needed)
3833{
3834 char token_storage[1500];
3835 char token_convert[1500];
3836 struct orf_token *token = NULL;
3837 int forward_token;
3838 unsigned int transmits_allowed;
3839 unsigned int mcasted_retransmit;
3840 unsigned int mcasted_regular;
3841 unsigned int last_aru;
3842
3843#ifdef GIVEINFO
3844 unsigned long long tv_current;
3845 unsigned long long tv_diff;
3846
3847 tv_current = qb_util_nano_current_get ();
3848 tv_diff = tv_current - tv_old;
3849 tv_old = tv_current;
3850
3852 "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3853#endif
3854
3855 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3856 return (0);
3857 }
3858
3859 if (instance->orf_token_discard) {
3860 return (0);
3861 }
3862#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3863 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3864 return (0);
3865 }
3866#endif
3867
3868 if (endian_conversion_needed) {
3869 orf_token_endian_convert ((struct orf_token *)msg,
3870 (struct orf_token *)token_convert);
3871 msg = (struct orf_token *)token_convert;
3872 }
3873
3874 /*
3875 * Make copy of token and retransmit list in case we have
3876 * to flush incoming messages from the kernel queue
3877 */
3878 token = (struct orf_token *)token_storage;
3879 memcpy (token, msg, sizeof (struct orf_token));
3880 memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3881 sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3882
3883
3884 /*
3885 * Handle merge detection timeout
3886 */
3887 if (token->seq == instance->my_last_seq) {
3888 start_merge_detect_timeout (instance);
3889 instance->my_seq_unchanged += 1;
3890 } else {
3891 cancel_merge_detect_timeout (instance);
3892 cancel_token_hold_retransmit_timeout (instance);
3893 instance->my_seq_unchanged = 0;
3894 }
3895
3896 instance->my_last_seq = token->seq;
3897
3898#ifdef TEST_RECOVERY_MSG_COUNT
3899 if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3900 return (0);
3901 }
3902#endif
3903 instance->flushing = 1;
3905 instance->flushing = 0;
3906
3907 /*
3908 * Determine if we should hold (in reality drop) the token
3909 */
3910 instance->my_token_held = 0;
3911 if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3912 instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3913 instance->my_token_held = 1;
3914 } else {
3915 if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3916 instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3917 instance->my_token_held = 1;
3918 }
3919 }
3920
3921 /*
3922 * Hold onto token when there is no activity on ring and
3923 * this processor is the ring rep
3924 */
3925 forward_token = 1;
3926 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3927 if (instance->my_token_held) {
3928 forward_token = 0;
3929 }
3930 }
3931
3932 switch (instance->memb_state) {
3933 case MEMB_STATE_COMMIT:
3934 /* Discard token */
3935 break;
3936
3938 messages_free (instance, token->aru);
3939 /*
3940 * Do NOT add break, this case should also execute code in gather case.
3941 */
3942
3943 case MEMB_STATE_GATHER:
3944 /*
3945 * DO NOT add break, we use different free mechanism in recovery state
3946 */
3947
3949 /*
3950 * Discard tokens from another configuration
3951 */
3952 if (memcmp (&token->ring_id, &instance->my_ring_id,
3953 sizeof (struct memb_ring_id)) != 0) {
3954
3955 if ((forward_token)
3956 && instance->use_heartbeat) {
3957 reset_heartbeat_timeout(instance);
3958 }
3959 else {
3960 cancel_heartbeat_timeout(instance);
3961 }
3962
3963 return (0); /* discard token */
3964 }
3965
3966 /*
3967 * Discard retransmitted tokens
3968 */
3969 if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3970 return (0); /* discard token */
3971 }
3972
3973 /*
3974 * Token is valid so trigger callbacks
3975 */
3976 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3977
3978 last_aru = instance->my_last_aru;
3979 instance->my_last_aru = token->aru;
3980
3981 transmits_allowed = fcc_calculate (instance, token);
3982 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3983
3984 if (instance->my_token_held == 1 &&
3985 (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
3986 instance->my_token_held = 0;
3987 forward_token = 1;
3988 }
3989
3990 fcc_rtr_limit (instance, token, &transmits_allowed);
3991 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3992/*
3993if (mcasted_regular) {
3994printf ("mcasted regular %d\n", mcasted_regular);
3995printf ("token seq %d\n", token->seq);
3996}
3997*/
3998 fcc_token_update (instance, token, mcasted_retransmit +
3999 mcasted_regular);
4000
4001 if (sq_lt_compare (instance->my_aru, token->aru) ||
4002 instance->my_id.nodeid == token->aru_addr ||
4003 token->aru_addr == 0) {
4004
4005 token->aru = instance->my_aru;
4006 if (token->aru == token->seq) {
4007 token->aru_addr = 0;
4008 } else {
4009 token->aru_addr = instance->my_id.nodeid;
4010 }
4011 }
4012 if (token->aru == last_aru && token->aru_addr != 0) {
4013 instance->my_aru_count += 1;
4014 } else {
4015 instance->my_aru_count = 0;
4016 }
4017
4018 /*
4019 * We really don't follow specification there. In specification, OTHER nodes
4020 * detect failure of one node (based on aru_count) and my_id IS NEVER added
4021 * to failed list (so node never mark itself as failed)
4022 */
4023 if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4024 token->aru_addr == instance->my_id.nodeid) {
4025
4027 "FAILED TO RECEIVE");
4028
4029 instance->failed_to_recv = 1;
4030
4031 memb_set_merge (&instance->my_id, 1,
4032 instance->my_failed_list,
4033 &instance->my_failed_list_entries);
4034
4035 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4036 } else {
4037 instance->my_token_seq = token->token_seq;
4038 token->token_seq += 1;
4039
4040 if (instance->memb_state == MEMB_STATE_RECOVERY) {
4041 /*
4042 * instance->my_aru == instance->my_high_seq_received means this processor
4043 * has recovered all messages it can recover
4044 * (ie: its retrans queue is empty)
4045 */
4046 if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4047
4048 if (token->retrans_flg == 0) {
4049 token->retrans_flg = 1;
4050 instance->my_set_retrans_flg = 1;
4051 }
4052 } else
4053 if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4054 token->retrans_flg = 0;
4055 instance->my_set_retrans_flg = 0;
4056 }
4058 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4059 token->retrans_flg, instance->my_set_retrans_flg,
4060 cs_queue_is_empty (&instance->retrans_message_queue),
4061 instance->my_retrans_flg_count, token->aru);
4062 if (token->retrans_flg == 0) {
4063 instance->my_retrans_flg_count += 1;
4064 } else {
4065 instance->my_retrans_flg_count = 0;
4066 }
4067 if (instance->my_retrans_flg_count == 2) {
4068 instance->my_install_seq = token->seq;
4069 }
4071 "install seq %x aru %x high seq received %x",
4072 instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4073 if (instance->my_retrans_flg_count >= 2 &&
4074 instance->my_received_flg == 0 &&
4075 sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4076 instance->my_received_flg = 1;
4077 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4078 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4079 sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4080 }
4081 if (instance->my_retrans_flg_count >= 3 &&
4082 sq_lte_compare (instance->my_install_seq, token->aru)) {
4083 instance->my_rotation_counter += 1;
4084 } else {
4085 instance->my_rotation_counter = 0;
4086 }
4087 if (instance->my_rotation_counter == 2) {
4089 "retrans flag count %x token aru %x install seq %x aru %x %x",
4090 instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4091 instance->my_aru, token->seq);
4092
4093 memb_state_operational_enter (instance);
4094 instance->my_rotation_counter = 0;
4095 instance->my_retrans_flg_count = 0;
4096 }
4097 }
4098
4100 token_send (instance, token, forward_token);
4101
4102#ifdef GIVEINFO
4103 tv_current = qb_util_nano_current_get ();
4104 tv_diff = tv_current - tv_old;
4105 tv_old = tv_current;
4107 "I held %0.4f ms",
4108 ((float)tv_diff) / 1000000.0);
4109#endif
4110 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4111 messages_deliver_to_app (instance, 0,
4112 instance->my_high_seq_received);
4113 }
4114
4115 /*
4116 * Deliver messages after token has been transmitted
4117 * to improve performance
4118 */
4119 reset_token_timeout (instance); // REVIEWED
4120 reset_token_retransmit_timeout (instance); // REVIEWED
4121 if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4122 instance->my_token_held == 1) {
4123
4124 start_token_hold_retransmit_timeout (instance);
4125 }
4126
4127 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4128 }
4129 break;
4130 }
4131
4132 if ((forward_token)
4133 && instance->use_heartbeat) {
4134 reset_heartbeat_timeout(instance);
4135 }
4136 else {
4137 cancel_heartbeat_timeout(instance);
4138 }
4139
4140 return (0);
4141}
4142
4143static void messages_deliver_to_app (
4144 struct totemsrp_instance *instance,
4145 int skip,
4146 unsigned int end_point)
4147{
4148 struct sort_queue_item *sort_queue_item_p;
4149 unsigned int i;
4150 int res;
4151 struct mcast *mcast_in;
4152 struct mcast mcast_header;
4153 unsigned int range = 0;
4154 int endian_conversion_required;
4155 unsigned int my_high_delivered_stored = 0;
4156 struct srp_addr aligned_system_from;
4157
4158 range = end_point - instance->my_high_delivered;
4159
4160 if (range) {
4162 "Delivering %x to %x", instance->my_high_delivered,
4163 end_point);
4164 }
4165 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4166 my_high_delivered_stored = instance->my_high_delivered;
4167
4168 /*
4169 * Deliver messages in order from rtr queue to pending delivery queue
4170 */
4171 for (i = 1; i <= range; i++) {
4172
4173 void *ptr = 0;
4174
4175 /*
4176 * If out of range of sort queue, stop assembly
4177 */
4178 res = sq_in_range (&instance->regular_sort_queue,
4179 my_high_delivered_stored + i);
4180 if (res == 0) {
4181 break;
4182 }
4183
4184 res = sq_item_get (&instance->regular_sort_queue,
4185 my_high_delivered_stored + i, &ptr);
4186 /*
4187 * If hole, stop assembly
4188 */
4189 if (res != 0 && skip == 0) {
4190 break;
4191 }
4192
4193 instance->my_high_delivered = my_high_delivered_stored + i;
4194
4195 if (res != 0) {
4196 continue;
4197
4198 }
4199
4200 sort_queue_item_p = ptr;
4201
4202 mcast_in = sort_queue_item_p->mcast;
4203 assert (mcast_in != (struct mcast *)0xdeadbeef);
4204
4205 endian_conversion_required = 0;
4206 if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4207 endian_conversion_required = 1;
4208 mcast_endian_convert (mcast_in, &mcast_header);
4209 } else {
4210 memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4211 }
4212
4213 aligned_system_from = mcast_header.system_from;
4214
4215 /*
4216 * Skip messages not originated in instance->my_deliver_memb
4217 */
4218 if (skip &&
4219 memb_set_subset (&aligned_system_from,
4220 1,
4221 instance->my_deliver_memb_list,
4222 instance->my_deliver_memb_entries) == 0) {
4223
4224 instance->my_high_delivered = my_high_delivered_stored + i;
4225
4226 continue;
4227 }
4228
4229 /*
4230 * Message found
4231 */
4233 "Delivering MCAST message with seq %x to pending delivery queue",
4234 mcast_header.seq);
4235
4236 /*
4237 * Message is locally originated multicast
4238 */
4239 instance->totemsrp_deliver_fn (
4240 mcast_header.header.nodeid,
4241 ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4242 sort_queue_item_p->msg_len - sizeof (struct mcast),
4243 endian_conversion_required);
4244 }
4245}
4246
4247/*
4248 * recv message handler called when MCAST message type received
4249 */
4250static int message_handler_mcast (
4251 struct totemsrp_instance *instance,
4252 const void *msg,
4253 size_t msg_len,
4254 int endian_conversion_needed)
4255{
4257 struct sq *sort_queue;
4258 struct mcast mcast_header;
4259 struct srp_addr aligned_system_from;
4260
4261 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4262 return (0);
4263 }
4264
4265 if (endian_conversion_needed) {
4266 mcast_endian_convert (msg, &mcast_header);
4267 } else {
4268 memcpy (&mcast_header, msg, sizeof (struct mcast));
4269 }
4270
4271 if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4272 sort_queue = &instance->recovery_sort_queue;
4273 } else {
4274 sort_queue = &instance->regular_sort_queue;
4275 }
4276
4277 assert (msg_len <= FRAME_SIZE_MAX);
4278
4279#ifdef TEST_DROP_MCAST_PERCENTAGE
4280 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4281 return (0);
4282 }
4283#endif
4284
4285 /*
4286 * If the message is foreign execute the switch below
4287 */
4288 if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4289 sizeof (struct memb_ring_id)) != 0) {
4290
4291 aligned_system_from = mcast_header.system_from;
4292
4293 switch (instance->memb_state) {
4295 memb_set_merge (
4296 &aligned_system_from, 1,
4297 instance->my_proc_list, &instance->my_proc_list_entries);
4298 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4299 break;
4300
4301 case MEMB_STATE_GATHER:
4302 if (!memb_set_subset (
4303 &aligned_system_from,
4304 1,
4305 instance->my_proc_list,
4306 instance->my_proc_list_entries)) {
4307
4308 memb_set_merge (&aligned_system_from, 1,
4309 instance->my_proc_list, &instance->my_proc_list_entries);
4310 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4311 return (0);
4312 }
4313 break;
4314
4315 case MEMB_STATE_COMMIT:
4316 /* discard message */
4317 instance->stats.rx_msg_dropped++;
4318 break;
4319
4321 /* discard message */
4322 instance->stats.rx_msg_dropped++;
4323 break;
4324 }
4325 return (0);
4326 }
4327
4329 "Received ringid (" CS_PRI_RING_ID ") seq %x",
4330 mcast_header.ring_id.rep,
4331 (uint64_t)mcast_header.ring_id.seq,
4332 mcast_header.seq);
4333
4334 /*
4335 * Add mcast message to rtr queue if not already in rtr queue
4336 * otherwise free io vectors
4337 */
4338 if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4339 sq_in_range (sort_queue, mcast_header.seq) &&
4340 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4341
4342 /*
4343 * Allocate new multicast memory block
4344 */
4345// TODO LEAK
4346 sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4347 if (sort_queue_item.mcast == NULL) {
4348 return (-1); /* error here is corrected by the algorithm */
4349 }
4350 memcpy (sort_queue_item.mcast, msg, msg_len);
4351 sort_queue_item.msg_len = msg_len;
4352
4353 if (sq_lt_compare (instance->my_high_seq_received,
4354 mcast_header.seq)) {
4355 instance->my_high_seq_received = mcast_header.seq;
4356 }
4357
4358 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4359 }
4360
4361 update_aru (instance);
4362 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4363 messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4364 }
4365
4366/* TODO remove from retrans message queue for old ring in recovery state */
4367 return (0);
4368}
4369
4370static int message_handler_memb_merge_detect (
4371 struct totemsrp_instance *instance,
4372 const void *msg,
4373 size_t msg_len,
4374 int endian_conversion_needed)
4375{
4377 struct srp_addr aligned_system_from;
4378
4379 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4380 return (0);
4381 }
4382
4383 if (endian_conversion_needed) {
4384 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4385 } else {
4386 memcpy (&memb_merge_detect, msg,
4387 sizeof (struct memb_merge_detect));
4388 }
4389
4390 /*
4391 * do nothing if this is a merge detect from this configuration
4392 */
4393 if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4394 sizeof (struct memb_ring_id)) == 0) {
4395
4396 return (0);
4397 }
4398
4399 aligned_system_from = memb_merge_detect.system_from;
4400
4401 /*
4402 * Execute merge operation
4403 */
4404 switch (instance->memb_state) {
4406 memb_set_merge (&aligned_system_from, 1,
4407 instance->my_proc_list, &instance->my_proc_list_entries);
4408 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4409 break;
4410
4411 case MEMB_STATE_GATHER:
4412 if (!memb_set_subset (
4413 &aligned_system_from,
4414 1,
4415 instance->my_proc_list,
4416 instance->my_proc_list_entries)) {
4417
4418 memb_set_merge (&aligned_system_from, 1,
4419 instance->my_proc_list, &instance->my_proc_list_entries);
4420 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4421 return (0);
4422 }
4423 break;
4424
4425 case MEMB_STATE_COMMIT:
4426 /* do nothing in commit */
4427 break;
4428
4430 /* do nothing in recovery */
4431 break;
4432 }
4433 return (0);
4434}
4435
4436static void memb_join_process (
4437 struct totemsrp_instance *instance,
4438 const struct memb_join *memb_join)
4439{
4440 struct srp_addr *proc_list;
4441 struct srp_addr *failed_list;
4442 int gather_entered = 0;
4443 int fail_minus_memb_entries = 0;
4444 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4445 struct srp_addr aligned_system_from;
4446
4447 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4448 failed_list = proc_list + memb_join->proc_list_entries;
4449 aligned_system_from = memb_join->system_from;
4450
4451 log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4452 memb_set_log(instance, instance->totemsrp_log_level_trace,
4453 "proclist", proc_list, memb_join->proc_list_entries);
4454 memb_set_log(instance, instance->totemsrp_log_level_trace,
4455 "faillist", failed_list, memb_join->failed_list_entries);
4456 memb_set_log(instance, instance->totemsrp_log_level_trace,
4457 "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4458 memb_set_log(instance, instance->totemsrp_log_level_trace,
4459 "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4460
4462 if (instance->flushing) {
4465 "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4467 if (memb_join->failed_list_entries > 0) {
4468 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4469 }
4470 } else {
4472 "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4473 }
4474 return;
4475 } else {
4478 "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4479 if (memb_join->failed_list_entries > 0) {
4480 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4481 }
4482 }
4483 }
4484
4485 }
4486
4487 if (memb_set_equal (proc_list,
4489 instance->my_proc_list,
4490 instance->my_proc_list_entries) &&
4491
4492 memb_set_equal (failed_list,
4494 instance->my_failed_list,
4495 instance->my_failed_list_entries)) {
4496
4498 memb_consensus_set (instance, &aligned_system_from);
4499 }
4500
4501 if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4502 instance->failed_to_recv = 0;
4503 instance->my_proc_list[0] = instance->my_id;
4504 instance->my_proc_list_entries = 1;
4505 instance->my_failed_list_entries = 0;
4506
4507 memb_state_commit_token_create (instance);
4508
4509 memb_state_commit_enter (instance);
4510 return;
4511 }
4512 if (memb_consensus_agreed (instance) &&
4513 memb_lowest_in_config (instance)) {
4514
4515 memb_state_commit_token_create (instance);
4516
4517 memb_state_commit_enter (instance);
4518 } else {
4519 goto out;
4520 }
4521 } else
4522 if (memb_set_subset (proc_list,
4524 instance->my_proc_list,
4525 instance->my_proc_list_entries) &&
4526
4527 memb_set_subset (failed_list,
4529 instance->my_failed_list,
4530 instance->my_failed_list_entries)) {
4531
4532 goto out;
4533 } else
4534 if (memb_set_subset (&aligned_system_from, 1,
4535 instance->my_failed_list, instance->my_failed_list_entries)) {
4536
4537 goto out;
4538 } else {
4539 memb_set_merge (proc_list,
4541 instance->my_proc_list, &instance->my_proc_list_entries);
4542
4543 if (memb_set_subset (
4544 &instance->my_id, 1,
4545 failed_list, memb_join->failed_list_entries)) {
4546
4547 memb_set_merge (
4548 &aligned_system_from, 1,
4549 instance->my_failed_list, &instance->my_failed_list_entries);
4550 } else {
4551 if (memb_set_subset (
4552 &aligned_system_from, 1,
4553 instance->my_memb_list,
4554 instance->my_memb_entries)) {
4555
4556 if (memb_set_subset (
4557 &aligned_system_from, 1,
4558 instance->my_failed_list,
4559 instance->my_failed_list_entries) == 0) {
4560
4561 memb_set_merge (failed_list,
4563 instance->my_failed_list, &instance->my_failed_list_entries);
4564 } else {
4565 memb_set_subtract (fail_minus_memb,
4566 &fail_minus_memb_entries,
4567 failed_list,
4569 instance->my_memb_list,
4570 instance->my_memb_entries);
4571
4572 memb_set_merge (fail_minus_memb,
4573 fail_minus_memb_entries,
4574 instance->my_failed_list,
4575 &instance->my_failed_list_entries);
4576 }
4577 }
4578 }
4579 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4580 gather_entered = 1;
4581 }
4582
4583out:
4584 if (gather_entered == 0 &&
4585 instance->memb_state == MEMB_STATE_OPERATIONAL) {
4586
4587 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4588 }
4589}
4590
4591static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4592{
4593 int i;
4594 struct srp_addr *in_proc_list;
4595 struct srp_addr *in_failed_list;
4596 struct srp_addr *out_proc_list;
4597 struct srp_addr *out_failed_list;
4598
4601 out->header.type = in->header.type;
4602 out->header.nodeid = swab32 (in->header.nodeid);
4603 out->system_from = srp_addr_endian_convert(in->system_from);
4606 out->ring_seq = swab64 (in->ring_seq);
4607
4608 in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4609 in_failed_list = in_proc_list + out->proc_list_entries;
4610 out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4611 out_failed_list = out_proc_list + out->proc_list_entries;
4612
4613 for (i = 0; i < out->proc_list_entries; i++) {
4614 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4615 }
4616 for (i = 0; i < out->failed_list_entries; i++) {
4617 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4618 }
4619}
4620
4621static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4622{
4623 int i;
4624 struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4625 struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4626 struct memb_commit_token_memb_entry *in_memb_list;
4627 struct memb_commit_token_memb_entry *out_memb_list;
4628
4629 out->header.magic = TOTEM_MH_MAGIC;
4630 out->header.version = TOTEM_MH_VERSION;
4631 out->header.type = in->header.type;
4632 out->header.nodeid = swab32 (in->header.nodeid);
4633 out->token_seq = swab32 (in->token_seq);
4634 out->ring_id.rep = swab32(in->ring_id.rep);
4635 out->ring_id.seq = swab64 (in->ring_id.seq);
4636 out->retrans_flg = swab32 (in->retrans_flg);
4637 out->memb_index = swab32 (in->memb_index);
4638 out->addr_entries = swab32 (in->addr_entries);
4639
4640 in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4641 out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4642 for (i = 0; i < out->addr_entries; i++) {
4643 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4644
4645 /*
4646 * Only convert the memb entry if it has been set
4647 */
4648 if (in_memb_list[i].ring_id.rep != 0) {
4649 out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4650
4651 out_memb_list[i].ring_id.seq =
4652 swab64 (in_memb_list[i].ring_id.seq);
4653 out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4654 out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4655 out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4656 }
4657 }
4658}
4659
4660static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4661{
4662 int i;
4663
4666 out->header.type = in->header.type;
4667 out->header.nodeid = swab32 (in->header.nodeid);
4668 out->seq = swab32 (in->seq);
4669 out->token_seq = swab32 (in->token_seq);
4670 out->aru = swab32 (in->aru);
4671 out->ring_id.rep = swab32(in->ring_id.rep);
4672 out->aru_addr = swab32(in->aru_addr);
4673 out->ring_id.seq = swab64 (in->ring_id.seq);
4674 out->fcc = swab32 (in->fcc);
4675 out->backlog = swab32 (in->backlog);
4676 out->retrans_flg = swab32 (in->retrans_flg);
4678 for (i = 0; i < out->rtr_list_entries; i++) {
4679 out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4680 out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4681 out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4682 }
4683}
4684
4685static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4686{
4689 out->header.type = in->header.type;
4690 out->header.nodeid = swab32 (in->header.nodeid);
4692
4693 out->seq = swab32 (in->seq);
4694 out->this_seqno = swab32 (in->this_seqno);
4695 out->ring_id.rep = swab32(in->ring_id.rep);
4696 out->ring_id.seq = swab64 (in->ring_id.seq);
4697 out->node_id = swab32 (in->node_id);
4698 out->guarantee = swab32 (in->guarantee);
4699 out->system_from = srp_addr_endian_convert(in->system_from);
4700}
4701
4702static void memb_merge_detect_endian_convert (
4703 const struct memb_merge_detect *in,
4704 struct memb_merge_detect *out)
4705{
4708 out->header.type = in->header.type;
4709 out->header.nodeid = swab32 (in->header.nodeid);
4710 out->ring_id.rep = swab32(in->ring_id.rep);
4711 out->ring_id.seq = swab64 (in->ring_id.seq);
4712 out->system_from = srp_addr_endian_convert (in->system_from);
4713}
4714
4715static int ignore_join_under_operational (
4716 struct totemsrp_instance *instance,
4717 const struct memb_join *memb_join)
4718{
4719 struct srp_addr *proc_list;
4720 struct srp_addr *failed_list;
4721 unsigned long long ring_seq;
4722 struct srp_addr aligned_system_from;
4723
4724 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4725 failed_list = proc_list + memb_join->proc_list_entries;
4727 aligned_system_from = memb_join->system_from;
4728
4729 if (memb_set_subset (&instance->my_id, 1,
4730 failed_list, memb_join->failed_list_entries)) {
4731 return (1);
4732 }
4733
4734 /*
4735 * In operational state, my_proc_list is exactly the same as
4736 * my_memb_list.
4737 */
4738 if ((memb_set_subset (&aligned_system_from, 1,
4739 instance->my_memb_list, instance->my_memb_entries)) &&
4740 (ring_seq < instance->my_ring_id.seq)) {
4741 return (1);
4742 }
4743
4744 return (0);
4745}
4746
4747static int message_handler_memb_join (
4748 struct totemsrp_instance *instance,
4749 const void *msg,
4750 size_t msg_len,
4751 int endian_conversion_needed)
4752{
4753 const struct memb_join *memb_join;
4754 struct memb_join *memb_join_convert = alloca (msg_len);
4755 struct srp_addr aligned_system_from;
4756
4757 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4758 return (0);
4759 }
4760
4761 if (endian_conversion_needed) {
4762 memb_join = memb_join_convert;
4763 memb_join_endian_convert (msg, memb_join_convert);
4764
4765 } else {
4766 memb_join = msg;
4767 }
4768
4769 aligned_system_from = memb_join->system_from;
4770
4771 /*
4772 * If the process paused because it wasn't scheduled in a timely
4773 * fashion, flush the join messages because they may be queued
4774 * entries
4775 */
4776 if (pause_flush (instance)) {
4777 return (0);
4778 }
4779
4780 if (instance->token_ring_id_seq < memb_join->ring_seq) {
4782 }
4783 switch (instance->memb_state) {
4785 if (!ignore_join_under_operational (instance, memb_join)) {
4786 memb_join_process (instance, memb_join);
4787 }
4788 break;
4789
4790 case MEMB_STATE_GATHER:
4791 memb_join_process (instance, memb_join);
4792 break;
4793
4794 case MEMB_STATE_COMMIT:
4795 if (memb_set_subset (&aligned_system_from,
4796 1,
4797 instance->my_new_memb_list,
4798 instance->my_new_memb_entries) &&
4799
4800 memb_join->ring_seq >= instance->my_ring_id.seq) {
4801
4802 memb_join_process (instance, memb_join);
4803 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4804 }
4805 break;
4806
4808 if (memb_set_subset (&aligned_system_from,
4809 1,
4810 instance->my_new_memb_list,
4811 instance->my_new_memb_entries) &&
4812
4813 memb_join->ring_seq >= instance->my_ring_id.seq) {
4814
4815 memb_join_process (instance, memb_join);
4816 memb_recovery_state_token_loss (instance);
4817 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4818 }
4819 break;
4820 }
4821 return (0);
4822}
4823
4824static int message_handler_memb_commit_token (
4825 struct totemsrp_instance *instance,
4826 const void *msg,
4827 size_t msg_len,
4828 int endian_conversion_needed)
4829{
4830 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4832 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4833 int sub_entries;
4834
4835 struct srp_addr *addr;
4836
4838 "got commit token");
4839
4840 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4841 return (0);
4842 }
4843
4844 if (endian_conversion_needed) {
4845 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4846 } else {
4847 memcpy (memb_commit_token_convert, msg, msg_len);
4848 }
4849 memb_commit_token = memb_commit_token_convert;
4851
4852#ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4853 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4854 return (0);
4855 }
4856#endif
4857 switch (instance->memb_state) {
4859 /* discard token */
4860 break;
4861
4862 case MEMB_STATE_GATHER:
4863 memb_set_subtract (sub, &sub_entries,
4864 instance->my_proc_list, instance->my_proc_list_entries,
4865 instance->my_failed_list, instance->my_failed_list_entries);
4866
4867 if (memb_set_equal (addr,
4869 sub,
4870 sub_entries) &&
4871
4873 memcpy (instance->commit_token, memb_commit_token, msg_len);
4874 memb_state_commit_enter (instance);
4875 }
4876 break;
4877
4878 case MEMB_STATE_COMMIT:
4879 /*
4880 * If retransmitted commit tokens are sent on this ring
4881 * filter them out and only enter recovery once the
4882 * commit token has traversed the array. This is
4883 * determined by :
4884 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4885 */
4886 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4888 memb_state_recovery_enter (instance, memb_commit_token);
4889 }
4890 break;
4891
4893 if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4894
4895 /* Filter out duplicated tokens */
4896 if (instance->originated_orf_token) {
4897 break;
4898 }
4899
4900 instance->originated_orf_token = 1;
4901
4903 "Sending initial ORF token");
4904
4905 // TODO convert instead of initiate
4906 orf_token_send_initial (instance);
4907 reset_token_timeout (instance); // REVIEWED
4908 reset_token_retransmit_timeout (instance); // REVIEWED
4909 }
4910 break;
4911 }
4912 return (0);
4913}
4914
4915static int message_handler_token_hold_cancel (
4916 struct totemsrp_instance *instance,
4917 const void *msg,
4918 size_t msg_len,
4919 int endian_conversion_needed)
4920{
4921 const struct token_hold_cancel *token_hold_cancel = msg;
4922
4923 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4924 return (0);
4925 }
4926
4927 if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4928 sizeof (struct memb_ring_id)) == 0) {
4929
4930 instance->my_seq_unchanged = 0;
4931 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4932 timer_function_token_retransmit_timeout (instance);
4933 }
4934 }
4935 return (0);
4936}
4937
4938static int check_message_header_validity(
4939 void *context,
4940 const void *msg,
4941 unsigned int msg_len,
4942 const struct sockaddr_storage *system_from)
4943{
4944 struct totemsrp_instance *instance = context;
4945 const struct totem_message_header *message_header = msg;
4946 const char *guessed_str;
4947 const char *msg_byte = msg;
4948
4949 if (msg_len < sizeof (struct totem_message_header)) {
4951 "Message received from %s is too short... Ignoring %u.",
4952 totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4953 return (-1);
4954 }
4955
4956 if (message_header->magic != TOTEM_MH_MAGIC &&
4957 message_header->magic != swab16(TOTEM_MH_MAGIC)) {
4958 /*
4959 * We've received ether Knet, old version of Corosync,
4960 * or something else. Do some guessing to display (hopefully)
4961 * helpful message
4962 */
4963 guessed_str = NULL;
4964
4965 if (message_header->magic == 0xFFFF) {
4966 /*
4967 * Corosync 2.2 used header with two UINT8_MAX
4968 */
4969 guessed_str = "Corosync 2.2";
4970 } else if (message_header->magic == 0xFEFE) {
4971 /*
4972 * Corosync 2.3+ used header with two UINT8_MAX - 1
4973 */
4974 guessed_str = "Corosync 2.3+";
4975 } else if (msg_byte[0] == 0x01) {
4976 /*
4977 * Knet has stable1 with first byte of message == 1
4978 */
4979 guessed_str = "unencrypted Kronosnet";
4980 } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4981 /*
4982 * Unencrypted Corosync 1.x/OpenAIS has first byte
4983 * 0-5. Collision with Knet (but still worth the try)
4984 */
4985 guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
4986 } else {
4987 /*
4988 * Encrypted Kronosned packet has a hash at the end of
4989 * the packet and nothing specific at the beginning of the
4990 * packet (just encrypted data).
4991 * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
4992 * is in the beginning of the packet.
4993 *
4994 * So it's not possible to reliably detect ether of them.
4995 */
4996 guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
4997 }
4998
5000 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
5001 totemip_sa_print((struct sockaddr *)system_from),
5002 guessed_str);
5003
5004 return (-1);
5005 }
5006
5007 if (message_header->version != TOTEM_MH_VERSION) {
5009 "Message received from %s has unsupported version %u... Ignoring",
5010 totemip_sa_print((struct sockaddr *)system_from),
5011 message_header->version);
5012
5013 return (-1);
5014 }
5015
5016 return (0);
5017}
5018
5019
5021 void *context,
5022 const void *msg,
5023 unsigned int msg_len,
5024 const struct sockaddr_storage *system_from)
5025{
5026 struct totemsrp_instance *instance = context;
5027 const struct totem_message_header *message_header = msg;
5028
5029 if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5030 return ;
5031 }
5032
5033 switch (message_header->type) {
5035 instance->stats.orf_token_rx++;
5036 break;
5037 case MESSAGE_TYPE_MCAST:
5038 instance->stats.mcast_rx++;
5039 break;
5041 instance->stats.memb_merge_detect_rx++;
5042 break;
5044 instance->stats.memb_join_rx++;
5045 break;
5047 instance->stats.memb_commit_token_rx++;
5048 break;
5050 instance->stats.token_hold_cancel_rx++;
5051 break;
5052 default:
5054 "Message received from %s has wrong type... ignoring %d.\n",
5055 totemip_sa_print((struct sockaddr *)system_from),
5056 (int)message_header->type);
5057
5058 instance->stats.rx_msg_dropped++;
5059 return;
5060 }
5061 /*
5062 * Handle incoming message
5063 */
5064 totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5065 instance,
5066 msg,
5067 msg_len,
5068 message_header->magic != TOTEM_MH_MAGIC);
5069}
5070
5072 void *context,
5073 const struct totem_ip_address *interface_addr,
5074 unsigned short ip_port,
5075 unsigned int iface_no)
5076{
5077 struct totemsrp_instance *instance = context;
5078 int res;
5079
5080 totemip_copy(&instance->my_addrs[iface_no], interface_addr);
5081
5082 res = totemnet_iface_set (
5083 instance->totemnet_context,
5084 interface_addr,
5085 ip_port,
5086 iface_no);
5087
5088 return (res);
5089}
5090
5091/* Contrary to its name, this only gets called when the interface is enabled */
5093 void *context,
5094 const struct totem_ip_address *iface_addr,
5095 unsigned int iface_no)
5096{
5097 struct totemsrp_instance *instance = context;
5098 int num_interfaces;
5099 int i;
5100
5101 if (!instance->my_id.nodeid) {
5102 instance->my_id.nodeid = iface_addr->nodeid;
5103 }
5104 totemip_copy (&instance->my_addrs[iface_no], iface_addr);
5105
5106 if (instance->iface_changes++ == 0) {
5107 instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5108 /*
5109 * Increase the ring_id sequence number. This doesn't follow specification.
5110 * Solves problem with restarted leader node (node with lowest nodeid) before
5111 * rest of the cluster forms new membership and guarantees unique ring_id for
5112 * new singleton configuration.
5113 */
5114 instance->my_ring_id.seq++;
5115
5116 instance->token_ring_id_seq = instance->my_ring_id.seq;
5117 log_printf (
5118 instance->totemsrp_log_level_debug,
5119 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5120 instance->my_ring_id.rep,
5121 (uint64_t)instance->my_ring_id.seq);
5122
5123 if (instance->totemsrp_service_ready_fn) {
5124 instance->totemsrp_service_ready_fn ();
5125 }
5126
5127 }
5128
5129 num_interfaces = 0;
5130 for (i = 0; i < INTERFACE_MAX; i++) {
5131 if (instance->totem_config->interfaces[i].configured) {
5132 num_interfaces++;
5133 }
5134 }
5135
5136 if (instance->iface_changes >= num_interfaces) {
5137 /* We need to clear orig_interfaces so that 'commit' diffs against nothing */
5138 instance->totem_config->orig_interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX);
5139 assert(instance->totem_config->orig_interfaces != NULL);
5140 memset(instance->totem_config->orig_interfaces, 0, sizeof (struct totem_interface) * INTERFACE_MAX);
5141
5143
5144 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5145 free(instance->totem_config->orig_interfaces);
5146 }
5147}
5148
5150 totem_config->net_mtu -= 2 * sizeof (struct mcast);
5151}
5152
5154 void *context,
5155 void (*totem_service_ready) (void))
5156{
5157 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5158
5159 instance->totemsrp_service_ready_fn = totem_service_ready;
5160}
5161
5163 void *context,
5164 const struct totem_ip_address *member,
5165 int iface_no)
5166{
5167 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5168 int res;
5169
5170 res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5171
5172 return (res);
5173}
5174
5176 void *context,
5177 const struct totem_ip_address *member,
5178 int iface_no)
5179{
5180 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5181 int res;
5182
5183 res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5184
5185 return (res);
5186}
5187
5189{
5190 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5191
5192 instance->threaded_mode_enabled = 1;
5193}
5194
5195void totemsrp_trans_ack (void *context)
5196{
5197 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5198
5199 instance->waiting_trans_ack = 0;
5201}
5202
5203
5205{
5206 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5207 int res;
5208
5210 return (res);
5211}
5212
5214{
5215 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5216 int res;
5217
5219 return (res);
5220}
5221
5222void totemsrp_stats_clear (void *context, int flags)
5223{
5224 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5225
5226 memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5229 }
5230}
5231
5232void totemsrp_force_gather (void *context)
5233{
5234 timer_function_orf_token_timeout(context);
5235}
totem_configuration_type
The totem_configuration_type enum.
Definition coroapi.h:132
@ TOTEM_CONFIGURATION_REGULAR
Definition coroapi.h:133
@ TOTEM_CONFIGURATION_TRANSITIONAL
Definition coroapi.h:134
#define INTERFACE_MAX
Definition coroapi.h:88
#define MESSAGE_QUEUE_MAX
Definition coroapi.h:98
unsigned int nodeid
Definition coroapi.h:0
unsigned char addr[TOTEMIP_ADDRLEN]
Definition coroapi.h:2
totem_callback_token_type
The totem_callback_token_type enum.
Definition coroapi.h:142
@ TOTEM_CALLBACK_TOKEN_SENT
Definition coroapi.h:144
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition coroapi.h:143
#define PROCESSOR_COUNT_MAX
Definition coroapi.h:96
#define CS_PRI_RING_ID_SEQ
Definition corotypes.h:61
#define CS_PRI_NODE_ID
Definition corotypes.h:59
#define CS_PRI_RING_ID
Definition corotypes.h:62
uint32_t flags
uint32_t value
icmap_map_t icmap_get_global_map(void)
Return global icmap.
Definition icmap.c:264
#define LOGSYS_LEVEL_DEBUG
Definition logsys.h:76
struct srp_addr addr
Definition totemsrp.c:164
int guarantee
Definition totemsrp.c:190
unsigned int node_id
Definition totemsrp.c:189
struct memb_ring_id ring_id
Definition totemsrp.c:188
struct totem_message_header header
Definition totemsrp.c:184
unsigned int seq
Definition totemsrp.c:186
int this_seqno
Definition totemsrp.c:187
struct srp_addr system_from
Definition totemsrp.c:185
Definition totemsrp.c:243
unsigned int aru
Definition totemsrp.c:245
unsigned int received_flg
Definition totemsrp.c:247
struct memb_ring_id ring_id
Definition totemsrp.c:244
unsigned int high_delivered
Definition totemsrp.c:246
unsigned int retrans_flg
Definition totemsrp.c:255
struct totem_message_header header
Definition totemsrp.c:252
unsigned char end_of_commit_token[0]
Definition totemsrp.c:258
unsigned int token_seq
Definition totemsrp.c:253
struct memb_ring_id ring_id
Definition totemsrp.c:254
struct srp_addr system_from
Definition totemsrp.c:217
struct totem_message_header header
Definition totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition totemsrp.c:221
unsigned long long ring_seq
Definition totemsrp.c:220
unsigned int failed_list_entries
Definition totemsrp.c:219
unsigned int proc_list_entries
Definition totemsrp.c:218
struct totem_message_header header
Definition totemsrp.c:231
struct memb_ring_id ring_id
Definition totemsrp.c:233
struct srp_addr system_from
Definition totemsrp.c:232
The memb_ring_id struct.
Definition coroapi.h:122
unsigned long long seq
Definition coroapi.h:124
unsigned int rep
Definition totem.h:150
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition totemsrp.c:535
unsigned int msg_len
Definition totemsrp.c:269
struct mcast * mcast
Definition totemsrp.c:268
unsigned int backlog
Definition totemsrp.c:207
unsigned int token_seq
Definition totemsrp.c:203
unsigned int aru_addr
Definition totemsrp.c:205
unsigned int fcc
Definition totemsrp.c:208
unsigned int aru
Definition totemsrp.c:204
int rtr_list_entries
Definition totemsrp.c:210
struct rtr_item rtr_list[0]
Definition totemsrp.c:211
int retrans_flg
Definition totemsrp.c:209
unsigned int seq
Definition totemsrp.c:202
struct totem_message_header header
Definition totemsrp.c:201
struct memb_ring_id ring_id
Definition totemsrp.c:206
struct memb_ring_id ring_id
Definition totemsrp.c:195
unsigned int seq
Definition totemsrp.c:196
unsigned int msg_len
Definition totemsrp.c:274
struct mcast * mcast
Definition totemsrp.c:273
The sq struct.
Definition sq.h:43
unsigned int nodeid
Definition totemsrp.c:108
struct qb_list_head list
Definition totemsrp.c:170
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition totemsrp.c:171
enum totem_callback_token_type callback_type
Definition totemsrp.c:172
struct totem_message_header header
Definition totemsrp.c:238
struct memb_ring_id ring_id
Definition totemsrp.c:239
unsigned int max_messages
Definition totem.h:219
unsigned int heartbeat_failures_allowed
Definition totem.h:213
unsigned int token_timeout
Definition totem.h:181
unsigned int window_size
Definition totem.h:217
struct totem_logging_configuration totem_logging_configuration
Definition totem.h:207
unsigned int downcheck_timeout
Definition totem.h:199
unsigned int miss_count_const
Definition totem.h:241
struct totem_interface * interfaces
Definition totem.h:165
unsigned int fail_to_recv_const
Definition totem.h:201
unsigned int merge_timeout
Definition totem.h:197
struct totem_interface * orig_interfaces
Definition totem.h:166
unsigned int net_mtu
Definition totem.h:209
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:247
unsigned int token_retransmits_before_loss_const
Definition totem.h:189
unsigned int max_network_delay
Definition totem.h:215
unsigned int seqno_unchanged_const
Definition totem.h:203
unsigned int consensus_timeout
Definition totem.h:195
unsigned int threads
Definition totem.h:211
unsigned int send_join_timeout
Definition totem.h:193
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:251
unsigned int token_retransmit_timeout
Definition totem.h:185
unsigned int token_warning
Definition totem.h:183
unsigned int join_timeout
Definition totem.h:191
unsigned int token_hold_timeout
Definition totem.h:187
struct totem_ip_address boundto
Definition totem.h:84
uint8_t configured
Definition totem.h:89
int member_count
Definition totem.h:90
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition totem.h:97
struct totem_ip_address mcast_addr
Definition totem.h:85
The totem_ip_address struct.
Definition coroapi.h:111
unsigned int nodeid
Definition coroapi.h:112
unsigned short family
Definition coroapi.h:113
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition totem.h:101
void(*) in log_level_security)
Definition totem.h:110
unsigned int nodeid
Definition totem.h:131
unsigned short magic
Definition totem.h:127
uint8_t reachable
Definition totem.h:265
uint32_t version
Definition totem.h:263
struct totem_ip_address mcast_address
Definition totemsrp.c:452
totemsrp_stats_t stats
Definition totemsrp.c:516
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:320
int consensus_list_entries
Definition totemsrp.c:300
int my_merge_detect_timeout_outstanding
Definition totemsrp.c:346
unsigned int my_last_seq
Definition totemsrp.c:496
qb_loop_timer_handle timer_heartbeat_timeout
Definition totemsrp.c:419
unsigned int my_token_seq
Definition totemsrp.c:396
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition totemsrp.c:413
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:298
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition totemsrp.c:415
uint64_t pause_timestamp
Definition totemsrp.c:512
uint32_t threaded_mode_enabled
Definition totemsrp.c:522
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:316
void * totemnet_context
Definition totemsrp.c:500
int my_leave_memb_entries
Definition totemsrp.c:338
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:308
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:312
int my_failed_list_entries
Definition totemsrp.c:326
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:310
unsigned int use_heartbeat
Definition totemsrp.c:504
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:472
void(*) enum memb_stat memb_state)
Definition totemsrp.c:446
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition totemsrp.c:417
struct cs_queue new_message_queue
Definition totemsrp.c:371
int orf_token_retransmit_size
Definition totemsrp.c:394
unsigned int my_high_seq_received
Definition totemsrp.c:354
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition totemsrp.c:454
uint32_t orf_token_discard
Definition totemsrp.c:518
struct qb_list_head token_callback_sent_listhead
Definition totemsrp.c:390
unsigned int last_released
Definition totemsrp.c:486
unsigned int set_aru
Definition totemsrp.c:488
int totemsrp_log_level_notice
Definition totemsrp.c:430
struct cs_queue new_message_queue_trans
Definition totemsrp.c:373
int totemsrp_log_level_trace
Definition totemsrp.c:434
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition totemsrp.c:392
unsigned int my_trc
Definition totemsrp.c:506
struct cs_queue retrans_message_queue
Definition totemsrp.c:375
struct memb_ring_id my_ring_id
Definition totemsrp.c:340
int totemsrp_log_level_error
Definition totemsrp.c:426
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition totemsrp.c:469
unsigned int old_ring_state_high_seq_received
Definition totemsrp.c:494
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition totemsrp.c:409
qb_loop_timer_handle timer_pause_timeout
Definition totemsrp.c:401
unsigned int my_high_ring_delivered
Definition totemsrp.c:364
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition totemsrp.c:407
struct totem_config * totem_config
Definition totemsrp.c:502
int my_deliver_memb_entries
Definition totemsrp.c:334
void(* totemsrp_service_ready_fn)(void)
Definition totemsrp.c:467
int my_trans_memb_entries
Definition totemsrp.c:330
uint32_t originated_orf_token
Definition totemsrp.c:520
void * token_recv_event_handle
Definition totemsrp.c:528
struct sq recovery_sort_queue
Definition totemsrp.c:379
qb_loop_timer_handle timer_orf_token_timeout
Definition totemsrp.c:403
qb_loop_timer_handle timer_merge_detect_timeout
Definition totemsrp.c:411
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:322
void * token_sent_event_handle
Definition totemsrp.c:529
unsigned int my_high_delivered
Definition totemsrp.c:386
int totemsrp_log_level_security
Definition totemsrp.c:424
int totemsrp_log_level_warning
Definition totemsrp.c:428
struct memb_commit_token * commit_token
Definition totemsrp.c:514
char commit_token_storage[40000]
Definition totemsrp.c:530
struct memb_ring_id my_old_ring_id
Definition totemsrp.c:342
struct timeval tv_old
Definition totemsrp.c:498
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:476
qb_loop_t * totemsrp_poll_handle
Definition totemsrp.c:450
unsigned int my_install_seq
Definition totemsrp.c:356
qb_loop_timer_handle timer_orf_token_warning
Definition totemsrp.c:405
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:314
struct srp_addr my_id
Definition totemsrp.c:304
unsigned int my_cbl
Definition totemsrp.c:510
struct qb_list_head token_callback_received_listhead
Definition totemsrp.c:388
unsigned int my_last_aru
Definition totemsrp.c:348
unsigned int my_aru
Definition totemsrp.c:384
uint32_t waiting_trans_ack
Definition totemsrp.c:524
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition totemsrp.c:438
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition totemsrp.c:460
struct sq regular_sort_queue
Definition totemsrp.c:377
unsigned long long token_ring_id_seq
Definition totemsrp.c:484
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:318
int totemsrp_log_level_debug
Definition totemsrp.c:432
unsigned int my_pbl
Definition totemsrp.c:508
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition totemsrp.c:306
uint64_t memb_join_tx
Definition totemstats.h:59
uint32_t continuous_gather
Definition totemstats.h:78
uint64_t recovery_entered
Definition totemstats.h:74
uint64_t rx_msg_dropped
Definition totemstats.h:77
uint64_t gather_entered
Definition totemstats.h:70
uint64_t memb_commit_token_rx
Definition totemstats.h:65
uint64_t mcast_retx
Definition totemstats.h:62
uint64_t mcast_tx
Definition totemstats.h:61
uint64_t memb_commit_token_tx
Definition totemstats.h:64
uint64_t operational_token_lost
Definition totemstats.h:69
uint64_t operational_entered
Definition totemstats.h:68
uint64_t gather_token_lost
Definition totemstats.h:71
uint64_t commit_token_lost
Definition totemstats.h:73
uint64_t token_hold_cancel_tx
Definition totemstats.h:66
uint64_t orf_token_rx
Definition totemstats.h:56
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition totemstats.h:90
uint64_t recovery_token_lost
Definition totemstats.h:75
uint64_t commit_entered
Definition totemstats.h:72
uint64_t memb_merge_detect_rx
Definition totemstats.h:58
uint64_t memb_join_rx
Definition totemstats.h:60
uint64_t orf_token_tx
Definition totemstats.h:55
uint64_t memb_merge_detect_tx
Definition totemstats.h:57
uint64_t mcast_rx
Definition totemstats.h:63
uint64_t token_hold_cancel_rx
Definition totemstats.h:67
uint64_t consensus_timeouts
Definition totemstats.h:76
#define swab64(x)
The swab64 macro.
Definition swab.h:65
#define swab16(x)
The swab16 macro.
Definition swab.h:39
#define swab32(x)
The swab32 macro.
Definition swab.h:51
totem_event_type
Definition totem.h:287
#define TOTEM_MH_VERSION
Definition totem.h:124
#define FRAME_SIZE_MAX
Definition totem.h:52
cfg_message_crypto_reconfig_phase_t
Definition totem.h:154
#define TOTEM_NODE_STATUS_STRUCTURE_VERSION
Definition totem.h:261
#define TOTEM_MH_MAGIC
Definition totem.h:123
char type
Definition totem.h:2
void totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
const char * totemip_sa_print(const struct sockaddr *sa)
Definition totemip.c:234
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition totemip.c:123
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemnet.c:471
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:553
void * totemnet_buffer_alloc(void *net_context)
Definition totemnet.c:367
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:414
int totemnet_send_flush(void *net_context)
Definition totemnet.c:404
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition totemnet.c:317
void totemnet_buffer_release(void *net_context, void *ptr)
Definition totemnet.c:375
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:426
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:533
int totemnet_finalize(void *net_context)
Definition totemnet.c:306
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition totemnet.c:292
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition totemnet.c:497
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition totemnet.c:383
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition totemnet.c:510
int totemnet_recv_flush(void *net_context)
Definition totemnet.c:394
int totemnet_iface_check(void *net_context)
Definition totemnet.c:452
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:439
int totemnet_recv_mcast_empty(void *net_context)
Definition totemnet.c:522
int totemnet_nodestatus_get(void *net_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemnet.c:484
void totemnet_stats_clear(void *net_context)
Definition totemnet.c:619
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition totemnet.c:589
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemnet.c:603
Totem Network interface - also does encryption/decryption.
int totemsrp_my_family_get(void *srp_context)
Definition totemsrp.c:1134
#define SEQNO_START_TOKEN
Definition totemsrp.c:122
unsigned long long ring_seq
Definition totemsrp.c:4
#define RETRANSMIT_ENTRIES_MAX
Definition totemsrp.c:100
unsigned long long int tv_old
Definition totemsrp.c:3824
unsigned int seq
Definition totemsrp.c:2
#define log_printf(level, format, args...)
Definition totemsrp.c:690
void totemsrp_force_gather(void *context)
Definition totemsrp.c:5232
int rtr_list_entries
Definition totemsrp.c:9
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition totemsrp.c:5153
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition totemsrp.c:819
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition totemsrp.c:3482
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition totemsrp.c:97
void totemsrp_threaded_mode_enable(void *context)
Definition totemsrp.c:5188
struct rtr_item rtr_list[0]
Definition totemsrp.c:10
message_type
Definition totemsrp.c:146
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
Definition totemsrp.c:151
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
Definition totemsrp.c:152
@ MESSAGE_TYPE_ORF_TOKEN
Definition totemsrp.c:147
@ MESSAGE_TYPE_MEMB_JOIN
Definition totemsrp.c:150
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
Definition totemsrp.c:149
@ MESSAGE_TYPE_MCAST
Definition totemsrp.c:148
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition totemsrp.c:5149
#define TOKEN_SIZE_MAX
Definition totemsrp.c:101
encapsulation_type
Definition totemsrp.c:155
@ MESSAGE_NOT_ENCAPSULATED
Definition totemsrp.c:157
@ MESSAGE_ENCAPSULATED
Definition totemsrp.c:156
unsigned int failed_list_entries
Definition totemsrp.c:3
struct message_handlers totemsrp_message_handlers
Definition totemsrp.c:678
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemsrp.c:1042
#define LEAVE_DUMMY_NODEID
Definition totemsrp.c:102
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition totemsrp.c:96
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition totemsrp.c:5092
int guarantee
Definition totemsrp.c:6
unsigned int aru
Definition totemsrp.c:3
gather_state_from
Definition totemsrp.c:542
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
Definition totemsrp.c:546
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
Definition totemsrp.c:551
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
Definition totemsrp.c:549
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
Definition totemsrp.c:543
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
Definition totemsrp.c:552
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
Definition totemsrp.c:545
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
Definition totemsrp.c:554
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
Definition totemsrp.c:558
@ TOTEMSRP_GSFROM_GATHER_MISSING1
Definition totemsrp.c:544
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
Definition totemsrp.c:547
@ TOTEMSRP_GSFROM_MAX
Definition totemsrp.c:559
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
Definition totemsrp.c:556
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
Definition totemsrp.c:548
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
Definition totemsrp.c:550
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
Definition totemsrp.c:555
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
Definition totemsrp.c:553
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
Definition totemsrp.c:557
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition totemsrp.c:1109
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition totemsrp.c:2555
void totemsrp_stats_clear(void *context, int flags)
Definition totemsrp.c:5222
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemsrp.c:5071
void totemsrp_finalize(void *srp_context)
Definition totemsrp.c:1027
struct memb_ring_id ring_id
Definition totemsrp.c:4
void totemsrp_trans_ack(void *context)
Definition totemsrp.c:5195
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemsrp.c:5213
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition totemsrp.c:1123
int addr_entries
Definition totemsrp.c:5
unsigned int backlog
Definition totemsrp.c:6
#define SEQNO_START_MSG
Definition totemsrp.c:121
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition totemsrp.c:2475
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition totemsrp.c:3517
unsigned int received_flg
Definition totemsrp.c:3
struct message_item __attribute__
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5162
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition totemsrp.c:1071
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition totemsrp.c:5204
unsigned int high_delivered
Definition totemsrp.c:2
struct srp_addr system_from
Definition totemsrp.c:1
unsigned int proc_list_entries
Definition totemsrp.c:2
const char * gather_state_from_desc[]
Definition totemsrp.c:562
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5175
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition totemsrp.c:2484
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition totemsrp.c:5020
memb_state
Definition totemsrp.c:277
@ MEMB_STATE_GATHER
Definition totemsrp.c:279
@ MEMB_STATE_RECOVERY
Definition totemsrp.c:281
@ MEMB_STATE_COMMIT
Definition totemsrp.c:280
@ MEMB_STATE_OPERATIONAL
Definition totemsrp.c:278
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition totemstats.h:116
#define TOTEM_TOKEN_STATS_MAX
Definition totemstats.h:89