-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #64 from MikroElektronika/improvement/teamup-calen…
…dar-automation Improvement/teamup calendar automation
- Loading branch information
Showing
4 changed files
with
459 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
name: Update Release Calendar | ||
|
||
on: | ||
# Trigger the workflow every Monday at 07:00 Belgrade time (adjusted for UTC) | ||
schedule: | ||
- cron: '0 6 * * 1' | ||
workflow_dispatch: | ||
# Can also be triggered manually if needed | ||
inputs: | ||
calendar_name: | ||
type: string | ||
description: Calendar Name | ||
default: "Product - Update" | ||
force_update: | ||
type: boolean | ||
description: "If checked, will update an existing event with new data. Otherwise, skips it." | ||
default: false | ||
|
||
jobs: | ||
notify: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- name: Checkout code | ||
uses: actions/checkout@v4 | ||
|
||
- name: Set up Python | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: '3.x' | ||
|
||
- name: Install Dependencies | ||
run: | | ||
python -m pip install --upgrade pip | ||
pip install pytz | ||
pip install requests | ||
pip install py7zr | ||
pip install chardet | ||
- name: Determine Calendar Name | ||
id: determine_calendar | ||
run: | | ||
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]]; then | ||
echo "CALENDAR_NAME=${{ github.event.inputs.calendar_name }}" >> $GITHUB_OUTPUT | ||
else | ||
echo "CALENDAR_NAME=Product - Update" >> $GITHUB_OUTPUT | ||
fi | ||
- name: Set Force Update Flag | ||
id: set_force_update # Will be set to false for every CRON trigger | ||
run: | | ||
if [[ "${{ github.event_name }}" == "workflow_dispatch" && "${{ github.event.inputs.force_update }}" == "true" ]]; then | ||
echo "FORCE_UPDATE_FLAG=--force_update" >> $GITHUB_OUTPUT | ||
else | ||
echo "FORCE_UPDATE_FLAG=" >> $GITHUB_OUTPUT | ||
fi | ||
- name: Update Release Calendar | ||
env: | ||
TEAM_UP_API_KEY: ${{ secrets.TEAM_UP_API_KEY }} | ||
TEAM_UP_CALENDAR_LINK: ${{ secrets.TEAM_UP_CALENDAR_LINK }} | ||
RELEASES_SPREADSHEET: ${{ secrets.RELEASES_SPREADSHEET }} | ||
run: | | ||
python -u scripts/release_calendar.py "$TEAM_UP_API_KEY" "NECTO DAILY UPDATE" "${{ steps.determine_calendar.outputs.CALENDAR_NAME }}" "$TEAM_UP_CALENDAR_LINK" "$RELEASES_SPREADSHEET" "${{ steps.set_force_update.outputs.FORCE_UPDATE_FLAG }}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import os, json, urllib.request | ||
from datetime import datetime, timedelta | ||
|
||
class events_json(): | ||
@staticmethod | ||
def get_data(link, calendar_title, saveToFile=None): | ||
""" | ||
Fetches data from the provided link, processes the release information, and merges events with the same start date. | ||
:param link: URL to fetch the CSV data. | ||
:param saveToFile: Optional; path to save the processed JSON data. | ||
:return: A list of merged events based on their start date. | ||
""" | ||
try: | ||
## Fetch the data from the provided link | ||
with urllib.request.urlopen(link) as f: | ||
html = f.read().decode('utf-8') | ||
## Save fetched data temporarily to a file named 'releases.txt' | ||
with open(os.path.join(os.path.dirname(__file__), 'releases.txt'), 'w') as releases: | ||
releases.write(html) | ||
releases.close() | ||
except Exception as e: | ||
## Handle errors that may occur during the data fetch process | ||
print(f"Error fetching data: {e}") | ||
|
||
## Read lines from the temporary file | ||
with open(os.path.join(os.path.dirname(__file__), 'releases.txt'), 'r') as releases: | ||
all_releases = releases.readlines() | ||
releases.close() | ||
|
||
## Remove the temporary file after reading its content | ||
if os.path.exists(os.path.join(os.path.dirname(__file__), "releases.txt")): | ||
os.remove(os.path.join(os.path.dirname(__file__), "releases.txt")) | ||
|
||
## List to store formatted event data | ||
formatted_array = [] | ||
for each_line in all_releases: | ||
## Split the line into individual parts based on commas | ||
parts = each_line.split(',') | ||
|
||
## Skip empty lines or headers | ||
if parts[0] == '' or parts[0] == 'Product name': | ||
continue | ||
|
||
## Extract the board name and release plan date | ||
board_name = parts[0] | ||
try: | ||
## Parse the release date from the format 'dd.mm.yyyy' | ||
release_date = datetime.strptime(parts[3], "%d.%m.%Y") | ||
except ValueError: | ||
## Skip lines with incorrect date format | ||
continue | ||
|
||
## Create a dictionary for each event and append to the formatted array | ||
formatted_array.append( | ||
{ | ||
"all_day": True, | ||
"title": calendar_title, | ||
"notes": f"<ul>\n<li>{board_name}</li>\n</ul>", | ||
"readonly": False, | ||
"tz": "Europe/Belgrade", | ||
"start_dt": release_date.strftime("%Y-%m-%dT00:00:00"), | ||
"end_dt": (release_date + timedelta(days=1) - timedelta(minutes=1)).strftime("%Y-%m-%dT23:59:00") | ||
} | ||
) | ||
|
||
## Dictionary to merge nodes based on the start date | ||
merged_nodes = {} | ||
|
||
## Merge events with the same start date by combining their notes | ||
for value in formatted_array: | ||
start_dt = value['start_dt'] | ||
if start_dt not in merged_nodes: | ||
## Add new event if the start date does not exist | ||
merged_nodes[start_dt] = value | ||
else: | ||
## Combine the notes of events with the same start date | ||
existing_notes = merged_nodes[start_dt]['notes'] | ||
new_notes = value['notes'].replace('<ul>', '').replace('</ul>', '') ## Strip outer tags to merge correctly | ||
merged_nodes[start_dt]['notes'] = existing_notes.replace('</ul>', '') + new_notes + '</ul>' | ||
|
||
## Convert the merged_nodes dictionary back into a list of merged events | ||
merged_events = list(merged_nodes.values()) | ||
|
||
## Save the merged events to a file if saveToFile path is provided | ||
if saveToFile: | ||
try: | ||
with open(saveToFile, 'w') as json_file: | ||
json_file.write(json.dumps(merged_events, indent=4)) | ||
except IOError as e: | ||
## Handle errors during file saving | ||
print(f"Error saving to file: {e}") | ||
|
||
## Return the list of merged events | ||
return merged_events | ||
|
||
def __init__(self, release_table_link, calendar_title): | ||
""" | ||
Initializes the events_json class with the provided release table link. | ||
:param release_table_link: Google Sheets link identifier for exporting the data as CSV. | ||
""" | ||
self.file_out = { | ||
calendar_title: { | ||
"events": [] | ||
} | ||
} | ||
|
||
self.table = release_table_link | ||
self.calendar_title = calendar_title | ||
|
||
def fetch_data(self, save_to_file=None): | ||
""" | ||
Fetches data from the release table link and stores the processed events. | ||
:param save_to_file: Optional; path to save the processed JSON data. | ||
""" | ||
## Fetch data from the Google Sheets CSV export link and populate events | ||
self.file_out[self.calendar_title]["events"] = \ | ||
self.get_data( | ||
f'https://docs.google.com/spreadsheets/d/{self.table}/export?format=csv', | ||
self.calendar_title, | ||
save_to_file | ||
) | ||
|
||
def generate_file(self, file_out_path): | ||
""" | ||
Generates a JSON file containing the processed event data. | ||
:param file_out_path: Path to save the final output JSON file. | ||
""" | ||
try: | ||
with open(file_out_path, 'w') as file: | ||
file.write(json.dumps(self.file_out, indent=4)) | ||
except IOError as e: | ||
## Handle errors that may occur during file generation | ||
print(f"Error generating file: {e}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
## IMPORT SECTION | ||
import os | ||
import json | ||
import requests | ||
|
||
from enum import Enum | ||
from datetime import datetime, timedelta | ||
|
||
## CODE SECTION | ||
class ReleaseCalendar: | ||
class RequestRetVal(Enum): | ||
'''Enum for response statuses that represent success values for creating and updating events''' | ||
CREATE_SUCCESS = 201 | ||
UPDATE_SUCCESS = 200 | ||
|
||
@staticmethod | ||
def fetch_dynamic_event_types(): | ||
'''Simulate fetching event types dynamically. You can replace this with an API call if necessary''' | ||
return { | ||
"Blog" : "Blog", | ||
"Newsletter" : "Newsletter", | ||
"Offer" : "Offer", | ||
"Product - InMaking" : "Product - InMaking", | ||
"Product - Ready" : "Product - Ready", | ||
"Product - Text Ready" : "Product - Text Ready", | ||
"Product - Update" : "Product - Update", | ||
"Website" : "Website", | ||
"Test Calendar" : "Test Calendar" ## Left in for testing purposes | ||
} | ||
|
||
@staticmethod | ||
def create_dynamic_enum(name, dynamic_values): | ||
'''Dynamically create an Enum class for event types''' | ||
return Enum(name, dynamic_values) | ||
|
||
def __init__(self, key, title, event_json_path, implement_check, calendar_name, calendar_id, start_date_str=None, end_date_str=None): | ||
'''Initialize the class with API credentials and calendar information''' | ||
self.api_key = key ## Fetch the API key | ||
self.calendar_id = calendar_id ## Fetch the calendar ID | ||
|
||
## If set to true will enforce strict calendar name checks | ||
self.implement_check = implement_check | ||
|
||
## Event title | ||
self.title = title | ||
|
||
## Calendar name | ||
self.calendar_name = calendar_name | ||
|
||
## Path to the JSON file containing event details | ||
self.event_json_path = event_json_path | ||
|
||
## Create dynamic enum for event types | ||
self.EventType = self.create_dynamic_enum('EventType', self.fetch_dynamic_event_types()) | ||
if self.implement_check: | ||
if calendar_name not in self.EventType: | ||
raise ValueError('%s not found in predefined calendars.' % calendar_name) | ||
|
||
## If no start date is provided, use the current date | ||
if start_date_str: | ||
self.start_date = datetime.strptime(start_date_str, '%Y-%m-%d') | ||
else: | ||
self.start_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) # Set time to midnight | ||
|
||
## If no end date is provided, add 3 weeks to the start date | ||
if end_date_str: | ||
self.end_date = datetime.strptime(end_date_str, '%Y-%m-%d') | ||
else: | ||
self.end_date = self.start_date + timedelta(weeks=3) | ||
|
||
def fetch_subcalendars(self): | ||
'''Fetch subcalendars based on the provided calendar name''' | ||
url = f"https://api.teamup.com/{self.calendar_id}/subcalendars" | ||
headers = {"Teamup-Token": self.api_key} | ||
|
||
## Make a GET request to retrieve subcalendar information | ||
response = requests.get(url, headers=headers) | ||
if response.status_code == self.RequestRetVal.UPDATE_SUCCESS.value: | ||
## Filter the subcalendar by name and store it in the class | ||
if self.implement_check: | ||
self.subcalendars = [ | ||
calendar for calendar in response.json()['subcalendars'] | ||
if calendar["name"] == self.EventType[self.calendar_name].value | ||
] | ||
else: | ||
self.subcalendars = [ | ||
calendar for calendar in response.json()['subcalendars'] | ||
if calendar["name"] == self.calendar_name | ||
] | ||
print(json.dumps(self.subcalendars, indent=4)) ## Display all sub-calendars with their IDs | ||
return self.subcalendars | ||
else: | ||
print(f"Failed to retrieve subcalendars. Status code: {response.status_code}, Response: {response.text}") | ||
return None | ||
|
||
def fetch_events(self): | ||
'''Fetch events for the specified date range''' | ||
url = f"https://api.teamup.com/{self.calendar_id}/events?startDate={self.start_date.strftime('%Y-%m-%d')}&endDate={self.end_date.strftime('%Y-%m-%d')}" | ||
|
||
# Headers for the request | ||
headers = { | ||
"Teamup-Token": self.api_key | ||
} | ||
|
||
## Make a GET request to get sub-calendar information | ||
response = requests.get(url, headers=headers) | ||
|
||
if response.status_code == 200: | ||
self.calendar = response.json() ## Store the fetched calendar events | ||
print(json.dumps(self.calendar, indent=4)) ## Pretty-print the calendar information | ||
else: | ||
print(f"Failed to retrieve calendars. Status code: {response.status_code}, Response: {response.text}") | ||
|
||
def fetch_events_from_json(self): | ||
'''Load events from the provided JSON file''' | ||
if os.path.exists(self.event_json_path): | ||
if os.path.isfile(self.event_json_path): | ||
with open(self.event_json_path, 'r') as file: | ||
self.events = json.load(file) | ||
if not self.title in self.events: | ||
raise ValueError('Provided json has no events for %s.' % self.title) | ||
return | ||
raise ValueError('Provided wrong path for event json file.') | ||
|
||
def fetch_matching_events(self, current_date): | ||
'''Fetch events with start_dt matching the current date''' | ||
return [ | ||
event for event in self.events[self.title]["events"] | ||
if event["start_dt"].startswith(current_date) | ||
] | ||
|
||
def create_event(self, events): | ||
'''Create new events in the Teamup calendar''' | ||
url = f"https://api.teamup.com/{self.calendar_id}/events" | ||
headers ={ | ||
"Teamup-Token": self.api_key, | ||
"Content-Type": "application/json" | ||
} | ||
|
||
for each_event in events: | ||
## Update the event with the relevant subcalendar IDs | ||
each_event.update({"subcalendar_ids": [subcalendar['id'] for subcalendar in self.subcalendars]}) | ||
## Make a POST request to add the event | ||
response = requests.post(url, headers=headers, data=json.dumps(each_event)) | ||
|
||
## Check the response | ||
if response.status_code == self.RequestRetVal.CREATE_SUCCESS.value: | ||
print("Event created successfully!") | ||
else: | ||
print(f"Failed to create event. Status code: {response.status_code}, Response: {response.text}") | ||
|
||
def update_event(self, events, event_to_update): | ||
'''Update existing events in the calendar''' | ||
headers ={ | ||
"Teamup-Token": self.api_key, | ||
"Content-Type": "application/json" | ||
} | ||
|
||
for each_event in events: | ||
## Update only if title is the same and date is the same | ||
if (event_to_update['title'] == each_event['title']) and \ | ||
(event_to_update['start_dt'] == each_event['start_dt']): | ||
|
||
## Iterate all keys and fetch new values | ||
for each_key in event_to_update: | ||
if each_key in each_event: | ||
event_to_update[each_key] = each_event[each_key] | ||
|
||
## Create the specific event URL | ||
url = f"https://api.teamup.com/{self.calendar_id}/events/{event_to_update['id']}" | ||
|
||
## Make a POST request to add the event | ||
response = requests.put(url, headers=headers, data=json.dumps(event_to_update)) | ||
|
||
## Check the response | ||
if response.status_code == self.RequestRetVal.UPDATE_SUCCESS.value: | ||
print("Event updated successfully!") | ||
else: | ||
print(f"Failed to update event. Status code: {response.status_code}, Response: {response.text}") |
Oops, something went wrong.