From 2c5963512f7131a52aa09e724fd20c80b418102d Mon Sep 17 00:00:00 2001 From: Fredrik Olsson Date: Thu, 22 Jun 2023 10:58:28 +0200 Subject: [PATCH] Adding a limit binary --- README.md | 15 +++++++++- bin/limit | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 bin/limit diff --git a/README.md b/README.md index 32dcb53..9aec8bf 100644 --- a/README.md +++ b/README.md @@ -7,10 +7,19 @@ TODO -## Schematic overview +## Overview + +### Schematic ![schematic](./porla.svg) +### Tech stack + +TODO + +### Performance + +TODO ## Usage @@ -58,6 +67,10 @@ services: Rearrange, deduct or add content to each line using two (one for the input and one for the output) format specifications. Expects two arguments, the `input_format_specification` and the `output_format_specification`. +* **limit** + + Rate limit the flow through a pipe on a line-by-line basis. Expects a single required argument, `interval`, and an optional argument, `--key` with a format specification of how to find the key of each line whereby to "group" the flow. + ### 3rd-party tools * **socat** diff --git a/bin/limit b/bin/limit new file mode 100644 index 0000000..b3b155c --- /dev/null +++ b/bin/limit @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + +""" +Command line utility tool for rate limiting the flow through a pipeline on a line-by-line basis. + +""" + +# pylint: disable=duplicate-code + +import sys +import time +import logging +import warnings +import argparse +from collections import defaultdict + +import parse + +# Parse cli arguments +parser = argparse.ArgumentParser() +parser.add_argument( + "--log-level", type=lambda level: getattr(logging, level), default=logging.WARNING +) +parser.add_argument( + "interval", type=float, help="Minimum allowed interval to go through" +) + +parser.add_argument( + "--key", + type=str, + default=None, + help="Example: '{key} {} {}'," + "See https://github.com/r1chardj0n3s/parse#format-specification", +) + + +args = parser.parse_args() + +# Setup logger +logging.basicConfig( + format="%(asctime)s %(levelname)s %(name)s %(message)s", level=args.log_level +) +logging.captureWarnings(True) +warnings.filterwarnings("once") + +logger = logging.getLogger("limit") + + +# Compile pattern +pattern = parse.compile(args.key) if args.key else None + + +def _get_key(line: str): + if not args.key: + return "fixed" + + res = pattern.parse(line.rstrip()) + + if not res: + logger.error( + "Could not parse line: %s according to the specification: %s", + line, + args.key, + ) + return None + + return res["key"] + + +# Initialize buffer +buffer = defaultdict(int) # Will default to 0 (zero) + +# Start processing +for line in sys.stdin: + now = time.monotonic() + logger.debug(line) + + if key := _get_key(line): + last_seen = buffer[key] + + if now - last_seen > args.interval: + buffer[key] = now + + sys.stdout.write(line) + sys.stdout.flush() + + # else: drop line + # else: drop line