Skip to content

Commit

Permalink
feat(geo): Add GeoIp lookup to light normalization (#2229)
Browse files Browse the repository at this point in the history
This change introduces the GeoIp lookup to light normalization process,
which allows us to populate the `User` with correct geo information as
soon as we can. Which also allow to set a correct tag in metrics
extraction , if the metrics extraction is happening on our side (in the
PoPs in this case).

I still keep the GeoIp lookup in the store normalization, since it's
also used in few places, more specifically in `relay-cabi`, which
exposed few different functions to the python lib.

Lookup won't be triggered in any case if the Geo information is either
provided by user or set on any of the steps.
  • Loading branch information
olksdr committed Jun 21, 2023
1 parent ae7f9bf commit bdb2a15
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Features**:

- Drop profiles without a transaction in the same envelope. ([#2169](https://github.com/getsentry/relay/pull/2169))
- Use GeoIP lookup also in non-processing Relays. Lookup from now on will be also run in light normalization. ([#2229](https://github.com/getsentry/relay/pull/2229))

## 23.6.1

Expand Down
1 change: 1 addition & 0 deletions relay-cabi/src/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ pub unsafe extern "C" fn relay_store_normalizer_normalize_event(
scrub_span_descriptions: false,
light_normalize_spans: false,
span_description_rules: None,
geoip_lookup: None, // only supported in relay
};
light_normalize_event(&mut event, light_normalization_config)?;
process_value(&mut event, &mut *processor, ProcessingState::root())?;
Expand Down
15 changes: 14 additions & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,13 @@ pub struct AwsConfig {
pub runtime_api: Option<String>,
}

/// GeoIp database configuration options.
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct GeoIpConfig {
/// The path to GeoIP database.
path: Option<PathBuf>,
}

#[derive(Serialize, Deserialize, Debug, Default)]
struct ConfigValues {
#[serde(default)]
Expand Down Expand Up @@ -1216,6 +1223,8 @@ struct ConfigValues {
auth: AuthConfig,
#[serde(default)]
aws: AwsConfig,
#[serde(default)]
geoip: GeoIpConfig,
}

impl ConfigObject for ConfigValues {
Expand Down Expand Up @@ -1896,7 +1905,11 @@ impl Config {

/// The path to the GeoIp database required for event processing.
pub fn geoip_path(&self) -> Option<&Path> {
self.values.processing.geoip_path.as_deref()
self.values
.geoip
.path
.as_deref()
.or(self.values.processing.geoip_path.as_deref())
}

/// Maximum future timestamp of ingested data.
Expand Down
29 changes: 21 additions & 8 deletions relay-general/src/store/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,18 @@ fn normalize_device_class(event: &mut Event) {
}
}

// Sets the user's GeoIp info based on user's IP address.
fn normalize_user_geoinfo(geoip_lookup: &GeoIpLookup, user: &mut User) {
// Infer user.geo from user.ip_address
if user.geo.value().is_none() {
if let Some(ip_address) = user.ip_address.value() {
if let Ok(Some(geo)) = geoip_lookup.lookup(ip_address.as_str()) {
user.geo.set_value(Some(geo));
}
}
}
}

#[derive(Clone, Default, Debug)]
pub struct LightNormalizationConfig<'a> {
pub client_ip: Option<&'a IpAddr>,
Expand All @@ -737,6 +749,7 @@ pub struct LightNormalizationConfig<'a> {
pub scrub_span_descriptions: bool,
pub light_normalize_spans: bool,
pub span_description_rules: Option<&'a Vec<SpanDescriptionRule>>,
pub geoip_lookup: Option<&'a GeoIpLookup>,
}

pub fn light_normalize_event(
Expand Down Expand Up @@ -773,6 +786,12 @@ pub fn light_normalize_event(
config.client_ip,
);

if let Some(geoip_lookup) = config.geoip_lookup {
if let Some(user) = event.user.value_mut() {
normalize_user_geoinfo(geoip_lookup, user)
}
}

// Validate the basic attributes we extract metrics from
event.release.apply(|release, meta| {
if protocol::validate_release(release).is_ok() {
Expand Down Expand Up @@ -937,14 +956,8 @@ impl<'a> Processor for NormalizeProcessor<'a> {
user.process_child_values(self, state)?;

// Infer user.geo from user.ip_address
if user.geo.value().is_none() {
if let Some(geoip_lookup) = self.geoip_lookup {
if let Some(ip_address) = user.ip_address.value() {
if let Ok(Some(geo)) = geoip_lookup.lookup(ip_address.as_str()) {
user.geo.set_value(Some(geo));
}
}
}
if let Some(geoip_lookup) = self.geoip_lookup {
normalize_user_geoinfo(geoip_lookup, user)
}

Ok(())
Expand Down
79 changes: 69 additions & 10 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::net::IpAddr as NetIPAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::Context;
use brotli::CompressorWriter as BrotliEncoder;
use bytes::Bytes;
use chrono::{DateTime, Duration as SignedDuration, Utc};
Expand All @@ -17,6 +18,7 @@ use relay_profiling::ProfileError;
use serde_json::Value as SerdeValue;
use tokio::sync::Semaphore;

use crate::service::ServiceError;
use relay_auth::RelayVersion;
use relay_common::{ProjectId, ProjectKey, UnixTimestamp};
use relay_config::{Config, HttpEncoding};
Expand All @@ -31,6 +33,7 @@ use relay_general::protocol::{
SessionAttributes, SessionStatus, SessionUpdate, Timestamp, TraceContext, UserReport, Values,
};
use relay_general::protocol::{Contexts, TransactionSource};
use relay_general::store::GeoIpLookup;
use relay_general::store::{
ClockDriftProcessor, LightNormalizationConfig, MeasurementsConfig, TransactionNameConfig,
};
Expand All @@ -47,11 +50,9 @@ use relay_system::{Addr, FromMessage, NoResponse, Service};
use {
crate::actors::envelopes::SendMetrics,
crate::actors::project_cache::UpdateRateLimits,
crate::service::ServiceError,
crate::utils::{EnvelopeLimiter, MetricsLimiter},
anyhow::Context,
relay_general::protocol::{Context as SentryContext, ProfileContext},
relay_general::store::{GeoIpLookup, StoreConfig, StoreProcessor},
relay_general::store::{StoreConfig, StoreProcessor},
relay_quotas::{RateLimitingError, RedisRateLimiter},
symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
};
Expand Down Expand Up @@ -537,7 +538,6 @@ pub struct EnvelopeProcessorService {
upstream_relay: Addr<UpstreamRelay>,
#[cfg(feature = "processing")]
rate_limiter: Option<RedisRateLimiter>,
#[cfg(feature = "processing")]
geoip_lookup: Option<GeoIpLookup>,
}

Expand All @@ -551,13 +551,13 @@ impl EnvelopeProcessorService {
project_cache: Addr<ProjectCache>,
upstream_relay: Addr<UpstreamRelay>,
) -> anyhow::Result<Self> {
let geoip_lookup = match config.geoip_path() {
Some(p) => Some(GeoIpLookup::open(p).context(ServiceError::GeoIp)?),
None => None,
};

#[cfg(feature = "processing")]
{
let geoip_lookup = match config.geoip_path() {
Some(p) => Some(GeoIpLookup::open(p).context(ServiceError::GeoIp)?),
None => None,
};

let rate_limiter =
_redis.map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit()));

Expand All @@ -575,6 +575,7 @@ impl EnvelopeProcessorService {
#[cfg(not(feature = "processing"))]
Ok(Self {
config,
geoip_lookup,
envelope_manager,
outcome_aggregator,
project_cache,
Expand Down Expand Up @@ -2384,6 +2385,7 @@ impl EnvelopeProcessorService {
is_renormalize: false,
light_normalize_spans,
span_description_rules: state.project_state.config.span_description_rules.as_ref(),
geoip_lookup: self.geoip_lookup.as_ref(),
};

metric!(timer(RelayTimers::EventProcessingLightNormalization), {
Expand Down Expand Up @@ -3110,7 +3112,6 @@ mod tests {
upstream_relay,
#[cfg(feature = "processing")]
rate_limiter: None,
#[cfg(feature = "processing")]
geoip_lookup: None,
}
}
Expand Down Expand Up @@ -3856,4 +3857,62 @@ mod tests {
"Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
);
}

#[test]
fn test_geo_in_light_normalize() {
let mut event = Annotated::<Event>::from_json(
r###"
{
"type": "transaction",
"transaction": "/foo/",
"timestamp": 946684810.0,
"start_timestamp": 946684800.0,
"contexts": {
"trace": {
"trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
"span_id": "fa90fdead5f74053",
"op": "http.server",
"type": "trace"
}
},
"transaction_info": {
"source": "url"
},
"user": {
"ip_address": "2.125.160.216"
}
}
"###,
)
.unwrap();

let lookup =
GeoIpLookup::open("../relay-general/tests/fixtures/GeoIP2-Enterprise-Test.mmdb")
.unwrap();
let config = LightNormalizationConfig {
geoip_lookup: Some(&lookup),
..Default::default()
};

// Extract user's geo information before normalization.
let user_geo = event.value().unwrap().user.value().unwrap().geo.value();

assert!(user_geo.is_none());

relay_general::store::light_normalize_event(&mut event, config).unwrap();

// Extract user's geo information after normalization.
let user_geo = event
.value()
.unwrap()
.user
.value()
.unwrap()
.geo
.value()
.unwrap();

assert_eq!(user_geo.country_code.value().unwrap(), "GB");
assert_eq!(user_geo.city.value().unwrap(), "Boxford");
}
}
1 change: 0 additions & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::utils::BufferGuard;
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
pub enum ServiceError {
/// GeoIp construction failed.
#[cfg(feature = "processing")]
#[error("could not load the Geoip Db")]
GeoIp,

Expand Down

0 comments on commit bdb2a15

Please sign in to comment.