Skip to content

Commit

Permalink
outputs: add UnixDatagram output
Browse files Browse the repository at this point in the history
The UnixDatagram output can be used to forward events to a local output,
for example, syslog-ng. This is useful when events need to be processed
further (transformed, filtered, etc.).

The output is datagram-oriented in order to support multi-line raw events
as well.
  • Loading branch information
MrAnno committed Nov 13, 2023
1 parent cd6ad50 commit f6a61ca
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 4 deletions.
5 changes: 5 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ async fn main() {
)
.arg(arg!(--filename <FILENAME> "Name of the file where logs will be written.").default_value("messages"))
)
.subcommand(
Command::new("unixdatagram")
.about("UnixDatagram output")
.arg(arg!(<path> "Path"))
)
)
.subcommand(
Command::new("delete")
Expand Down
15 changes: 14 additions & 1 deletion cli/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use common::{
subscription::{
ContentFormat, FileConfiguration, KafkaConfiguration, PrincsFilter, PrincsFilterOperation,
SubscriptionData, SubscriptionMachineState, SubscriptionOutput, SubscriptionOutputFormat,
TcpConfiguration, RedisConfiguration,
TcpConfiguration, RedisConfiguration, UnixDatagramConfiguration,
},
};
use roxmltree::{Document, Node};
Expand Down Expand Up @@ -637,6 +637,9 @@ async fn outputs_add(subscription: &mut SubscriptionData, matches: &ArgMatches)
Some(("files", matches)) => {
SubscriptionOutput::Files(format, outputs_add_files(matches)?, true)
}
Some(("unixdatagram", matches)) => {
SubscriptionOutput::UnixDatagram(format, outputs_add_unix_datagram(matches)?, true)
}
_ => {
bail!("Missing output type")
}
Expand Down Expand Up @@ -715,6 +718,16 @@ fn outputs_add_files(matches: &ArgMatches) -> Result<FileConfiguration> {
Ok(config)
}

fn outputs_add_unix_datagram(matches: &ArgMatches) -> Result<UnixDatagramConfiguration> {
let path = matches
.get_one::<String>("path")
.ok_or_else(|| anyhow!("Missing UnixDatagram path"))?
.to_owned();

info!("Adding UnixDatagram output : {}", path);
Ok(UnixDatagramConfiguration::new(path))
}

async fn outputs_delete(subscription: &mut SubscriptionData, matches: &ArgMatches) -> Result<()> {
let index = matches
.get_one::<usize>("index")
Expand Down
28 changes: 28 additions & 0 deletions common/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,29 @@ impl FileConfiguration {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct UnixDatagramConfiguration {
path: String,
}

impl UnixDatagramConfiguration {
pub fn new(path: String) -> Self {
Self { path }
}

pub fn path(&self) -> &str {
self.path.as_ref()
}
}

#[derive(Debug, Serialize, Clone, Eq, PartialEq, Deserialize)]
pub enum SubscriptionOutput {
// The last bool indicates whether the output is enabled or not.
Files(SubscriptionOutputFormat, FileConfiguration, bool),
Kafka(SubscriptionOutputFormat, KafkaConfiguration, bool),
Tcp(SubscriptionOutputFormat, TcpConfiguration, bool),
Redis(SubscriptionOutputFormat, RedisConfiguration, bool),
UnixDatagram(SubscriptionOutputFormat, UnixDatagramConfiguration, bool),
}

impl SubscriptionOutput {
Expand All @@ -137,6 +153,7 @@ impl SubscriptionOutput {
SubscriptionOutput::Kafka(format, _, _) => format,
SubscriptionOutput::Tcp(format, _, _) => format,
SubscriptionOutput::Redis(format, _, _) => format,
SubscriptionOutput::UnixDatagram(format, _, _) => format,
}
}

Expand All @@ -146,6 +163,7 @@ impl SubscriptionOutput {
SubscriptionOutput::Kafka(_, _, enabled) => *enabled,
SubscriptionOutput::Tcp(_, _, enabled) => *enabled,
SubscriptionOutput::Redis(_, _, enabled) => *enabled,
SubscriptionOutput::UnixDatagram(_, _, enabled) => *enabled,
}
}

Expand All @@ -163,6 +181,9 @@ impl SubscriptionOutput {
SubscriptionOutput::Redis(format, config, _) => {
SubscriptionOutput::Redis(format.clone(), config.clone(), value)
}
SubscriptionOutput::UnixDatagram(format, config, _) => {
SubscriptionOutput::UnixDatagram(format.clone(), config.clone(), value)
}
}
}
}
Expand Down Expand Up @@ -198,6 +219,13 @@ impl Display for SubscriptionOutput {
enabled, format, config
)
}
SubscriptionOutput::UnixDatagram(format, config, enabled) => {
write!(
f,
"Enabled: {:?}, Format: {}, Output: UnixDatagram({:?})",
enabled, format, config
)
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion server/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use common::subscription::{FileConfiguration, KafkaConfiguration, SubscriptionOutput, RedisConfiguration};
use common::subscription::{FileConfiguration, KafkaConfiguration, SubscriptionOutput, RedisConfiguration, UnixDatagramConfiguration};

use crate::{event::EventMetadata, formatter::Format};

Expand All @@ -12,6 +12,7 @@ pub enum OutputType {
Kafka(Format, KafkaConfiguration, bool),
Redis(Format, RedisConfiguration, bool),
Tcp(Format, String, u16, bool),
UnixDatagram(Format, UnixDatagramConfiguration, bool),
}

impl From<&SubscriptionOutput> for OutputType {
Expand All @@ -32,6 +33,9 @@ impl From<&SubscriptionOutput> for OutputType {
config.port(),
*enabled,
),
SubscriptionOutput::UnixDatagram(sof, config, enabled) => {
OutputType::UnixDatagram(sof.into(), config.clone(), *enabled)
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion server/src/outputs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod file;
pub mod kafka;
pub mod tcp;
pub mod redis;
pub mod redis;
pub mod unix;
149 changes: 149 additions & 0 deletions server/src/outputs/unix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::sync::Arc;

use crate::{event::EventMetadata, formatter::Format, output::Output};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use common::subscription::UnixDatagramConfiguration;
use log::{debug, info, warn};
use std::path::Path;
use tokio::{
net::UnixDatagram,
sync::{mpsc, oneshot},
};

use tokio_util::sync::CancellationToken;

#[derive(Debug)]
pub struct WriteUnixDatagramMessage {
events: Arc<Vec<Arc<String>>>,
resp: oneshot::Sender<Result<()>>,
}

fn send_response(sender: oneshot::Sender<Result<()>>, msg: Result<()>) {
if let Err(e) = sender.send(msg) {
warn!(
"Failed to send UnixDatagram write result because the receiver dropped. Result was: {:?}",
e
);
}
}

pub async fn run(
path: String,
mut task_rx: mpsc::Receiver<WriteUnixDatagramMessage>,
cancellation_token: CancellationToken,
) {
let mut dgram_opt: Option<UnixDatagram> = None;
'mainloop: loop {
tokio::select! {
Some(message) = task_rx.recv() => {
if dgram_opt.is_none() {
let dgram = match UnixDatagram::unbound() {
Ok(dgram) => dgram,
Err(e) => {
warn!("Failed to create UnixDatagram socket: {}", e);
send_response(message.resp, Err(anyhow!(format!("Failed to create UnixDatagram socket: {}", e))));
continue;
}
};

match dgram.connect(Path::new(&path)) {
Ok(_) => {
dgram_opt = Some(dgram);
},
Err(e) => {
warn!("Failed to connect to {}: {}", path, e);
send_response(message.resp, Err(anyhow!(format!("Failed to connect to {}: {}", path, e))));
continue;
}
};
}

// This should never fail
let dgram = match dgram_opt.as_mut() {
Some(dgram) => dgram,
None => {
warn!("UnixDatagram is unset !");
send_response(message.resp, Err(anyhow!(format!("UnixDatagram of {} is unset!", path))));
continue;
}
};

for event in message.events.iter() {
if let Err(e) = dgram.send(event.as_bytes()).await {
dgram_opt = None;
send_response(message.resp, Err(anyhow!(format!("Failed to write to UnixDatagram ({}): {}", path, e))));
continue 'mainloop;
}
}

send_response(message.resp, Ok(()));
},
_ = cancellation_token.cancelled() => {
break;
}
};
}
info!("Exiting UnixDatagram output task ({})", path);
}

pub struct OutputUnixDatagram {
format: Format,
path: String,
task_tx: mpsc::Sender<WriteUnixDatagramMessage>,
task_ct: CancellationToken,
}

impl OutputUnixDatagram {
pub fn new(format: Format, config: &UnixDatagramConfiguration) -> Result<Self> {
debug!("Initialize UnixDatagram output with format {:?} and path {}", format, config.path());

let (task_tx, task_rx) = mpsc::channel(32);

let task_ct = CancellationToken::new();
let cloned_task_ct = task_ct.clone();

let path = config.path().to_string();

tokio::spawn(async move { run(path, task_rx, cloned_task_ct).await });

Ok(OutputUnixDatagram {
format,
path: config.path().to_string(),
task_tx,
task_ct,
})
}
}

#[async_trait]
impl Output for OutputUnixDatagram {
async fn write(
&self,
_metadata: Arc<EventMetadata>,
events: Arc<Vec<Arc<String>>>,
) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.task_tx
.send(WriteUnixDatagramMessage { events, resp: tx })
.await?;

rx.await??;

Ok(())
}

fn describe(&self) -> String {
format!("UnixDatagram ({})", self.path)
}

fn format(&self) -> &Format {
&self.format
}
}

impl Drop for OutputUnixDatagram {
fn drop(&mut self) {
self.task_ct.cancel();
}
}
5 changes: 4 additions & 1 deletion server/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::{
use crate::{
formatter::Format,
output::Output,
outputs::{file::OutputFile, kafka::OutputKafka, tcp::OutputTcp},
outputs::{file::OutputFile, kafka::OutputKafka, tcp::OutputTcp, unix::OutputUnixDatagram},
};

use crate::outputs::redis::OutputRedis;
Expand Down Expand Up @@ -79,6 +79,9 @@ impl Subscription {
SubscriptionOutput::Redis(format, config, _) => {
Arc::new(Box::new(OutputRedis::new(Format::from(format), config)?))
}
SubscriptionOutput::UnixDatagram(format, config, _) => {
Arc::new(Box::new(OutputUnixDatagram::new(Format::from(format), config)?))
}
});
}
}
Expand Down

0 comments on commit f6a61ca

Please sign in to comment.