-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat: duplicated task handling #3395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Wendong-Fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @Saedbhati , left some comments below, i think the design could be more tidy and now there's some redundant code/design, also could you add test code and example file for this? example task for workforce could be: search 10 different papers related to llm agent and write a html report about them
| max_refinement_iterations (int, optional): Maximum number of | ||
| iterative refinement attempts for parallel task results. When | ||
| parallel subtasks produce duplicate content or don't meet | ||
| requirements, the workforce will automatically validate and | ||
| generate additional targeted subtasks to fill gaps. This | ||
| parameter limits the refinement loop to prevent excessive | ||
| iterations. Set to 0 to disable refinement validation. | ||
| (default: :obj:`2`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be better not to set default max value? since if still have duplicated content that means the task hasn't been success, we should continue refine it
we also need to add this argument to def clone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The agent might get stuck in a refinement loop without a max limit, so keeping this parameter helps control and reduce the overall task completion cost
hesamsheikh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the PR @Saedbhati
I added a few comments.
| def _validate_aggregated_result( | ||
| self, task: Task, aggregated_result: str | ||
| ) -> ValidationResult: | ||
| r"""Validate aggregated results from parallel subtasks. | ||
| This method uses the Task Planner Agent to: | ||
| 1. Deduplicate content across parallel subtask results | ||
| 2. Verify requirements are met (e.g., "5 unique papers") | ||
| 3. Provide guidance for additional tasks if needed | ||
| Args: | ||
| task (Task): The parent task containing subtasks | ||
| aggregated_result (str): The concatenated results from all subtasks | ||
| Returns: | ||
| ValidationResult: Validation result with deduplication and | ||
| requirement checking | ||
| """ | ||
| num_subtasks = len(task.subtasks) if task.subtasks else 0 | ||
|
|
||
| subtask_ids = ( | ||
| [sub.id for sub in task.subtasks] if task.subtasks else [] | ||
| ) | ||
| subtask_ids_str = ", ".join(subtask_ids) if subtask_ids else "None" | ||
|
|
||
| validation_prompt = str( | ||
| VALIDATION_PROMPT.format( | ||
| task_id=task.id, | ||
| task_content=task.content, | ||
| num_subtasks=num_subtasks, | ||
| subtask_ids=subtask_ids_str, | ||
| aggregated_result=aggregated_result, | ||
| response_format=VALIDATION_RESPONSE_FORMAT, | ||
| ) | ||
| ) | ||
|
|
||
| # Fallback values if parsing fails - fail-safe approach | ||
| fallback_values = { | ||
| "requirements_met": False, | ||
| "unique_count": 0, | ||
| "duplicate_count": 0, | ||
| "missing_count": num_subtasks, | ||
| "deduplicated_result": aggregated_result, | ||
| "reasoning": ( | ||
| "Validation failed - could not parse response. " | ||
| "Failing safe to prevent accepting potentially " | ||
| "invalid results." | ||
| ), | ||
| "additional_task_guidance": None, | ||
| "duplicate_subtask_ids": None, | ||
| } | ||
|
|
||
| examples = [ | ||
| { | ||
| "requirements_met": True, | ||
| "unique_count": 5, | ||
| "duplicate_count": 2, | ||
| "missing_count": 0, | ||
| "deduplicated_result": "Deduplicated content here...", | ||
| "reasoning": ( | ||
| "Found 5 unique papers after removing 2 duplicates. " | ||
| "Requirements fully met." | ||
| ), | ||
| "additional_task_guidance": None, | ||
| "duplicate_subtask_ids": ["task_1.3", "task_1.5"], | ||
| }, | ||
| { | ||
| "requirements_met": False, | ||
| "unique_count": 3, | ||
| "duplicate_count": 4, | ||
| "missing_count": 2, | ||
| "deduplicated_result": "3 unique papers found...", | ||
| "reasoning": ( | ||
| "Only 3 unique papers found after deduplication. " | ||
| "Need 2 more to meet requirement of 5." | ||
| ), | ||
| "additional_task_guidance": ( | ||
| "Find ONE unique research paper on the topic, avoiding: " | ||
| "Paper A, Paper B, Paper C. To ensure diversity across " | ||
| "parallel retries, consider exploring different " | ||
| "publication years, research methodologies, or " | ||
| "application domains." | ||
| ), | ||
| "duplicate_subtask_ids": [ | ||
| "task_1.2", | ||
| "task_1.4", | ||
| "task_1.5", | ||
| "task_1.6", | ||
| ], | ||
| }, | ||
| ] | ||
|
|
||
| try: | ||
| if self.use_structured_output_handler: | ||
| enhanced_prompt = ( | ||
| self.structured_handler.generate_structured_prompt( | ||
| base_prompt=validation_prompt, | ||
| schema=ValidationResult, | ||
| examples=examples, | ||
| ) | ||
| ) | ||
| response = self.task_agent.step(enhanced_prompt) | ||
|
|
||
| result = self.structured_handler.parse_structured_response( | ||
| response.msg.content if response.msg else "", | ||
| schema=ValidationResult, | ||
| fallback_values=fallback_values, | ||
| ) | ||
|
|
||
| if isinstance(result, ValidationResult): | ||
| return result | ||
| elif isinstance(result, dict): | ||
| return ValidationResult.model_validate(result) | ||
| else: | ||
| return ValidationResult.model_validate(fallback_values) | ||
| else: | ||
| response = self.task_agent.step( | ||
| validation_prompt, response_format=ValidationResult | ||
| ) | ||
| return response.msg.parsed | ||
|
|
||
| except Exception as e: | ||
| logger.warning( | ||
| f"Error during validation for task {task.id}: {e}, " | ||
| f"using fallback" | ||
| ) | ||
| return ValidationResult.model_validate(fallback_values) | ||
|
|
||
| def _build_task_content_with_refinement( | ||
| self, | ||
| base_content: str, | ||
| refinement_guidance: str, | ||
| refinement_iteration: int, | ||
| ) -> str: | ||
| r"""Build task content with refinement guidance. | ||
| Args: | ||
| base_content (str): The base task content | ||
| refinement_guidance (str): The refinement guidance from validation | ||
| refinement_iteration (int): The current refinement iteration number | ||
| Returns: | ||
| str: The complete task content including refinement guidance | ||
| """ | ||
| return ( | ||
| f"{base_content}\n\n" | ||
| f"IMPORTANT - REFINEMENT ITERATION {refinement_iteration}:\n" | ||
| f"{refinement_guidance}\n\n" | ||
| f"Your previous result was a duplicate. You MUST find a " | ||
| f"completely DIFFERENT and UNIQUE item this time." | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any possibility to put these methods in the utils.py?
| batch_result = await self._find_assignee([task]) | ||
| assignment = batch_result.assignments[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few comments here are removed, which were kind of helpful.
|
Hey @Saedbhati |
waleedalzarooni
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice PR @Saedbhati, just a couple of comments
| task = Task( | ||
| content=( | ||
| "Find 5 unique research papers on NLP systems. " | ||
| "For each paper, provide: title, authors, year, and a brief " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this line may be incomplete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should do the job! Do you have any suggestions on how I can improve the task prompt?
| ) | ||
| iteration_num = refinement_iteration + 1 | ||
| subtask.content = ( | ||
| f"{subtask.additional_info['base_content'],}\n\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just checking whether the comma after ['base_content'] is on purpose as it will create a tuple formatting ('content',) instead of just 'content'
Co-authored-by: Waleed Alzarooni <[email protected]>
Description
Describe your changes in detail (optional if the linked issue already contains a detailed description of the changes).
fixes #3332
Checklist
Go over all the following points, and put an
xin all the boxes that apply.Fixes #issue-numberin the PR description (required)pyproject.tomlanduv lockIf you are unsure about any of these, don't hesitate to ask. We are here to help!