Skip to content
78 changes: 77 additions & 1 deletion camel/societies/workforce/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,88 @@
QUALITY_EVALUATION_RESPONSE_FORMAT = """JSON format:
{
"quality_score": 0-100,
"reasoning": "explanation (1-2 sentences)",
"reasoning": "explanation (1-2 sentences)",
"issues": ["issue1", "issue2"],
"recovery_strategy": "retry|reassign|replan|decompose or null",
"modified_task_content": "new content if replan, else null"
}"""

VALIDATION_PROMPT = TextPrompt(
"""You are validating the aggregated results from subtasks to ensure they meet the original task requirements.

**ORIGINAL TASK INFORMATION:**
- Task ID: {task_id}
- Task Content: {task_content}
- Number of Subtasks: {num_subtasks}
- Subtask IDs: {subtask_ids}

**AGGREGATED RESULT:**
{aggregated_result}

**YOUR RESPONSIBILITIES:**

1. **Deduplication**: Identify and remove EXACT duplicate content across subtask results
- A duplicate means the SAME item appears multiple times (e.g., same paper title, same product name, same entity)
- Do NOT consider items as duplicates just because they are similar, related, or in the same category
- Only mark as duplicate if they are truly IDENTICAL (same title/name/identifier)
- Examples:
* DUPLICATE: Two subtasks both return "Paper: Attention Is All You Need"
* NOT DUPLICATE: "Attention Is All You Need" and "BERT: Pre-training of Deep Bidirectional Transformers" (different papers, both about NLP)
* NOT DUPLICATE: "iPhone 14" and "iPhone 15" (different products, both phones)
- Count unique items vs actual duplicates

2. **Requirement Validation**: Check if the deduplicated results meet the original task requirements
- Extract any numerical requirements (e.g., "5 papers", "3 examples")
- Verify the unique count EXACTLY matches the requirements (not approximately)
- Set requirements_met to FALSE if even one item is missing
- Identify how many items are missing if requirements not met

3. **Quality Assessment**: Ensure the deduplicated result is coherent and complete
- Verify all unique items are valid and relevant
- Check for completeness and accuracy

**RESPONSE REQUIREMENTS:**

- **requirements_met**: MUST be false if unique_count does not EXACTLY match the required number
- **unique_count**: Total number of DISTINCT items after removing EXACT duplicates (items with different titles/names/identifiers are NOT duplicates)
- **duplicate_count**: Number of EXACT duplicate items removed (same title/name appearing multiple times)
- **missing_count**: If requirements not met, how many items are still needed (0 if met)
- **deduplicated_result**: The cleaned result with EXACT duplicates removed and content merged
- **reasoning**: Clear explanation of your validation decision (2-3 sentences). If marking items as duplicates, explicitly state which items are IDENTICAL.
- **additional_task_guidance**: If requirements not met, provide guidance for individual subtask refinement. Express guidance in SINGULAR form (what ONE subtask should do to find ONE unique item), list all items to avoid/exclude, AND suggest exploring diverse areas (different domains, time periods, methodologies, or approaches) to maximize chances of finding different unique items. The system will retry only missing_count subtasks, selecting from duplicate_subtask_ids.
- **duplicate_subtask_ids**: List ONLY the subtask IDs that should be RETRIED due to producing duplicate results. For each set of duplicates, keep ONE result and list the OTHER task IDs for retry.
* Example: If task_1.3, task_1.5, and task_1.7 all returned "Paper A", keep one (e.g., task_1.3) and list the others: ["research_task_1.5", "research_task_1.7"]
* Example: If task_1.2 and task_1.4 both returned "Paper B", keep one and list the other: ["research_task_1.4"]
* Only include tasks that need to be retried because they produced duplicate content
* Do NOT include task IDs that returned unique, distinct results
* Use null if NO exact duplicates found (all results are distinct)

**RESPONSE FORMAT:**
{response_format}

**CRITICAL**:
- Return ONLY a valid JSON object
- Be thorough in identifying EXACT duplicates (same title/name/identifier)
- Do NOT mark similar or related items as duplicates
- duplicate_subtask_ids should ONLY contain task IDs to retry (exclude one kept result per duplicate set)
- If all results are distinct, set duplicate_subtask_ids to null
- Provide specific guidance if additional tasks are needed
- Ensure all required fields are included
"""
)

VALIDATION_RESPONSE_FORMAT = """JSON format:
{
"requirements_met": true|false,
"unique_count": number,
"duplicate_count": number,
"missing_count": number,
"deduplicated_result": "cleaned content",
"reasoning": "explanation (2-3 sentences)",
"additional_task_guidance": "guidance string or null",
"duplicate_subtask_ids": ["subtask_id1", "subtask_id2"] or null
}"""

TASK_AGENT_SYSTEM_MESSAGE = """You are an intelligent task management assistant responsible for planning, analyzing, and quality control.

Your responsibilities include:
Expand Down
42 changes: 42 additions & 0 deletions camel/societies/workforce/structured_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
RecoveryStrategy,
TaskAnalysisResult,
TaskAssignResult,
ValidationResult,
WorkerConf,
)

Expand Down Expand Up @@ -391,6 +392,7 @@ def _fix_common_issues(
'decompose',
'create_worker',
'reassign',
'refine',
]
if strategy not in valid_strategies:
# Try to match partial
Expand All @@ -399,6 +401,18 @@ def _fix_common_issues(
fixed_data['recovery_strategy'] = valid
break

elif schema_name == 'ValidationResult':
if 'deduplicated_result' in fixed_data:
if not isinstance(fixed_data['deduplicated_result'], str):
try:
fixed_data['deduplicated_result'] = json.dumps(
fixed_data['deduplicated_result'], indent=2
)
except (TypeError, ValueError):
fixed_data['deduplicated_result'] = str(
fixed_data['deduplicated_result']
)

return fixed_data

@staticmethod
Expand Down Expand Up @@ -428,6 +442,19 @@ def _create_default_instance(schema: Type[BaseModel]) -> BaseModel:
recovery_strategy=RecoveryStrategy.RETRY,
modified_task_content=None,
)
elif schema_name == 'ValidationResult':
return ValidationResult(
requirements_met=False,
unique_count=0,
duplicate_count=0,
missing_count=0,
deduplicated_result="",
reasoning=(
"Default validation result due to parsing error - "
"failing safe"
),
additional_task_guidance=None,
)
else:
# Try to create with empty dict and let defaults handle it
return schema()
Expand Down Expand Up @@ -502,6 +529,21 @@ def create_fallback_response(
modified_task_content=None,
)

elif schema_name == 'ValidationResult':
# Return fallback validation result - fail-safe approach
return ValidationResult(
requirements_met=False,
unique_count=0,
duplicate_count=0,
missing_count=0,
deduplicated_result="",
reasoning=(
f"Fallback validation result (failing safe): "
f"{error_message}"
),
additional_task_guidance=None,
)

else:
# Generic fallback
try:
Expand Down
52 changes: 52 additions & 0 deletions camel/societies/workforce/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class RecoveryStrategy(str, Enum):
DECOMPOSE = "decompose"
CREATE_WORKER = "create_worker"
REASSIGN = "reassign"
REFINE = "refine"

def __str__(self):
return self.value
Expand Down Expand Up @@ -221,6 +222,57 @@ def quality_sufficient(self) -> bool:
)


class ValidationResult(BaseModel):
r"""Result of validating aggregated parallel task results.

This model is used to validate results from parallel subtasks that were
"scattered" across multiple agents, checking for duplicates and ensuring
the final result meets the original requirements.
"""

requirements_met: bool = Field(
description="Whether the aggregated results meet the original task "
"requirements (e.g., '5 unique papers found')"
)

unique_count: int = Field(
description="Number of unique items found after deduplication"
)

duplicate_count: int = Field(
default=0,
description="Number of duplicate items that were removed",
)

missing_count: int = Field(
default=0,
description="Number of items still needed to meet requirements "
"(e.g., if 5 required but only 3 found, missing_count=2)",
)

deduplicated_result: str = Field(
description="The cleaned, deduplicated result content"
)

reasoning: str = Field(
description="Explanation of the validation decision and any issues "
"found"
)

additional_task_guidance: Optional[str] = Field(
default=None,
description="If requirements not met, guidance for generating "
"additional targeted subtasks (e.g., 'Find 2 more papers, "
"excluding: [list]')",
)

duplicate_subtask_ids: Optional[List[str]] = Field(
default=None,
description="List of subtask IDs that produced duplicate results. "
"These subtasks should be retried with refinement guidance.",
)


def check_if_running(
running: bool,
max_retries: int = 3,
Expand Down
Loading
Loading