-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
[ ] I checked the documentation and related resources and couldn't find an answer to my question.
Your Question
I am using the new @experiment decorator flow to evaluate a dataset using Azure OpenAI. However, I am hitting HTTP/1.1 429 Too Many Requests errors almost immediately.
2025-11-28 09:02:02,470 - HTTP Request: POST https://{MY_ENDPOINT}/openai/v1/chat/completions "HTTP/1.1 429 Too Many Requests"
It seems evaluate_rag.arun processes the dataset with high parallelism, exceeding the TPM/RPM limits of my Azure deployment.
I checked the documentation: I see that there is a RunConfig object (for max_workers, timeout, etc.), but the documentation implies this is for the legacy evaluate() function.
My Question: How can I control concurrency or rate limits when using the new experiment.arun() workflow? Is there a way to pass a config similar to RunConfig, or a parameter to limit the number of parallel async calls?
Code Examples
import asyncio
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
import pandas as pd
from dotenv import load_dotenv
from openai import AsyncOpenAI
from ragas import Dataset, experiment
from ragas.embeddings.base import embedding_factory
from ragas.llms import llm_factory
from ragas.metrics.collections import (
AnswerCorrectness,
AnswerRelevancy,
ContextPrecision,
ContextRecall,
Faithfulness,
)
from app.configs.config import (
AZURE_OPENAI_API_KEY,
AZURE_OPENAI_ENDPOINT,
EMBEDDING_MODEL,
)
load_dotenv()
# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
logger = logging.getLogger(__name__)
def create_ragas_dataset(dataset_path: Path) -> Dataset:
"""Create a Ragas Dataset from the downloaded JSON file."""
dataset = Dataset(name="wke_faq_testset", backend="local/jsonl", root_dir=".")
df = pd.read_json(dataset_path)
for _, row in df.iterrows():
dataset.append({
"question": row["question"],
"contexts": row["contexts"],
"answer": row["answer"],
"ground_truth": row["ground_truth"],
})
logger.info(f"Created Ragas dataset with {len(df)} samples")
return dataset
@experiment()
async def evaluate_rag(row: Dict[str, Any], llm, embeddings) -> Dict[str, Any]:
question = row["question"]
answer = row["answer"]
contexts = row["contexts"]
ground_truth = row["ground_truth"]
answer_relevancy_scorer = AnswerRelevancy(llm=llm, embeddings=embeddings)
answer_correctness_scorer = AnswerCorrectness(llm=llm, embeddings=embeddings)
contexts_precision_scorer = ContextPrecision(llm=llm)
context_recall_scorer = ContextRecall(llm=llm)
faithfulness_scorer = Faithfulness(llm=llm)
faithfulness = await faithfulness_scorer.ascore(
user_input=question, response=answer, retrieved_contexts=contexts
)
contexts_precision = await contexts_precision_scorer.ascore(
user_input=question, reference=answer, retrieved_contexts=contexts
)
context_recall = await context_recall_scorer.ascore(
user_input=question, reference=answer, retrieved_contexts=contexts
)
answer_relevancy = await answer_relevancy_scorer.ascore(
user_input=question, response=answer
)
answer_correctness = await answer_correctness_scorer.ascore(
user_input=question, response=answer, reference=ground_truth
)
# Return evaluation results
result = {
**row,
"faithfulness": faithfulness.value,
"context_precision": contexts_precision.value,
"context_recall": context_recall.value,
"answer_relevancy": answer_relevancy.value,
"answer_correctness": answer_correctness.value,
}
return result
async def run_experiment():
# Prepare dataset and initialize system
logger.info("Initializing RAG system...")
dataset = create_ragas_dataset(
"./data/eval_dataset/ragas_eval_gpt_5_mini_wke_opt.json"
)
endpoint = f"{AZURE_OPENAI_ENDPOINT.rstrip('/')}/openai/v1"
client = AsyncOpenAI(base_url=endpoint, api_key=AZURE_OPENAI_API_KEY)
experiment_results = await evaluate_rag.arun(
dataset,
name=f"ragas_experiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
llm=llm_factory(
"gpt-5-mini",
client=client,
temperature=1,
top_p=1,
reasoning_effort="low",
),
embeddings=embedding_factory(provider="huggingface", model=EMBEDDING_MODEL),
)
return experiment_results
if __name__ == "__main__":
asyncio.run(run_experiment())