Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handshake regression #407

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/logproto/logproto-buffered-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ LogProtoPrepareAction log_proto_buffered_server_prepare(LogProtoServer *s, GIOCo
gint *timeout G_GNUC_UNUSED);
LogProtoBufferedServerState *log_proto_buffered_server_get_state(LogProtoBufferedServer *self);
void log_proto_buffered_server_put_state(LogProtoBufferedServer *self);
gboolean log_proto_buffered_server_restart_with_state(LogProtoServer *s,
PersistState *persist_state, const gchar *persist_name);

/* LogProtoBufferedServer */
gboolean log_proto_buffered_server_validate_options_method(LogProtoServer *s);
Expand Down
28 changes: 28 additions & 0 deletions lib/logproto/logproto-text-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,33 @@ log_proto_text_server_set_multi_line(LogProtoServer *s, MultiLineLogic *multi_li
self->multi_line = multi_line;
}

static gboolean
log_proto_text_server_restart_with_state(LogProtoServer *s, PersistState *persist_state, const gchar *persist_name)
{
LogProtoTextServer *self = (LogProtoTextServer *) s;

gboolean res = log_proto_buffered_server_restart_with_state(s, persist_state, persist_name);
if (!res)
return FALSE;

if (!self->super.buffer)
return FALSE;

LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
const guchar *buffer_start = self->super.buffer + state->pending_buffer_pos;
gsize buffer_bytes = state->pending_buffer_end - state->pending_buffer_pos;

if (buffer_bytes > 0)
{
const guchar *eom = self->find_eom(buffer_start, buffer_bytes);
if (eom)
self->cached_eol_pos = eom - self->super.buffer;
MrAnno marked this conversation as resolved.
Show resolved Hide resolved
}
log_proto_buffered_server_put_state(&self->super);

return TRUE;
}

void
log_proto_text_server_free(LogProtoServer *s)
{
Expand All @@ -306,6 +333,7 @@ log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, co
log_proto_buffered_server_init(&self->super, transport, options);
self->super.super.prepare = log_proto_text_server_prepare_method;
self->super.super.free_fn = log_proto_text_server_free;
self->super.super.restart_with_state = log_proto_text_server_restart_with_state;
self->super.fetch_from_buffer = log_proto_text_server_fetch_from_buffer;
self->super.flush = log_proto_text_server_flush;
self->find_eom = find_eom;
Expand Down
19 changes: 0 additions & 19 deletions lib/logreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,6 @@ log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr)
self->local_addr = g_sockaddr_ref(local_addr);
}

void
log_reader_set_immediate_check(LogReader *s)
{
LogReader *self = (LogReader *) s;

self->immediate_check = TRUE;
}

void
log_reader_set_options(LogReader *s, LogPipe *control, LogReaderOptions *options,
const gchar *stats_id, StatsClusterKeyBuilder *kb)
Expand Down Expand Up @@ -160,15 +152,13 @@ log_reader_disable_watches(LogReader *self)
static void
log_reader_suspend_until_awoken(LogReader *self)
{
self->immediate_check = FALSE;
log_reader_disable_watches(self);
self->suspended = TRUE;
}

static void
log_reader_force_check_in_next_poll(LogReader *self)
{
self->immediate_check = FALSE;
log_reader_disable_watches(self);
self->suspended = FALSE;

Expand Down Expand Up @@ -326,12 +316,6 @@ log_reader_update_watches(LogReader *self)
iv_timer_register(&self->idle_timer);
}

if (self->immediate_check)
{
log_reader_force_check_in_next_poll(self);
return;
}

switch (prepare_action)
{
case LPPA_POLL_IO:
Expand Down Expand Up @@ -574,8 +558,6 @@ log_reader_fetch_log(LogReader *self)
}
log_transport_aux_data_destroy(aux);

if (msg_count == self->options->fetch_limit)
self->immediate_check = TRUE;
return 0;
}

Expand Down Expand Up @@ -785,7 +767,6 @@ log_reader_new(GlobalConfig *cfg)
self->super.wakeup = log_reader_wakeup;
self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc;
self->super.metrics.raw_bytes_enabled = TRUE;
self->immediate_check = FALSE;
self->handshake_in_progress = TRUE;
log_reader_init_watches(self);
g_mutex_init(&self->pending_close_lock);
Expand Down
3 changes: 1 addition & 2 deletions lib/logreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct _LogReader
{
LogSource super;
LogProtoServer *proto;
gboolean immediate_check, handshake_in_progress;
gboolean handshake_in_progress;
LogPipe *control;
LogReaderOptions *options;
PollEvents *poll_events;
Expand Down Expand Up @@ -99,7 +99,6 @@ void log_reader_set_follow_filename(LogReader *self, const gchar *follow_filenam
void log_reader_set_name(LogReader *s, const gchar *name);
void log_reader_set_peer_addr(LogReader *s, GSockAddr *peer_addr);
void log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr);
void log_reader_set_immediate_check(LogReader *s);
void log_reader_disable_bookmark_saving(LogReader *s);
void log_reader_open(LogReader *s, LogProtoServer *proto, PollEvents *poll_events);
void log_reader_close_proto(LogReader *s);
Expand Down
23 changes: 4 additions & 19 deletions modules/affile/file-reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ _deinit_sd_logreader(FileReader *self)
}

static void
_setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto, gboolean check_immediately)
_setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto)
{
FileReader *self = (FileReader *) s;

Expand All @@ -172,25 +172,12 @@ _setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto, gbo
self->owner->super.id,
kb);

if (check_immediately)
log_reader_set_immediate_check(self->reader);

/* NOTE: if the file could not be opened, we ignore the last
* remembered file position, if the file is created in the future
* we're going to read from the start. */
log_pipe_append((LogPipe *) self->reader, s);
}

static gboolean
_is_immediate_check_needed(gboolean file_opened, gboolean open_deferred)
{
if (file_opened)
return TRUE;
else if (open_deferred)
return FALSE;
return FALSE;
}

static gboolean
_reader_open_file(LogPipe *s, gboolean recover_state)
{
Expand All @@ -214,7 +201,6 @@ _reader_open_file(LogPipe *s, gboolean recover_state)
{
LogProtoServer *proto;
PollEvents *poll_events;
gboolean check_immediately;

poll_events = _construct_poll_events(self, fd);
if (!poll_events)
Expand All @@ -224,8 +210,9 @@ _reader_open_file(LogPipe *s, gboolean recover_state)
}
proto = _construct_proto(self, fd);

check_immediately = _is_immediate_check_needed(file_opened, open_deferred);
_setup_logreader(s, poll_events, proto, check_immediately);
_setup_logreader(s, poll_events, proto);
if (recover_state)
_recover_state(s, cfg, proto);
if (!log_pipe_init((LogPipe *) self->reader))
{
msg_error("Error initializing log_reader, closing fd",
Expand All @@ -235,8 +222,6 @@ _reader_open_file(LogPipe *s, gboolean recover_state)
close(fd);
return FALSE;
}
if (recover_state)
_recover_state(s, cfg, proto);
}
else
{
Expand Down
26 changes: 17 additions & 9 deletions modules/affile/poll-file-changes.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,18 @@ poll_file_changes_stop_watches(PollEvents *s)
}

static void
poll_file_changes_rearm_timer(PollFileChanges *self)
poll_file_changes_rearm_timer(PollFileChanges *self, glong delay)
{
iv_validate_now();
self->follow_timer.expires = iv_now;
timespec_add_msec(&self->follow_timer.expires, self->follow_freq);
timespec_add_msec(&self->follow_timer.expires, delay);
iv_timer_register(&self->follow_timer);
}

static gboolean
poll_file_changes_check_eof(PollFileChanges *self)
{
gint fd = self->fd;
if (fd < 0)
return FALSE;

off_t pos = lseek(fd, 0, SEEK_CUR);
if (pos == (off_t) -1)
Expand All @@ -202,22 +200,32 @@ void
poll_file_changes_update_watches(PollEvents *s, GIOCondition cond)
{
PollFileChanges *self = (PollFileChanges *) s;
gboolean check_again = TRUE;

/* we can only provide input events */
g_assert((cond & ~G_IO_IN) == 0);

poll_file_changes_stop_watches(s);

if (self->fd < 0)
{
/* file does not exist yet, go back checking after follow_freq */
poll_file_changes_rearm_timer(self, self->follow_freq);
return;
}

if (poll_file_changes_check_eof(self))
{
msg_trace("End of file, following file",
evt_tag_str("follow_filename", self->follow_filename));
check_again = poll_file_changes_on_eof(self);
if (poll_file_changes_on_eof(self))
poll_file_changes_rearm_timer(self, self->follow_freq);
}
else
{
msg_trace("File exists and contains data",
evt_tag_str("follow_filename", self->follow_filename));
poll_file_changes_rearm_timer(self, 0);
}

if (check_again)
poll_file_changes_rearm_timer(self);
}

void
Expand Down
Loading