Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 298 additions & 0 deletions camel/agents/chat_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
cast,
)

from fastapi import FastAPI
from fastapi.responses import JSONResponse, StreamingResponse
from openai import (
AsyncStream,
RateLimitError,
Expand Down Expand Up @@ -103,6 +105,24 @@
from camel.utils.commons import dependencies_required
from camel.utils.context_utils import ContextUtility


# Define Pydantic models for request/response
class ChatMessage(BaseModel):
role: str
content: str = ""
name: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
tool_call_id: Optional[str] = None


class ChatCompletionRequest(BaseModel):
model: Optional[str] = "camel-model"
messages: List[ChatMessage]
stream: Optional[bool] = False
functions: Optional[Any] = None
tools: Optional[Any] = None


TOKEN_LIMIT_ERROR_MARKERS = (
"context_length_exceeded",
"prompt is too long",
Expand Down Expand Up @@ -5208,3 +5228,281 @@
mcp_server.tool()(get_available_tools)

return mcp_server

@dependencies_required("fastapi")
def to_openai_compatible_server(self) -> Any:
r"""Create an OpenAI-compatible FastAPI server for this ChatAgent.
Returns:
FastAPI: A FastAPI application that can be served to provide
OpenAI-compatible API endpoints for this ChatAgent.
Example:
```python
agent = ChatAgent(model="gpt-4")
app = agent.to_openai_compatible_server()
# Serve with uvicorn
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
```
"""

app = FastAPI(
title="CAMEL OpenAI-compatible API",
description="OpenAI-compatible API for CAMEL ChatAgent",
)

@app.post("/v1/chat/completions")
async def chat_completions(request_data: ChatCompletionRequest):
try:
logger.info("Request", request_data.model_dump_json())

messages = request_data.messages
model = request_data.model
stream = request_data.stream
functions = request_data.functions
tools = request_data.tools

self.reset()
self._external_tool_schemas.clear()

# Convert OpenAI messages to CAMEL format and record in memory
current_user_message = None
for msg in messages:
msg_role = msg.role
msg_content = msg.content or ""

if msg_role == "user":
user_msg = BaseMessage.make_user_message(
role_name="User", content=msg_content
)
# Record all but the last user message in memory
# The last user message will be passed to step()
if current_user_message is not None:
self.update_memory(
current_user_message, OpenAIBackendRole.USER
)
current_user_message = user_msg
elif msg_role == "system":
sys_msg = BaseMessage.make_system_message(
role_name="System", content=msg_content
)
self.update_memory(sys_msg, OpenAIBackendRole.SYSTEM)
self.update_system_message(msg_content, True)
elif msg_role == "assistant":
# Record previous assistant messages
assistant_msg = BaseMessage.make_assistant_message(
role_name="Assistant", content=msg_content
)
self.update_memory(
assistant_msg, OpenAIBackendRole.ASSISTANT
)
elif msg_role == "tool":
# Handle tool response messages if needed
tool_call_id = msg.tool_call_id
tool_msg = FunctionCallingMessage.make_tool_message(
role_name="Tool",
content=msg_content,
tool_call_id=tool_call_id,
)
self.update_memory(tool_msg, OpenAIBackendRole.TOOL)

# Process tools/functions if provided
if tools or functions:
tools_to_use = tools if tools else functions
# Type guard to ensure tools_to_use is not None
if tools_to_use is not None:
for tool in tools_to_use:
self._external_tool_schemas[
tool["function"]["name"]
] = tool

# Get the response from the agent
self.model_backend.model_config_dict["stream"] = stream
if current_user_message is not None:
if stream:
try:
return StreamingResponse(
_stream_response(
current_user_message, request_data
),
media_type="text/event-stream",
)
except Exception:
logger.warning(
"The Model does not support streaming output,"
"there will be no reply"
)
else:
agent_response = await self.astep(current_user_message)

# Convert CAMEL response to OpenAI format
if not agent_response.msgs:
# Empty response or error
content = "No response generated"
finish_reason = "error"
else:
content = agent_response.msgs[0].content
finish_reason = "stop"

# Check for tool calls
tool_calls_response = None
external_tool_requests = agent_response.info.get(
"external_tool_call_requests"
)
if external_tool_requests:
tool_calls_response = []
for tool_call in external_tool_requests:
tool_calls_response.append(
{
"id": (
tool_call.tool_call_id
or f"call_{int(time.time())}"
),
"type": "function",
"function": {
"name": tool_call.tool_name,
"arguments": json.dumps(
tool_call.args
),
},
}
)
finish_reason = "tool_calls"

usage = agent_response.info.get("usage") or {
"prompt_tokens": agent_response.info.get(
"prompt_tokens", 0
),
"completion_tokens": agent_response.info.get(
"completion_tokens", 0
),
"total_tokens": agent_response.info.get(
"total_tokens", 0
),
}

response = {
"id": agent_response.info.get(
"id", f"chatcmpl-{int(time.time())}"
),
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": (
content
if not tool_calls_response
else None
),
"tool_calls": tool_calls_response,
},
"finish_reason": finish_reason,
}
],
"usage": usage,
}

logger.info(
"Pesponse:",
json.dumps(response, indent=2, ensure_ascii=False),
)

return response
else:
# No user message provided
return JSONResponse(
status_code=400,
content={"error": "No user message provided"},
)
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": f"Internal server error: {e!s}"},

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 8 days ago

To fix this issue, the code should avoid including the actual string representation of the exception ({e!s}) in the response sent to the client. Instead, upon catching an exception, the error (including the exception message and ideally the stack trace) should be logged on the server for diagnostic purposes, and a generic error message (e.g., "An internal server error has occurred.") should be returned to the client. This keeps sensitive implementation details from being exposed to a potential attacker while still providing developers the information they need to debug the error. The required changes are as follows:

  • In the exception handler in the chat_completions route, use logger.error to log the actual exception and stack trace.
  • Change the frontend-facing response to a generic message, such as {"error": "Internal server error"}.
  • Ensure the logger is available (already present as logger in the code) and used correctly for logging exceptions.
  • No new imports are needed, as logger and FastAPI's JSONResponse are already imported.

Suggested changeset 1
camel/agents/chat_agent.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/camel/agents/chat_agent.py b/camel/agents/chat_agent.py
--- a/camel/agents/chat_agent.py
+++ b/camel/agents/chat_agent.py
@@ -5909,9 +5909,10 @@
                         content={"error": "No user message provided"},
                     )
             except Exception as e:
+                logger.error("Exception in /v1/chat/completions endpoint", exc_info=True)
                 return JSONResponse(
                     status_code=500,
-                    content={"error": f"Internal server error: {e!s}"},
+                    content={"error": "Internal server error"},
                 )
 
         async def _stream_response(
EOF
@@ -5909,9 +5909,10 @@
content={"error": "No user message provided"},
)
except Exception as e:
logger.error("Exception in /v1/chat/completions endpoint", exc_info=True)
return JSONResponse(
status_code=500,
content={"error": f"Internal server error: {e!s}"},
content={"error": "Internal server error"},
)

async def _stream_response(
Copilot is powered by AI and may make mistakes. Always verify output.
)

async def _stream_response(
message: BaseMessage, request_data: ChatCompletionRequest
):
# Start a separate task for the agent processing
agent_response = await self.astep(message)

if isinstance(agent_response, AsyncStreamingChatAgentResponse):
first_sent = False
last_content = ""

async for chunk in agent_response:
if not chunk.msgs:
continue

msg = chunk.msgs[0]
full_content = (
msg.content if hasattr(msg, "content") else ""
)

if not first_sent:
first_chunk = {
"id": f"chatcmpl-{int(time.time())}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": getattr(
request_data, "model", "camel-model"
),
"choices": [
{
"index": 0,
"delta": {"role": "assistant"},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(first_chunk)}\n\n"
first_sent = True

if full_content and len(full_content) > len(last_content):
# TODO Needs improvement.
# Since Camel returns accumulated streaming content,
# the current implementation uses diff
# to retrieve incremental content.
delta_content = full_content[len(last_content) :]
last_content = full_content
word_chunk = {
"id": f"chatcmpl-{int(time.time())}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": getattr(
request_data, "model", "camel-model"
),
"choices": [
{
"index": 0,
"delta": {"content": delta_content},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(word_chunk)}\n\n"

if getattr(chunk, "terminated", False):
break

final_chunk = {
"choices": [
{"index": 0, "delta": {}, "finish_reason": "stop"}
]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"

else:
logger.warning(
"The Model does not support streaming output, "
"there will be no reply"
)

return app
20 changes: 18 additions & 2 deletions camel/messages/func_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import json
from dataclasses import dataclass
from typing import Any, Dict, Optional
from typing import (
Any,
Dict,
Optional,
)

from camel.messages import (
BaseMessage,
Expand All @@ -30,7 +34,7 @@
from camel.messages.conversion.sharegpt.function_call_formatter import (
FunctionCallFormatter,
)
from camel.types import OpenAIBackendRole
from camel.types import OpenAIBackendRole, RoleType


@dataclass
Expand Down Expand Up @@ -58,6 +62,18 @@ class FunctionCallingMessage(BaseMessage):
tool_call_id: Optional[str] = None
mask_output: Optional[bool] = False

@classmethod
def make_tool_message(
cls, role_name: str, content: str, tool_call_id: str
) -> "FunctionCallingMessage":
return cls(
role_name=role_name,
role_type=RoleType.TOOL,
meta_dict=None,
content=content,
tool_call_id=tool_call_id,
)

def to_openai_message(
self,
role_at_backend: OpenAIBackendRole,
Expand Down
Loading
Loading