-
Notifications
You must be signed in to change notification settings - Fork 0
/
app_flask.py
37 lines (31 loc) · 1.03 KB
/
app_flask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import logging
from flask import Flask, request, jsonify
from pydantic import BaseModel
from src.osu_api import Api
from src.p03_1_app import BGApp
app = Flask(__name__)
# Define the Pydantic model for the task payload
class Event(BaseModel):
start_ts: int = 1677112070
end_ts: int = 1677115068
asset_id: int = 123456789
task: str = "return_cache"
@app.route('/')
def home():
return {"health_check": "OK"}
@app.route("/task", methods=["POST"])
def task():
try:
event_data = request.json
event = Event(**event_data)
except Exception as e:
return jsonify({"error": "Invalid payload format"}), 400
logging.info(f"Lambda function executed successfully with event {event}")
api = Api()
event_dict = event.dict()
# print(event_dict)
obj = BGApp(api, event_dict)
returned_value = obj.run()
return jsonify(returned_value)
# here is an example of event in the api - fast api
# {"start_ts": 1677112070,"end_ts": 1677115068, "asset_id": 123456789, "task": "return_cache"}