diff --git a/matricula_online_scraper/cli/fetch.py b/matricula_online_scraper/cli/fetch.py index bc80087..f77bf84 100644 --- a/matricula_online_scraper/cli/fetch.py +++ b/matricula_online_scraper/cli/fetch.py @@ -8,6 +8,7 @@ from scrapy import crawler # pylint: disable=import-error # type: ignore from rich import print # pylint: disable=redefined-builtin from matricula_online_scraper.spiders.locations_spider import LocationsSpider +from matricula_online_scraper.spiders.newsfeed_spider import NewsfeedSpider from matricula_online_scraper.spiders.parish_registers_spider import ( ParishRegistersSpider, ) @@ -135,7 +136,7 @@ def parish( List[URL], typer.Option("--url", "-u", parser=URL, help="One ore more URLs to scrape."), ], - output_file_name: OutputFileNameArgument = Path("matricula_parishes"), + output_file_name: OutputFileNameArgument = Path("matricula-newsfeed"), output_file_format: OutputFileFormatOption = DEFAULT_OUTPUT_FILE_FORMAT, append: AppendOption = DEFAUL_APPEND, log_level: LogLevelOption = DEFAULT_SCRAPER_LOG_LEVEL, @@ -184,3 +185,69 @@ def parish( except Exception as exception: print("[red]An unknown error occurred while scraping.[/red]") raise typer.Exit(code=1) from exception + + +@app.command() +def newsfeed( + output_file_name: OutputFileNameArgument = Path("matricula_parishes"), + output_file_format: OutputFileFormatOption = DEFAULT_OUTPUT_FILE_FORMAT, + log_level: LogLevelOption = DEFAULT_SCRAPER_LOG_LEVEL, + silent: SilentOption = DEFAULT_SCRAPER_SILENT, + # options + last_n_days: Annotated[ + Optional[int], + typer.Option( + "--last-n-days", + "-n", + help="Scrape news from the last n days (including today).", + ), + ] = None, + limit: Annotated[ + Optional[int], + typer.Option( + help=( + "Limit the number of max. news articles to scrape" + "(note that this is a upper bound, it might be less depending on other parameters)." + ) + ), + ] = 100, +): + """ + Scrape Matricula Online's Newsfeed. + """ + + output_path_str = str(output_file_name.absolute()) + "." + output_file_format + output_path = Path(output_path_str) + + # check if output file already exists + if output_path.exists(): + print( + f"[red]Output file already exists: {output_path.absolute()}." + " Use the option '--append' if you want to append to the file.[/red]" + ) + raise typer.Exit() + + try: + process = crawler.CrawlerProcess( + settings={ + "FEEDS": { + str(output_path.absolute()): { + "format": file_format_to_scrapy(output_file_format), + } + }, + "LOG_LEVEL": log_level, + "LOG_ENABLED": not silent, + } + ) + + process.crawl(NewsfeedSpider, limit=limit, last_n_days=last_n_days) + process.start() + + print( + "[green]Scraping completed successfully. " + f"Output saved to: {output_path.absolute()}[/green]" + ) + + except Exception as exception: + print("[red]An unknown error occurred while scraping.[/red]") + raise typer.Exit(code=1) from exception diff --git a/matricula_online_scraper/spiders/newsfeed_spider.py b/matricula_online_scraper/spiders/newsfeed_spider.py new file mode 100644 index 0000000..b1b723b --- /dev/null +++ b/matricula_online_scraper/spiders/newsfeed_spider.py @@ -0,0 +1,108 @@ +""" +Scrapy spider to scrape parish registers from a specific location from Matricula Online. +""" + +from datetime import date, datetime +from typing import Optional +import scrapy # pylint: disable=import-error # type: ignore +from urllib.parse import urlencode, urlparse, parse_qs, urljoin, urlunparse + +HOST = "https://data.matricula-online.eu" + + +def parse_date_str(value: str) -> date: + # example: "June 3, 2024" or "Dec. 19, 2023" + if "." in value: + # shorted month name + return datetime.strptime(value, "%b. %d, %Y").date() + + # full month name + return datetime.strptime(value, "%B %d, %Y").date() + + +def create_next_url(current: str, next_page: str) -> str: + current_url = urlparse(current) + url_parts = list(current_url) + query = parse_qs(current_url.query) + + params = {"page": next_page} + query.update(params) + + url_parts[4] = urlencode(query) + new_url = urlunparse(url_parts) + + return new_url + + +class NewsfeedSpider(scrapy.Spider): + name = "newsfeed" + + def __init__( + self, limit: Optional[int] = None, last_n_days: Optional[int] = None, **kwargs + ): + super().__init__(**kwargs) + self.start_urls = ["https://data.matricula-online.eu/en/nachrichten/"] + # TODO: this is not thread-safe (?), it seems to work though ... investigate + self.counter = 0 + + if limit is not None and limit <= 1: + self.logger.error( + f"Parameter 'limit' must be greater than 1. Received: {limit}" + ) + raise ValueError( + f"Parameter 'limit' must be greater than 1. Received: {limit}" + ) + + if last_n_days is not None and last_n_days <= 0: + self.logger.error( + f"Parameter 'last_n_days' must be greater than 0. Received: {last_n_days}" + ) + raise ValueError( + f"Parameter 'last_n_days' must be greater than 0. Received: {last_n_days}" + ) + + self.limit = limit + self.last_n_days = last_n_days + + def parse(self, response): + items = response.css('#page-main-content div[id^="news-"]') + + for news_article in items: + if self.limit is not None and self.counter >= self.limit: + self.close(self, reason="Limit reached") + break + self.counter += 1 + + headline_container = news_article.css("h3") + headline = headline_container.css("a::text").get().strip() + article_url = headline_container.css("a::attr('href')").get() + article_date_str = headline_container.css("small::text").get() + try: + article_date = parse_date_str(article_date_str) + if self.last_n_days is not None: + today = date.today() + delta = today - article_date + if delta.days > self.last_n_days: + continue + except Exception as e: + self.logger.error(f"Failed to evaluate parameter 'last_n_days': {e}") + + preview = news_article.css("p.text-justify + p::text").get() + + yield { + "headline": headline, + "date": article_date_str, + "preview": preview, + "url": urljoin(HOST, article_url), + } + + next_page = response.css( + "ul.pagination li.page-item.active + li.page-item a.page-link::attr('href')" + ).get() + + if next_page is not None: + # next_page will be a url query parameter like '?page=2' + _, page = next_page.split("=") + next_url = create_next_url(response.url, page) + self.logger.debug(f"## Next URL: {next_url}") + yield response.follow(next_url, self.parse)