Skip to content
64 changes: 63 additions & 1 deletion camel/societies/workforce/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,74 @@
QUALITY_EVALUATION_RESPONSE_FORMAT = """JSON format:
{
"quality_score": 0-100,
"reasoning": "explanation (1-2 sentences)",
"reasoning": "explanation (1-2 sentences)",
"issues": ["issue1", "issue2"],
"recovery_strategy": "retry|reassign|replan|decompose or null",
"modified_task_content": "new content if replan, else null"
}"""

VALIDATION_PROMPT = TextPrompt(
"""You are validating the aggregated results from subtasks to ensure they meet the original task requirements.

**ORIGINAL TASK INFORMATION:**
- Task ID: {task_id}
- Task Content: {task_content}
- Number of Subtasks: {num_subtasks}
- Subtask IDs: {subtask_ids}

**AGGREGATED RESULT:**
{aggregated_result}

**YOUR RESPONSIBILITIES:**

1. **Deduplication**: Identify and remove duplicate content across subtask results
- Look for repeated information, identical items, or overlapping content
- Count unique items vs duplicates

2. **Requirement Validation**: Check if the deduplicated results meet the original task requirements
- Extract any numerical requirements (e.g., "5 papers", "3 examples")
- Verify the unique count EXACTLY matches the requirements (not approximately)
- Set requirements_met to FALSE if even one item is missing
- Identify how many items are missing if requirements not met

3. **Quality Assessment**: Ensure the deduplicated result is coherent and complete
- Verify all unique items are valid and relevant
- Check for completeness and accuracy

**RESPONSE REQUIREMENTS:**

- **requirements_met**: MUST be false if unique_count does not EXACTLY match the required number
- **unique_count**: Total number of unique items after deduplication
- **duplicate_count**: Number of duplicate items removed
- **missing_count**: If requirements not met, how many items are still needed (0 if met)
- **deduplicated_result**: The cleaned result with duplicates removed and content merged
- **reasoning**: Clear explanation of your validation decision (2-3 sentences)
- **additional_task_guidance**: If requirements not met, provide guidance for individual subtask refinement. IMPORTANT: Multiple duplicate subtasks will be retried IN PARALLEL with the SAME guidance, so they may find the same new item again. To prevent this, your guidance should encourage DIVERSITY by suggesting exploration of different domains, time periods, methodologies, or approaches. Express guidance in SINGULAR form (what ONE subtask should do), list all items to avoid/exclude, AND suggest exploring diverse areas to maximize chances of finding different unique items
- **duplicate_subtask_ids**: List of subtask IDs that returned duplicate results (e.g., ["research_task_1.3", "research_task_1.5"] if these two had the same result). Use null if no duplicates found.

**RESPONSE FORMAT:**
{response_format}

**CRITICAL**:
- Return ONLY a valid JSON object
- Be thorough in identifying duplicates
- Provide specific guidance if additional tasks are needed
- Ensure all required fields are included
"""
)

VALIDATION_RESPONSE_FORMAT = """JSON format:
{
"requirements_met": true|false,
"unique_count": number,
"duplicate_count": number,
"missing_count": number,
"deduplicated_result": "cleaned content",
"reasoning": "explanation (2-3 sentences)",
"additional_task_guidance": "guidance string or null",
"duplicate_subtask_ids": ["subtask_id1", "subtask_id2"] or null
}"""

TASK_AGENT_SYSTEM_MESSAGE = """You are an intelligent task management assistant responsible for planning, analyzing, and quality control.

Your responsibilities include:
Expand Down
41 changes: 41 additions & 0 deletions camel/societies/workforce/structured_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
RecoveryStrategy,
TaskAnalysisResult,
TaskAssignResult,
ValidationResult,
WorkerConf,
)

Expand Down Expand Up @@ -399,6 +400,18 @@ def _fix_common_issues(
fixed_data['recovery_strategy'] = valid
break

elif schema_name == 'ValidationResult':
if 'deduplicated_result' in fixed_data:
if not isinstance(fixed_data['deduplicated_result'], str):
try:
fixed_data['deduplicated_result'] = json.dumps(
fixed_data['deduplicated_result'], indent=2
)
except (TypeError, ValueError):
fixed_data['deduplicated_result'] = str(
fixed_data['deduplicated_result']
)

return fixed_data

@staticmethod
Expand Down Expand Up @@ -428,6 +441,19 @@ def _create_default_instance(schema: Type[BaseModel]) -> BaseModel:
recovery_strategy=RecoveryStrategy.RETRY,
modified_task_content=None,
)
elif schema_name == 'ValidationResult':
return ValidationResult(
requirements_met=False,
unique_count=0,
duplicate_count=0,
missing_count=0,
deduplicated_result="",
reasoning=(
"Default validation result due to parsing error - "
"failing safe"
),
additional_task_guidance=None,
)
else:
# Try to create with empty dict and let defaults handle it
return schema()
Expand Down Expand Up @@ -502,6 +528,21 @@ def create_fallback_response(
modified_task_content=None,
)

elif schema_name == 'ValidationResult':
# Return fallback validation result - fail-safe approach
return ValidationResult(
requirements_met=False,
unique_count=0,
duplicate_count=0,
missing_count=0,
deduplicated_result="",
reasoning=(
f"Fallback validation result (failing safe): "
f"{error_message}"
),
additional_task_guidance=None,
)

else:
# Generic fallback
try:
Expand Down
52 changes: 52 additions & 0 deletions camel/societies/workforce/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class RecoveryStrategy(str, Enum):
DECOMPOSE = "decompose"
CREATE_WORKER = "create_worker"
REASSIGN = "reassign"
REFINE = "refine"

def __str__(self):
return self.value
Expand Down Expand Up @@ -221,6 +222,57 @@ def quality_sufficient(self) -> bool:
)


class ValidationResult(BaseModel):
r"""Result of validating aggregated parallel task results.

This model is used to validate results from parallel subtasks that were
"scattered" across multiple agents, checking for duplicates and ensuring
the final result meets the original requirements.
"""

requirements_met: bool = Field(
description="Whether the aggregated results meet the original task "
"requirements (e.g., '5 unique papers found')"
)

unique_count: int = Field(
description="Number of unique items found after deduplication"
)

duplicate_count: int = Field(
default=0,
description="Number of duplicate items that were removed",
)

missing_count: int = Field(
default=0,
description="Number of items still needed to meet requirements "
"(e.g., if 5 required but only 3 found, missing_count=2)",
)

deduplicated_result: str = Field(
description="The cleaned, deduplicated result content"
)

reasoning: str = Field(
description="Explanation of the validation decision and any issues "
"found"
)

additional_task_guidance: Optional[str] = Field(
default=None,
description="If requirements not met, guidance for generating "
"additional targeted subtasks (e.g., 'Find 2 more papers, "
"excluding: [list]')",
)

duplicate_subtask_ids: Optional[List[str]] = Field(
default=None,
description="List of subtask IDs that produced duplicate results. "
"These subtasks should be retried with refinement guidance.",
)


def check_if_running(
running: bool,
max_retries: int = 3,
Expand Down
Loading
Loading