mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-11 00:58:30 +00:00
2589 lines
98 KiB
Python
2589 lines
98 KiB
Python
"""Test Agent creation and execution basic functionality."""
|
|
|
|
import hashlib
|
|
import json
|
|
from concurrent.futures import Future
|
|
from unittest import mock
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pydantic_core
|
|
import pytest
|
|
from crewai.agent import Agent
|
|
from crewai.agents.cache import CacheHandler
|
|
from crewai.crew import Crew
|
|
from crewai.crews.crew_output import CrewOutput
|
|
from crewai.memory.contextual.contextual_memory import ContextualMemory
|
|
from crewai.process import Process
|
|
from crewai.task import Task
|
|
from crewai.tasks.conditional_task import ConditionalTask
|
|
from crewai.tasks.output_format import OutputFormat
|
|
from crewai.tasks.task_output import TaskOutput
|
|
from crewai.types.usage_metrics import UsageMetrics
|
|
from crewai.utilities import Logger, RPMController
|
|
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
|
|
|
|
ceo = Agent(
|
|
role="CEO",
|
|
goal="Make sure the writers in your company produce amazing content.",
|
|
backstory="You're an long time CEO of a content creation agency with a Senior Writer on the team. You're now working on a new project and want to make sure the content produced is amazing.",
|
|
allow_delegation=True,
|
|
)
|
|
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
writer = Agent(
|
|
role="Senior Writer",
|
|
goal="Write the best content about AI and AI agents.",
|
|
backstory="You're a senior writer, specialized in technology, software engineering, AI and startups. You work as a freelancer and are now working on writing content for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
|
|
def test_crew_config_conditional_requirement():
|
|
with pytest.raises(ValueError):
|
|
Crew(process=Process.sequential)
|
|
|
|
config = json.dumps(
|
|
{
|
|
"agents": [
|
|
{
|
|
"role": "Senior Researcher",
|
|
"goal": "Make the best research and analysis on content about AI and AI agents",
|
|
"backstory": "You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
},
|
|
{
|
|
"role": "Senior Writer",
|
|
"goal": "Write the best content about AI and AI agents.",
|
|
"backstory": "You're a senior writer, specialized in technology, software engineering, AI and startups. You work as a freelancer and are now working on writing content for a new customer.",
|
|
},
|
|
],
|
|
"tasks": [
|
|
{
|
|
"description": "Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
"expected_output": "Bullet point list of 5 important events.",
|
|
"agent": "Senior Researcher",
|
|
},
|
|
{
|
|
"description": "Write a 1 amazing paragraph highlight for each idea that showcases how good an article about this topic could be, check references if necessary or search for more content but make sure it's unique, interesting and well written. Return the list of ideas with their paragraph and your notes.",
|
|
"expected_output": "A 4 paragraph article about AI.",
|
|
"agent": "Senior Writer",
|
|
},
|
|
],
|
|
}
|
|
)
|
|
parsed_config = json.loads(config)
|
|
|
|
try:
|
|
crew = Crew(process=Process.sequential, config=config)
|
|
except ValueError:
|
|
pytest.fail("Unexpected ValidationError raised")
|
|
|
|
assert [agent.role for agent in crew.agents] == [
|
|
agent["role"] for agent in parsed_config["agents"]
|
|
]
|
|
assert [task.description for task in crew.tasks] == [
|
|
task["description"] for task in parsed_config["tasks"]
|
|
]
|
|
|
|
|
|
def test_async_task_cannot_include_sequential_async_tasks_in_context():
|
|
task1 = Task(
|
|
description="Task 1",
|
|
async_execution=True,
|
|
expected_output="output",
|
|
agent=researcher,
|
|
)
|
|
task2 = Task(
|
|
description="Task 2",
|
|
async_execution=True,
|
|
expected_output="output",
|
|
agent=researcher,
|
|
context=[task1],
|
|
)
|
|
task3 = Task(
|
|
description="Task 3",
|
|
async_execution=True,
|
|
expected_output="output",
|
|
agent=researcher,
|
|
context=[task2],
|
|
)
|
|
task4 = Task(
|
|
description="Task 4",
|
|
expected_output="output",
|
|
agent=writer,
|
|
)
|
|
task5 = Task(
|
|
description="Task 5",
|
|
async_execution=True,
|
|
expected_output="output",
|
|
agent=researcher,
|
|
context=[task4],
|
|
)
|
|
|
|
# This should raise an error because task2 is async and has task1 in its context without a sync task in between
|
|
with pytest.raises(
|
|
ValueError,
|
|
match="Task 'Task 2' is asynchronous and cannot include other sequential asynchronous tasks in its context.",
|
|
):
|
|
Crew(tasks=[task1, task2, task3, task4, task5], agents=[researcher, writer])
|
|
|
|
# This should not raise an error because task5 has a sync task (task4) in its context
|
|
try:
|
|
Crew(tasks=[task1, task4, task5], agents=[researcher, writer])
|
|
except ValueError:
|
|
pytest.fail("Unexpected ValidationError raised")
|
|
|
|
|
|
def test_context_no_future_tasks():
|
|
task2 = Task(
|
|
description="Task 2",
|
|
expected_output="output",
|
|
agent=researcher,
|
|
)
|
|
task3 = Task(
|
|
description="Task 3",
|
|
expected_output="output",
|
|
agent=researcher,
|
|
context=[task2],
|
|
)
|
|
task4 = Task(
|
|
description="Task 4",
|
|
expected_output="output",
|
|
agent=researcher,
|
|
)
|
|
task1 = Task(
|
|
description="Task 1",
|
|
expected_output="output",
|
|
agent=researcher,
|
|
context=[task4],
|
|
)
|
|
|
|
# This should raise an error because task1 has a context dependency on a future task (task4)
|
|
with pytest.raises(
|
|
ValueError,
|
|
match="Task 'Task 1' has a context dependency on a future task 'Task 4', which is not allowed.",
|
|
):
|
|
Crew(tasks=[task1, task2, task3, task4], agents=[researcher, writer])
|
|
|
|
|
|
def test_crew_config_with_wrong_keys():
|
|
no_tasks_config = json.dumps(
|
|
{
|
|
"agents": [
|
|
{
|
|
"role": "Senior Researcher",
|
|
"goal": "Make the best research and analysis on content about AI and AI agents",
|
|
"backstory": "You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
}
|
|
]
|
|
}
|
|
)
|
|
|
|
no_agents_config = json.dumps(
|
|
{
|
|
"tasks": [
|
|
{
|
|
"description": "Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
"agent": "Senior Researcher",
|
|
}
|
|
]
|
|
}
|
|
)
|
|
with pytest.raises(ValueError):
|
|
Crew(process=Process.sequential, config='{"wrong_key": "wrong_value"}')
|
|
with pytest.raises(ValueError):
|
|
Crew(process=Process.sequential, config=no_tasks_config)
|
|
with pytest.raises(ValueError):
|
|
Crew(process=Process.sequential, config=no_agents_config)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_crew_creation():
|
|
tasks = [
|
|
Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher,
|
|
),
|
|
Task(
|
|
description="Write a 1 amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="A 4 paragraph article about AI.",
|
|
agent=writer,
|
|
),
|
|
]
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.sequential,
|
|
tasks=tasks,
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
|
|
expected_string_output = "**1. The Evolution of Autonomous AI Agents: From Simplicity to Sophistication**\n\nThe journey of autonomous AI agents spans decades, beginning with rule-based systems that followed strict, predefined instructions to accomplish tasks. Over time, these rudimentary agents have evolved into sophisticated models driven by advanced neural networks, capable of learning and making decisions independently. Significant milestones, such as the advent of machine learning, deep learning, and reinforcement learning, have propelled this evolution, enabling AI agents to transition from simple automation tools to intelligent entities performing complex functions in industries like finance, healthcare, and entertainment. As we look to the future, the potential capabilities of autonomous AI agents appear boundless, with innovations poised to usher in an era of unprecedented computational intelligence and autonomous decision-making.\n\n**2. Ethical Dimensions and Challenges of AI Agents**\n\nThe deployment of AI agents in various aspects of life presents profound ethical considerations that cannot be overlooked. Ethical challenges such as bias, transparency, and accountability have emerged, eliciting vigorous debates and necessitating thoughtful scrutiny. Case studies across different sectors reveal that AI agents can sometimes reinforce existing prejudices or make opaque decisions that are difficult to explain. To navigate these dilemmas, it's crucial to establish robust frameworks and guidelines that promote ethical AI development and application. Proposals include bias mitigation strategies, enforceable transparency standards, and accountability mechanisms to ensure AI agents act in ways that are fair, understandable, and responsible.\n\n**3. AI Agents in Healthcare: Revolutionizing Diagnosis and Treatment**\n\nThe integration of AI agents into healthcare is revolutionizing how diagnoses and treatments are handled, leading to enhanced patient outcomes and more efficient medical processes. AI agents are being harnessed to analyze vast datasets, identify patterns, and predict medical conditions with remarkable accuracy. Examples include AI-driven diagnostic tools that outperform traditional methods and virtual assistants that facilitate personalized patient care. However, integrating AI more deeply into healthcare presents unique challenges, including data privacy concerns and the need for rigorous validation of AI systems. Despite these hurdles, the opportunities for AI agents in medical research and patient care are boundless, promising a future where healthcare is both more effective and more accessible.\n\n**4. The Role of AI Agents in Personalized Consumer Experiences**\n\nAI agents are at the forefront of transforming personalized marketing and customer service, crafting unique consumer experiences across various industries. In e-commerce, AI algorithms analyze shopping behavior to recommend products tailored to individual preferences, while in entertainment, AI curates content that aligns with user tastes. The versatility of AI agents extends to customer service as well, where they handle inquiries and resolve issues with a level of efficiency and personalization that human agents struggle to match. The reception to AI-powered personalized interactions has been largely positive, reflecting an increasing consumer demand for bespoke services. As AI technology continues to advance, the scope and effectiveness of these personalized experiences are set to expand, offering a glimpse into a future where consumer engagement is more intuitive and engaging than ever before."
|
|
|
|
assert str(result) == expected_string_output
|
|
assert result.raw == expected_string_output
|
|
assert isinstance(result, CrewOutput)
|
|
assert len(result.tasks_output) == len(tasks)
|
|
assert result.raw == expected_string_output
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_sync_task_execution():
|
|
from unittest.mock import patch
|
|
|
|
tasks = [
|
|
Task(
|
|
description="Give me a list of 5 interesting ideas to explore for an article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher,
|
|
),
|
|
Task(
|
|
description="Write an amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="A 4 paragraph article about AI.",
|
|
agent=writer,
|
|
),
|
|
]
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.sequential,
|
|
tasks=tasks,
|
|
)
|
|
|
|
mock_task_output = TaskOutput(
|
|
description="Mock description", raw="mocked output", agent="mocked agent"
|
|
)
|
|
|
|
# Because we are mocking execute_sync, we never hit the underlying _execute_core
|
|
# which sets the output attribute of the task
|
|
for task in tasks:
|
|
task.output = mock_task_output
|
|
|
|
with patch.object(
|
|
Task, "execute_sync", return_value=mock_task_output
|
|
) as mock_execute_sync:
|
|
crew.kickoff()
|
|
|
|
# Assert that execute_sync was called for each task
|
|
assert mock_execute_sync.call_count == len(tasks)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_hierarchical_process():
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.hierarchical,
|
|
manager_llm=ChatOpenAI(temperature=0, model="gpt-4o"),
|
|
tasks=[task],
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
|
|
assert (
|
|
result.raw
|
|
== "1. **Unlocking the Potential of AI in Healthcare: Revolutionizing Diagnosis and Treatment**\n AI is rapidly transforming healthcare by providing innovative solutions for diagnosis and treatment. From predicting patient outcomes to personalizing treatment plans, AI-powered systems are becoming indispensable tools for medical professionals. Machine learning algorithms can analyze vast amounts of medical data with unparalleled speed and accuracy, allowing for earlier detection of diseases like cancer and more effective treatment options. Additionally, AI-driven diagnostic tools can help narrow down symptoms and suggest possible conditions, facilitating a quicker and more accurate diagnosis. As AI technology continues to evolve, it holds the promise of not only improving patient care but also making healthcare more efficient and accessible for everyone.\n\n2. **The Ethical Implications of AI: Balancing Innovation and Responsibility**\n The rapid advancement of AI presents a host of ethical challenges that must be addressed to balance innovation with societal responsibility. Issues such as bias in AI algorithms, privacy concerns, and the potential for job displacement are critical topics that require careful consideration. Ensuring that AI systems are transparent and fair is essential to maintaining public trust. Moreover, establishing robust ethical guidelines and regulatory frameworks can help mitigate risks and promote the responsible use of AI. As we continue to explore the potential of AI, it is imperative to engage in a broader conversation about its ethical implications to ensure that the technology benefits all of humanity.\n\n3. **AI Agents in Everyday Life: Enhancing Productivity and Convenience**\n AI agents are seamlessly integrating into our daily lives, enhancing productivity and convenience in unprecedented ways. Virtual assistants like Siri, Alexa, and Google Assistant help us manage our schedules, control smart home devices, and even make shopping easier with voice commands. Meanwhile, AI-driven applications can streamline tasks such as email sorting, data analysis, and project management, allowing us to focus on more critical and creative aspects of our work. In the near future, AI agents are expected to become even more intuitive and capable, further revolutionizing how we perform routine tasks and manage our daily lives. The ongoing advancements in AI are poised to make our lives more efficient, productive, and enjoyable.\n\n4. **The Future of Work with AI: Preparing for the Workforce of Tomorrow**\n AI is set to redefine the future of work, making it essential for individuals and organizations to prepare for the changes ahead. Automation and AI-driven technologies are transforming industries, from manufacturing to finance, by increasing efficiency and reducing the need for manual labor. Workers will need to adapt by acquiring new skills and embracing lifelong learning to stay relevant in an AI-driven economy. Companies will also have to rethink their business models and workforce strategies to leverage AI effectively. By fostering a culture of continuous improvement and innovation, we can ensure that AI augments human capabilities rather than replacing them, paving the way for a more dynamic and prosperous workforce of tomorrow.\n\n5. **AI in Environmental Sustainability: Tackling Climate Change with Intelligent Solutions**\n AI is emerging as a powerful tool in the fight against climate change, offering intelligent solutions for environmental sustainability. Machine learning algorithms can analyze complex datasets to predict climate patterns and monitor environmental changes in real-time. AI-driven models can optimize energy consumption in smart grids, reduce waste through improved recycling processes, and enhance the efficiency of renewable energy sources like solar and wind. Additionally, AI can support conservation efforts by tracking endangered species and managing natural resources more effectively. As we grapple with the challenges of climate change, AI stands out as a promising ally that can help us develop innovative strategies for a more sustainable future."
|
|
)
|
|
|
|
|
|
def test_manager_llm_requirement_for_hierarchical_process():
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
)
|
|
|
|
with pytest.raises(pydantic_core._pydantic_core.ValidationError):
|
|
Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.hierarchical,
|
|
tasks=[task],
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_manager_agent_delegating_to_assigned_task_agent():
|
|
"""
|
|
Test that the manager agent delegates to the assigned task agent.
|
|
"""
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
agent=researcher,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.hierarchical,
|
|
manager_llm=ChatOpenAI(temperature=0, model="gpt-4o"),
|
|
tasks=[task],
|
|
)
|
|
|
|
crew.kickoff()
|
|
|
|
# Check if the manager agent has the correct tools
|
|
assert crew.manager_agent is not None
|
|
assert crew.manager_agent.tools is not None
|
|
|
|
assert len(crew.manager_agent.tools) == 2
|
|
assert (
|
|
"Delegate a specific task to one of the following coworkers: Researcher\n"
|
|
in crew.manager_agent.tools[0].description
|
|
)
|
|
assert (
|
|
"Ask a specific question to one of the following coworkers: Researcher\n"
|
|
in crew.manager_agent.tools[1].description
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_manager_agent_delegating_to_all_agents():
|
|
"""
|
|
Test that the manager agent delegates to all agents when none are specified.
|
|
"""
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.hierarchical,
|
|
manager_llm=ChatOpenAI(temperature=0, model="gpt-4o"),
|
|
tasks=[task],
|
|
)
|
|
|
|
crew.kickoff()
|
|
|
|
assert crew.manager_agent is not None
|
|
assert crew.manager_agent.tools is not None
|
|
|
|
assert len(crew.manager_agent.tools) == 2
|
|
print(
|
|
"crew.manager_agent.tools[0].description",
|
|
crew.manager_agent.tools[0].description,
|
|
)
|
|
assert (
|
|
"Delegate a specific task to one of the following coworkers: Researcher, Senior Writer\n"
|
|
in crew.manager_agent.tools[0].description
|
|
)
|
|
assert (
|
|
"Ask a specific question to one of the following coworkers: Researcher, Senior Writer\n"
|
|
in crew.manager_agent.tools[1].description
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_crew_with_delegating_agents():
|
|
tasks = [
|
|
Task(
|
|
description="Produce and amazing 1 paragraph draft of an article about AI Agents.",
|
|
expected_output="A 4 paragraph article about AI.",
|
|
agent=ceo,
|
|
)
|
|
]
|
|
|
|
crew = Crew(
|
|
agents=[ceo, writer],
|
|
process=Process.sequential,
|
|
tasks=tasks,
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
|
|
assert (
|
|
result.raw
|
|
== "In the rapidly evolving landscape of artificial intelligence, AI Agents stand out as transformative tools capable of revolutionizing various sectors. These autonomous entities, equipped with sophisticated algorithms and machine learning capabilities, can perform complex tasks that range from customer service in the retail industry to predictive maintenance in manufacturing. For instance, AI Agents in healthcare can analyze vast datasets to identify potential health issues before they become critical, thereby saving lives and reducing costs. Their ability to process information and learn from interactions enables them to adapt and optimize their performance over time.\n\nAI Agents have many applications across diverse fields. In the financial industry, they facilitate fraud detection by analyzing transaction patterns and flagging abnormalities. In customer service, they improve user experience through chatbots that can respond to inquiries with human-like interactions, reducing wait times and increasing satisfaction. In manufacturing, predictive maintenance powered by AI Agents helps forecast equipment failures before they occur, thereby improving operational efficiency and reducing downtime. The educational sector also benefits from AI Agents through personalized learning experiences and administrative task automation.\n\nDespite their significant benefits, AI Agents are not without challenges. Data privacy remains a prominent concern as these systems often need access to large volumes of personal information to function effectively. Additionally, there is the issue of bias in AI algorithms, which can lead to unfair outcomes if not properly addressed. The need for continuous monitoring and updating of AI systems to ensure they operate correctly and ethically further complicates their implementation. Moreover, the potential for job displacement due to automation raises social and economic considerations that must be managed carefully.\n\nLooking to the future, the role of AI Agents is poised to expand even further. Advances in machine learning, natural language processing, and other AI technologies will continue to enhance their capabilities. We can expect to see AI Agents playing even more integral roles in smart cities, autonomous vehicles, and complex problem-solving across sciences. The key to unlocking the full potential of AI Agents lies in addressing existing challenges and fostering an environment of collaboration between human intelligence and artificial systems. By doing so, we pave the way for a future where AI Agents not only drive efficiency and productivity but also contribute to a higher quality of life."
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_crew_verbose_output(capsys):
|
|
tasks = [
|
|
Task(
|
|
description="Research AI advancements.",
|
|
expected_output="A full report on AI advancements.",
|
|
agent=researcher,
|
|
),
|
|
Task(
|
|
description="Write about AI in healthcare.",
|
|
expected_output="A 4 paragraph article about AI.",
|
|
agent=writer,
|
|
),
|
|
]
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
verbose=True,
|
|
)
|
|
|
|
crew.kickoff()
|
|
captured = capsys.readouterr()
|
|
expected_strings = [
|
|
"[DEBUG]: == Working Agent: Researcher",
|
|
"[INFO]: == Starting Task: Research AI advancements.",
|
|
"[DEBUG]: == [Researcher] Task output:",
|
|
"[DEBUG]: == Working Agent: Senior Writer",
|
|
"[INFO]: == Starting Task: Write about AI in healthcare.",
|
|
"[DEBUG]: == [Senior Writer] Task output:",
|
|
]
|
|
|
|
for expected_string in expected_strings:
|
|
assert expected_string in captured.out
|
|
|
|
# Now test with verbose set to False
|
|
crew.verbose = False
|
|
crew._logger = Logger(verbose=False)
|
|
crew.kickoff()
|
|
captured = capsys.readouterr()
|
|
assert captured.out == ""
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_cache_hitting_between_agents():
|
|
from unittest.mock import call, patch
|
|
|
|
from crewai_tools import tool
|
|
|
|
@tool
|
|
def multiplier(first_number: int, second_number: int) -> float:
|
|
"""Useful for when you need to multiply two numbers together."""
|
|
return first_number * second_number
|
|
|
|
tasks = [
|
|
Task(
|
|
description="What is 2 tims 6? Return only the number.",
|
|
expected_output="the result of multiplication",
|
|
tools=[multiplier],
|
|
agent=ceo,
|
|
),
|
|
Task(
|
|
description="What is 2 times 6? Return only the number.",
|
|
expected_output="the result of multiplication",
|
|
tools=[multiplier],
|
|
agent=researcher,
|
|
),
|
|
]
|
|
|
|
crew = Crew(
|
|
agents=[ceo, researcher],
|
|
tasks=tasks,
|
|
)
|
|
|
|
with patch.object(CacheHandler, "read") as read:
|
|
read.return_value = "12"
|
|
crew.kickoff()
|
|
assert read.call_count == 2, "read was not called exactly twice"
|
|
# Check if read was called with the expected arguments
|
|
expected_calls = [
|
|
call(tool="multiplier", input={"first_number": 2, "second_number": 6}),
|
|
call(tool="multiplier", input={"first_number": 2, "second_number": 6}),
|
|
]
|
|
read.assert_has_calls(expected_calls, any_order=False)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_api_calls_throttling(capsys):
|
|
from unittest.mock import patch
|
|
|
|
from crewai_tools import tool
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
@tool
|
|
def get_final_answer(anything) -> float:
|
|
"""Get the final answer but don't give it yet, just re-use this
|
|
tool non-stop."""
|
|
return 42
|
|
|
|
agent = Agent(
|
|
role="test role",
|
|
goal="test goal",
|
|
backstory="test backstory",
|
|
max_iter=5,
|
|
allow_delegation=False,
|
|
verbose=True,
|
|
llm=ChatOpenAI(model="gpt-4-0125-preview"),
|
|
)
|
|
|
|
task = Task(
|
|
description="Don't give a Final Answer, instead keep using the `get_final_answer` tool.",
|
|
expected_output="The final answer.",
|
|
tools=[get_final_answer],
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task], max_rpm=2, verbose=True)
|
|
|
|
with patch.object(RPMController, "_wait_for_next_minute") as moveon:
|
|
moveon.return_value = True
|
|
crew.kickoff()
|
|
captured = capsys.readouterr()
|
|
assert "Max RPM reached, waiting for next minute to start." in captured.out
|
|
moveon.assert_called()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_crew_kickoff_usage_metrics():
|
|
inputs = [
|
|
{"topic": "dog"},
|
|
{"topic": "cat"},
|
|
{"topic": "apple"},
|
|
]
|
|
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
results = crew.kickoff_for_each(inputs=inputs)
|
|
|
|
assert len(results) == len(inputs)
|
|
for result in results:
|
|
# Assert that all required keys are in usage_metrics and their values are not None
|
|
assert result.token_usage.total_tokens > 0
|
|
assert result.token_usage.prompt_tokens > 0
|
|
assert result.token_usage.completion_tokens > 0
|
|
assert result.token_usage.successful_requests > 0
|
|
|
|
|
|
def test_agents_rpm_is_never_set_if_crew_max_RPM_is_not_set():
|
|
agent = Agent(
|
|
role="test role",
|
|
goal="test goal",
|
|
backstory="test backstory",
|
|
allow_delegation=False,
|
|
verbose=True,
|
|
)
|
|
|
|
task = Task(
|
|
description="just say hi!",
|
|
expected_output="your greeting",
|
|
agent=agent,
|
|
)
|
|
|
|
Crew(agents=[agent], tasks=[task], verbose=True)
|
|
|
|
assert agent._rpm_controller is None
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_sequential_async_task_execution_completion():
|
|
list_ideas = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for an article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher,
|
|
async_execution=True,
|
|
)
|
|
list_important_history = Task(
|
|
description="Research the history of AI and give me the 5 most important events that shaped the technology.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher,
|
|
)
|
|
write_article = Task(
|
|
description="Write an article about the history of AI and its most important events.",
|
|
expected_output="A 4 paragraph article about AI.",
|
|
agent=writer,
|
|
context=[list_ideas, list_important_history],
|
|
)
|
|
|
|
sequential_crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.sequential,
|
|
tasks=[list_ideas, list_important_history, write_article],
|
|
)
|
|
|
|
sequential_result = sequential_crew.kickoff()
|
|
assert sequential_result.raw.startswith(
|
|
"The history of Artificial Intelligence (AI) is dotted with monumental events that have significantly shaped the trajectory of technological advancement."
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_single_task_with_async_execution():
|
|
researcher_agent = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
list_ideas = Task(
|
|
description="Generate a list of 5 interesting ideas to explore for an article, where each bulletpoint is under 15 words.",
|
|
expected_output="Bullet point list of 5 important events. No additional commentary.",
|
|
agent=researcher_agent,
|
|
async_execution=True,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher_agent],
|
|
process=Process.sequential,
|
|
tasks=[list_ideas],
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert result.raw.startswith(
|
|
"- Future of AI in Healthcare and Personalized Treatment."
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_three_task_with_async_execution():
|
|
researcher_agent = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
bullet_list = Task(
|
|
description="Generate a list of 5 interesting ideas to explore for an article, where each bulletpoint is under 15 words.",
|
|
expected_output="Bullet point list of 5 important events. No additional commentary.",
|
|
agent=researcher_agent,
|
|
async_execution=True,
|
|
)
|
|
numbered_list = Task(
|
|
description="Generate a list of 5 interesting ideas to explore for an article, where each bulletpoint is under 15 words.",
|
|
expected_output="Numbered list of 5 important events. No additional commentary.",
|
|
agent=researcher_agent,
|
|
async_execution=True,
|
|
)
|
|
letter_list = Task(
|
|
description="Generate a list of 5 interesting ideas to explore for an article, where each bulletpoint is under 15 words.",
|
|
expected_output="Numbered list using [A), B), C)] list of 5 important events. No additional commentary.",
|
|
agent=researcher_agent,
|
|
async_execution=True,
|
|
)
|
|
|
|
# Expected result is that we will get an error
|
|
# because a crew can end only end with one or less
|
|
# async tasks
|
|
with pytest.raises(pydantic_core._pydantic_core.ValidationError) as error:
|
|
Crew(
|
|
agents=[researcher_agent],
|
|
process=Process.sequential,
|
|
tasks=[bullet_list, numbered_list, letter_list],
|
|
)
|
|
|
|
assert error.value.errors()[0]["type"] == "async_task_count"
|
|
assert (
|
|
"The crew must end with at most one asynchronous task."
|
|
in error.value.errors()[0]["msg"]
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
@pytest.mark.asyncio
|
|
async def test_crew_async_kickoff():
|
|
inputs = [
|
|
{"topic": "dog"},
|
|
{"topic": "cat"},
|
|
{"topic": "apple"},
|
|
]
|
|
|
|
agent = Agent(
|
|
role="mock agent",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
mock_task_output = (
|
|
CrewOutput(
|
|
raw="Test output from Crew 1",
|
|
tasks_output=[],
|
|
token_usage=UsageMetrics(
|
|
total_tokens=100,
|
|
prompt_tokens=10,
|
|
completion_tokens=90,
|
|
successful_requests=1,
|
|
),
|
|
json_dict={"output": "crew1"},
|
|
pydantic=None,
|
|
),
|
|
)
|
|
with patch.object(Crew, "kickoff_async", return_value=mock_task_output):
|
|
results = await crew.kickoff_for_each_async(inputs=inputs)
|
|
|
|
assert len(results) == len(inputs)
|
|
for result in results:
|
|
# Assert that all required keys are in usage_metrics and their values are not None
|
|
assert result[0].token_usage.total_tokens > 0 # type: ignore
|
|
assert result[0].token_usage.prompt_tokens > 0 # type: ignore
|
|
assert result[0].token_usage.completion_tokens > 0 # type: ignore
|
|
assert result[0].token_usage.successful_requests > 0 # type: ignore
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_async_task_execution_call_count():
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
list_ideas = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher,
|
|
async_execution=True,
|
|
)
|
|
list_important_history = Task(
|
|
description="Research the history of AI and give me the 5 most important events that shaped the technology.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher,
|
|
async_execution=True,
|
|
)
|
|
write_article = Task(
|
|
description="Write an article about the history of AI and its most important events.",
|
|
expected_output="A 4 paragraph article about AI.",
|
|
agent=writer,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.sequential,
|
|
tasks=[list_ideas, list_important_history, write_article],
|
|
)
|
|
|
|
# Create a valid TaskOutput instance to mock the return value
|
|
mock_task_output = TaskOutput(
|
|
description="Mock description", raw="mocked output", agent="mocked agent"
|
|
)
|
|
|
|
# Create a MagicMock Future instance
|
|
mock_future = MagicMock(spec=Future)
|
|
mock_future.result.return_value = mock_task_output
|
|
|
|
# Directly set the output attribute for each task
|
|
list_ideas.output = mock_task_output
|
|
list_important_history.output = mock_task_output
|
|
write_article.output = mock_task_output
|
|
|
|
with patch.object(
|
|
Task, "execute_sync", return_value=mock_task_output
|
|
) as mock_execute_sync, patch.object(
|
|
Task, "execute_async", return_value=mock_future
|
|
) as mock_execute_async:
|
|
crew.kickoff()
|
|
|
|
assert mock_execute_async.call_count == 2
|
|
assert mock_execute_sync.call_count == 1
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_kickoff_for_each_single_input():
|
|
"""Tests if kickoff_for_each works with a single input."""
|
|
|
|
inputs = [{"topic": "dog"}]
|
|
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
results = crew.kickoff_for_each(inputs=inputs)
|
|
|
|
assert len(results) == 1
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_kickoff_for_each_multiple_inputs():
|
|
"""Tests if kickoff_for_each works with multiple inputs."""
|
|
|
|
inputs = [
|
|
{"topic": "dog"},
|
|
{"topic": "cat"},
|
|
{"topic": "apple"},
|
|
]
|
|
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
results = crew.kickoff_for_each(inputs=inputs)
|
|
|
|
assert len(results) == len(inputs)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_kickoff_for_each_empty_input():
|
|
"""Tests if kickoff_for_each handles an empty input list."""
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
results = crew.kickoff_for_each(inputs=[])
|
|
assert results == []
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_kickoff_for_each_invalid_input():
|
|
"""Tests if kickoff_for_each raises TypeError for invalid input types."""
|
|
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
|
|
with pytest.raises(TypeError):
|
|
# Pass a string instead of a list
|
|
crew.kickoff_for_each("invalid input")
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_kickoff_for_each_error_handling():
|
|
"""Tests error handling in kickoff_for_each when kickoff raises an error."""
|
|
from unittest.mock import patch
|
|
|
|
inputs = [
|
|
{"topic": "dog"},
|
|
{"topic": "cat"},
|
|
{"topic": "apple"},
|
|
]
|
|
expected_outputs = [
|
|
"Dogs are loyal companions and popular pets.",
|
|
"Cats are independent and low-maintenance pets.",
|
|
"Apples are a rich source of dietary fiber and vitamin C.",
|
|
]
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
|
|
with patch.object(Crew, "kickoff") as mock_kickoff:
|
|
mock_kickoff.side_effect = expected_outputs[:2] + [
|
|
Exception("Simulated kickoff error")
|
|
]
|
|
with pytest.raises(Exception, match="Simulated kickoff error"):
|
|
crew.kickoff_for_each(inputs=inputs)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
@pytest.mark.asyncio
|
|
async def test_kickoff_async_basic_functionality_and_output():
|
|
"""Tests the basic functionality and output of kickoff_async."""
|
|
from unittest.mock import patch
|
|
|
|
inputs = {"topic": "dog"}
|
|
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
# Create the crew
|
|
crew = Crew(
|
|
agents=[agent],
|
|
tasks=[task],
|
|
)
|
|
|
|
expected_output = "This is a sample output from kickoff."
|
|
with patch.object(Crew, "kickoff", return_value=expected_output) as mock_kickoff:
|
|
result = await crew.kickoff_async(inputs)
|
|
|
|
assert isinstance(result, str), "Result should be a string"
|
|
assert result == expected_output, "Result should match expected output"
|
|
mock_kickoff.assert_called_once_with(inputs)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
@pytest.mark.asyncio
|
|
async def test_async_kickoff_for_each_async_basic_functionality_and_output():
|
|
"""Tests the basic functionality and output of kickoff_for_each_async."""
|
|
inputs = [
|
|
{"topic": "dog"},
|
|
{"topic": "cat"},
|
|
{"topic": "apple"},
|
|
]
|
|
|
|
# Define expected outputs for each input
|
|
expected_outputs = [
|
|
"Dogs are loyal companions and popular pets.",
|
|
"Cats are independent and low-maintenance pets.",
|
|
"Apples are a rich source of dietary fiber and vitamin C.",
|
|
]
|
|
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
async def mock_kickoff_async(**kwargs):
|
|
input_data = kwargs.get("inputs")
|
|
index = [input_["topic"] for input_ in inputs].index(input_data["topic"])
|
|
return expected_outputs[index]
|
|
|
|
with patch.object(
|
|
Crew, "kickoff_async", side_effect=mock_kickoff_async
|
|
) as mock_kickoff_async:
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
|
|
results = await crew.kickoff_for_each_async(inputs)
|
|
|
|
assert len(results) == len(inputs)
|
|
assert results == expected_outputs
|
|
for input_data in inputs:
|
|
mock_kickoff_async.assert_any_call(inputs=input_data)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
@pytest.mark.asyncio
|
|
async def test_async_kickoff_for_each_async_empty_input():
|
|
"""Tests if akickoff_for_each_async handles an empty input list."""
|
|
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="1 bullet point about {topic} that's under 15 words.",
|
|
agent=agent,
|
|
)
|
|
|
|
# Create the crew
|
|
crew = Crew(
|
|
agents=[agent],
|
|
tasks=[task],
|
|
)
|
|
|
|
# Call the function we are testing
|
|
results = await crew.kickoff_for_each_async([])
|
|
|
|
# Assertion
|
|
assert results == [], "Result should be an empty list when input is empty"
|
|
|
|
|
|
def test_set_agents_step_callback():
|
|
from unittest.mock import patch
|
|
|
|
researcher_agent = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
list_ideas = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher_agent,
|
|
async_execution=True,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher_agent],
|
|
process=Process.sequential,
|
|
tasks=[list_ideas],
|
|
step_callback=lambda: None,
|
|
)
|
|
|
|
with patch.object(Agent, "execute_task") as execute:
|
|
execute.return_value = "ok"
|
|
crew.kickoff()
|
|
assert researcher_agent.step_callback is not None
|
|
|
|
|
|
def test_dont_set_agents_step_callback_if_already_set():
|
|
from unittest.mock import patch
|
|
|
|
def agent_callback(_):
|
|
pass
|
|
|
|
def crew_callback(_):
|
|
pass
|
|
|
|
researcher_agent = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
step_callback=agent_callback,
|
|
)
|
|
|
|
list_ideas = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher_agent,
|
|
async_execution=True,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher_agent],
|
|
process=Process.sequential,
|
|
tasks=[list_ideas],
|
|
step_callback=crew_callback,
|
|
)
|
|
|
|
with patch.object(Agent, "execute_task") as execute:
|
|
execute.return_value = "ok"
|
|
crew.kickoff()
|
|
assert researcher_agent.step_callback is not crew_callback
|
|
assert researcher_agent.step_callback is agent_callback
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_crew_function_calling_llm():
|
|
from unittest.mock import patch
|
|
from crewai_tools import tool
|
|
from langchain_openai import ChatOpenAI
|
|
from crewai.utilities import Instructor
|
|
|
|
llm = ChatOpenAI(model="gpt-3.5-turbo-0125")
|
|
|
|
@tool
|
|
def learn_about_AI(topic) -> str:
|
|
"""Useful for when you need to learn about AI to write an paragraph about it."""
|
|
return "AI is a very broad field."
|
|
|
|
agent1 = Agent(
|
|
role="test role",
|
|
goal="test goal",
|
|
backstory="test backstory",
|
|
tools=[learn_about_AI],
|
|
llm=ChatOpenAI(model="gpt-4-0125-preview"),
|
|
function_calling_llm=llm,
|
|
)
|
|
|
|
essay = Task(
|
|
description="Write and then review an small paragraph on AI until it's AMAZING",
|
|
expected_output="The final paragraph.",
|
|
agent=agent1,
|
|
)
|
|
tasks = [essay]
|
|
crew = Crew(agents=[agent1], tasks=tasks)
|
|
|
|
with patch.object(Instructor, "__init__", return_value=None) as mock_instructor:
|
|
crew.kickoff()
|
|
mock_instructor.assert_called()
|
|
calls = mock_instructor.call_args_list
|
|
assert any(
|
|
call.kwargs.get("llm") == "gpt-3.5-turbo-0125" for call in calls
|
|
), "Instructor was not created with the expected model"
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_task_with_no_arguments():
|
|
from crewai_tools import tool
|
|
|
|
@tool
|
|
def return_data() -> str:
|
|
"Useful to get the sales related data"
|
|
return "January: 5, February: 10, March: 15, April: 20, May: 25"
|
|
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
tools=[return_data],
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Look at the available data and give me a sense on the total number of sales.",
|
|
expected_output="The total number of sales as an integer",
|
|
agent=researcher,
|
|
)
|
|
|
|
crew = Crew(agents=[researcher], tasks=[task])
|
|
|
|
result = crew.kickoff()
|
|
assert result.raw == "The total number of sales is 75."
|
|
|
|
|
|
def test_code_execution_flag_adds_code_tool_upon_kickoff():
|
|
from crewai_tools import CodeInterpreterTool
|
|
|
|
programmer = Agent(
|
|
role="Programmer",
|
|
goal="Write code to solve problems.",
|
|
backstory="You're a programmer who loves to solve problems with code.",
|
|
allow_delegation=False,
|
|
allow_code_execution=True,
|
|
)
|
|
|
|
task = Task(
|
|
description="How much is 2 + 2?",
|
|
expected_output="The result of the sum as an integer.",
|
|
agent=programmer,
|
|
)
|
|
|
|
crew = Crew(agents=[programmer], tasks=[task])
|
|
|
|
with patch.object(Agent, "execute_task") as executor:
|
|
executor.return_value = "ok"
|
|
crew.kickoff()
|
|
assert len(programmer.tools) == 1
|
|
assert programmer.tools[0].__class__ == CodeInterpreterTool
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_delegation_is_not_enabled_if_there_are_only_one_agent():
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=True,
|
|
)
|
|
|
|
task = Task(
|
|
description="Look at the available data and give me a sense on the total number of sales.",
|
|
expected_output="The total number of sales as an integer",
|
|
agent=researcher,
|
|
)
|
|
|
|
crew = Crew(agents=[researcher], tasks=[task])
|
|
|
|
crew.kickoff()
|
|
assert task.tools == []
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_agents_do_not_get_delegation_tools_with_there_is_only_one_agent():
|
|
agent = Agent(
|
|
role="Researcher",
|
|
goal="Be super empathetic.",
|
|
backstory="You're love to sey howdy.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(description="say howdy", expected_output="Howdy!", agent=agent)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
|
|
result = crew.kickoff()
|
|
assert result.raw == "Howdy!"
|
|
assert len(agent.tools) == 0
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_sequential_crew_creation_tasks_without_agents():
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
# agent=researcher, # not having an agent on the task should throw an error
|
|
)
|
|
|
|
# Expected Output: The sequential crew should fail to create because the task is missing an agent
|
|
with pytest.raises(pydantic_core._pydantic_core.ValidationError) as exec_info:
|
|
Crew(
|
|
tasks=[task],
|
|
agents=[researcher],
|
|
process=Process.sequential,
|
|
)
|
|
|
|
assert exec_info.value.errors()[0]["type"] == "missing_agent_in_task"
|
|
assert (
|
|
"Agent is missing in the task with the following description"
|
|
in exec_info.value.errors()[0]["msg"]
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_agent_usage_metrics_are_captured_for_hierarchical_process():
|
|
agent = Agent(
|
|
role="Researcher",
|
|
goal="Be super empathetic.",
|
|
backstory="You're love to sey howdy.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(description="Ask the researched to say hi!", expected_output="Howdy!")
|
|
|
|
crew = Crew(
|
|
agents=[agent], tasks=[task], process=Process.hierarchical, manager_llm="gpt-4o"
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert result.raw == "Howdy!"
|
|
|
|
assert result.token_usage == UsageMetrics(
|
|
total_tokens=2706,
|
|
prompt_tokens=2548,
|
|
completion_tokens=158,
|
|
successful_requests=5,
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_hierarchical_crew_creation_tasks_with_agents():
|
|
"""
|
|
Agents are not required for tasks in a hierarchical process but sometimes they are still added
|
|
This test makes sure that the manager still delegates the task to the agent even if the agent is passed in the task
|
|
"""
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
task = Task(
|
|
description="Write one amazing paragraph about AI.",
|
|
expected_output="A single paragraph with 4 sentences.",
|
|
agent=writer,
|
|
)
|
|
|
|
crew = Crew(
|
|
tasks=[task],
|
|
agents=[writer, researcher],
|
|
process=Process.hierarchical,
|
|
manager_llm=ChatOpenAI(model="gpt-4o"),
|
|
)
|
|
crew.kickoff()
|
|
|
|
assert crew.manager_agent is not None
|
|
assert crew.manager_agent.tools is not None
|
|
print("TOOL DESCRIPTION", crew.manager_agent.tools[0].description)
|
|
assert crew.manager_agent.tools[0].description.startswith(
|
|
"Delegate a specific task to one of the following coworkers: Senior Writer"
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_hierarchical_crew_creation_tasks_with_async_execution():
|
|
"""
|
|
Agents are not required for tasks in a hierarchical process but sometimes they are still added
|
|
This test makes sure that the manager still delegates the task to the agent even if the agent is passed in the task
|
|
"""
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
task = Task(
|
|
description="Write one amazing paragraph about AI.",
|
|
expected_output="A single paragraph with 4 sentences.",
|
|
agent=writer,
|
|
async_execution=True,
|
|
)
|
|
|
|
crew = Crew(
|
|
tasks=[task],
|
|
agents=[writer, researcher, ceo],
|
|
process=Process.hierarchical,
|
|
manager_llm=ChatOpenAI(model="gpt-4o"),
|
|
)
|
|
|
|
crew.kickoff()
|
|
assert crew.manager_agent is not None
|
|
assert crew.manager_agent.tools is not None
|
|
assert crew.manager_agent.tools[0].description.startswith(
|
|
"Delegate a specific task to one of the following coworkers: Senior Writer\n"
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_hierarchical_crew_creation_tasks_with_sync_last():
|
|
"""
|
|
Agents are not required for tasks in a hierarchical process but sometimes they are still added
|
|
This test makes sure that the manager still delegates the task to the agent even if the agent is passed in the task
|
|
"""
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
task = Task(
|
|
description="Write one amazing paragraph about AI.",
|
|
expected_output="A single paragraph with 4 sentences.",
|
|
agent=writer,
|
|
async_execution=True,
|
|
)
|
|
task2 = Task(
|
|
description="Write one amazing paragraph about AI.",
|
|
expected_output="A single paragraph with 4 sentences.",
|
|
async_execution=False,
|
|
)
|
|
|
|
crew = Crew(
|
|
tasks=[task, task2],
|
|
agents=[writer, researcher, ceo],
|
|
process=Process.hierarchical,
|
|
manager_llm=ChatOpenAI(model="gpt-4o"),
|
|
)
|
|
|
|
crew.kickoff()
|
|
assert crew.manager_agent is not None
|
|
assert crew.manager_agent.tools is not None
|
|
assert crew.manager_agent.tools[0].description.startswith(
|
|
"Delegate a specific task to one of the following coworkers: Senior Writer, Researcher, CEO\n"
|
|
)
|
|
|
|
|
|
def test_crew_inputs_interpolate_both_agents_and_tasks():
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="{points} bullet points about {topic}.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
inputs = {"topic": "AI", "points": 5}
|
|
crew._interpolate_inputs(inputs=inputs) # Manual call for now
|
|
|
|
assert crew.tasks[0].description == "Give me an analysis around AI."
|
|
assert crew.tasks[0].expected_output == "5 bullet points about AI."
|
|
assert crew.agents[0].role == "AI Researcher"
|
|
assert crew.agents[0].goal == "Express hot takes on AI."
|
|
assert crew.agents[0].backstory == "You have a lot of experience with AI."
|
|
|
|
|
|
def test_crew_inputs_interpolate_both_agents_and_tasks_diff():
|
|
from unittest.mock import patch
|
|
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="{points} bullet points about {topic}.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
|
|
with patch.object(Agent, "execute_task") as execute:
|
|
with patch.object(
|
|
Agent, "interpolate_inputs", wraps=agent.interpolate_inputs
|
|
) as interpolate_agent_inputs:
|
|
with patch.object(
|
|
Task, "interpolate_inputs", wraps=task.interpolate_inputs
|
|
) as interpolate_task_inputs:
|
|
execute.return_value = "ok"
|
|
crew.kickoff(inputs={"topic": "AI", "points": 5})
|
|
interpolate_agent_inputs.assert_called()
|
|
interpolate_task_inputs.assert_called()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_crew_does_not_interpolate_without_inputs():
|
|
from unittest.mock import patch
|
|
|
|
agent = Agent(
|
|
role="{topic} Researcher",
|
|
goal="Express hot takes on {topic}.",
|
|
backstory="You have a lot of experience with {topic}.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an analysis around {topic}.",
|
|
expected_output="{points} bullet points about {topic}.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
|
|
with patch.object(Agent, "interpolate_inputs") as interpolate_agent_inputs:
|
|
with patch.object(Task, "interpolate_inputs") as interpolate_task_inputs:
|
|
crew.kickoff()
|
|
interpolate_agent_inputs.assert_not_called()
|
|
interpolate_task_inputs.assert_not_called()
|
|
|
|
|
|
# def test_crew_partial_inputs():
|
|
# agent = Agent(
|
|
# role="{topic} Researcher",
|
|
# goal="Express hot takes on {topic}.",
|
|
# backstory="You have a lot of experience with {topic}.",
|
|
# )
|
|
|
|
# task = Task(
|
|
# description="Give me an analysis around {topic}.",
|
|
# expected_output="{points} bullet points about {topic}.",
|
|
# )
|
|
|
|
# crew = Crew(agents=[agent], tasks=[task], inputs={"topic": "AI"})
|
|
# inputs = {"topic": "AI"}
|
|
# crew._interpolate_inputs(inputs=inputs) # Manual call for now
|
|
|
|
# assert crew.tasks[0].description == "Give me an analysis around AI."
|
|
# assert crew.tasks[0].expected_output == "{points} bullet points about AI."
|
|
# assert crew.agents[0].role == "AI Researcher"
|
|
# assert crew.agents[0].goal == "Express hot takes on AI."
|
|
# assert crew.agents[0].backstory == "You have a lot of experience with AI."
|
|
|
|
|
|
# def test_crew_invalid_inputs():
|
|
# agent = Agent(
|
|
# role="{topic} Researcher",
|
|
# goal="Express hot takes on {topic}.",
|
|
# backstory="You have a lot of experience with {topic}.",
|
|
# )
|
|
|
|
# task = Task(
|
|
# description="Give me an analysis around {topic}.",
|
|
# expected_output="{points} bullet points about {topic}.",
|
|
# )
|
|
|
|
# crew = Crew(agents=[agent], tasks=[task], inputs={"subject": "AI"})
|
|
# inputs = {"subject": "AI"}
|
|
# crew._interpolate_inputs(inputs=inputs) # Manual call for now
|
|
|
|
# assert crew.tasks[0].description == "Give me an analysis around {topic}."
|
|
# assert crew.tasks[0].expected_output == "{points} bullet points about {topic}."
|
|
# assert crew.agents[0].role == "{topic} Researcher"
|
|
# assert crew.agents[0].goal == "Express hot takes on {topic}."
|
|
# assert crew.agents[0].backstory == "You have a lot of experience with {topic}."
|
|
|
|
|
|
def test_task_callback_on_crew():
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
researcher_agent = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
list_ideas = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher_agent,
|
|
async_execution=True,
|
|
)
|
|
|
|
mock_callback = MagicMock()
|
|
|
|
crew = Crew(
|
|
agents=[researcher_agent],
|
|
process=Process.sequential,
|
|
tasks=[list_ideas],
|
|
task_callback=mock_callback,
|
|
)
|
|
|
|
with patch.object(Agent, "execute_task") as execute:
|
|
execute.return_value = "ok"
|
|
crew.kickoff()
|
|
|
|
assert list_ideas.callback is not None
|
|
mock_callback.assert_called_once()
|
|
args, _ = mock_callback.call_args
|
|
assert isinstance(args[0], TaskOutput)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_tools_with_custom_caching():
|
|
from unittest.mock import patch
|
|
|
|
from crewai_tools import tool
|
|
|
|
@tool
|
|
def multiplcation_tool(first_number: int, second_number: int) -> int:
|
|
"""Useful for when you need to multiply two numbers together."""
|
|
return first_number * second_number
|
|
|
|
def cache_func(args, result):
|
|
cache = result % 2 == 0
|
|
return cache
|
|
|
|
multiplcation_tool.cache_function = cache_func
|
|
|
|
writer1 = Agent(
|
|
role="Writer",
|
|
goal="You write lessons of math for kids.",
|
|
backstory="You're an expert in writing and you love to teach kids but you know nothing of math.",
|
|
tools=[multiplcation_tool],
|
|
allow_delegation=False,
|
|
)
|
|
|
|
writer2 = Agent(
|
|
role="Writer",
|
|
goal="You write lessons of math for kids.",
|
|
backstory="You're an expert in writing and you love to teach kids but you know nothing of math.",
|
|
tools=[multiplcation_tool],
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task1 = Task(
|
|
description="What is 2 times 6? Return only the number after using the multiplication tool.",
|
|
expected_output="the result of multiplication",
|
|
agent=writer1,
|
|
)
|
|
|
|
task2 = Task(
|
|
description="What is 3 times 1? Return only the number after using the multiplication tool.",
|
|
expected_output="the result of multiplication",
|
|
agent=writer1,
|
|
)
|
|
|
|
task3 = Task(
|
|
description="What is 2 times 6? Return only the number after using the multiplication tool.",
|
|
expected_output="the result of multiplication",
|
|
agent=writer2,
|
|
)
|
|
|
|
task4 = Task(
|
|
description="What is 3 times 1? Return only the number after using the multiplication tool.",
|
|
expected_output="the result of multiplication",
|
|
agent=writer2,
|
|
)
|
|
|
|
crew = Crew(agents=[writer1, writer2], tasks=[task1, task2, task3, task4])
|
|
|
|
with patch.object(
|
|
CacheHandler, "add", wraps=crew._cache_handler.add
|
|
) as add_to_cache:
|
|
with patch.object(CacheHandler, "read", wraps=crew._cache_handler.read) as _:
|
|
result = crew.kickoff()
|
|
add_to_cache.assert_called_once_with(
|
|
tool="multiplcation_tool",
|
|
input={"first_number": 2, "second_number": 6},
|
|
output=12,
|
|
)
|
|
assert result.raw == "3"
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_using_contextual_memory():
|
|
from unittest.mock import patch
|
|
|
|
math_researcher = Agent(
|
|
role="Researcher",
|
|
goal="You research about math.",
|
|
backstory="You're an expert in research and you love to learn new things.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task1 = Task(
|
|
description="Research a topic to teach a kid aged 6 about math.",
|
|
expected_output="A topic, explanation, angle, and examples.",
|
|
agent=math_researcher,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[math_researcher],
|
|
tasks=[task1],
|
|
memory=True,
|
|
)
|
|
|
|
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
|
|
crew.kickoff()
|
|
contextual_mem.assert_called_once()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_disabled_memory_using_contextual_memory():
|
|
from unittest.mock import patch
|
|
|
|
math_researcher = Agent(
|
|
role="Researcher",
|
|
goal="You research about math.",
|
|
backstory="You're an expert in research and you love to learn new things.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task1 = Task(
|
|
description="Research a topic to teach a kid aged 6 about math.",
|
|
expected_output="A topic, explanation, angle, and examples.",
|
|
agent=math_researcher,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[math_researcher],
|
|
tasks=[task1],
|
|
memory=False,
|
|
)
|
|
|
|
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
|
|
crew.kickoff()
|
|
contextual_mem.assert_not_called()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_crew_log_file_output(tmp_path):
|
|
test_file = tmp_path / "logs.txt"
|
|
tasks = [
|
|
Task(
|
|
description="Say Hi",
|
|
expected_output="The word: Hi",
|
|
agent=researcher,
|
|
)
|
|
]
|
|
|
|
crew = Crew(agents=[researcher], tasks=tasks, output_log_file=str(test_file))
|
|
crew.kickoff()
|
|
assert test_file.exists()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_manager_agent():
|
|
from unittest.mock import patch
|
|
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
)
|
|
|
|
manager = Agent(
|
|
role="Manager",
|
|
goal="Manage the crew and ensure the tasks are completed efficiently.",
|
|
backstory="You're an experienced manager, skilled in overseeing complex projects and guiding teams to success. Your role is to coordinate the efforts of the crew members, ensuring that each task is completed on time and to the highest standard.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.hierarchical,
|
|
manager_agent=manager,
|
|
tasks=[task],
|
|
)
|
|
|
|
mock_task_output = TaskOutput(
|
|
description="Mock description", raw="mocked output", agent="mocked agent"
|
|
)
|
|
|
|
# Because we are mocking execute_sync, we never hit the underlying _execute_core
|
|
# which sets the output attribute of the task
|
|
task.output = mock_task_output
|
|
|
|
with patch.object(
|
|
Task, "execute_sync", return_value=mock_task_output
|
|
) as mock_execute_sync:
|
|
crew.kickoff()
|
|
assert manager.allow_delegation is True
|
|
mock_execute_sync.assert_called()
|
|
|
|
|
|
def test_manager_agent_in_agents_raises_exception():
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
)
|
|
|
|
manager = Agent(
|
|
role="Manager",
|
|
goal="Manage the crew and ensure the tasks are completed efficiently.",
|
|
backstory="You're an experienced manager, skilled in overseeing complex projects and guiding teams to success. Your role is to coordinate the efforts of the crew members, ensuring that each task is completed on time and to the highest standard.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
with pytest.raises(pydantic_core._pydantic_core.ValidationError):
|
|
Crew(
|
|
agents=[researcher, writer, manager],
|
|
process=Process.hierarchical,
|
|
manager_agent=manager,
|
|
tasks=[task],
|
|
)
|
|
|
|
|
|
def test_manager_agent_with_tools_raises_exception():
|
|
from crewai_tools import tool
|
|
|
|
@tool
|
|
def testing_tool(first_number: int, second_number: int) -> int:
|
|
"""Useful for when you need to multiply two numbers together."""
|
|
return first_number * second_number
|
|
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
)
|
|
|
|
manager = Agent(
|
|
role="Manager",
|
|
goal="Manage the crew and ensure the tasks are completed efficiently.",
|
|
backstory="You're an experienced manager, skilled in overseeing complex projects and guiding teams to success. Your role is to coordinate the efforts of the crew members, ensuring that each task is completed on time and to the highest standard.",
|
|
allow_delegation=False,
|
|
tools=[testing_tool],
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.hierarchical,
|
|
manager_agent=manager,
|
|
tasks=[task],
|
|
)
|
|
|
|
with pytest.raises(Exception):
|
|
crew.kickoff()
|
|
|
|
|
|
@patch("crewai.crew.Crew.kickoff")
|
|
@patch("crewai.crew.CrewTrainingHandler")
|
|
@patch("crewai.crew.TaskEvaluator")
|
|
def test_crew_train_success(task_evaluator, crew_training_handler, kickoff):
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
agent=researcher,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task],
|
|
)
|
|
crew.train(
|
|
n_iterations=2, inputs={"topic": "AI"}, filename="trained_agents_data.pkl"
|
|
)
|
|
task_evaluator.assert_has_calls(
|
|
[
|
|
mock.call(researcher),
|
|
mock.call().evaluate_training_data(
|
|
training_data=crew_training_handler().load(),
|
|
agent_id=str(researcher.id),
|
|
),
|
|
mock.call().evaluate_training_data().model_dump(),
|
|
mock.call(writer),
|
|
mock.call().evaluate_training_data(
|
|
training_data=crew_training_handler().load(),
|
|
agent_id=str(writer.id),
|
|
),
|
|
mock.call().evaluate_training_data().model_dump(),
|
|
]
|
|
)
|
|
|
|
crew_training_handler.assert_has_calls(
|
|
[
|
|
mock.call("training_data.pkl"),
|
|
mock.call().load(),
|
|
mock.call("trained_agents_data.pkl"),
|
|
mock.call().save_trained_data(
|
|
agent_id="Researcher",
|
|
trained_data=task_evaluator().evaluate_training_data().model_dump(),
|
|
),
|
|
mock.call("trained_agents_data.pkl"),
|
|
mock.call().save_trained_data(
|
|
agent_id="Senior Writer",
|
|
trained_data=task_evaluator().evaluate_training_data().model_dump(),
|
|
),
|
|
mock.call(),
|
|
mock.call().load(),
|
|
mock.call(),
|
|
mock.call().load(),
|
|
]
|
|
)
|
|
|
|
kickoff.assert_has_calls(
|
|
[mock.call(inputs={"topic": "AI"}), mock.call(inputs={"topic": "AI"})]
|
|
)
|
|
|
|
|
|
def test_crew_train_error():
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
agent=researcher,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task],
|
|
)
|
|
|
|
with pytest.raises(TypeError) as e:
|
|
crew.train()
|
|
assert "train() missing 1 required positional argument: 'n_iterations'" in str(
|
|
e
|
|
)
|
|
|
|
|
|
def test__setup_for_training():
|
|
researcher.allow_delegation = True
|
|
writer.allow_delegation = True
|
|
agents = [researcher, writer]
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
agent=researcher,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=[task],
|
|
)
|
|
|
|
assert crew._train is False
|
|
assert task.human_input is False
|
|
|
|
for agent in agents:
|
|
assert agent.allow_delegation is True
|
|
|
|
crew._setup_for_training("trained_agents_data.pkl")
|
|
|
|
assert crew._train is True
|
|
assert task.human_input is True
|
|
|
|
for agent in agents:
|
|
assert agent.allow_delegation is False
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_replay_feature():
|
|
list_ideas = Task(
|
|
description="Generate a list of 5 interesting ideas to explore for an article, where each bulletpoint is under 15 words.",
|
|
expected_output="Bullet point list of 5 important events. No additional commentary.",
|
|
agent=researcher,
|
|
)
|
|
write = Task(
|
|
description="Write a sentence about the events",
|
|
expected_output="A sentence about the events",
|
|
agent=writer,
|
|
context=[list_ideas],
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[list_ideas, write],
|
|
process=Process.sequential,
|
|
)
|
|
|
|
with patch.object(Task, "execute_sync") as mock_execute_task:
|
|
mock_execute_task.return_value = TaskOutput(
|
|
description="Mock description",
|
|
raw="Mocked output for list of ideas",
|
|
agent="Researcher",
|
|
json_dict=None,
|
|
output_format=OutputFormat.RAW,
|
|
pydantic=None,
|
|
summary="Mocked output for list of ideas",
|
|
)
|
|
|
|
crew.kickoff()
|
|
crew.replay(str(write.id))
|
|
# Ensure context was passed correctly
|
|
assert mock_execute_task.call_count == 3
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_crew_replay_error():
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
agent=researcher,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task],
|
|
)
|
|
|
|
with pytest.raises(TypeError) as e:
|
|
crew.replay() # type: ignore purposefully throwing err
|
|
assert "task_id is required" in str(e)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_crew_task_db_init():
|
|
agent = Agent(
|
|
role="Content Writer",
|
|
goal="Write engaging content on various topics.",
|
|
backstory="You have a background in journalism and creative writing.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Write a detailed article about AI in healthcare.",
|
|
expected_output="A 1 paragraph article about AI.",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
|
|
with patch.object(Task, "execute_sync") as mock_execute_task:
|
|
mock_execute_task.return_value = TaskOutput(
|
|
description="Write about AI in healthcare.",
|
|
raw="Artificial Intelligence (AI) is revolutionizing healthcare by enhancing diagnostic accuracy, personalizing treatment plans, and streamlining administrative tasks.",
|
|
agent="Content Writer",
|
|
json_dict=None,
|
|
output_format=OutputFormat.RAW,
|
|
pydantic=None,
|
|
summary="Write about AI in healthcare...",
|
|
)
|
|
|
|
crew.kickoff()
|
|
|
|
# Check if this runs without raising an exception
|
|
try:
|
|
db_handler = TaskOutputStorageHandler()
|
|
db_handler.load()
|
|
assert True # If we reach this point, no exception was raised
|
|
except Exception as e:
|
|
pytest.fail(f"An exception was raised: {str(e)}")
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_replay_task_with_context():
|
|
agent1 = Agent(
|
|
role="Researcher",
|
|
goal="Research AI advancements.",
|
|
backstory="You are an expert in AI research.",
|
|
)
|
|
agent2 = Agent(
|
|
role="Writer",
|
|
goal="Write detailed articles on AI.",
|
|
backstory="You have a background in journalism and AI.",
|
|
)
|
|
|
|
task1 = Task(
|
|
description="Research the latest advancements in AI.",
|
|
expected_output="A detailed report on AI advancements.",
|
|
agent=agent1,
|
|
)
|
|
task2 = Task(
|
|
description="Summarize the AI advancements report.",
|
|
expected_output="A summary of the AI advancements report.",
|
|
agent=agent2,
|
|
)
|
|
task3 = Task(
|
|
description="Write an article based on the AI advancements summary.",
|
|
expected_output="An article on AI advancements.",
|
|
agent=agent2,
|
|
)
|
|
task4 = Task(
|
|
description="Create a presentation based on the AI advancements article.",
|
|
expected_output="A presentation on AI advancements.",
|
|
agent=agent2,
|
|
context=[task1],
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[agent1, agent2],
|
|
tasks=[task1, task2, task3, task4],
|
|
process=Process.sequential,
|
|
)
|
|
|
|
mock_task_output1 = TaskOutput(
|
|
description="Research the latest advancements in AI.",
|
|
raw="Detailed report on AI advancements...",
|
|
agent="Researcher",
|
|
json_dict=None,
|
|
output_format=OutputFormat.RAW,
|
|
pydantic=None,
|
|
summary="Detailed report on AI advancements...",
|
|
)
|
|
mock_task_output2 = TaskOutput(
|
|
description="Summarize the AI advancements report.",
|
|
raw="Summary of the AI advancements report...",
|
|
agent="Writer",
|
|
json_dict=None,
|
|
output_format=OutputFormat.RAW,
|
|
pydantic=None,
|
|
summary="Summary of the AI advancements report...",
|
|
)
|
|
mock_task_output3 = TaskOutput(
|
|
description="Write an article based on the AI advancements summary.",
|
|
raw="Article on AI advancements...",
|
|
agent="Writer",
|
|
json_dict=None,
|
|
output_format=OutputFormat.RAW,
|
|
pydantic=None,
|
|
summary="Article on AI advancements...",
|
|
)
|
|
mock_task_output4 = TaskOutput(
|
|
description="Create a presentation based on the AI advancements article.",
|
|
raw="Presentation on AI advancements...",
|
|
agent="Writer",
|
|
json_dict=None,
|
|
output_format=OutputFormat.RAW,
|
|
pydantic=None,
|
|
summary="Presentation on AI advancements...",
|
|
)
|
|
|
|
with patch.object(Task, "execute_sync") as mock_execute_task:
|
|
mock_execute_task.side_effect = [
|
|
mock_task_output1,
|
|
mock_task_output2,
|
|
mock_task_output3,
|
|
mock_task_output4,
|
|
]
|
|
|
|
crew.kickoff()
|
|
db_handler = TaskOutputStorageHandler()
|
|
assert db_handler.load() != []
|
|
|
|
with patch.object(Task, "execute_sync") as mock_replay_task:
|
|
mock_replay_task.return_value = mock_task_output4
|
|
|
|
replayed_output = crew.replay(str(task4.id))
|
|
assert replayed_output.raw == "Presentation on AI advancements..."
|
|
|
|
db_handler.reset()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_replay_with_context():
|
|
agent = Agent(role="test_agent", backstory="Test Description", goal="Test Goal")
|
|
task1 = Task(
|
|
description="Context Task", expected_output="Say Task Output", agent=agent
|
|
)
|
|
task2 = Task(
|
|
description="Test Task", expected_output="Say Hi", agent=agent, context=[task1]
|
|
)
|
|
|
|
context_output = TaskOutput(
|
|
description="Context Task Output",
|
|
agent="test_agent",
|
|
raw="context raw output",
|
|
pydantic=None,
|
|
json_dict={},
|
|
output_format=OutputFormat.RAW,
|
|
)
|
|
task1.output = context_output
|
|
|
|
crew = Crew(agents=[agent], tasks=[task1, task2], process=Process.sequential)
|
|
|
|
with patch(
|
|
"crewai.utilities.task_output_storage_handler.TaskOutputStorageHandler.load",
|
|
return_value=[
|
|
{
|
|
"task_id": str(task1.id),
|
|
"output": {
|
|
"description": context_output.description,
|
|
"summary": context_output.summary,
|
|
"raw": context_output.raw,
|
|
"pydantic": context_output.pydantic,
|
|
"json_dict": context_output.json_dict,
|
|
"output_format": context_output.output_format,
|
|
"agent": context_output.agent,
|
|
},
|
|
"inputs": {},
|
|
},
|
|
{
|
|
"task_id": str(task2.id),
|
|
"output": {
|
|
"description": "Test Task Output",
|
|
"summary": None,
|
|
"raw": "test raw output",
|
|
"pydantic": None,
|
|
"json_dict": {},
|
|
"output_format": "json",
|
|
"agent": "test_agent",
|
|
},
|
|
"inputs": {},
|
|
},
|
|
],
|
|
):
|
|
crew.replay(str(task2.id))
|
|
|
|
assert crew.tasks[1].context[0].output.raw == "context raw output"
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_replay_with_invalid_task_id():
|
|
agent = Agent(role="test_agent", backstory="Test Description", goal="Test Goal")
|
|
task1 = Task(
|
|
description="Context Task", expected_output="Say Task Output", agent=agent
|
|
)
|
|
task2 = Task(
|
|
description="Test Task", expected_output="Say Hi", agent=agent, context=[task1]
|
|
)
|
|
|
|
context_output = TaskOutput(
|
|
description="Context Task Output",
|
|
agent="test_agent",
|
|
raw="context raw output",
|
|
pydantic=None,
|
|
json_dict={},
|
|
output_format=OutputFormat.RAW,
|
|
)
|
|
task1.output = context_output
|
|
|
|
crew = Crew(agents=[agent], tasks=[task1, task2], process=Process.sequential)
|
|
|
|
with patch(
|
|
"crewai.utilities.task_output_storage_handler.TaskOutputStorageHandler.load",
|
|
return_value=[
|
|
{
|
|
"task_id": str(task1.id),
|
|
"output": {
|
|
"description": context_output.description,
|
|
"summary": context_output.summary,
|
|
"raw": context_output.raw,
|
|
"pydantic": context_output.pydantic,
|
|
"json_dict": context_output.json_dict,
|
|
"output_format": context_output.output_format,
|
|
"agent": context_output.agent,
|
|
},
|
|
"inputs": {},
|
|
},
|
|
{
|
|
"task_id": str(task2.id),
|
|
"output": {
|
|
"description": "Test Task Output",
|
|
"summary": None,
|
|
"raw": "test raw output",
|
|
"pydantic": None,
|
|
"json_dict": {},
|
|
"output_format": "json",
|
|
"agent": "test_agent",
|
|
},
|
|
"inputs": {},
|
|
},
|
|
],
|
|
):
|
|
with pytest.raises(
|
|
ValueError,
|
|
match="Task with id bf5b09c9-69bd-4eb8-be12-f9e5bae31c2d not found in the crew's tasks.",
|
|
):
|
|
crew.replay("bf5b09c9-69bd-4eb8-be12-f9e5bae31c2d")
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
@patch.object(Crew, "_interpolate_inputs")
|
|
def test_replay_interpolates_inputs_properly(mock_interpolate_inputs):
|
|
agent = Agent(role="test_agent", backstory="Test Description", goal="Test Goal")
|
|
task1 = Task(description="Context Task", expected_output="Say {name}", agent=agent)
|
|
task2 = Task(
|
|
description="Test Task",
|
|
expected_output="Say Hi to {name}",
|
|
agent=agent,
|
|
context=[task1],
|
|
)
|
|
|
|
context_output = TaskOutput(
|
|
description="Context Task Output",
|
|
agent="test_agent",
|
|
raw="context raw output",
|
|
pydantic=None,
|
|
json_dict={},
|
|
output_format=OutputFormat.RAW,
|
|
)
|
|
task1.output = context_output
|
|
|
|
crew = Crew(agents=[agent], tasks=[task1, task2], process=Process.sequential)
|
|
crew.kickoff(inputs={"name": "John"})
|
|
|
|
with patch(
|
|
"crewai.utilities.task_output_storage_handler.TaskOutputStorageHandler.load",
|
|
return_value=[
|
|
{
|
|
"task_id": str(task1.id),
|
|
"output": {
|
|
"description": context_output.description,
|
|
"summary": context_output.summary,
|
|
"raw": context_output.raw,
|
|
"pydantic": context_output.pydantic,
|
|
"json_dict": context_output.json_dict,
|
|
"output_format": context_output.output_format,
|
|
"agent": context_output.agent,
|
|
},
|
|
"inputs": {"name": "John"},
|
|
},
|
|
{
|
|
"task_id": str(task2.id),
|
|
"output": {
|
|
"description": "Test Task Output",
|
|
"summary": None,
|
|
"raw": "test raw output",
|
|
"pydantic": None,
|
|
"json_dict": {},
|
|
"output_format": "json",
|
|
"agent": "test_agent",
|
|
},
|
|
"inputs": {"name": "John"},
|
|
},
|
|
],
|
|
):
|
|
crew.replay(str(task2.id))
|
|
assert crew._inputs == {"name": "John"}
|
|
assert mock_interpolate_inputs.call_count == 2
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_replay_setup_context():
|
|
agent = Agent(role="test_agent", backstory="Test Description", goal="Test Goal")
|
|
task1 = Task(description="Context Task", expected_output="Say {name}", agent=agent)
|
|
task2 = Task(
|
|
description="Test Task",
|
|
expected_output="Say Hi to {name}",
|
|
agent=agent,
|
|
)
|
|
context_output = TaskOutput(
|
|
description="Context Task Output",
|
|
agent="test_agent",
|
|
raw="context raw output",
|
|
pydantic=None,
|
|
json_dict={},
|
|
output_format=OutputFormat.RAW,
|
|
)
|
|
task1.output = context_output
|
|
crew = Crew(agents=[agent], tasks=[task1, task2], process=Process.sequential)
|
|
with patch(
|
|
"crewai.utilities.task_output_storage_handler.TaskOutputStorageHandler.load",
|
|
return_value=[
|
|
{
|
|
"task_id": str(task1.id),
|
|
"output": {
|
|
"description": context_output.description,
|
|
"summary": context_output.summary,
|
|
"raw": context_output.raw,
|
|
"pydantic": context_output.pydantic,
|
|
"json_dict": context_output.json_dict,
|
|
"output_format": context_output.output_format,
|
|
"agent": context_output.agent,
|
|
},
|
|
"inputs": {"name": "John"},
|
|
},
|
|
{
|
|
"task_id": str(task2.id),
|
|
"output": {
|
|
"description": "Test Task Output",
|
|
"summary": None,
|
|
"raw": "test raw output",
|
|
"pydantic": None,
|
|
"json_dict": {},
|
|
"output_format": "json",
|
|
"agent": "test_agent",
|
|
},
|
|
"inputs": {"name": "John"},
|
|
},
|
|
],
|
|
):
|
|
crew.replay(str(task2.id))
|
|
|
|
# Check if the first task's output was set correctly
|
|
assert crew.tasks[0].output is not None
|
|
assert isinstance(crew.tasks[0].output, TaskOutput)
|
|
assert crew.tasks[0].output.description == "Context Task Output"
|
|
assert crew.tasks[0].output.agent == "test_agent"
|
|
assert crew.tasks[0].output.raw == "context raw output"
|
|
assert crew.tasks[0].output.output_format == OutputFormat.RAW
|
|
|
|
assert crew.tasks[1].prompt_context == "context raw output"
|
|
|
|
|
|
def test_key():
|
|
tasks = [
|
|
Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 important events.",
|
|
agent=researcher,
|
|
),
|
|
Task(
|
|
description="Write a 1 amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="A 4 paragraph article about AI.",
|
|
agent=writer,
|
|
),
|
|
]
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
process=Process.sequential,
|
|
tasks=tasks,
|
|
)
|
|
hash = hashlib.md5(
|
|
f"{researcher.key}|{writer.key}|{tasks[0].key}|{tasks[1].key}".encode()
|
|
).hexdigest()
|
|
|
|
assert crew.key == hash
|
|
|
|
|
|
def test_conditional_task_requirement_breaks_when_singular_conditional_task():
|
|
def condition_fn(output) -> bool:
|
|
return output.raw.startswith("Andrew Ng has!!")
|
|
|
|
task = ConditionalTask(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
condition=condition_fn,
|
|
)
|
|
|
|
with pytest.raises(pydantic_core._pydantic_core.ValidationError):
|
|
Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task],
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_conditional_task_last_task_when_conditional_is_true():
|
|
def condition_fn(output) -> bool:
|
|
return True
|
|
|
|
task1 = Task(
|
|
description="Say Hi",
|
|
expected_output="Hi",
|
|
agent=researcher,
|
|
)
|
|
task2 = ConditionalTask(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
condition=condition_fn,
|
|
agent=writer,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task1, task2],
|
|
)
|
|
result = crew.kickoff()
|
|
assert result.raw.startswith(
|
|
"1. **The Rise of Autonomous AI Agents in Daily Life**"
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_conditional_task_last_task_when_conditional_is_false():
|
|
def condition_fn(output) -> bool:
|
|
return False
|
|
|
|
task1 = Task(
|
|
description="Say Hi",
|
|
expected_output="Hi",
|
|
agent=researcher,
|
|
)
|
|
task2 = ConditionalTask(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
condition=condition_fn,
|
|
agent=writer,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task1, task2],
|
|
)
|
|
result = crew.kickoff()
|
|
print(result.raw)
|
|
assert result.raw == "Hi"
|
|
|
|
|
|
def test_conditional_task_requirement_breaks_when_task_async():
|
|
def my_condition(context):
|
|
return context.get("some_value") > 10
|
|
|
|
task = ConditionalTask(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
execute_async=True,
|
|
condition=my_condition,
|
|
agent=researcher,
|
|
)
|
|
task2 = Task(
|
|
description="Say Hi",
|
|
expected_output="Hi",
|
|
agent=writer,
|
|
)
|
|
|
|
with pytest.raises(pydantic_core._pydantic_core.ValidationError):
|
|
Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task, task2],
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_conditional_should_skip():
|
|
task1 = Task(description="Return hello", expected_output="say hi", agent=researcher)
|
|
|
|
condition_mock = MagicMock(return_value=False)
|
|
task2 = ConditionalTask(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
condition=condition_mock,
|
|
agent=writer,
|
|
)
|
|
crew_met = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task1, task2],
|
|
)
|
|
with patch.object(Task, "execute_sync") as mock_execute_sync:
|
|
mock_execute_sync.return_value = TaskOutput(
|
|
description="Task 1 description",
|
|
raw="Task 1 output",
|
|
agent="Researcher",
|
|
)
|
|
|
|
result = crew_met.kickoff()
|
|
assert mock_execute_sync.call_count == 1
|
|
|
|
assert condition_mock.call_count == 1
|
|
assert condition_mock() is False
|
|
|
|
assert task2.output is None
|
|
assert result.raw.startswith("Task 1 output")
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_conditional_should_execute():
|
|
task1 = Task(description="Return hello", expected_output="say hi", agent=researcher)
|
|
|
|
condition_mock = MagicMock(
|
|
return_value=True
|
|
) # should execute this conditional task
|
|
task2 = ConditionalTask(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
condition=condition_mock,
|
|
agent=writer,
|
|
)
|
|
crew_met = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task1, task2],
|
|
)
|
|
with patch.object(Task, "execute_sync") as mock_execute_sync:
|
|
mock_execute_sync.return_value = TaskOutput(
|
|
description="Task 1 description",
|
|
raw="Task 1 output",
|
|
agent="Researcher",
|
|
)
|
|
|
|
crew_met.kickoff()
|
|
|
|
assert condition_mock.call_count == 1
|
|
assert condition_mock() is True
|
|
assert mock_execute_sync.call_count == 2
|
|
|
|
|
|
@mock.patch("crewai.crew.CrewEvaluator")
|
|
@mock.patch("crewai.crew.Crew.kickoff")
|
|
def test_crew_testing_function(mock_kickoff, crew_evaluator):
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
agent=researcher,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher],
|
|
tasks=[task],
|
|
)
|
|
n_iterations = 2
|
|
crew.test(n_iterations, openai_model_name="gpt-4o-mini", inputs={"topic": "AI"})
|
|
|
|
assert len(mock_kickoff.mock_calls) == n_iterations
|
|
mock_kickoff.assert_has_calls(
|
|
[mock.call(inputs={"topic": "AI"}), mock.call(inputs={"topic": "AI"})]
|
|
)
|
|
|
|
crew_evaluator.assert_has_calls(
|
|
[
|
|
mock.call(crew, "gpt-4o-mini"),
|
|
mock.call().set_iteration(1),
|
|
mock.call().set_iteration(2),
|
|
mock.call().print_crew_evaluation_result(),
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_hierarchical_verbose_manager_agent():
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task],
|
|
process=Process.hierarchical,
|
|
manager_llm=ChatOpenAI(temperature=0, model="gpt-4o"),
|
|
verbose=True,
|
|
)
|
|
|
|
crew.kickoff()
|
|
|
|
assert crew.manager_agent is not None
|
|
assert crew.manager_agent.verbose
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_hierarchical_verbose_false_manager_agent():
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
task = Task(
|
|
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
|
expected_output="5 bullet points with a paragraph for each idea.",
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher, writer],
|
|
tasks=[task],
|
|
process=Process.hierarchical,
|
|
manager_llm=ChatOpenAI(temperature=0, model="gpt-4o"),
|
|
verbose=False,
|
|
)
|
|
|
|
crew.kickoff()
|
|
|
|
assert crew.manager_agent is not None
|
|
assert not crew.manager_agent.verbose
|