Skip to content

Commit

Permalink
fix: pipeline dissert error is returned directly to the user, instead…
Browse files Browse the repository at this point in the history
… of printing a warn log (#4709)

* fix: pipeline dissert error is returned directly to the user, instead of printing a warn log

* chore: add more test for pipeline
  • Loading branch information
paomian authored Sep 12, 2024
1 parent 67fb3d0 commit 36b1baf
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 17 deletions.
21 changes: 13 additions & 8 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod value;

use ahash::HashSet;
use common_telemetry::debug;
use itertools::{merge, Itertools};
use itertools::Itertools;
use processor::{Processor, ProcessorBuilder, Processors};
use transform::{TransformBuilders, Transformer, Transforms};
use value::Value;
Expand Down Expand Up @@ -91,13 +91,18 @@ where
debug!("required_keys: {:?}", required_keys);

// intermediate keys are the keys that all processor and transformer required
let ordered_intermediate_keys: Vec<String> =
merge(processors_required_keys, transforms_required_keys)
.cloned()
.collect::<HashSet<String>>()
.into_iter()
.sorted()
.collect();
let ordered_intermediate_keys: Vec<String> = [
processors_required_keys,
transforms_required_keys,
processors_output_keys,
]
.iter()
.flat_map(|l| l.iter())
.collect::<HashSet<&String>>()
.into_iter()
.sorted()
.cloned()
.collect_vec();

let mut final_intermediate_keys = Vec::with_capacity(ordered_intermediate_keys.len());
let mut intermediate_keys_exclude_original =
Expand Down
14 changes: 5 additions & 9 deletions src/pipeline/src/etl/processor/dissect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,16 +817,12 @@ impl Processor for DissectProcessor {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
Some(Value::String(val_str)) => match self.process(val_str) {
Ok(r) => {
for (k, v) in r {
val[k] = v;
}
}
Err(e) => {
warn!("dissect processor: {}", e);
Some(Value::String(val_str)) => {
let r = self.process(val_str)?;
for (k, v) in r {
val[k] = v;
}
},
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
Expand Down
34 changes: 34 additions & 0 deletions src/pipeline/tests/dissect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,37 @@ transform:
Some(StringValue("key1_key2".to_string()))
);
}

#[test]
fn test_parse_failure() {
let input_str = r#"
{
"str": "key1 key2"
}"#;

let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{key1} %{key2} %{key3}"
transform:
- fields:
- key1
type: string
"#;

let input_value = serde_json::from_str::<serde_json::Value>(input_str).unwrap();

let yaml_content = pipeline::Content::Yaml(pipeline_yaml.into());
let pipeline: pipeline::Pipeline<pipeline::GreptimeTransformer> =
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
let mut result = pipeline.init_intermediate_state();

pipeline.prepare(input_value, &mut result).unwrap();
let row = pipeline.exec_mut(&mut result);

assert!(row.is_err());
assert_eq!(row.err().unwrap(), "No matching pattern found");
}
46 changes: 46 additions & 0 deletions src/pipeline/tests/regex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,49 @@ transform:

assert_eq!(output.rows[0].values[0].value_data, None);
}

#[test]
fn test_unuse_regex_group() {
let input_value_str = r#"
[
{
"str": "123 456"
}
]
"#;

let pipeline_yaml = r#"
processors:
- regex:
fields:
- str
pattern: "(?<id1>\\d+) (?<id2>\\d+)"
transform:
- field: str_id1
type: string
"#;

let output = common::parse_and_exec(input_value_str, pipeline_yaml);

assert_eq!(
output.schema,
vec![
common::make_column_schema(
"str_id1".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
]
);

assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
}

0 comments on commit 36b1baf

Please sign in to comment.