refactor: update step callback methods to support asynchronous invocation (#4633)

* refactor: update step callback methods to support asynchronous invocation

- Replaced synchronous step callback invocations with asynchronous counterparts in the CrewAgentExecutor class.
- Introduced a new async method _ainvoke_step_callback to handle step callbacks in an async context, improving responsiveness and performance in asynchronous workflows.

* chore: bump version to 1.10.1b1 across multiple files

- Updated version strings from 1.10.1b to 1.10.1b1 in various project files including pyproject.toml and __init__.py files.
- Adjusted dependency specifications to reflect the new version in relevant templates and modules.
This commit is contained in:
João Moura
2026-02-27 02:35:03 -08:00
committed by GitHub
parent 979aa26c3d
commit 1bdb9496a3
12 changed files with 35 additions and 17 deletions

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.10.1b"
__version__ = "1.10.1b1"

View File

@@ -11,7 +11,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.10.1b",
"crewai==1.10.1b1",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -291,4 +291,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.10.1b"
__version__ = "1.10.1b1"

View File

@@ -53,7 +53,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.10.1b",
"crewai-tools==1.10.1b1",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.10.1b"
__version__ = "1.10.1b1"
_telemetry_submitted = False

View File

@@ -1259,7 +1259,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer, tool_result
)
self._invoke_step_callback(formatted_answer) # type: ignore[arg-type]
await self._ainvoke_step_callback(formatted_answer) # type: ignore[arg-type]
self._append_message(formatted_answer.text) # type: ignore[union-attr]
except OutputParserError as e:
@@ -1374,7 +1374,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
await self._ainvoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
@@ -1386,7 +1386,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
output=answer,
text=output_json,
)
self._invoke_step_callback(formatted_answer)
await self._ainvoke_step_callback(formatted_answer)
self._append_message(output_json)
self._show_logs(formatted_answer)
return formatted_answer
@@ -1397,7 +1397,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
await self._ainvoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
@@ -1491,7 +1491,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _invoke_step_callback(
self, formatted_answer: AgentAction | AgentFinish
) -> None:
"""Invoke step callback.
"""Invoke step callback (sync context).
Args:
formatted_answer: Current agent response.
@@ -1501,6 +1501,19 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if inspect.iscoroutine(cb_result):
asyncio.run(cb_result)
async def _ainvoke_step_callback(
self, formatted_answer: AgentAction | AgentFinish
) -> None:
"""Invoke step callback (async context).
Args:
formatted_answer: Current agent response.
"""
if self.step_callback:
cb_result = self.step_callback(formatted_answer)
if inspect.iscoroutine(cb_result):
await cb_result
def _append_message(
self, text: str, role: Literal["user", "assistant", "system"] = "assistant"
) -> None:

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.1b"
"crewai[tools]==1.10.1b1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.1b"
"crewai[tools]==1.10.1b1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.1b"
"crewai[tools]==1.10.1b1"
]
[tool.crewai]

View File

@@ -16,7 +16,7 @@ from collections.abc import (
Sequence,
ValuesView,
)
from concurrent.futures import Future
from concurrent.futures import Future, ThreadPoolExecutor
import copy
import enum
import inspect
@@ -1739,7 +1739,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
async def _run_flow() -> Any:
return await self.kickoff_async(inputs, input_files)
return asyncio.run(_run_flow())
try:
asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, _run_flow()).result()
except RuntimeError:
return asyncio.run(_run_flow())
async def kickoff_async(
self,

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.10.1b"
__version__ = "1.10.1b1"

View File

@@ -200,7 +200,7 @@ def add_docs_version(docs_json_path: Path, version: str) -> bool:
Args:
docs_json_path: Path to docs/docs.json.
version: Version string (e.g., "1.10.1b").
version: Version string (e.g., "1.10.1b1").
Returns:
True if docs.json was updated, False otherwise.