Skip to content

Commit

Permalink
internal schema changes, add new stdout category for serverstate
Browse files Browse the repository at this point in the history
  • Loading branch information
circlesabound committed Apr 21, 2024
1 parent e7e2f09 commit 2682713
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 136 deletions.
18 changes: 9 additions & 9 deletions src/agent/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,17 +899,17 @@ impl AgentController {
server_state,
player_count,
} => match server_state {
server::InternalServerState::Ready
| server::InternalServerState::PreparedToHostGame
| server::InternalServerState::CreatingGame => ServerStatus::PreGame,
server::InternalServerState::InGame
| server::InternalServerState::InGameSavingMap => {
InternalServerState::Ready
| InternalServerState::PreparedToHostGame
| InternalServerState::CreatingGame => ServerStatus::PreGame,
InternalServerState::InGame
| InternalServerState::InGameSavingMap => {
ServerStatus::InGame { player_count }
}
server::InternalServerState::DisconnectingScheduled
| server::InternalServerState::Disconnecting
| server::InternalServerState::Disconnected
| server::InternalServerState::Closed => ServerStatus::PostGame,
InternalServerState::DisconnectingScheduled
| InternalServerState::Disconnecting
| InternalServerState::Disconnected
| InternalServerState::Closed => ServerStatus::PostGame,
},
};
self.reply_success(AgentOutMessage::ServerStatus(status), operation_id)
Expand Down
19 changes: 1 addition & 18 deletions src/agent/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,18 @@ use std::{
sync::atomic::{AtomicU32, Ordering},
};

use lazy_static::lazy_static;
use log::{debug, error, info, warn};
use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};
use regex::Regex;
use strum_macros::EnumString;
use tokio::io::AsyncBufReadExt;
use tokio::process::*;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;

use crate::error::{Error, Result};
use fctrl::schema::ServerStartSaveFile;
use fctrl::schema::*;

use settings::*;

Expand Down Expand Up @@ -290,17 +287,3 @@ impl StartableShortLivedInstance {
pub struct StoppedShortLivedInstance {
pub exit_status: ExitStatus,
}

/// Internal state of the Factorio multiplayer server as tracked by output logs
#[derive(Clone, Debug, EnumString)]
pub enum InternalServerState {
Ready,
PreparedToHostGame,
CreatingGame,
InGame,
InGameSavingMap,
DisconnectingScheduled,
Disconnecting,
Disconnected,
Closed,
}
7 changes: 1 addition & 6 deletions src/agent/server/mods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use factorio_file_parser::ModSettings;
use futures::future;
use lazy_static::lazy_static;
use log::{debug, error, info};
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::fs;

Expand All @@ -18,7 +17,7 @@ use crate::{
util::downloader,
};

use fctrl::schema::*;
use fctrl::schema::{regex::*, *};

use super::settings::Secrets;

Expand Down Expand Up @@ -269,10 +268,6 @@ impl Mod {
// Per https://wiki.factorio.com/Tutorial:Mod_structure, mod zip files must be named with the pattern:
// {mod-name}_{version-number}.zip
// No support for unzipped mods (yet?)
lazy_static! {
static ref MOD_FILENAME_RE: Regex = Regex::new(r"^(.+)_(\d+\.\d+\.\d+)\.zip$").unwrap();
}

if let Some(captures) = MOD_FILENAME_RE.captures(s) {
let name = captures.get(1).unwrap().as_str().to_string();
let version = captures.get(2).unwrap().as_str().to_string();
Expand Down
22 changes: 1 addition & 21 deletions src/agent/server/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
*,
},
};
use fctrl::schema::regex::*;

pub struct ProcessManager {
running_instance: Arc<Mutex<Option<StartedInstance>>>,
Expand Down Expand Up @@ -185,11 +186,6 @@ pub async fn parse_process_stdout(
Ok(line_opt) => {
if let Some(line) = line_opt {
// Parse for internal server state (whether the game is running, stopped, etc)
lazy_static! {
static ref STATE_CHANGE_RE: Regex =
Regex::new(r"changing state from\(([a-zA-Z]+)\) to\(([a-zA-Z]+)\)")
.unwrap();
}
if let Some(captures) = STATE_CHANGE_RE.captures(&line) {
if let Ok(from) =
InternalServerState::from_str(captures.get(1).unwrap().as_str())
Expand Down Expand Up @@ -217,29 +213,13 @@ pub async fn parse_process_stdout(
}

// Parse for player join / leave, update counter
lazy_static! {
static ref JOIN_RE: Regex = Regex::new(
r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[JOIN\] ([^:]+) joined the game$"
)
.unwrap();
static ref LEAVE_RE: Regex = Regex::new(
r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[LEAVE\] ([^:]+) left the game$"
)
.unwrap();
}
if JOIN_RE.is_match(&line) {
player_count.fetch_add(1, Ordering::Relaxed);
} else if LEAVE_RE.is_match(&line) {
player_count.fetch_sub(1, Ordering::Relaxed);
}

// If not already open, parse for "RCON ready message", then attempt to connect
lazy_static! {
static ref RCON_READY_RE: Regex = Regex::new(
r"Starting RCON interface at IP ADDR:\(\{\d+\.\d+\.\d+\.\d+:(\d+)\}\)"
)
.unwrap();
}
if !rcon_initialised {
if let Some(captures) = RCON_READY_RE.captures(&line) {
match u16::from_str(captures.get(1).unwrap().as_str()) {
Expand Down
94 changes: 51 additions & 43 deletions src/mgmt-server/clients.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
use std::{
collections::HashMap,
pin::Pin,
sync::{
collections::HashMap, pin::Pin, str::FromStr, sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
},
time::Duration,
}, time::Duration
};

use chrono::Utc;
use fctrl::schema::{
AgentOutMessage, AgentRequest, AgentRequestWithId, AgentResponseWithId, AgentStreamingMessage, AgentStreamingMessageInner, FactorioVersion, ModObject, ModSettingsBytes, OperationId, OperationStatus, RconConfig, Save, SecretsObject, ServerSettingsConfig, ServerStartSaveFile, ServerStatus, WhitelistObject
regex::*,
*,
};
use futures::{future, pin_mut, Future, SinkExt, Stream, StreamExt};
use lazy_static::lazy_static;
use log::{error, info, trace, warn};
use regex::Regex;
use stream_cancel::Valved;
use tokio::sync::{mpsc, Mutex};
use tokio_tungstenite::tungstenite::Message;
Expand All @@ -24,7 +20,8 @@ use uuid::Uuid;
use crate::{
error::{Error, Result},
events::{
broker::EventBroker, Event, TopicName, CHAT_TOPIC_NAME, JOIN_TOPIC_NAME, LEAVE_TOPIC_NAME, OPERATION_TOPIC_NAME, RPC_TOPIC_NAME, STDOUT_TOPIC_CHAT_CATEGORY, STDOUT_TOPIC_CHAT_DISCORD_ECHO_CATEGORY, STDOUT_TOPIC_JOINLEAVE_CATEGORY, STDOUT_TOPIC_NAME, STDOUT_TOPIC_RPC, STDOUT_TOPIC_SYSTEMLOG_CATEGORY
broker::EventBroker,
*,
},
};

Expand Down Expand Up @@ -374,7 +371,7 @@ impl AgentApiClient {
};
let mut tags = HashMap::new();
tags.insert(
TopicName(OUTGOING_TOPIC_NAME.to_owned()),
TopicName::new(OUTGOING_TOPIC_NAME),
self.ws_addr.to_string(),
);
let timestamp = Utc::now();
Expand All @@ -388,7 +385,7 @@ impl AgentApiClient {
let id_clone = id.clone();
let subscriber = self
.event_broker
.subscribe(TopicName(OPERATION_TOPIC_NAME.to_owned()), move |v| {
.subscribe(TopicName::new(OPERATION_TOPIC_NAME), move |v| {
v == id_clone.0
})
.await;
Expand Down Expand Up @@ -439,7 +436,7 @@ pub async fn connect(ws_addr: url::Url, event_broker: Arc<EventBroker>) -> Resul
let (ws_write, mut ws_read) = ws_stream.split();

let outgoing_stream = event_broker
.subscribe(TopicName(OUTGOING_TOPIC_NAME.to_owned()), move |s| {
.subscribe(TopicName::new(OUTGOING_TOPIC_NAME), move |s| {
ws_addr.to_string() == s
})
.await;
Expand Down Expand Up @@ -533,7 +530,7 @@ fn tag_incoming_message(s: String) -> Option<Event> {
if let Ok(response_with_id) = serde_json::from_str::<AgentResponseWithId>(&s) {
let mut tags = HashMap::new();
tags.insert(
TopicName(OPERATION_TOPIC_NAME.to_string()),
TopicName::new(OPERATION_TOPIC_NAME),
response_with_id.operation_id.into(),
);
let event = Event {
Expand Down Expand Up @@ -637,67 +634,78 @@ fn fuse_agent_response_stream(s: impl Stream<Item = Event>) -> impl Stream<Item
}

fn tag_server_stdout_message(message: &str, tags: &mut HashMap<TopicName, String>) {
lazy_static! {
static ref CHAT_DISCORD_ECHO_RE: Regex =
Regex::new(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[CHAT\] <server>: \[Discord\] (.+)$").unwrap();
static ref CHAT_RE: Regex =
Regex::new(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[CHAT\] ([^:]+): (.+)$").unwrap();
static ref JOIN_RE: Regex =
Regex::new(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[JOIN\] ([^:]+) joined the game$")
.unwrap();
static ref LEAVE_RE: Regex =
Regex::new(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[LEAVE\] ([^:]+) left the game$")
.unwrap();
static ref RPC_RE: Regex = Regex::new(r"^FCTRL_RPC (.+)$").unwrap();
}

if let Some(chat_captures) = CHAT_DISCORD_ECHO_RE.captures(message) {
// echo from achievement-preserve setting discord chat link
// tag separately and not as regular chat
let _timestamp = chat_captures.get(1).unwrap().as_str().to_string();
tags.insert(
TopicName(STDOUT_TOPIC_NAME.to_string()),
STDOUT_TOPIC_CHAT_DISCORD_ECHO_CATEGORY.to_string(),
TopicName::new(STDOUT_TOPIC_NAME),
StdoutTopicCategory::ChatDiscordEcho.to_string(),
);
} else if let Some(chat_captures) = CHAT_RE.captures(message) {
let _timestamp = chat_captures.get(1).unwrap().as_str().to_string();
let user = chat_captures.get(2).unwrap().as_str().to_string();
let msg = chat_captures.get(3).unwrap().as_str().to_string();
tags.insert(
TopicName(STDOUT_TOPIC_NAME.to_string()),
STDOUT_TOPIC_CHAT_CATEGORY.to_string(),
TopicName::new(STDOUT_TOPIC_NAME),
StdoutTopicCategory::Chat.to_string(),
);
tags.insert(
TopicName(CHAT_TOPIC_NAME.to_string()),
TopicName::new(CHAT_TOPIC_NAME),
format!("{}: {}", user, msg),
);
} else if let Some(join_captures) = JOIN_RE.captures(message) {
let _timestamp = join_captures.get(1).unwrap().as_str().to_string();
let user = join_captures.get(2).unwrap().as_str().to_string();
tags.insert(
TopicName(STDOUT_TOPIC_NAME.to_string()),
STDOUT_TOPIC_JOINLEAVE_CATEGORY.to_string(),
TopicName::new(STDOUT_TOPIC_NAME),
StdoutTopicCategory::JoinLeave.to_string(),
);
tags.insert(
TopicName::new(JOIN_TOPIC_NAME),
user
);
tags.insert(TopicName(JOIN_TOPIC_NAME.to_string()), user);
} else if let Some(leave_captures) = LEAVE_RE.captures(message) {
let _timestamp = leave_captures.get(1).unwrap().as_str().to_string();
let user = leave_captures.get(2).unwrap().as_str().to_string();
tags.insert(
TopicName(STDOUT_TOPIC_NAME.to_string()),
STDOUT_TOPIC_JOINLEAVE_CATEGORY.to_string(),
TopicName::new(STDOUT_TOPIC_NAME),
StdoutTopicCategory::JoinLeave.to_string(),
);
tags.insert(
TopicName::new(LEAVE_TOPIC_NAME),
user
);
tags.insert(TopicName(LEAVE_TOPIC_NAME.to_string()), user);
} else if let Some(rpc_captures) = RPC_RE.captures(message) {
tags.insert(
TopicName(STDOUT_TOPIC_NAME.to_string()),
STDOUT_TOPIC_RPC.to_string(),
TopicName::new(STDOUT_TOPIC_NAME),
StdoutTopicCategory::Rpc.to_string(),
);
let rpc_command = rpc_captures.get(1).unwrap().as_str().to_string();
tags.insert(TopicName(RPC_TOPIC_NAME.to_string()), rpc_command);
tags.insert(
TopicName::new(RPC_TOPIC_NAME),
rpc_command
);
} else if let Some(state_change_captures) = STATE_CHANGE_RE.captures(message) {
if let Ok(to) = InternalServerState::from_str(state_change_captures.get(2).unwrap().as_str()) {
// bad cases already logged on agent side, can ignore
tags.insert(
TopicName::new(STDOUT_TOPIC_NAME),
StdoutTopicCategory::ServerState.to_string(),
);
tags.insert(
TopicName::new(SERVERSTATE_TOPIC_NAME),
to.as_ref().to_string(),
);
tags.insert(
TopicName::new(STDOUT_TOPIC_NAME),
StdoutTopicCategory::SystemLog.to_string(),
);
}
} else {
tags.insert(
TopicName(STDOUT_TOPIC_NAME.to_string()),
STDOUT_TOPIC_SYSTEMLOG_CATEGORY.to_string(),
TopicName::new(STDOUT_TOPIC_NAME),
StdoutTopicCategory::SystemLog.to_string(),
);
}
}
12 changes: 6 additions & 6 deletions src/mgmt-server/discord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,14 @@ impl DiscordClient {
let leave_tx = send_msg_tx;

let chat_sub = event_broker
.subscribe(TopicName(CHAT_TOPIC_NAME.to_string()), |_| true)
.subscribe(TopicName::new(CHAT_TOPIC_NAME), |_| true)
.await;
tokio::spawn(async move {
pin_mut!(chat_sub);
while let Some(event) = chat_sub.next().await {
let message = event
.tags
.get(&TopicName(CHAT_TOPIC_NAME.to_string()))
.get(&TopicName::new(CHAT_TOPIC_NAME))
.unwrap();
if let Err(e) = chat_tx.send(message.clone()) {
error!("Error sending line through mpsc channel: {:?}", e);
Expand All @@ -206,14 +206,14 @@ impl DiscordClient {
});

let join_sub = event_broker
.subscribe(TopicName(JOIN_TOPIC_NAME.to_string()), |_| true)
.subscribe(TopicName::new(JOIN_TOPIC_NAME), |_| true)
.await;
tokio::spawn(async move {
pin_mut!(join_sub);
while let Some(event) = join_sub.next().await {
let user = event
.tags
.get(&TopicName(JOIN_TOPIC_NAME.to_string()))
.get(&TopicName::new(JOIN_TOPIC_NAME))
.unwrap();
let message = format!("**{} has joined the server**", user);
if let Err(e) = join_tx.send(message) {
Expand All @@ -226,14 +226,14 @@ impl DiscordClient {
});

let leave_sub = event_broker
.subscribe(TopicName(LEAVE_TOPIC_NAME.to_string()), |_| true)
.subscribe(TopicName::new(LEAVE_TOPIC_NAME), |_| true)
.await;
tokio::spawn(async move {
pin_mut!(leave_sub);
while let Some(event) = leave_sub.next().await {
let user = event
.tags
.get(&TopicName(LEAVE_TOPIC_NAME.to_string()))
.get(&TopicName::new(LEAVE_TOPIC_NAME))
.unwrap();
let message = format!("**{} has left the server**", user);
if let Err(e) = leave_tx.send(message) {
Expand Down
Loading

0 comments on commit 2682713

Please sign in to comment.