Compare commits

...

9 Commits

Author SHA1 Message Date
Lucas Gomide
ebcda8b2ec Merge branch 'main' into lg-support-set-task-context 2025-05-13 18:13:35 -03:00
Lucas Gomide
fed397f745 refactor: move logic to fetch agent to utilities file (#2822)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-05-13 09:51:21 -04:00
Lucas Gomide
d55e596800 feat: support to load an Agent from a repository (#2816)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* feat: support to load an Agent from a repository

* test: fix get_auth_token test
2025-05-12 16:08:57 -04:00
Lucas Gomide
f700e014c9 fix: address race condition in FilteredStream by using context managers (#2818)
During the sys.stdout = FilteredStream(old_stdout) assignment, if any code (including logging, print, or internal library output) writes to sys.stdout immediately, and that write happens before __init__ completes, the write() method is called on a not-fully-initialized object.. hence _lock doesn’t exist yet.
2025-05-12 15:05:14 -04:00
Lucas Gomide
fced8ba47f sytle: fix linter issues 2025-05-12 11:53:57 -03:00
Lucas Gomide
7204910da4 Merge branch 'main' into lg-support-set-task-context 2025-05-12 09:47:52 -03:00
Vidit Ostwal
4e496d7a20 Added link to github issue (#2810)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
2025-05-12 08:27:18 -04:00
Lucas Gomide
8663c7e1c2 Enable ALL Ruff rules set by default (#2775)
* style: use Ruff default linter rules

* ci: check linter files over changed ones
2025-05-12 08:10:31 -04:00
Lucas Gomide
971a90f534 feat: support to set an empty context to the Task 2025-05-10 09:46:21 -03:00
16 changed files with 260 additions and 62 deletions

View File

@@ -5,12 +5,29 @@ on: [pull_request]
jobs:
lint:
runs-on: ubuntu-latest
env:
TARGET_BRANCH: ${{ github.event.pull_request.base.ref }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install Requirements
- name: Fetch Target Branch
run: git fetch origin $TARGET_BRANCH --depth=1
- name: Install Ruff
run: pip install ruff
- name: Get Changed Python Files
id: changed-files
run: |
pip install ruff
merge_base=$(git merge-base origin/"$TARGET_BRANCH" HEAD)
changed_files=$(git diff --name-only --diff-filter=ACMRTUB "$merge_base" | grep '\.py$' || true)
echo "files<<EOF" >> $GITHUB_OUTPUT
echo "$changed_files" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: Run Ruff Linter
run: ruff check
- name: Run Ruff on Changed Files
if: ${{ steps.changed-files.outputs.files != '' }}
run: |
echo "${{ steps.changed-files.outputs.files }}" | tr " " "\n" | xargs -I{} ruff check "{}"

View File

@@ -2,8 +2,3 @@ exclude = [
"templates",
"__init__.py",
]
[lint]
select = [
"I", # isort rules
]

View File

@@ -700,4 +700,11 @@ recent_news = SpaceNewsKnowledgeSource(
- Configure appropriate embedding models
- Consider using local embedding providers for faster processing
</Accordion>
<Accordion title="One Time Knowledge">
- With the typical file structure provided by CrewAI, knowledge sources are embedded every time the kickoff is triggered.
- If the knowledge sources are large, this leads to inefficiency and increased latency, as the same data is embedded each time.
- To resolve this, directly initialize the knowledge parameter instead of the knowledge_sources parameter.
- Link to the issue to get complete idea [Github Issue](https://github.com/crewAIInc/crewAI/issues/2755)
</Accordion>
</AccordionGroup>

View File

@@ -20,6 +20,7 @@ from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Converter, Prompts
from crewai.utilities.agent_utils import (
get_tool_names,
load_agent_from_repository,
parse_tools,
render_text_description_and_args,
)
@@ -134,6 +135,16 @@ class Agent(BaseAgent):
default=None,
description="Knowledge search query for the agent dynamically generated by the agent.",
)
from_repository: Optional[str] = Field(
default=None,
description="The Agent's role to be used from your repository.",
)
@model_validator(mode="before")
def validate_from_repository(cls, v):
if v is not None and (from_repository := v.get("from_repository")):
return load_agent_from_repository(from_repository) | v
return v
@model_validator(mode="after")
def post_init_setup(self):

View File

@@ -5,5 +5,5 @@ def get_auth_token() -> str:
"""Get the authentication token."""
access_token = TokenManager().get_token()
if not access_token:
raise Exception()
raise Exception("No token found, make sure you are logged in")
return access_token

View File

@@ -14,6 +14,7 @@ class PlusAPI:
TOOLS_RESOURCE = "/crewai_plus/api/v1/tools"
CREWS_RESOURCE = "/crewai_plus/api/v1/crews"
AGENTS_RESOURCE = "/crewai_plus/api/v1/agents"
def __init__(self, api_key: str) -> None:
self.api_key = api_key
@@ -37,6 +38,9 @@ class PlusAPI:
def get_tool(self, handle: str):
return self._make_request("GET", f"{self.TOOLS_RESOURCE}/{handle}")
def get_agent(self, handle: str):
return self._make_request("GET", f"{self.AGENTS_RESOURCE}/{handle}")
def publish_tool(
self,
handle: str,

View File

@@ -52,7 +52,7 @@ from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool, Tool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.events.crew_events import (
@@ -478,7 +478,7 @@ class Crew(FlowTrackable, BaseModel):
separated by a synchronous task.
"""
for i, task in enumerate(self.tasks):
if task.async_execution and task.context:
if task.async_execution and isinstance(task.context, list):
for context_task in task.context:
if context_task.async_execution:
for j in range(i - 1, -1, -1):
@@ -496,7 +496,7 @@ class Crew(FlowTrackable, BaseModel):
task_indices = {id(task): i for i, task in enumerate(self.tasks)}
for task in self.tasks:
if task.context:
if isinstance(task.context, list):
for context_task in task.context:
if id(context_task) not in task_indices:
continue # Skip context tasks not in the main tasks list
@@ -1034,11 +1034,14 @@ class Crew(FlowTrackable, BaseModel):
)
return cast(List[BaseTool], tools)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
def _get_context(self, task: Task, task_outputs: List[TaskOutput]) -> str:
if not task.context:
return ""
context = (
aggregate_raw_outputs_from_tasks(task.context)
if task.context
else aggregate_raw_outputs_from_task_outputs(task_outputs)
aggregate_raw_outputs_from_task_outputs(task_outputs)
if task.context is NOT_SPECIFIED
else aggregate_raw_outputs_from_tasks(task.context)
)
return context
@@ -1226,7 +1229,7 @@ class Crew(FlowTrackable, BaseModel):
task_mapping[task.key] = cloned_task
for cloned_task, original_task in zip(cloned_tasks, self.tasks):
if original_task.context:
if isinstance(original_task.context, list):
cloned_context = [
task_mapping[context_task.key]
for context_task in original_task.context

View File

@@ -5,8 +5,7 @@ import sys
import threading
import warnings
from collections import defaultdict
from contextlib import contextmanager
from types import SimpleNamespace
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from typing import (
Any,
DefaultDict,
@@ -31,7 +30,6 @@ from crewai.utilities.events.llm_events import (
LLMCallType,
LLMStreamChunkEvent,
)
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
@@ -45,6 +43,9 @@ with warnings.catch_warnings():
from litellm.utils import supports_response_schema
import io
from typing import TextIO
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -54,12 +55,17 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
load_dotenv()
class FilteredStream:
def __init__(self, original_stream):
class FilteredStream(io.TextIOBase):
_lock = None
def __init__(self, original_stream: TextIO):
self._original_stream = original_stream
self._lock = threading.Lock()
def write(self, s) -> int:
def write(self, s: str) -> int:
if not self._lock:
self._lock = threading.Lock()
with self._lock:
# Filter out extraneous messages from LiteLLM
if (
@@ -214,15 +220,11 @@ def suppress_warnings():
)
# Redirect stdout and stderr
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = FilteredStream(old_stdout)
sys.stderr = FilteredStream(old_stderr)
try:
with (
redirect_stdout(FilteredStream(sys.stdout)),
redirect_stderr(FilteredStream(sys.stderr)),
):
yield
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
class Delta(TypedDict):

View File

@@ -2,7 +2,6 @@ import datetime
import inspect
import json
import logging
import re
import threading
import uuid
from concurrent.futures import Future
@@ -41,6 +40,7 @@ from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.events import (
TaskCompletedEvent,
@@ -97,7 +97,7 @@ class Task(BaseModel):
)
context: Optional[List["Task"]] = Field(
description="Other tasks that will have their output used as context for this task.",
default=None,
default=NOT_SPECIFIED,
)
async_execution: Optional[bool] = Field(
description="Whether the task should be executed asynchronously or not.",
@@ -643,7 +643,7 @@ class Task(BaseModel):
cloned_context = (
[task_mapping[context_task.key] for context_task in self.context]
if self.context
if isinstance(self.context, list)
else None
)

View File

@@ -10,6 +10,18 @@ from contextlib import contextmanager
from importlib.metadata import version
from typing import TYPE_CHECKING, Any, Optional
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span, Status, StatusCode
from crewai.telemetry.constants import (
CREWAI_TELEMETRY_BASE_URL,
CREWAI_TELEMETRY_SERVICE_NAME,
@@ -25,18 +37,6 @@ def suppress_warnings():
yield
from opentelemetry import trace # noqa: E402
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter, # noqa: E402
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
from opentelemetry.sdk.trace.export import ( # noqa: E402
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
if TYPE_CHECKING:
from crewai.crew import Crew
from crewai.task import Task
@@ -232,7 +232,7 @@ class Telemetry:
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
if task.context
if isinstance(task.context, list)
else None
),
"tools_names": [
@@ -748,7 +748,7 @@ class Telemetry:
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
if task.context
if isinstance(task.context, list)
else None
),
"tools_names": [

View File

@@ -16,6 +16,7 @@ from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.utilities import I18N, Printer
from crewai.utilities.errors import AgentRepositoryError
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
@@ -428,3 +429,36 @@ def show_agent_logs(
printer.print(
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n"
)
def load_agent_from_repository(from_repository: str) -> Dict[str, Any]:
attributes: Dict[str, Any] = {}
if from_repository:
import importlib
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.plus_api import PlusAPI
client = PlusAPI(api_key=get_auth_token())
response = client.get_agent(from_repository)
if response.status_code != 200:
raise AgentRepositoryError(
f"Agent {from_repository} could not be loaded: {response.text}"
)
agent = response.json()
for key, value in agent.items():
if key == "tools":
attributes[key] = []
for tool_name in value:
try:
module = importlib.import_module("crewai_tools")
tool_class = getattr(module, tool_name)
attributes[key].append(tool_class())
except Exception as e:
raise AgentRepositoryError(
f"Tool {tool_name} could not be loaded: {e}"
) from e
else:
attributes[key] = value
return attributes

View File

@@ -5,3 +5,14 @@ KNOWLEDGE_DIRECTORY = "knowledge"
MAX_LLM_RETRY = 3
MAX_FILE_NAME_LENGTH = 255
EMITTER_COLOR = "bold_blue"
class _NotSpecified:
def __repr__(self):
return "NOT_SPECIFIED"
# Sentinel value used to detect when no value has been explicitly provided.
# Unlike `None`, which might be a valid value from the user, `NOT_SPECIFIED` allows
# us to distinguish between "not passed at all" and "explicitly passed None" or "[]".
NOT_SPECIFIED = _NotSpecified()

View File

@@ -1,4 +1,5 @@
"""Error message definitions for CrewAI database operations."""
from typing import Optional
@@ -37,3 +38,9 @@ class DatabaseError:
The formatted error message
"""
return template.format(str(error))
class AgentRepositoryError(Exception):
"""Exception raised when an agent repository is not found."""
...

View File

@@ -1,6 +1,6 @@
import re
from typing import TYPE_CHECKING, List
if TYPE_CHECKING:
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
@@ -17,6 +17,11 @@ def aggregate_raw_outputs_from_task_outputs(task_outputs: List["TaskOutput"]) ->
def aggregate_raw_outputs_from_tasks(tasks: List["Task"]) -> str:
"""Generate string context from the tasks."""
task_outputs = [task.output for task in tasks if task.output is not None]
task_outputs = (
[task.output for task in tasks if task.output is not None]
if isinstance(tasks, list)
else []
)
return aggregate_raw_outputs_from_task_outputs(task_outputs)

View File

@@ -2,7 +2,7 @@
import os
from unittest import mock
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import pytest
@@ -18,6 +18,7 @@ from crewai.tools import tool
from crewai.tools.tool_calling import InstructorToolCalling
from crewai.tools.tool_usage import ToolUsage
from crewai.utilities import RPMController
from crewai.utilities.errors import AgentRepositoryError
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.tool_usage_events import ToolUsageFinishedEvent
@@ -308,9 +309,7 @@ def test_cache_hitting():
def handle_tool_end(source, event):
received_events.append(event)
with (
patch.object(CacheHandler, "read") as read,
):
with (patch.object(CacheHandler, "read") as read,):
read.return_value = "0"
task = Task(
description="What is 2 times 6? Ignore correctness and just return the result of the multiplication tool, you must use the tool.",
@@ -1040,7 +1039,7 @@ def test_agent_human_input():
CrewAgentExecutor,
"_invoke_loop",
return_value=AgentFinish(output="Hello", thought="", text=""),
) as mock_invoke_loop,
),
):
# Execute the task
output = agent.execute_task(task)
@@ -2025,3 +2024,86 @@ def test_get_knowledge_search_query():
},
]
)
@pytest.fixture
def mock_get_auth_token():
with patch(
"crewai.cli.authentication.token.get_auth_token", return_value="test_token"
):
yield
@patch("crewai.cli.plus_api.PlusAPI.get_agent")
def test_agent_from_repository(mock_get_agent, mock_get_auth_token):
from crewai_tools import SerperDevTool
mock_get_response = MagicMock()
mock_get_response.status_code = 200
mock_get_response.json.return_value = {
"role": "test role",
"goal": "test goal",
"backstory": "test backstory",
"tools": ["SerperDevTool"],
}
mock_get_agent.return_value = mock_get_response
agent = Agent(from_repository="test_agent")
assert agent.role == "test role"
assert agent.goal == "test goal"
assert agent.backstory == "test backstory"
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], SerperDevTool)
@patch("crewai.cli.plus_api.PlusAPI.get_agent")
def test_agent_from_repository_override_attributes(mock_get_agent, mock_get_auth_token):
from crewai_tools import SerperDevTool
mock_get_response = MagicMock()
mock_get_response.status_code = 200
mock_get_response.json.return_value = {
"role": "test role",
"goal": "test goal",
"backstory": "test backstory",
"tools": ["SerperDevTool"],
}
mock_get_agent.return_value = mock_get_response
agent = Agent(from_repository="test_agent", role="Custom Role")
assert agent.role == "Custom Role"
assert agent.goal == "test goal"
assert agent.backstory == "test backstory"
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], SerperDevTool)
@patch("crewai.cli.plus_api.PlusAPI.get_agent")
def test_agent_from_repository_with_invalid_tools(mock_get_agent, mock_get_auth_token):
mock_get_response = MagicMock()
mock_get_response.status_code = 200
mock_get_response.json.return_value = {
"role": "test role",
"goal": "test goal",
"backstory": "test backstory",
"tools": ["DoesNotExist"],
}
mock_get_agent.return_value = mock_get_response
with pytest.raises(
AgentRepositoryError,
match="Tool DoesNotExist could not be loaded: module 'crewai_tools' has no attribute 'DoesNotExist'",
):
Agent(from_repository="test_agent")
@patch("crewai.cli.plus_api.PlusAPI.get_agent")
def test_agent_from_repository_agent_not_found(mock_get_agent, mock_get_auth_token):
mock_get_response = MagicMock()
mock_get_response.status_code = 404
mock_get_response.text = "Agent not found"
mock_get_agent.return_value = mock_get_response
with pytest.raises(
AgentRepositoryError,
match="Agent NOT_FOUND could not be loaded: Agent not found",
):
Agent(from_repository="NOT_FOUND")

View File

@@ -2,22 +2,18 @@
import hashlib
import json
import os
import tempfile
from concurrent.futures import Future
from unittest import mock
from unittest.mock import MagicMock, patch
from unittest.mock import ANY, MagicMock, patch
import pydantic_core
import pytest
from crewai.agent import Agent
from crewai.agents import CacheHandler
from crewai.agents.cache import CacheHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.flow import Flow, listen, start
from crewai.flow import Flow, start
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
@@ -3141,6 +3137,30 @@ def test_replay_with_context():
assert crew.tasks[1].context[0].output.raw == "context raw output"
def test_replay_with_context_set_to_nullable():
agent = Agent(role="test_agent", backstory="Test Description", goal="Test Goal")
task1 = Task(
description="Context Task", expected_output="Say Task Output", agent=agent
)
task2 = Task(
description="Test Task", expected_output="Say Hi", agent=agent, context=[]
)
task3 = Task(
description="Test Task 3", expected_output="Say Hi", agent=agent, context=None
)
crew = Crew(agents=[agent], tasks=[task1, task2, task3], process=Process.sequential)
with patch("crewai.task.Task.execute_sync") as mock_execute_task:
mock_execute_task.return_value = TaskOutput(
description="Test Task Output",
raw="test raw output",
agent="test_agent",
)
crew.kickoff()
mock_execute_task.assert_called_with(agent=ANY, context="", tools=ANY)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_replay_with_invalid_task_id():
agent = Agent(role="test_agent", backstory="Test Description", goal="Test Goal")