Skip to content

Commit

Permalink
Add parallel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
WolfwithSword committed Aug 22, 2024
1 parent a908cfb commit 8221d19
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
paths:
- '.github/workflows/**'
- 'templates/**'
- 'main.py'
- '*.py'
- 'config.ini'
- 'requirements.txt'
branches:
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ client_id: `your_dev_app_client_id`

client_secret: `you_dev_app_client_secret`

#### [CONCURRENCY]

enabled: `true/false` enable parallel processing concurrency

max_concurrency: `12` max number of concurrent processes to run. Recommend 5-20. If you hit rate-limiting from twitch API, it will pause until the rate opens back up.

# FAQ

- It Crashed!
Expand Down
6 changes: 5 additions & 1 deletion config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ max_children=60

[TWITCH]
client_id=REPLACEME
client_secret=REPLACEME
client_secret=REPLACEME

[CONCURRENCY]
enabled=true
max_concurrency=12
75 changes: 54 additions & 21 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
BLACKLISTED = list(map(str.strip, BLACKLISTED))


CONCURRENCY = config.getboolean(section='CONCURRENCY', option='enabled', fallback=False)
MAX_CONCURRENCY = config.getint(section='CONCURRENCY', option='max_concurrency', fallback=2)


class Streamer:
def __init__(self, twitch_user: TwitchUser):
self.twitch_user = twitch_user
Expand Down Expand Up @@ -89,6 +93,31 @@ def node_color(self):
return "red"


async def init_primary_user(twitch: Twitch, username: str, users: dict):
primary_user = await get_user_by_name(twitch, username)
if not primary_user:
return

primary = StreamerConnection(primary_user)
primary.color = "green"

users[primary.name] = primary
await scan_user(twitch=twitch, user=primary, users=users)


async def scan_user(twitch: Twitch, user: StreamerConnection, users: dict):
videos = await get_videos(twitch, user.twitch_user)
await find_connections_from_videos(twitch, videos, user, users)


def chunkify(li, size):
for i in range(0, len(li), size):
yield li[i:i+size]

# Each user lookup is always two api requests. First is for user check, second is for video archives check.
# So N=500 users, means 1000 API requests


async def twitch_run():
start_time = time.time()
twitch = await Twitch(app_id=CLIENT_ID, app_secret=CLIENT_SECRET)
Expand All @@ -101,20 +130,15 @@ async def twitch_run():
else:
primary_channel_names = [PRIMARY_CHANNEL]

for primary_username in primary_channel_names:

primary_user = await get_user_by_name(twitch, primary_username)
if not primary_user:
continue

primary = StreamerConnection(primary_user)
primary.color = "green"

videos = await get_videos(twitch, primary.twitch_user)

users[primary.name] = primary

await find_connections_from_videos(twitch, videos, primary, users)
if CONCURRENCY and len(primary_channel_names) > 1:
chunks = list(chunkify(primary_channel_names, MAX_CONCURRENCY))
for chunk in chunks:
if chunk:
chunked_users = [init_primary_user(twitch=twitch, username=user_n, users=users) for user_n in chunk]
await asyncio.gather(*chunked_users)
else:
for primary_username in primary_channel_names:
await init_primary_user(twitch, primary_username, users)

if len(users) == 0:
print("No valid primary channels were found. Please reconfigure the primary_channel(s)")
Expand All @@ -123,16 +147,26 @@ async def twitch_run():
print(f"Done loading primary channels: {PRIMARY_CHANNEL}")

while not all_done(users, depth):
for user in list(users):
if not users[user].processed:
videos = await get_videos(twitch, users[user].twitch_user)
#print(f"Checking {user} for children")
await find_connections_from_videos(twitch, videos, users[user], users)
non_processed_users = list([_u for _u in list(users) if not users[_u].processed])
if CONCURRENCY and len(non_processed_users) > 1:
chunks = list(chunkify(non_processed_users, MAX_CONCURRENCY))
for chunk in chunks:
if chunk:
chunked_users = [scan_user(twitch=twitch, user=users[_u], users=users) for _u in chunk]
await asyncio.gather(*chunked_users)
else:
for user in non_processed_users:
await scan_user(twitch=twitch, user=users[user], users=users)
depth += 1
print(f"At depth level {depth} with {len(users)} users")
progress_time = "{:.2f}".format(time.time() - start_time)
print(f"At depth level {depth} with {len(users)} users. {progress_time}s...")

print(f"Depth: {depth}, Users: {len(users)}")

###############
# Build Graph #
###############

G = nx.Graph()
for u in users:
user = users[u]
Expand Down Expand Up @@ -215,7 +249,6 @@ async def find_connections_from_videos(twitch: Twitch, videos: list[Video], user


async def get_user_by_name(twitch: Twitch, username: str):
# TODO: possibly optimize to fetch multiple per depth level at once
try:
user = await first(twitch.get_users(logins=[username]))
except Exception as e:
Expand Down

0 comments on commit 8221d19

Please sign in to comment.