Compare commits

...

4 Commits

Author SHA1 Message Date
Devin AI
d58a24950c Fix import sorting in test_kickoff_for_each_parallel.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-19 17:12:18 +00:00
Devin AI
15d59cfc34 Improve kickoff_for_each_parallel based on PR feedback
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-19 17:12:09 +00:00
Devin AI
fd18bdfabb Add test cassettes for kickoff_for_each_parallel tests
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-19 17:05:25 +00:00
Devin AI
c77ac31db6 Add kickoff_for_each_parallel method using ThreadPoolExecutor to fix issue #2406
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-19 17:05:14 +00:00
5 changed files with 867 additions and 1 deletions

View File

@@ -1,4 +1,5 @@
import asyncio
import concurrent.futures
import json
import re
import uuid
@@ -6,7 +7,7 @@ import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union
from pydantic import (
UUID4,
@@ -707,6 +708,65 @@ class Crew(BaseModel):
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return results
def kickoff_for_each_parallel(self, inputs: Sequence[Dict[str, Any]], max_workers: Optional[int] = None) -> List[CrewOutput]:
"""Executes the Crew's workflow for each input in the list in parallel using ThreadPoolExecutor.
Args:
inputs: Sequence of input dictionaries to be passed to each crew execution.
max_workers: Maximum number of worker threads to use. If None, uses the default
ThreadPoolExecutor behavior (typically min(32, os.cpu_count() + 4)).
Returns:
List of CrewOutput objects, one for each input.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
if not isinstance(inputs, (list, tuple)):
raise TypeError(f"Inputs must be a list of dictionaries. Received {type(inputs).__name__} instead.")
if not inputs:
return []
results: List[CrewOutput] = []
# Initialize the parent crew's usage metrics
total_usage_metrics = UsageMetrics()
# Create a copy of the crew for each input to avoid state conflicts
crew_copies = [self.copy() for _ in inputs]
# Execute each crew in parallel
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks to the executor
future_to_crew = {
executor.submit(crew_copies[i].kickoff, inputs[i]): i
for i in range(len(inputs))
}
# Process results as they complete
for future in as_completed(future_to_crew):
crew_index = future_to_crew[future]
try:
output = future.result()
results.append(output)
# Aggregate usage metrics
if crew_copies[crew_index].usage_metrics:
total_usage_metrics.add_usage_metrics(crew_copies[crew_index].usage_metrics)
except Exception as exc:
# Re-raise the exception to maintain consistent behavior with kickoff_for_each
raise exc
finally:
# Clean up to assist garbage collection
crew_copies.clear()
# Set the aggregated metrics on the parent crew
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return results
def _handle_crew_planning(self):
"""Handles the Crew planning."""

View File

@@ -0,0 +1,254 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are dog Researcher. You
have a lot of experience with dog.\nYour personal goal is: Express hot takes
on dog.\nTo give my best complete final answer to the task respond using the
exact following format:\n\nThought: I now can give a great answer\nFinal Answer:
Your final answer must be the great and the most complete as possible, it must
be outcome described.\n\nI MUST use these formats, my job depends on it!"},
{"role": "user", "content": "\nCurrent Task: Give me an analysis around dog.\n\nThis
is the expected criteria for your final answer: 1 bullet point about dog that''s
under 15 words.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '914'
content-type:
- application/json
cookie:
- __cf_bm=xOCSjxqrC.2f2I2kISNioXWWErqIIGNKPkTsoEzcVRY-1742403707-1.0.1.1-bG9XNEXc7EBBNdNl9ff975bzPeeXgfmzAyYN1AesFIuQ2gEcmP67jHmnN6ABv.JJVDhw9oF5SREuZ3PUkYA8Xu6UQeWnIQxKi10X_40KN38;
_cfuvid=RypiiyRYeZ0ZoVTLFkoT3v1Yzk.qiNfGEnh9gk4.bYg-1742403707485-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-proj-********************************************************************************************************************************************************sLcA.
You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 922e88ad5d8d2805-SEA
Connection:
- keep-alive
Content-Length:
- '414'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 19 Mar 2025 17:01:49 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_9f1aa8c42b31f8139e80f67a71b11af3
http_version: HTTP/1.1
status_code: 401
- request:
body: '{"messages": [{"role": "system", "content": "You are apple Researcher.
You have a lot of experience with apple.\nYour personal goal is: Express hot
takes on apple.\nTo give my best complete final answer to the task respond using
the exact following format:\n\nThought: I now can give a great answer\nFinal
Answer: Your final answer must be the great and the most complete as possible,
it must be outcome described.\n\nI MUST use these formats, my job depends on
it!"}, {"role": "user", "content": "\nCurrent Task: Give me an analysis around
apple.\n\nThis is the expected criteria for your final answer: 1 bullet point
about apple that''s under 15 words.\nyou MUST return the actual complete content
as the final answer, not a summary.\n\nBegin! This is VERY important to you,
use the tools available and give your best Final Answer, your job depends on
it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '924'
content-type:
- application/json
cookie:
- __cf_bm=xOCSjxqrC.2f2I2kISNioXWWErqIIGNKPkTsoEzcVRY-1742403707-1.0.1.1-bG9XNEXc7EBBNdNl9ff975bzPeeXgfmzAyYN1AesFIuQ2gEcmP67jHmnN6ABv.JJVDhw9oF5SREuZ3PUkYA8Xu6UQeWnIQxKi10X_40KN38;
_cfuvid=RypiiyRYeZ0ZoVTLFkoT3v1Yzk.qiNfGEnh9gk4.bYg-1742403707485-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-proj-********************************************************************************************************************************************************sLcA.
You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 922e88adcdfe2805-SEA
Connection:
- keep-alive
Content-Length:
- '414'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 19 Mar 2025 17:01:49 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_2a94d77754a61f5f38f74f0648bf38af
http_version: HTTP/1.1
status_code: 401
- request:
body: '{"messages": [{"role": "system", "content": "You are cat Researcher. You
have a lot of experience with cat.\nYour personal goal is: Express hot takes
on cat.\nTo give my best complete final answer to the task respond using the
exact following format:\n\nThought: I now can give a great answer\nFinal Answer:
Your final answer must be the great and the most complete as possible, it must
be outcome described.\n\nI MUST use these formats, my job depends on it!"},
{"role": "user", "content": "\nCurrent Task: Give me an analysis around cat.\n\nThis
is the expected criteria for your final answer: 1 bullet point about cat that''s
under 15 words.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '914'
content-type:
- application/json
cookie:
- __cf_bm=xOCSjxqrC.2f2I2kISNioXWWErqIIGNKPkTsoEzcVRY-1742403707-1.0.1.1-bG9XNEXc7EBBNdNl9ff975bzPeeXgfmzAyYN1AesFIuQ2gEcmP67jHmnN6ABv.JJVDhw9oF5SREuZ3PUkYA8Xu6UQeWnIQxKi10X_40KN38;
_cfuvid=RypiiyRYeZ0ZoVTLFkoT3v1Yzk.qiNfGEnh9gk4.bYg-1742403707485-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-proj-********************************************************************************************************************************************************sLcA.
You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 922e88adabe3f8d9-SEA
Connection:
- keep-alive
Content-Length:
- '414'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 19 Mar 2025 17:01:49 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_e06cdb71cffc47ee28b267ed083c3a6b
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,254 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are dog Researcher. You
have a lot of experience with dog.\nYour personal goal is: Express hot takes
on dog.\nTo give my best complete final answer to the task respond using the
exact following format:\n\nThought: I now can give a great answer\nFinal Answer:
Your final answer must be the great and the most complete as possible, it must
be outcome described.\n\nI MUST use these formats, my job depends on it!"},
{"role": "user", "content": "\nCurrent Task: Give me an analysis around dog.\n\nThis
is the expected criteria for your final answer: 1 bullet point about dog that''s
under 15 words.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '914'
content-type:
- application/json
cookie:
- __cf_bm=xOCSjxqrC.2f2I2kISNioXWWErqIIGNKPkTsoEzcVRY-1742403707-1.0.1.1-bG9XNEXc7EBBNdNl9ff975bzPeeXgfmzAyYN1AesFIuQ2gEcmP67jHmnN6ABv.JJVDhw9oF5SREuZ3PUkYA8Xu6UQeWnIQxKi10X_40KN38;
_cfuvid=RypiiyRYeZ0ZoVTLFkoT3v1Yzk.qiNfGEnh9gk4.bYg-1742403707485-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-proj-********************************************************************************************************************************************************sLcA.
You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 922e88a8e9432805-SEA
Connection:
- keep-alive
Content-Length:
- '414'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 19 Mar 2025 17:01:48 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_d00f966d049ee99fde6e9e7bf652afa2
http_version: HTTP/1.1
status_code: 401
- request:
body: '{"messages": [{"role": "system", "content": "You are apple Researcher.
You have a lot of experience with apple.\nYour personal goal is: Express hot
takes on apple.\nTo give my best complete final answer to the task respond using
the exact following format:\n\nThought: I now can give a great answer\nFinal
Answer: Your final answer must be the great and the most complete as possible,
it must be outcome described.\n\nI MUST use these formats, my job depends on
it!"}, {"role": "user", "content": "\nCurrent Task: Give me an analysis around
apple.\n\nThis is the expected criteria for your final answer: 1 bullet point
about apple that''s under 15 words.\nyou MUST return the actual complete content
as the final answer, not a summary.\n\nBegin! This is VERY important to you,
use the tools available and give your best Final Answer, your job depends on
it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '924'
content-type:
- application/json
cookie:
- __cf_bm=xOCSjxqrC.2f2I2kISNioXWWErqIIGNKPkTsoEzcVRY-1742403707-1.0.1.1-bG9XNEXc7EBBNdNl9ff975bzPeeXgfmzAyYN1AesFIuQ2gEcmP67jHmnN6ABv.JJVDhw9oF5SREuZ3PUkYA8Xu6UQeWnIQxKi10X_40KN38;
_cfuvid=RypiiyRYeZ0ZoVTLFkoT3v1Yzk.qiNfGEnh9gk4.bYg-1742403707485-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-proj-********************************************************************************************************************************************************sLcA.
You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 922e88a91a59c4c8-SEA
Connection:
- keep-alive
Content-Length:
- '414'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 19 Mar 2025 17:01:48 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_6e9a5fd1d83c6eac8070b4e9b9b94f10
http_version: HTTP/1.1
status_code: 401
- request:
body: '{"messages": [{"role": "system", "content": "You are cat Researcher. You
have a lot of experience with cat.\nYour personal goal is: Express hot takes
on cat.\nTo give my best complete final answer to the task respond using the
exact following format:\n\nThought: I now can give a great answer\nFinal Answer:
Your final answer must be the great and the most complete as possible, it must
be outcome described.\n\nI MUST use these formats, my job depends on it!"},
{"role": "user", "content": "\nCurrent Task: Give me an analysis around cat.\n\nThis
is the expected criteria for your final answer: 1 bullet point about cat that''s
under 15 words.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '914'
content-type:
- application/json
cookie:
- __cf_bm=xOCSjxqrC.2f2I2kISNioXWWErqIIGNKPkTsoEzcVRY-1742403707-1.0.1.1-bG9XNEXc7EBBNdNl9ff975bzPeeXgfmzAyYN1AesFIuQ2gEcmP67jHmnN6ABv.JJVDhw9oF5SREuZ3PUkYA8Xu6UQeWnIQxKi10X_40KN38;
_cfuvid=RypiiyRYeZ0ZoVTLFkoT3v1Yzk.qiNfGEnh9gk4.bYg-1742403707485-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-proj-********************************************************************************************************************************************************sLcA.
You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 922e88a91f2ef8d9-SEA
Connection:
- keep-alive
Content-Length:
- '414'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 19 Mar 2025 17:01:48 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_d0ddaee2449c771a230de2b6362c8d99
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,89 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are dog Researcher. You
have a lot of experience with dog.\nYour personal goal is: Express hot takes
on dog.\nTo give my best complete final answer to the task respond using the
exact following format:\n\nThought: I now can give a great answer\nFinal Answer:
Your final answer must be the great and the most complete as possible, it must
be outcome described.\n\nI MUST use these formats, my job depends on it!"},
{"role": "user", "content": "\nCurrent Task: Give me an analysis around dog.\n\nThis
is the expected criteria for your final answer: 1 bullet point about dog that''s
under 15 words.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '914'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-proj-********************************************************************************************************************************************************sLcA.
You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 922e88a1b9ef2805-SEA
Connection:
- keep-alive
Content-Length:
- '414'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 19 Mar 2025 17:01:47 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=xOCSjxqrC.2f2I2kISNioXWWErqIIGNKPkTsoEzcVRY-1742403707-1.0.1.1-bG9XNEXc7EBBNdNl9ff975bzPeeXgfmzAyYN1AesFIuQ2gEcmP67jHmnN6ABv.JJVDhw9oF5SREuZ3PUkYA8Xu6UQeWnIQxKi10X_40KN38;
path=/; expires=Wed, 19-Mar-25 17:31:47 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=RypiiyRYeZ0ZoVTLFkoT3v1Yzk.qiNfGEnh9gk4.bYg-1742403707485-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_5980db57b01be333b0f977bb03a15bca
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,209 @@
"""Test for the kickoff_for_each_parallel method in Crew class."""
import concurrent.futures
from unittest.mock import MagicMock, patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.task import Task
def test_kickoff_for_each_parallel_single_input():
"""Tests if kickoff_for_each_parallel works with a single input."""
inputs = [{"topic": "dog"}]
agent = Agent(
role="{topic} Researcher",
goal="Express hot takes on {topic}.",
backstory="You have a lot of experience with {topic}.",
)
task = Task(
description="Give me an analysis around {topic}.",
expected_output="1 bullet point about {topic} that's under 15 words.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
# Mock the kickoff method to avoid API calls
expected_output = CrewOutput(raw="Dogs are loyal companions.")
with patch.object(Crew, "kickoff", return_value=expected_output):
results = crew.kickoff_for_each_parallel(inputs=inputs)
assert len(results) == 1
assert results[0].raw == "Dogs are loyal companions."
def test_kickoff_for_each_parallel_multiple_inputs():
"""Tests if kickoff_for_each_parallel works with multiple inputs."""
inputs = [
{"topic": "dog"},
{"topic": "cat"},
{"topic": "apple"},
]
agent = Agent(
role="{topic} Researcher",
goal="Express hot takes on {topic}.",
backstory="You have a lot of experience with {topic}.",
)
task = Task(
description="Give me an analysis around {topic}.",
expected_output="1 bullet point about {topic} that's under 15 words.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
# Mock the kickoff method to avoid API calls
expected_outputs = [
CrewOutput(raw="Dogs are loyal companions."),
CrewOutput(raw="Cats are independent pets."),
CrewOutput(raw="Apples are nutritious fruits."),
]
with patch.object(Crew, "copy") as mock_copy:
# Setup mock crew copies
crew_copies = []
for i in range(len(inputs)):
crew_copy = MagicMock()
crew_copy.kickoff.return_value = expected_outputs[i]
crew_copies.append(crew_copy)
mock_copy.side_effect = crew_copies
results = crew.kickoff_for_each_parallel(inputs=inputs)
assert len(results) == len(inputs)
# Since ThreadPoolExecutor returns results in completion order, not input order,
# we just check that all expected outputs are in the results
result_texts = [result.raw for result in results]
expected_texts = [output.raw for output in expected_outputs]
for expected_text in expected_texts:
assert expected_text in result_texts
def test_kickoff_for_each_parallel_empty_input():
"""Tests if kickoff_for_each_parallel handles an empty input list."""
agent = Agent(
role="{topic} Researcher",
goal="Express hot takes on {topic}.",
backstory="You have a lot of experience with {topic}.",
)
task = Task(
description="Give me an analysis around {topic}.",
expected_output="1 bullet point about {topic} that's under 15 words.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
results = crew.kickoff_for_each_parallel(inputs=[])
assert results == []
def test_kickoff_for_each_parallel_invalid_input():
"""Tests if kickoff_for_each_parallel raises TypeError for invalid input types."""
agent = Agent(
role="{topic} Researcher",
goal="Express hot takes on {topic}.",
backstory="You have a lot of experience with {topic}.",
)
task = Task(
description="Give me an analysis around {topic}.",
expected_output="1 bullet point about {topic} that's under 15 words.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
# No need to mock here since we're testing input validation which happens before any API calls
with pytest.raises(TypeError):
# Pass a string instead of a list
crew.kickoff_for_each_parallel("invalid input")
def test_kickoff_for_each_parallel_error_handling():
"""Tests error handling in kickoff_for_each_parallel when kickoff raises an error."""
inputs = [
{"topic": "dog"},
{"topic": "cat"},
{"topic": "apple"},
]
agent = Agent(
role="{topic} Researcher",
goal="Express hot takes on {topic}.",
backstory="You have a lot of experience with {topic}.",
)
task = Task(
description="Give me an analysis around {topic}.",
expected_output="1 bullet point about {topic} that's under 15 words.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
with patch.object(Crew, "copy") as mock_copy:
# Setup mock crew copies
crew_copies = []
for i in range(len(inputs)):
crew_copy = MagicMock()
# Make the third crew copy raise an exception
if i == 2:
crew_copy.kickoff.side_effect = Exception("Simulated kickoff error")
else:
crew_copy.kickoff.return_value = f"Output for {inputs[i]['topic']}"
crew_copies.append(crew_copy)
mock_copy.side_effect = crew_copies
with pytest.raises(Exception, match="Simulated kickoff error"):
crew.kickoff_for_each_parallel(inputs=inputs)
def test_kickoff_for_each_parallel_max_workers():
"""Tests if kickoff_for_each_parallel respects the max_workers parameter."""
inputs = [
{"topic": "dog"},
{"topic": "cat"},
{"topic": "apple"},
]
agent = Agent(
role="{topic} Researcher",
goal="Express hot takes on {topic}.",
backstory="You have a lot of experience with {topic}.",
)
task = Task(
description="Give me an analysis around {topic}.",
expected_output="1 bullet point about {topic} that's under 15 words.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
# Mock both ThreadPoolExecutor and crew.copy to avoid API calls
with patch.object(concurrent.futures, "ThreadPoolExecutor", wraps=concurrent.futures.ThreadPoolExecutor) as mock_executor:
with patch.object(Crew, "copy") as mock_copy:
# Setup mock crew copies
crew_copies = []
for _ in range(len(inputs)):
crew_copy = MagicMock()
crew_copy.kickoff.return_value = CrewOutput(raw="Test output")
crew_copies.append(crew_copy)
mock_copy.side_effect = crew_copies
crew.kickoff_for_each_parallel(inputs=inputs, max_workers=2)
mock_executor.assert_called_once_with(max_workers=2)