Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python PubSub pipeline for processing gcloud logs #10

Merged
merged 4 commits into from
Dec 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions Python/pubsub/gcloud_logs_filter_with_dlq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import json

import apache_beam as beam
from apache_beam import DoFn
from apache_beam import Filter
from apache_beam import Map
from apache_beam import ParDo
from apache_beam.io import ReadFromPubSub
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions


PROCESSED_TAG = "processed"
UNPROCESSED_TAG = "unprocessed"


class PubSubOptions(PipelineOptions):

@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
"--input_topic",
default="projects/your-project/topics/your-input-test",
help="Input PubSub topic")
parser.add_argument(
"--output_topic",
default="projects/your-project/topics/your-output-test",
help="Output PubSub topic")
parser.add_argument(
"--dlq_topic",
default="projects/your-project/topics/your-dlq-test",
help="Dead Letter Queue PubSub topic")


def run():
"""
This Apache Beam pipeline processes log messages from a Google Cloud Pub/Sub topic.
The expected data format follows the standard Google Cloud log format,
which can be achieved by routing logs to a Pub/Sub topic via https://console.cloud.google.com/logs/router.

It performs the following steps:
1. Input Configuration:
- Reads messages from the specified input Pub/Sub topic.

2. Message Parsing:
- Parses each message as a JSON dictionary.

3. Message Splitting:
- Divides messages into two categories:
a. PROCESSED (contain both 'severity' and 'jsonPayload' fields)
b. UNPROCESSED (missing one or both of these fields).

4. Severity Filtering:
- For PROCESSED messages, filters out those with severity other than "ERROR".

5. Data Transformation:
- Extracts timestamp and message content from the 'jsonPayload' field for PROCESSED messages.

6. Output Handling:
- Writes transformed PROCESSED messages to a specified output Pub/Sub topic.
- Sends UNPROCESSED messages to a Dead Letter Queue (DLQ) topic.
"""

options = PubSubOptions(streaming=True)

with beam.Pipeline(options=options) as p:
split_result = (p | "Read from PubSub" >> ReadFromPubSub(topic=options.input_topic)
| "Parse JSON" >> Map(lambda msg: json.loads(msg))
| "Split Messages" >> ParDo(SplitMessages()).with_outputs(UNPROCESSED_TAG, PROCESSED_TAG))

# Filter processed messages and write to output topic
(split_result[PROCESSED_TAG]
| "Filter by Severity" >> Filter(filter_by_severity)
| "Map to PubsubMessage for output" >> Map(to_pubsub_message_for_output)
| "Write to PubSub" >> WriteToPubSub(options.output_topic, with_attributes=True))

# Write unprocessed messages to DLQ
(split_result[UNPROCESSED_TAG]
| "Map to PubsubMessage for DLQ" >> Map(to_pubsub_message_for_dlq)
| "Write to DLQ" >> WriteToPubSub(options.dlq_topic, with_attributes=True))


class SplitMessages(DoFn):
def process(self, element):
from apache_beam.pvalue import TaggedOutput

if ('severity' in element) & ('jsonPayload' in element):
yield TaggedOutput(PROCESSED_TAG, element)
else:
yield TaggedOutput(UNPROCESSED_TAG, element)


def filter_by_severity(log):
# Filter logs by severity level (only process logs with severity "ERROR")
return log.get("severity").upper() == "ERROR"


def to_pubsub_message_for_dlq(msg):
from apache_beam.io import PubsubMessage
Amar3tto marked this conversation as resolved.
Show resolved Hide resolved

return PubsubMessage(data=bytes(json.dumps(msg), "utf-8"), attributes=None)


def to_pubsub_message_for_output(log):
from apache_beam.io import PubsubMessage

# Example transformation: Extract relevant information from the log
transformed_data = {
"timestamp": log.get("timestamp"),
"message": log.get("jsonPayload").get("message")
}
data = bytes(f"Error log message: {transformed_data['message']} [{transformed_data['timestamp']}]", "utf-8")
return PubsubMessage(data=data, attributes=transformed_data)


if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()