Skip to content

Commit

Permalink
Cleanup for type-safety, Fix for only one new comment being detected …
Browse files Browse the repository at this point in the history
…per user
  • Loading branch information
EthanC committed Jul 2, 2024
1 parent 67a7aa5 commit 53e531c
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 317 deletions.
2 changes: 0 additions & 2 deletions handlers/__init__.py

This file was deleted.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ build-backend = "poetry.core.masonry.api"

[tool.ruff.lint]
select = ["I"]

[tool.pyright]
reportUnknownMemberType = false
2 changes: 0 additions & 2 deletions services/__init__.py

This file was deleted.

299 changes: 159 additions & 140 deletions services/reddit.py
Original file line number Diff line number Diff line change
@@ -1,215 +1,234 @@
from os import environ
from sys import exit
from typing import Self

import praw
import praw # pyright: ignore [reportMissingTypeStubs]
from loguru import logger
from praw.reddit import Comment, Reddit, Redditor, Submission, Subreddit
from praw.reddit import ( # pyright: ignore [reportMissingTypeStubs]
Comment,
Reddit,
Redditor,
Submission,
Subreddit,
)


class RedditAPI:
"""
Class to integrate with the Reddit API and build objects specific
to the Reddit platform.
"""
def Authenticate() -> Reddit:
"""Authenticate with Reddit using the configured credentials."""

def Authenticate(self: Self) -> Reddit:
"""Authenticate with Reddit using the configured credentials."""
client: Reddit = praw.Reddit(
username=environ["REDDIT_USERNAME"],
password=environ["REDDIT_PASSWORD"],
client_id=environ["REDDIT_CLIENT_ID"],
client_secret=environ["REDDIT_CLIENT_SECRET"],
user_agent="Snoopy by u/LackingAGoodName (https://github.com/EthanC/Snoopy)",
)

self.client: Reddit | None = praw.Reddit(
username=environ.get("REDDIT_USERNAME"),
password=environ.get("REDDIT_PASSWORD"),
client_id=environ.get("REDDIT_CLIENT_ID"),
client_secret=environ.get("REDDIT_CLIENT_SECRET"),
user_agent="Snoopy by u/LackingAGoodName (https://github.com/EthanC/Snoopy)",
)
if (not client) or (client.read_only):
logger.critical("Failed to authenticate client with Reddit")
logger.debug(client)

if self.client.read_only:
logger.critical("Failed to authenticate with Reddit, client is read-only")
exit(1)

exit(1)
client.validate_on_submit = True

self.client.validate_on_submit = True
logger.trace(client.auth.limits)
logger.success(f"Authenticated client with Reddit as u/{ClientUsername(client)}")

logger.trace(self.client.auth.limits)
logger.success(
f"Authenticated with Reddit as u/{RedditAPI.ClientUsername(self)}"
)
return client


def ClientUsername(client: Reddit) -> str:
"""Fetch the username of the authenticated Reddit user."""

me: Redditor = client.user.me() # pyright: ignore [reportUnknownVariableType]

return self.client
if me and me.name:
return me.name # pyright: ignore [reportUnknownVariableType]

def ClientUsername(self: Self) -> str:
"""Fetch the username of the authenticated Reddit user."""
logger.warning("Client username is unknown")
logger.debug(client)

return self.client.user.me().name
return "Unknown"

def GetUser(self: Self, username: str) -> Redditor | None:
"""Fetch a Redditor object for the specified Reddit username."""

try:
return self.client.redditor(username)
except Exception as e:
logger.opt(exception=e).error(f"Failed to fetch Reddit user u/{username}")
def GetUser(client: Reddit, username: str) -> Redditor | None:
"""Fetch a Redditor object for the specified Reddit username."""

def GetUserPosts(
self: Self, user: Redditor, checkpoint: int, communities: list[str]
) -> list[Submission]:
"""Fetch the latest posts for the provided Reddit user."""
try:
return client.redditor(username) # pyright: ignore [reportUnknownVariableType]
except Exception as e:
logger.opt(exception=e).error(f"Failed to fetch Reddit user u/{username}")

posts: list[Submission] = []

try:
for post in user.submissions.new(limit=None):
def GetUserPosts(
user: Redditor, checkpoint: int | None, communities: list[str]
) -> list[Submission]:
"""Fetch the latest posts for the provided Reddit user."""

posts: list[Submission] = []

try:
for post in user.submissions.new(limit=None):
if checkpoint:
if int(post.created_utc) < checkpoint:
logger.debug(
f"Checkpoint reached while fetching posts for u/{user.name}"
f"Checkpoint reached while fetching posts ({len(posts):,}) for u/{user.name}"
)

break

if len(communities) > 0:
communityName: str = post.subreddit.display_name
if len(communities) > 0:
communityName: str = post.subreddit.display_name

if not communityName.lower() in communities:
logger.debug(
f"r/{communityName} is not a valid community for u/{user.name}"
)
if not communityName.lower() in communities:
logger.debug(
f"r/{communityName} is not a valid community for u/{user.name}"
)

continue
continue

posts.append(post)

logger.trace(posts)
except Exception as e:
logger.opt(exception=e).error(
f"Failed to fetch posts for Reddit user u/{user.name}"
)

posts.append(post)
logger.info(f"Fetched {len(posts):,} posts for Reddit user u/{user.name}")

logger.trace(posts)
except Exception as e:
logger.opt(exception=e).error(
f"Failed to fetch posts for Reddit user u/{user.name}"
)
return posts

return posts

def GetUserComments(
self: Self, user: Redditor, checkpoint: int, communities: list[str]
) -> list[Comment]:
"""Fetch the latest comments for the provided Reddit user."""
def GetUserComments(
user: Redditor, checkpoint: int | None, communities: list[str]
) -> list[Comment]:
"""Fetch the latest comments for the provided Reddit user."""

comments: list[Comment] = []
comments: list[Comment] = []

try:
for comment in user.comments.new(limit=None):
try:
for comment in user.comments.new(limit=None):
if checkpoint:
if int(comment.created_utc) < checkpoint:
logger.debug(
f"Checkpoint reached while fetching comments for u/{user.name}"
f"Checkpoint reached while fetching comments ({len(comments):,}) for u/{user.name}"
)

break

if len(communities) > 0:
communityName: str = comment.subreddit.display_name
if len(communities) > 0:
communityName: str = comment.subreddit.display_name

if not communityName.lower() in communities:
logger.debug(
f"r/{communityName} is not a valid community for u/{user.name}"
)
if not communityName.lower() in communities:
logger.debug(
f"r/{communityName} is not a valid community for u/{user.name}"
)

continue
continue

comments.append(comment)
comments.append(comment)

logger.trace(comments)
except Exception as e:
logger.opt(exception=e).error(
f"Failed to fetch comments for Reddit user u/{user.name}"
)
logger.trace(comments)
except Exception as e:
logger.opt(exception=e).error(
f"Failed to fetch comments for Reddit user u/{user.name}"
)

return comments
logger.info(f"Fetched {len(comments):,} comments for Reddit user u/{user.name}")

def GetPostComments(self: Self, post: Submission) -> list[Comment]:
"""Fetch all comments on the provided Reddit post."""
return comments

comments: list[Comment] = []

try:
post.comments.replace_more(limit=None)
def GetPostComments(post: Submission) -> list[Comment]:
"""Fetch all comments on the provided Reddit post."""

comments = post.comments.list()
except Exception as e:
logger.error(f"Failed to fetch comments for post {post.id}, {e}")
comments: list[Comment] = []

logger.trace(comments)
try:
post.comments.replace_more(limit=None)

return comments
comments = post.comments.list()
except Exception as e:
logger.error(f"Failed to fetch comments for post {post.id}, {e}")

def GetStickiedComment(self: Self, post: Submission) -> Comment | None:
"""Return the stickied comment object on the provided post."""
logger.trace(comments)

comments: list[Comment] = RedditAPI.GetPostComments(self, post)
return comments

for comment in comments:
if (hasattr(comment, "stickied")) and (comment.stickied):
logger.trace(comment)

return comment
def GetStickiedComment(post: Submission) -> Comment | None:
"""Return the stickied comment object on the provided post."""

def BuildURL(
self: Self, content: Submission | Comment | Redditor, context: bool = False
) -> str:
"""Return a complete URL to the provided Reddit content."""
comments: list[Comment] = GetPostComments(post)

url: str = f"https://reddit.com"
for comment in comments:
if (hasattr(comment, "stickied")) and (comment.stickied):
logger.trace(comment)

if type(content) is Submission:
url += content.permalink
elif type(content) is Comment:
url += content.permalink
return comment

if context:
url += "?context=5"
elif type(content) is Redditor:
url += f"/user/{content.name}"

logger.trace(url)
def BuildURL(content: Submission | Comment | Redditor, context: bool = False) -> str:
"""Return a complete URL to the provided Reddit content."""

return url
url: str = f"https://reddit.com"

def BuildQuote(self: Self, content: Comment, label: str | None) -> str:
"""Return a markdown-formatted quote of the provided comment."""
if type(content) is Submission:
url += content.permalink
elif type(content) is Comment:
url += content.permalink

link: str = f"[Comment]({RedditAPI.BuildURL(self, content, True)})"
author: str = (
f"[u\/{content.author.name}]({RedditAPI.BuildURL(self, content.author)})"
)
if context:
url += "?context=5"
elif type(content) is Redditor:
url += f"/user/{content.name}"

logger.trace(url)

# Escape forward slash with backward slash to avoid sending
# an unnecessary notification to the quoted user
quote: str = f"{link} by {author}"
return url

if label:
quote += f" ({label})"

quote += "\n\n"
def BuildQuote(content: Comment, label: str | None) -> str:
"""Return a markdown-formatted quote of the provided comment."""

for line in content.body.splitlines(True):
quote += f"> {line}"
link: str = f"[Comment]({BuildURL(content, True)})"

logger.trace(quote)
# Escape forward slash with backward slash to avoid sending
# an unnecessary notification to the quoted user
author: str = f"[u\\/{content.author.name}]({BuildURL(content.author)})"
quote: str = f"{link} by {author}"

return quote
if label:
quote += f" ({label})"

def IsModerator(self: Self, user: Redditor | Reddit, community: Subreddit) -> bool:
"""
Determine if the provided user is a Moderator of the
specified community.
"""
quote += "\n\n"

for line in content.body.splitlines(True):
quote += f"> {line}"

logger.trace(quote)

return quote


def IsModerator(user: Redditor | Reddit, community: Subreddit) -> bool:
"""
Determine if the provided user is a Moderator of the
specified community.
"""

# If provided a Reddit instance, use authenticated user
if type(user) is Reddit:
user = user.user.me()
# If provided a Reddit instance, use authenticated user
if isinstance(user, Reddit):
userObj: Redditor = user.user.me() # pyright: ignore [reportUnknownVariableType]
else:
userObj: Redditor = user

if user in community.moderator():
logger.trace(f"u/{user.name} is a Moderator of r/{community.display_name}")
if userObj in community.moderator():
logger.trace(f"u/{userObj.name} is a Moderator of r/{community.display_name}")

return True
return True

logger.trace(f"u/{user.name} is not a Moderator of r/{community.display_name}")
logger.trace(f"u/{userObj.name} is not a Moderator of r/{community.display_name}")

return False
return False
Loading

0 comments on commit 53e531c

Please sign in to comment.