From f83b6b10fdfc61bcbe2e1d44c1da260bf6cf81b0 Mon Sep 17 00:00:00 2001 From: Hofi Date: Fri, 22 Nov 2024 21:21:36 +0100 Subject: [PATCH] logproto: fixed state handling of the new internal handshake_in_progress flag There were multiple issues here - proto change did not force re-handshake if needed - initial state handling led to a drop of the first flush event both on client and server side that e.g. led to incorrect persist state processing at restart Signed-off-by: Hofi --- lib/logproto/logproto-client.h | 13 +++++++++---- lib/logproto/logproto-server.h | 13 +++++++++---- lib/logreader.c | 15 +++++++++++---- lib/logwriter.c | 12 ++++++++++-- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/lib/logproto/logproto-client.h b/lib/logproto/logproto-client.h index f571b26f82..ce242b8819 100644 --- a/lib/logproto/logproto-client.h +++ b/lib/logproto/logproto-client.h @@ -112,13 +112,18 @@ log_proto_client_validate_options(LogProtoClient *self) return self->validate_options(self); } +static inline gboolean +log_proto_client_needs_handshake(LogProtoClient *s) +{ + return s->handshake != NULL; +} + static inline LogProtoStatus log_proto_client_handshake(LogProtoClient *s, gboolean *handshake_finished) { - if (s->handshake) - { - return s->handshake(s, handshake_finished); - } + if (log_proto_client_needs_handshake(s)) + return s->handshake(s, handshake_finished); + *handshake_finished = TRUE; return LPS_SUCCESS; } diff --git a/lib/logproto/logproto-server.h b/lib/logproto/logproto-server.h index 76904143cc..53b65fc3b8 100644 --- a/lib/logproto/logproto-server.h +++ b/lib/logproto/logproto-server.h @@ -99,13 +99,18 @@ log_proto_server_validate_options(LogProtoServer *self) return self->validate_options(self); } +static inline gboolean +log_proto_server_needs_handshake(LogProtoServer *s) +{ + return s->handshake != NULL; +} + static inline LogProtoStatus log_proto_server_handshake(LogProtoServer *s, gboolean *handshake_finished) { - if (s->handshake) - { - return s->handshake(s, handshake_finished); - } + if (log_proto_server_needs_handshake(s)) + return s->handshake(s, handshake_finished); + *handshake_finished = TRUE; return LPS_SUCCESS; } diff --git a/lib/logreader.c b/lib/logreader.c index 0001912b15..ac119fbf4e 100644 --- a/lib/logreader.c +++ b/lib/logreader.c @@ -231,7 +231,12 @@ log_reader_apply_proto_and_poll_events(LogReader *self, LogProtoServer *proto, P self->proto = proto; if (self->proto) - log_proto_server_set_wakeup_cb(self->proto, (LogProtoServerWakeupFunc) log_reader_wakeup, self); + { + log_proto_server_set_wakeup_cb(self->proto, (LogProtoServerWakeupFunc) log_reader_wakeup, self); + self->handshake_in_progress = log_proto_server_needs_handshake(self->proto); + } + else + self->handshake_in_progress = FALSE; self->poll_events = poll_events; } @@ -501,11 +506,13 @@ log_reader_fetch_log(LogReader *self) if ((self->options->flags & LR_IGNORE_AUX_DATA)) aux = NULL; - log_transport_aux_data_init(aux); + if (self->handshake_in_progress) { - return log_reader_process_handshake(self); + gboolean succ = log_reader_process_handshake(self); + if (FALSE == succ || self->handshake_in_progress) + return FALSE; } /* NOTE: this loop is here to decrease the load on the main loop, we try @@ -777,7 +784,7 @@ log_reader_new(GlobalConfig *cfg) self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc; self->super.metrics.raw_bytes_enabled = TRUE; self->immediate_check = FALSE; - self->handshake_in_progress = TRUE; + self->handshake_in_progress = FALSE; log_reader_init_watches(self); g_mutex_init(&self->pending_close_lock); g_cond_init(&self->pending_close_cond); diff --git a/lib/logwriter.c b/lib/logwriter.c index 236b046837..ae18663d87 100644 --- a/lib/logwriter.c +++ b/lib/logwriter.c @@ -1334,7 +1334,11 @@ log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode) return FALSE; if (self->handshake_in_progress) - return log_writer_process_handshake(self); + { + gboolean succ = log_writer_process_handshake(self); + if (FALSE == succ || self->handshake_in_progress) + return FALSE; + } /* NOTE: in case we're reloading or exiting we flush all queued items as * long as the destination can consume it. This is not going to be an @@ -1720,7 +1724,11 @@ log_writer_set_proto(LogWriter *self, LogProtoClient *proto) log_proto_client_set_client_flow_control(self->proto, &flow_control_funcs); log_proto_client_set_options(self->proto, &self->options->proto_options.super); + + self->handshake_in_progress = log_proto_client_needs_handshake(self->proto); } + else + self->handshake_in_progress = FALSE; } static void @@ -1923,7 +1931,7 @@ log_writer_new(guint32 flags, GlobalConfig *cfg) self->flags = flags; self->line_buffer = g_string_sized_new(128); self->pollable_state = -1; - self->handshake_in_progress = TRUE; + self->handshake_in_progress = FALSE; init_sequence_number(&self->seq_num); log_writer_init_watches(self);