Skip to content

Commit

Permalink
feat(workflow): Add files into MessageEndStreamResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
laipz8200 committed Sep 26, 2024
1 parent a55f7de commit c5e6b06
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 8 deletions.
10 changes: 8 additions & 2 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import time
from collections.abc import Generator
from collections.abc import Generator, Mapping
from typing import Any, Optional, Union

from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME
Expand Down Expand Up @@ -49,6 +49,7 @@
from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from enums.workflow_nodes import NodeType
from events.message_event import message_was_created
from extensions.ext_database import db
from models.account import Account
Expand Down Expand Up @@ -112,6 +113,7 @@ def __init__(
self._task_state = WorkflowTaskState()

self._conversation_name_generate_thread = None
self._recorded_files: list[Mapping[str, Any]] = []

def process(self):
"""
Expand Down Expand Up @@ -289,6 +291,10 @@ def _process_stream_response(
elif isinstance(event, QueueNodeSucceededEvent):
workflow_node_execution = self._handle_workflow_node_execution_success(event)

# Record files if it's an answer node or end node
if event.node_type in [NodeType.ANSWER, NodeType.END]:
self._recorded_files.extend(self._fetch_files_from_node_outputs(event.outputs or {}))

response = self._workflow_node_finish_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
Expand Down Expand Up @@ -527,7 +533,7 @@ def _message_end_to_stream_response(self) -> MessageEndStreamResponse:
del extras["metadata"]["annotation_reply"]

return MessageEndStreamResponse(
task_id=self._application_generate_entity.task_id, id=self._message.id, **extras
task_id=self._application_generate_entity.task_id, id=self._message.id, files=self._recorded_files, **extras
)

def _handle_output_moderation_chunk(self, text: str) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions api/core/app/entities/task_entities.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Mapping
from enum import Enum
from typing import Any, Optional

Expand Down Expand Up @@ -119,6 +120,7 @@ class MessageEndStreamResponse(StreamResponse):
event: StreamEvent = StreamEvent.MESSAGE_END
id: str
metadata: dict = {}
files: Optional[list[Mapping[str, Any]]] = None


class MessageFileStreamResponse(StreamResponse):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _run(self):

value = variable.value
inputs = {"variable_selector": variable_selector}
process_data = {"value": value}
process_data = {"documents": value if isinstance(value, list) else [value]}

try:
if isinstance(value, list):
Expand Down
6 changes: 1 addition & 5 deletions api/core/workflow/nodes/list_filter/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ class ListFilterNode(BaseNode):

def _run(self):
node_data = cast(ListFilterNodeData, self.node_data)
inputs = {
"filter_by": [filter_by.model_dump() for filter_by in node_data.filter_by],
"order_by": node_data.order_by.model_dump(),
"limit": node_data.limit.model_dump(),
}
inputs = {}
process_data = {}
outputs = {}

Expand Down

0 comments on commit c5e6b06

Please sign in to comment.