Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

question: aggregate node should not be recalculating window, as input is windowed #752

Open
zidaye opened this issue Oct 8, 2024 · 1 comment
Labels
bug Something isn't working sql Related to the DataFusion SQL integration

Comments

@zidaye
Copy link

zidaye commented Oct 8, 2024

I have a working logic as follows: after window connection is required, aggregation operations can be performed

CREATE TABLE test_table1 ( 
    `attacker_ip` STRING,
    `victim_ip` STRING,
    `vuln_type` STRING,
    `vuln_name` STRING,
    `attack_result_cd` STRING,
    `occur_time` Timestamp
) WITH (
    connector = 'kafka',
    format = 'json',
    type = 'source',
    bootstrap_servers = 'localhost:9092',
    topic = 'test_table1 ',
    'source.group_id' = 'arroyo',
    'source.read_mode' = 'read_committed',
    'source.offset' = 'group'
 );

CREATE TABLE test_table2( 
    `attacker_ip` STRING,
    `victim_ip` STRING,
    `vuln_type` STRING,
    `vuln_name` STRING,
    `attack_result_cd` STRING,
    `occur_time` Timestamp
) WITH (
     connector = 'kafka',
    format = 'json',
    type = 'source',
    bootstrap_servers = 'localhost:9092',
    topic = 'test_table2',
    'source.group_id' = 'arroyo',
    'source.read_mode' = 'read_committed',
    'source.offset' = 'group'
);

CREATE view union_data as  (select * from  test_table1 UNION all select * from test_table2);

CREATE view hit_data_result as (select  left_tb.window_time as window_time,  left_tb.attacker_ip as attacker_ip, left_tb.victim_ip as victim_ip, left_tb.attack_result_cd as attack_result_cd , left_tb.occur_time as occur_time from (select tumble(interval '10 second') as window_time, attacker_ip ,victim_ip,attack_result_cd, occur_time  from log_data  where vuln_type like '%type_A%' and  vuln_name like '%name_A%' and attack_result_cd != '4' group by 1, 2, 3, 4, 5) left_tb join (select tumble(interval '10 second') as window_time, attacker_ip ,victim_ip,attack_result_cd, occur_time from log_data where vuln_type like '%type_B%' or  vuln_name like '%name_B%' group by  1, 2, 3, 4, 5) right_tb  on  left_tb.attacker_ip = right_tb.attacker_ip and left_tb.victim_ip = right_tb.victim_ip)
;

select window_time, max(occur_time) as log_end_time, min(occur_time) as log_start_time, array_agg(attacker_ip) as attacker_ip_list, array_agg(victim_ip) as victim_ip_list, last_value(attack_result_cd) as attack_result_cd from hit_data_result group by 1 having count(*) >= 1 ;

An error occurs: Error during planning: aggregate node should not be recalculating window, as input is windowed.

How should I rewrite my sql? I am very much looking forward to your help.

@mwylde
Copy link
Member

mwylde commented Oct 14, 2024

Thanks for reporting the issue. After investigating, this does seem to be a bug in the planner, which is not correctly identifying the window that should be aggregated over in the select statement. We'll take a look at this.

@mwylde mwylde added bug Something isn't working sql Related to the DataFusion SQL integration labels Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working sql Related to the DataFusion SQL integration
Projects
None yet
Development

No branches or pull requests

2 participants