-
Notifications
You must be signed in to change notification settings - Fork 0
/
reddit-query.py
executable file
·458 lines (412 loc) · 15.8 KB
/
reddit-query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
#!/usr/bin/env python3
"""Query Pushshift and Reddit data.
Pull from the Pushshift and Reddit APIs and generate a file with columns
for submissions' deletion status of author and message, at time of Pushshift's indexing
(often within 24 hours) and Reddit's current version. This permits one to answer the
question: What proportion of people on a subreddit delete their posts?
"""
__author__ = "Joseph Reagle"
__copyright__ = "Copyright (C) 2009-2023 Joseph Reagle"
__license__ = "GLPv3"
__version__ = "1.0"
import argparse # http://docs.python.org/dev/library/argparse.html
import collections
import logging as log
import pathlib as pl
import shelve
import sys
import typing as typ
import pandas as pd
import pendulum # https://pendulum.eustace.io/docs/
import praw # type: ignore # https://praw.readthedocs.io/en/latest
import tqdm # progress bar https://github.com/tqdm/tqdm
import reddit_sample as rs
import web_utils
# https://github.com/pushshift/api
# import psaw # Pushshift API https://github.com/dmarx/psaw no exclude:not
NOW = pendulum.now("UTC")
NOW_STR = NOW.format("YYYYMMDD")
PUSHSHIFT_LIMIT = 100
REDDIT_LIMIT = 100
REDDIT = praw.Reddit(
user_agent=web_utils.get_credential("Reddit_API", "REDDIT_USER_AGENT"),
client_id=web_utils.get_credential("Reddit_API", "REDDIT_CLIENT_ID"),
client_secret=web_utils.get_credential("Reddit_API", "REDDIT_CLIENT_SECRET"),
ratelimit_seconds=600,
)
# This is duplicated in reddit-query.py and reddit-message.py
def is_throwaway(user_name: str) -> bool:
"""Determine if a user name is a throwaway account."""
name = user_name.lower()
# "throwra" is common throwaway in (relationship) advice subreddits
return ("throw" in name and "away" in name) or ("throwra" in name)
def prefetch_reddit_posts(ids_req: list[str]) -> shelve.Shelf[typ.Any]:
"""Use praw's info() method to pre-fetch and cache Reddit info."""
# TODO if key already in shelf continue, otherwise grab
# Break up into 100s
shelf = shelve.open("shelf-reddit.dbm")
ids_shelved = set(shelf.keys())
ids_needed = set(ids_req) - ids_shelved
t3_ids = [i if i.startswith("t3_") else f"t3_{i}" for i in ids_needed]
submissions = reddit.info(fullnames=t3_ids)
print("pre-fetch: storing in shelf")
for submission in tqdm.tqdm(submissions, total=len(t3_ids)):
# print(f"{count: <3} {submission.id} {submission.title}")
shelf[submission.id] = submission
return shelf
def get_reddit_info(
shelf: shelve.Shelf[typ.Any], id_: str, author_pushshift: str
) -> tuple[str, bool, bool]:
"""Given id, returns info from reddit."""
author_reddit = "NA"
is_deleted = False
is_removed = False
if args.skip:
log.debug(f"reddit skipped because args.skip {author_pushshift=}")
elif args.throwaway_only and not is_throwaway(author_pushshift):
log.debug(
"reddit skipped because args.throwaway_only but not throwaway "
+ f"{author_pushshift=}"
)
else:
author_reddit = "[deleted]"
# submission = REDDIT.submission(id=id_)
if id_ in shelf:
submission = shelf[id_]
else:
# These instances are very rare 0.001%
# https://www.reddit.com/r/pushshift/comments/vby7c2/rare_pushshift_has_a_submission_id_reddit_returns/icbbtkr/?context=3
print(f"WARNING: {id_=} not in shelf")
return "[deleted]", False, False
author_reddit = "[deleted]" if not submission.author else submission.author
log.debug(f"reddit found {author_pushshift=}")
log.debug(f"{submission=}")
# https://www.reddit.com/r/pushshift/comments/v6vrmo/was_this_message_removed_or_deleted/
is_removed = submission.selftext == "[removed]"
if (
submission.selftext == "[deleted]"
or submission.title == "[deleted by user]"
):
is_deleted = True
# when removed and then deleted, set deleted as well
if submission.removed_by_category == "deleted":
is_deleted = True
return author_reddit, is_deleted, is_removed
def construct_df(pushshift_total: int, pushshift_results: list[dict]) -> typ.Any:
"""Given pushshift query results, return dataframe of info about submissions.
https://github.com/pushshift/api
https://github.com/dmarx/psaw
https://www.reddit.com/dev/api/
https://praw.readthedocs.io/en/latest
"""
# Use these for manual confirmation of results
# PUSHSHIFT_API_URL = (
# "https://api.pushshift.io/reddit/submission/search?ids="
# )
# REDDIT_API_URL = "https://api.reddit.com/api/info/?id=t3_"
results_row = []
ids_counter = collections.Counter()
ids_all = [message["id"] for message in pushshift_results]
shelf = prefetch_reddit_posts(ids_all)
for pr in tqdm.tqdm(pushshift_results, total=len(ids_all)):
log.debug(f"{pr['id']=} {pr['author']=} {pr['title']=}\n")
ids_counter[pr["id"]] += 1
created_utc = pendulum.from_timestamp(pr["created_utc"]).format(
"YYYYMMDD HH:mm:ss"
)
elapsed_hours = round((pr["retrieved_on"] - pr["created_utc"]) / 3600)
author_r, is_deleted_r, is_removed_r = get_reddit_info(
shelf, pr["id"], pr["author"]
)
results_row.append(
( # comments correspond to headings in dataframe below
pr["subreddit"],
pushshift_total, # total_p: total range if sampling
author_r, # author_r(eddit)
pr["author"], # author_p(ushshift)
pr["author"] == "[deleted]", # del_author_p(ushshift)
author_r == "[deleted]", # del_author_r(eddit)
pr["id"], # id (pushshift)
pr["title"], # title (pushshift)
created_utc,
elapsed_hours, # elapsed hours when pushshift indexed
pr["score"], # at time of ingest
pr["num_comments"], # updated as comments ingested
pr.get("selftext", "") == "[deleted]", # del_text_p(ushshift)
is_deleted_r, # del_text_r(eddit)
is_removed_r, # rem_text_r(eddit)
# the following using
pr["full_link"] != pr.get("url"), # crosspost
pr["full_link"], # url
# PUSHSHIFT_API_URL + r["id"],
# REDDIT_API_URL + r["id"],
)
)
log.debug(results_row)
posts_df = pd.DataFrame(
results_row,
columns=[
"subreddit",
"total_p",
"author_r",
"author_p",
"del_author_p", # on pushshift
"del_author_r", # on reddit
"id",
"title",
"created_utc",
"elapsed_hours",
"score_p",
"comments_num_p",
"del_text_p",
"del_text_r",
"rem_text_r",
"crosspost",
"url",
# "url_api_p",
# "url_api_r",
],
)
ids_repeating = [m_id for m_id, count in ids_counter.items() if count > 1]
if ids_repeating:
print(f"WARNING: repeat IDs = {ids_repeating=}")
return posts_df
def query_pushshift(
limit: int,
after: pendulum.DateTime,
before: pendulum.DateTime,
subreddit: str,
query: str = "",
comments_num: str = ">0",
) -> typ.Any:
"""Given search parameters, query pushshift and return JSON."""
# https://github.com/pushshift/api
# no need to pass different limit params beyond 100 (Pushshift's limit)
# as it creates unnecessary keys in get_JSON cache
if limit >= PUSHSHIFT_LIMIT:
limit_param = f"limit={PUSHSHIFT_LIMIT}&"
else:
limit_param = f"limit={limit}&"
after_human = after.format("YYYY-MM-DD HH:mm:ss")
before_human = before.format("YYYY-MM-DD HH:mm:ss")
log.critical(f"******* between {after_human} and {before_human}")
after_timestamp = after.int_timestamp
before_timestamp = before.int_timestamp
log.debug(f"******* between {after_timestamp} and {before_timestamp}")
optional_params = ""
if after:
optional_params += f"&after={after_timestamp}"
if before:
optional_params += f"&before={before_timestamp}"
if comments_num:
# I prefer `comments_num`, but Reddit uses poorly
# named `num_comments`
optional_params += f"&num_comments={comments_num}"
# this can be use to remove typ.any message with "removed"
# see earlier commits for full functionality
# optional_params += f"&selftext:not=[removed]"
pushshift_url = (
"https://api.pushshift.io/reddit/submission/search/"
+ f"?{limit_param}subreddit={subreddit}{optional_params}"
)
print(f"{pushshift_url=}")
pushshift_data = web_utils.get_JSON(pushshift_url)["data"]
if len(pushshift_data) != 100:
print(f"short on some entries {len(pushshift_data)}")
# breakpoint()
return pushshift_data
def collect_pushshift_results(
limit: int,
after: pendulum.DateTime,
before: pendulum.DateTime,
subreddit: str,
query: str = "",
comments_num: str = ">0",
) -> tuple[int, typ.Any]:
"""Breakup queries by PUSHSHIFT_LIMIT.
Pushshift limited to PUSHSHIFT_LIMIT (100) results,
so need multiple queries to collect results in date range up to
or sampled at limit.
"""
log.info(f"{after=}, {before=}")
log.info(f"{after.timestamp()=}, {before.timestamp()=}")
if args.sample: # collect PUSHSHIFT_LIMIT at offsets
# TODO/BUG: comments_num won't work with sampling estimates
# because they'll throw off the estimates
results_total = rs.get_pushshift_total(subreddit, after, before)
offsets = rs.get_offsets(subreddit, after, before, limit, PUSHSHIFT_LIMIT)
log.info(f"{offsets=}")
results_found = []
for query_iteration, after_offset in enumerate(offsets):
log.info(f"{after_offset=}, {before=}")
log.critical(f"{query_iteration}")
results = query_pushshift(
limit, after_offset, before, subreddit, query, comments_num
)
results_found.extend(results)
else: # collect only first message starting with after up to limit
# I need an initial to see if there's anything in results
results_total = rs.get_pushshift_total(subreddit, after, before)
query_iteration = 1
results = results_found = query_pushshift(
limit, after, before, subreddit, query, comments_num
)
while len(results) != 0 and len(results_found) < limit:
log.critical(f"{query_iteration=}")
query_iteration += 1
after_new = pendulum.from_timestamp(results[-1]["created_utc"])
results = query_pushshift(
limit, after_new, before, subreddit, query, comments_num
)
results_found.extend(results)
results_found = results_found[0:limit]
print(f"returning {len(results_found)} (first) posts in range\n")
log.info(f"{results_total=}")
log.info(f"{results_found=}")
return results_total, results_found
def export_df(name, df) -> None:
"""Export dataframe to CSV."""
df.to_csv(f"{name}.csv", encoding="utf-8-sig", index=False)
print(f"saved dataframe of shape {df.shape} to '{name}.csv'")
def main(argv) -> argparse.Namespace:
"""Process arguments."""
arg_parser = argparse.ArgumentParser(description="Query Pushshift and Reddit APIs.")
# optional arguments
arg_parser.add_argument(
"-a",
"--after",
type=str,
default=False,
help="""submissions after: Y-m-d (any pendulum parsable)."""
+ """ Using it without before starts in 1970!""",
)
arg_parser.add_argument(
"-b",
"--before",
type=str,
default=False,
help="""submissions before: Y-m-d (any pendulum parsable)."""
+ """ Using it without before starts in 1970!""",
)
# # TODO: add cache clearing mechanism
# arg_parser.add_argument(
# "-c",
# "--clear_cache",
# type=bool,
# default=False,
# help="""clear web I/O cache (default: %(default)s).""",
# )
arg_parser.add_argument(
"-l",
"--limit",
type=int,
default=5,
help="limit to (default: %(default)s) results ",
)
arg_parser.add_argument(
"-c",
"--comments_num",
type=str,
default=False,
help="""number of comments threshold """
+ r"""'[<>]\d+]' (default: %(default)s). """
+ """Note: this is updated as Pushshift ingests, `score` is not.""",
)
arg_parser.add_argument(
"-r",
"--subreddit",
type=str,
default="AmItheAsshole",
help="subreddit to query (default: %(default)s)",
)
arg_parser.add_argument(
"--sample",
action="store_true",
default=False,
help="""sample complete date range up to limit, rather than """
+ """first submissions within limit""",
)
arg_parser.add_argument(
"--skip",
action="store_true",
default=False,
help="skip all Reddit fetches; pushshift only",
)
arg_parser.add_argument(
"-t",
"--throwaway-only",
action="store_true",
default=False,
help=(
"""only throwaway accounts ('throw' and 'away') get """
+ """ fetched from Reddit"""
),
)
arg_parser.add_argument(
"-L",
"--log-to-file",
action="store_true",
default=False,
help="log to file %(prog)s.log",
)
arg_parser.add_argument(
"-V",
"--verbose",
action="count",
default=0,
help="increase verbosity from critical though error, warning, info, and debug",
)
arg_parser.add_argument("--version", action="version", version="0.4")
args = arg_parser.parse_args(argv)
log_level = (log.CRITICAL) - (args.verbose * 10)
LOG_FORMAT = "%(levelname).4s %(funcName).10s:%(lineno)-4d| %(message)s"
if args.log_to_file:
print("logging to file")
log.basicConfig(
filename=f"{pl.PurePath(__file__).name!s}.log",
filemode="w",
level=log_level,
format=LOG_FORMAT,
)
else:
log.basicConfig(level=log_level, format=LOG_FORMAT)
return args
if __name__ == "__main__":
args = main(sys.argv[1:])
# syntactical tweaks to filename
if args.after and args.before:
after: pendulum.DateTime = pendulum.parse(args.after)
before: pendulum.DateTime = pendulum.parse(args.before)
date_str = f"{after.format('YYYYMMDD')}-{before.format('YYYYMMDD')}"
elif args.after:
after = pendulum.parse(args.after)
date_str = f"{after.format('YYYYMMDD')}-{NOW_STR}"
elif args.before:
raise RuntimeError("--before cannot be used without --after")
if args.comments_num:
comments_num = args.comments_num
if comments_num[0] == ">":
comments_num = comments_num[1:] + "+"
elif comments_num[0] == "<":
comments_num = comments_num[1:] + "-"
comments_num = "_c" + comments_num
else:
comments_num = ""
sample = "_sampled" if args.sample else ""
throwaway = "_throwaway" if args.throwaway_only else ""
query = {
"limit": args.limit,
"before": before,
"after": after,
"subreddit": args.subreddit,
"comments_num": args.comments_num,
}
print(f"{query=}")
ps_total, ps_results = collect_pushshift_results(**query)
posts_df = construct_df(ps_total, ps_results)
number_results = len(posts_df)
result_name = (
f"""reddit_{date_str}_{args.subreddit}{comments_num}"""
+ f"""_l{args.limit}_n{number_results}{sample}{throwaway}"""
)
export_df(result_name, posts_df)