Skip to content

Commit

Permalink
Merge pull request #31 from sabotack/28-env-variable-for-number-of-cp…
Browse files Browse the repository at this point in the history
…u-threads-to-use

Add env variable for number of cpu threads to use
  • Loading branch information
ViktorPlatz authored May 8, 2024
2 parents 5d71c96 + 3cd435f commit 86def80
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
4 changes: 4 additions & 0 deletions .vsls.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"$schema": "http://json.schemastore.org/vsls",
"gitignore":"none"
}
2 changes: 1 addition & 1 deletion p6/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def main():
links = dataUtils.readLinks()
traffic = dataUtils.readTraffic(DATA_DAY)

with mp.Pool() as pool:
with mp.Pool(processes=dataUtils.CPU_THREADS) as pool:
results = pool.starmap(
process_flows_hour,
[
Expand Down
31 changes: 18 additions & 13 deletions p6/utils/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
DATA_OUTPUT_DIR = os.getenv("DATA_OUTPUT_DIR")
RATIOS_OUTPUT_DIR = os.getenv("RATIOS_OUTPUT_DIR")

CPU_THREADS = os.getenv("CPU_THREADS")
if CPU_THREADS is not None and CPU_THREADS.isdigit() and int(CPU_THREADS) > 0:
CPU_THREADS = int(CPU_THREADS)
else:
CPU_THREADS = mp.cpu_count()

def _process_group(chunk, group_func):

def _processGroup(chunk, group_func):
return chunk.groupby(["timestamp", "pathName"])["path"].apply(group_func)


def _group_func(x):
def _groupFunc(x):
return [path[1:-1].split(";") for path in x]


def _merge_results(results):
def _mergeResults(results):
return {k: v for result in results for k, v in result.items()}


Expand Down Expand Up @@ -65,28 +71,27 @@ def readFlows(day):
logger.debug("Grouping paths...")

# Splitting data into chunks for multiprocessing
cpu_count = mp.cpu_count()
chunk_size = len(dataFlows) // cpu_count
chunkSize = len(dataFlows) // CPU_THREADS
logger.info(
f"Grouping in parallel | CPUs: {cpu_count} | chunk_size: {chunk_size} | len(dataFlows): {len(dataFlows)}"
f"Grouping using CPU threads: {CPU_THREADS} | chunkSize: {chunkSize} | len(dataFlows): {len(dataFlows)}"
)
chunks = [
(
dataFlows[i:]
if rangeIndex == cpu_count - 1
else dataFlows[i : i + chunk_size]
if rangeIndex == CPU_THREADS - 1
else dataFlows[i : i + chunkSize]
)
for rangeIndex, i in enumerate([i * chunk_size for i in range(cpu_count)])
for rangeIndex, i in enumerate([i * chunkSize for i in range(CPU_THREADS)])
]

partial_process_group = partial(_process_group, group_func=_group_func)
partialProcessGroup = partial(_processGroup, group_func=_groupFunc)

# Create a pool of processes and apply the process_group function to each chunk
with mp.Pool() as pool:
results = pool.map(partial_process_group, chunks)
with mp.Pool(processes=CPU_THREADS) as pool:
results = pool.map(partialProcessGroup, chunks)

# Merge the results from all processes
grouped_flows = _merge_results(results)
grouped_flows = _mergeResults(results)

# grouped_flows = dataFlows.groupby(['timestamp', 'pathName'])['path'].apply(lambda x: [path[1:-1].split(';') for path in x]).to_dict()
logger.debug("Finished grouping paths")
Expand Down

0 comments on commit 86def80

Please sign in to comment.