Skip to content

Commit

Permalink
logreader: drop log_reader_set_immediate_check()
Browse files Browse the repository at this point in the history
This was to force the LogReader instance to immediately start reading
once at startup. This is being replaced by poll-fd-events reporting
readability if we are at the middle of the file and also by LogProto
if we have buffered data.

Signed-off-by: Balazs Scheidler <balazs.scheidler@axoflow.com>
  • Loading branch information
bazsi committed Dec 4, 2024
1 parent 8d0ea2e commit c2cf91f
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 40 deletions.
19 changes: 0 additions & 19 deletions lib/logreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,6 @@ log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr)
self->local_addr = g_sockaddr_ref(local_addr);
}

void
log_reader_set_immediate_check(LogReader *s)
{
LogReader *self = (LogReader *) s;

self->immediate_check = TRUE;
}

void
log_reader_set_options(LogReader *s, LogPipe *control, LogReaderOptions *options,
const gchar *stats_id, StatsClusterKeyBuilder *kb)
Expand Down Expand Up @@ -160,15 +152,13 @@ log_reader_disable_watches(LogReader *self)
static void
log_reader_suspend_until_awoken(LogReader *self)
{
self->immediate_check = FALSE;
log_reader_disable_watches(self);
self->suspended = TRUE;
}

static void
log_reader_force_check_in_next_poll(LogReader *self)
{
self->immediate_check = FALSE;
log_reader_disable_watches(self);
self->suspended = FALSE;

Expand Down Expand Up @@ -326,12 +316,6 @@ log_reader_update_watches(LogReader *self)
iv_timer_register(&self->idle_timer);
}

if (self->immediate_check)
{
log_reader_force_check_in_next_poll(self);
return;
}

switch (prepare_action)
{
case LPPA_POLL_IO:
Expand Down Expand Up @@ -574,8 +558,6 @@ log_reader_fetch_log(LogReader *self)
}
log_transport_aux_data_destroy(aux);

if (msg_count == self->options->fetch_limit)
self->immediate_check = TRUE;
return 0;
}

Expand Down Expand Up @@ -785,7 +767,6 @@ log_reader_new(GlobalConfig *cfg)
self->super.wakeup = log_reader_wakeup;
self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc;
self->super.metrics.raw_bytes_enabled = TRUE;
self->immediate_check = FALSE;
self->handshake_in_progress = TRUE;
log_reader_init_watches(self);
g_mutex_init(&self->pending_close_lock);
Expand Down
3 changes: 1 addition & 2 deletions lib/logreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct _LogReader
{
LogSource super;
LogProtoServer *proto;
gboolean immediate_check, handshake_in_progress;
gboolean handshake_in_progress;
LogPipe *control;
LogReaderOptions *options;
PollEvents *poll_events;
Expand Down Expand Up @@ -99,7 +99,6 @@ void log_reader_set_follow_filename(LogReader *self, const gchar *follow_filenam
void log_reader_set_name(LogReader *s, const gchar *name);
void log_reader_set_peer_addr(LogReader *s, GSockAddr *peer_addr);
void log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr);
void log_reader_set_immediate_check(LogReader *s);
void log_reader_disable_bookmark_saving(LogReader *s);
void log_reader_open(LogReader *s, LogProtoServer *proto, PollEvents *poll_events);
void log_reader_close_proto(LogReader *s);
Expand Down
23 changes: 4 additions & 19 deletions modules/affile/file-reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ _deinit_sd_logreader(FileReader *self)
}

static void
_setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto, gboolean check_immediately)
_setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto)
{
FileReader *self = (FileReader *) s;

Expand All @@ -172,25 +172,12 @@ _setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto, gbo
self->owner->super.id,
kb);

if (check_immediately)
log_reader_set_immediate_check(self->reader);

/* NOTE: if the file could not be opened, we ignore the last
* remembered file position, if the file is created in the future
* we're going to read from the start. */
log_pipe_append((LogPipe *) self->reader, s);
}

static gboolean
_is_immediate_check_needed(gboolean file_opened, gboolean open_deferred)
{
if (file_opened)
return TRUE;
else if (open_deferred)
return FALSE;
return FALSE;
}

static gboolean
_reader_open_file(LogPipe *s, gboolean recover_state)
{
Expand All @@ -214,7 +201,6 @@ _reader_open_file(LogPipe *s, gboolean recover_state)
{
LogProtoServer *proto;
PollEvents *poll_events;
gboolean check_immediately;

poll_events = _construct_poll_events(self, fd);
if (!poll_events)
Expand All @@ -224,8 +210,9 @@ _reader_open_file(LogPipe *s, gboolean recover_state)
}
proto = _construct_proto(self, fd);

check_immediately = _is_immediate_check_needed(file_opened, open_deferred);
_setup_logreader(s, poll_events, proto, check_immediately);
_setup_logreader(s, poll_events, proto);
if (recover_state)
_recover_state(s, cfg, proto);
if (!log_pipe_init((LogPipe *) self->reader))
{
msg_error("Error initializing log_reader, closing fd",
Expand All @@ -235,8 +222,6 @@ _reader_open_file(LogPipe *s, gboolean recover_state)
close(fd);
return FALSE;
}
if (recover_state)
_recover_state(s, cfg, proto);
}
else
{
Expand Down

0 comments on commit c2cf91f

Please sign in to comment.