Skip to content

How to control concurrency/rate limits with @experiment? #2457

@a5512167086

Description

@a5512167086

[ ] I checked the documentation and related resources and couldn't find an answer to my question.

Your Question
I am using the new @experiment decorator flow to evaluate a dataset using Azure OpenAI. However, I am hitting HTTP/1.1 429 Too Many Requests errors almost immediately.

2025-11-28 09:02:02,470 - HTTP Request: POST https://{MY_ENDPOINT}/openai/v1/chat/completions "HTTP/1.1 429 Too Many Requests"

It seems evaluate_rag.arun processes the dataset with high parallelism, exceeding the TPM/RPM limits of my Azure deployment.

I checked the documentation: I see that there is a RunConfig object (for max_workers, timeout, etc.), but the documentation implies this is for the legacy evaluate() function.

My Question: How can I control concurrency or rate limits when using the new experiment.arun() workflow? Is there a way to pass a config similar to RunConfig, or a parameter to limit the number of parallel async calls?

Code Examples

import asyncio
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict

import pandas as pd
from dotenv import load_dotenv
from openai import AsyncOpenAI
from ragas import Dataset, experiment
from ragas.embeddings.base import embedding_factory
from ragas.llms import llm_factory
from ragas.metrics.collections import (
    AnswerCorrectness,
    AnswerRelevancy,
    ContextPrecision,
    ContextRecall,
    Faithfulness,
)

from app.configs.config import (
    AZURE_OPENAI_API_KEY,
    AZURE_OPENAI_ENDPOINT,
    EMBEDDING_MODEL,
)

load_dotenv()

# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
logger = logging.getLogger(__name__)


def create_ragas_dataset(dataset_path: Path) -> Dataset:
    """Create a Ragas Dataset from the downloaded JSON file."""
    dataset = Dataset(name="wke_faq_testset", backend="local/jsonl", root_dir=".")

    df = pd.read_json(dataset_path)

    for _, row in df.iterrows():
        dataset.append({
            "question": row["question"],
            "contexts": row["contexts"],
            "answer": row["answer"],
            "ground_truth": row["ground_truth"],
        })

    logger.info(f"Created Ragas dataset with {len(df)} samples")
    return dataset


@experiment()
async def evaluate_rag(row: Dict[str, Any], llm, embeddings) -> Dict[str, Any]:
    question = row["question"]
    answer = row["answer"]
    contexts = row["contexts"]
    ground_truth = row["ground_truth"]
    answer_relevancy_scorer = AnswerRelevancy(llm=llm, embeddings=embeddings)
    answer_correctness_scorer = AnswerCorrectness(llm=llm, embeddings=embeddings)
    contexts_precision_scorer = ContextPrecision(llm=llm)
    context_recall_scorer = ContextRecall(llm=llm)
    faithfulness_scorer = Faithfulness(llm=llm)
    faithfulness = await faithfulness_scorer.ascore(
        user_input=question, response=answer, retrieved_contexts=contexts
    )
    contexts_precision = await contexts_precision_scorer.ascore(
        user_input=question, reference=answer, retrieved_contexts=contexts
    )
    context_recall = await context_recall_scorer.ascore(
        user_input=question, reference=answer, retrieved_contexts=contexts
    )
    answer_relevancy = await answer_relevancy_scorer.ascore(
        user_input=question, response=answer
    )
    answer_correctness = await answer_correctness_scorer.ascore(
        user_input=question, response=answer, reference=ground_truth
    )

    # Return evaluation results
    result = {
        **row,
        "faithfulness": faithfulness.value,
        "context_precision": contexts_precision.value,
        "context_recall": context_recall.value,
        "answer_relevancy": answer_relevancy.value,
        "answer_correctness": answer_correctness.value,
    }

    return result


async def run_experiment():
    # Prepare dataset and initialize system
    logger.info("Initializing RAG system...")
    dataset = create_ragas_dataset(
        "./data/eval_dataset/ragas_eval_gpt_5_mini_wke_opt.json"
    )

    endpoint = f"{AZURE_OPENAI_ENDPOINT.rstrip('/')}/openai/v1"
    client = AsyncOpenAI(base_url=endpoint, api_key=AZURE_OPENAI_API_KEY)
    experiment_results = await evaluate_rag.arun(
        dataset,
        name=f"ragas_experiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
        llm=llm_factory(
            "gpt-5-mini",
            client=client,
            temperature=1,
            top_p=1,
            reasoning_effort="low",
        ),
        embeddings=embedding_factory(provider="huggingface", model=EMBEDDING_MODEL),
    )

    return experiment_results


if __name__ == "__main__":
    asyncio.run(run_experiment())

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions