Skip to content

Commit

Permalink
feat: add newsfeed scraping functionality (#37)
Browse files Browse the repository at this point in the history
* This commit adds the `newsfeed` subcommand to the `fetch`command. The `newsfeed` command allows users to scrape Matricula Online's Newsfeed. Despite common options, one can set a limit (`--limit`) and fetch the news articles from the last n days (`--last-n-days`).
  • Loading branch information
lsg551 committed Jun 4, 2024
1 parent 40fb6e4 commit 79e9081
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 1 deletion.
69 changes: 68 additions & 1 deletion matricula_online_scraper/cli/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from scrapy import crawler # pylint: disable=import-error # type: ignore
from rich import print # pylint: disable=redefined-builtin
from matricula_online_scraper.spiders.locations_spider import LocationsSpider
from matricula_online_scraper.spiders.newsfeed_spider import NewsfeedSpider
from matricula_online_scraper.spiders.parish_registers_spider import (
ParishRegistersSpider,
)
Expand Down Expand Up @@ -135,7 +136,7 @@ def parish(
List[URL],
typer.Option("--url", "-u", parser=URL, help="One ore more URLs to scrape."),
],
output_file_name: OutputFileNameArgument = Path("matricula_parishes"),
output_file_name: OutputFileNameArgument = Path("matricula-newsfeed"),
output_file_format: OutputFileFormatOption = DEFAULT_OUTPUT_FILE_FORMAT,
append: AppendOption = DEFAUL_APPEND,
log_level: LogLevelOption = DEFAULT_SCRAPER_LOG_LEVEL,
Expand Down Expand Up @@ -184,3 +185,69 @@ def parish(
except Exception as exception:
print("[red]An unknown error occurred while scraping.[/red]")
raise typer.Exit(code=1) from exception


@app.command()
def newsfeed(
output_file_name: OutputFileNameArgument = Path("matricula_parishes"),
output_file_format: OutputFileFormatOption = DEFAULT_OUTPUT_FILE_FORMAT,
log_level: LogLevelOption = DEFAULT_SCRAPER_LOG_LEVEL,
silent: SilentOption = DEFAULT_SCRAPER_SILENT,
# options
last_n_days: Annotated[
Optional[int],
typer.Option(
"--last-n-days",
"-n",
help="Scrape news from the last n days (including today).",
),
] = None,
limit: Annotated[
Optional[int],
typer.Option(
help=(
"Limit the number of max. news articles to scrape"
"(note that this is a upper bound, it might be less depending on other parameters)."
)
),
] = 100,
):
"""
Scrape Matricula Online's Newsfeed.
"""

output_path_str = str(output_file_name.absolute()) + "." + output_file_format
output_path = Path(output_path_str)

# check if output file already exists
if output_path.exists():
print(
f"[red]Output file already exists: {output_path.absolute()}."
" Use the option '--append' if you want to append to the file.[/red]"
)
raise typer.Exit()

try:
process = crawler.CrawlerProcess(
settings={
"FEEDS": {
str(output_path.absolute()): {
"format": file_format_to_scrapy(output_file_format),
}
},
"LOG_LEVEL": log_level,
"LOG_ENABLED": not silent,
}
)

process.crawl(NewsfeedSpider, limit=limit, last_n_days=last_n_days)
process.start()

print(
"[green]Scraping completed successfully. "
f"Output saved to: {output_path.absolute()}[/green]"
)

except Exception as exception:
print("[red]An unknown error occurred while scraping.[/red]")
raise typer.Exit(code=1) from exception
108 changes: 108 additions & 0 deletions matricula_online_scraper/spiders/newsfeed_spider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Scrapy spider to scrape parish registers from a specific location from Matricula Online.
"""

from datetime import date, datetime
from typing import Optional
import scrapy # pylint: disable=import-error # type: ignore
from urllib.parse import urlencode, urlparse, parse_qs, urljoin, urlunparse

HOST = "https://data.matricula-online.eu"


def parse_date_str(value: str) -> date:
# example: "June 3, 2024" or "Dec. 19, 2023"
if "." in value:
# shorted month name
return datetime.strptime(value, "%b. %d, %Y").date()

# full month name
return datetime.strptime(value, "%B %d, %Y").date()


def create_next_url(current: str, next_page: str) -> str:
current_url = urlparse(current)
url_parts = list(current_url)
query = parse_qs(current_url.query)

params = {"page": next_page}
query.update(params)

url_parts[4] = urlencode(query)
new_url = urlunparse(url_parts)

return new_url


class NewsfeedSpider(scrapy.Spider):
name = "newsfeed"

def __init__(
self, limit: Optional[int] = None, last_n_days: Optional[int] = None, **kwargs
):
super().__init__(**kwargs)
self.start_urls = ["https://data.matricula-online.eu/en/nachrichten/"]
# TODO: this is not thread-safe (?), it seems to work though ... investigate
self.counter = 0

if limit is not None and limit <= 1:
self.logger.error(
f"Parameter 'limit' must be greater than 1. Received: {limit}"
)
raise ValueError(
f"Parameter 'limit' must be greater than 1. Received: {limit}"
)

if last_n_days is not None and last_n_days <= 0:
self.logger.error(
f"Parameter 'last_n_days' must be greater than 0. Received: {last_n_days}"
)
raise ValueError(
f"Parameter 'last_n_days' must be greater than 0. Received: {last_n_days}"
)

self.limit = limit
self.last_n_days = last_n_days

def parse(self, response):
items = response.css('#page-main-content div[id^="news-"]')

for news_article in items:
if self.limit is not None and self.counter >= self.limit:
self.close(self, reason="Limit reached")
break
self.counter += 1

headline_container = news_article.css("h3")
headline = headline_container.css("a::text").get().strip()
article_url = headline_container.css("a::attr('href')").get()
article_date_str = headline_container.css("small::text").get()
try:
article_date = parse_date_str(article_date_str)
if self.last_n_days is not None:
today = date.today()
delta = today - article_date
if delta.days > self.last_n_days:
continue
except Exception as e:
self.logger.error(f"Failed to evaluate parameter 'last_n_days': {e}")

preview = news_article.css("p.text-justify + p::text").get()

yield {
"headline": headline,
"date": article_date_str,
"preview": preview,
"url": urljoin(HOST, article_url),
}

next_page = response.css(
"ul.pagination li.page-item.active + li.page-item a.page-link::attr('href')"
).get()

if next_page is not None:
# next_page will be a url query parameter like '?page=2'
_, page = next_page.split("=")
next_url = create_next_url(response.url, page)
self.logger.debug(f"## Next URL: {next_url}")
yield response.follow(next_url, self.parse)

0 comments on commit 79e9081

Please sign in to comment.