Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does watermark work with events from the past? #756

Open
ahassany opened this issue Oct 15, 2024 · 1 comment
Open

Does watermark work with events from the past? #756

ahassany opened this issue Oct 15, 2024 · 1 comment

Comments

@ahassany
Copy link

I'm trying to do some streaming analytics on historical data pushed to Kafka. The kafka message timestamp is recent but the actual event date is older (over a year).

CREATE TABLE input_table (
    ts TIMESTAMP NOT NULL,
    sourceIPv6Address VARCHAR,
    destinationIPv6Address VARCHAR,
    octetDeltaCount BIGINT,
    watermark TIMESTAMP GENERATED ALWAYS AS (ts - INTERVAL '30' SECOND) STORED
) WITH (
  connector = 'kafka',
  format = 'json',
  bootstrap_servers = 'kafka-1:19091',
  topic = 'input-topic',
  type = 'source',
  'source.offset' = 'earliest',
  'source.read_mode' = 'read_uncommitted',
  event_time_field = 'ts',
  watermark_field = 'watermark'
);

CREATE VIEW AGG AS SELECT window.start as window_start, "sourceIPv6Address", "destinationIPv6Address", octetDeltaCount
FROM (
    SELECT TUMBLE(interval '1 minute') as window, "sourceIPv6Address", "destinationIPv6Address", SUM("octetDeltaCount") AS octetDeltaCount
    FROM input_table
    GROUP BY window, "sourceIPv6Address", "destinationIPv6Address"
);

CREATE TABLE agg_sink (
    window_start TIMESTAMP,
    sourceIPv6Address VARCHAR,
    destinationIPv6Address VARCHAR,
    octetDeltaCount BIGINT
) WITH (
    'connector' = 'kafka',
    'bootstrap_servers' = 'kafka-1:19091',
    'type' = 'sink',
    'topic' = 'arroyo-agg',
    'format' = 'json'
);

INSERT INTO agg_sink SELECT window_start, "sourceIPv6Address", "destinationIPv6Address", octetDeltaCount FROM AGG;

However nothing is inserted into the output kafka topic?

@Cirr0e
Copy link

Cirr0e commented Nov 28, 2024

Hey there! I understand you're trying to process historical data with watermarks, but nothing is showing up in the output topic. This is actually a common issue when dealing with historical data processing.

The problem is likely related to how watermarks work with event timestamps. Looking at your SQL, I see you've set a 30-second watermark:

watermark TIMESTAMP GENERATED ALWAYS AS (ts - INTERVAL '30' SECOND) STORED

For historical data processing, this poses a challenge because:

  1. Watermarks are used to track the completeness of your data stream and are based on event timestamps
  2. When processing historical data with old timestamps, the watermark might immediately fall behind the current processing time, causing events to be considered "late" and dropped

Based on our documentation (from https://doc.arroyo.dev/concepts):

A watermark represents a claim that no more events with a timestamp earlier than the watermark will arrive in the future. Late events (which arrive after the watermark has passed) are dropped.

To fix this, you have a couple of options:

  1. Increase the watermark interval to account for the historical nature of your data:
watermark TIMESTAMP GENERATED ALWAYS AS (ts - INTERVAL '2 years' STORED)
  1. Or process the historical data without watermarks first, then switch to watermarking for real-time data

For your use case, since the events are over a year old, I'd recommend going with option 1 and setting a watermark interval that's larger than the age of your historical data.

Let me know if you need help adjusting the watermark settings or if you have any questions about how watermarks work with historical data processing.

References:

  • Watermark behavior documentation: https://doc.arroyo.dev/concepts (Time-oriented processing section)
  • Similar watermark timing issues have been reported in previous tickets

Be careful with:

  • Setting too large watermark intervals can impact memory usage
  • Make sure your watermark interval covers the full range of historical data
  • Consider whether you'll need to process both historical and real-time data in the same pipeline

Let me know if you need any clarification or run into other issues!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants