Replies: 1 comment
-
Yes, it's possible. See this example: apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: fan-out-in-params-workflow
namespace: argo
spec:
templates:
- name: fan-out-in-params-workflow
inputs: {}
outputs: {}
metadata: {}
dag:
tasks:
- name: generate
template: generate-artifacts
arguments: {}
- name: fan-out-print
template: fan-out-print
arguments:
parameters:
- name: batch
value: '{{item}}' # Each item is a batch of 20 files
artifacts:
- name: input
from: '{{tasks.generate.outputs.artifacts.fan-out-artifacts}}'
dependencies:
- generate # Ensures this runs after the generate step
withParam: '{{tasks.generate.outputs.result}}' # Iterates over the batches
- name: fan-in
template: fan-in
arguments: {}
dependencies:
- fan-out-print # Ensures this runs after all fan-out tasks complete
- name: generate-artifacts
inputs: {}
outputs:
artifacts:
- name: fan-out-artifacts
path: /tmp/
s3:
key: fanout-{{workflow.name}}/
archive:
none: {} # Prevents compression of artifacts
metadata: {}
script:
name: ''
image: python:alpine3.6
command:
- python
resources: {}
source: |
import json
import sys
import os
files = []
batches = []
# Generate 100 files
for i in range(100):
filename = f'file{i}.txt'
files.append(filename)
with open(os.path.join('tmp', filename), 'w') as f:
f.write(f'hello {i}')
# Group files into batches of 20
for i in range(0, len(files), 20):
batches.append(files[i:i+20])
# Output batches as JSON (this becomes the withParam input)
json.dump(batches, sys.stdout)
- name: fan-out-print
inputs:
parameters:
- name: batch
artifacts:
- name: input
path: /tmp/input
outputs: {}
metadata: {}
script:
name: ''
image: python:alpine3.6
command:
- python
resources: {}
source: |
import json
batch = json.loads('{{inputs.parameters.batch}}')
for file in batch:
with open(f'/tmp/input/{file}', 'r') as f:
filecont = f.read()
print(f'File: {file}, Content: {filecont}') # Prints the file name and content
- name: fan-in
inputs:
artifacts:
- name: artifact-files
path: /tmp
s3:
key: fanout-{{workflow.name}}
outputs: {}
metadata: {}
container:
name: ''
image: alpine:latest
command:
- sh
- '-c'
args:
- ls /tmp # Lists all files in the artifact directory
resources: {}
entrypoint: fan-out-in-params-workflow
arguments: {} This workflow demonstrates the desired fan-out pattern:
References |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Suppose I have 100 artifacts to handle: /myproject/data/[0-99]
If I use the method introduced by this article
Then there will be 100 tasks to submit to cluster.
What I want is to fan out 5 tasks and each of the task handling 20 artifacts.
Is it possible to implement such fan-out pattern?
Beta Was this translation helpful? Give feedback.
All reactions