Merge pull request #1640 from crewAIInc/gui/fix-threading

Fix threading
This commit is contained in:
Gui Vieira
2024-11-21 15:50:46 -03:00
committed by GitHub
2 changed files with 29 additions and 16 deletions

View File

@@ -1,6 +1,6 @@
import io
import logging
import sys
import threading
import warnings
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Union
@@ -13,16 +13,25 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
)
class FilteredStream(io.StringIO):
def write(self, s):
if (
"Give Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new"
in s
or "LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True`"
in s
):
return
super().write(s)
class FilteredStream:
def __init__(self, original_stream):
self._original_stream = original_stream
self._lock = threading.Lock()
def write(self, s) -> int:
with self._lock:
if (
"Give Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new"
in s
or "LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True`"
in s
):
return 0
return self._original_stream.write(s)
def flush(self):
with self._lock:
return self._original_stream.flush()
LLM_CONTEXT_WINDOW_SIZES = {
@@ -60,8 +69,8 @@ def suppress_warnings():
# Redirect stdout and stderr
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = FilteredStream()
sys.stderr = FilteredStream()
sys.stdout = FilteredStream(old_stdout)
sys.stderr = FilteredStream(old_stderr)
try:
yield

View File

@@ -20,10 +20,10 @@ from pydantic import (
from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.i18n import I18N
@@ -208,7 +208,9 @@ class Task(BaseModel):
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
threading.Thread(
target=self._execute_task_async, args=(agent, context, tools, future)
daemon=True,
target=self._execute_task_async,
args=(agent, context, tools, future),
).start()
return future
@@ -277,7 +279,9 @@ class Task(BaseModel):
content = (
json_output
if json_output
else pydantic_output.model_dump_json() if pydantic_output else result
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)