Skip to content

SQLAlchemyInstrumentor - calling instrument for multiple engines only instruments the first engine #3977

@dfaivre-pcs

Description

@dfaivre-pcs

What problem do you want to solve?

Adding SQL Alchemy engines one at a time using this pattern - does not seem to add the second engine to instrumentation (ie, calling SQLAlchemyInstrumentor().instrument(engine=engine_1, ...) multiple times)

instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py

def test_instrument_two_engines(self):
        engine_1 = create_engine("sqlite:///:memory:")
        engine_2 = create_engine("sqlite:///:memory:")

        SQLAlchemyInstrumentor().instrument(
            engine=engine_1,
            tracer_provider=self.tracer_provider,
        )
        cnx_1 = engine_1.connect()
        cnx_1.execute("SELECT	1 + 1;").fetchall()

        SQLAlchemyInstrumentor().instrument(
            engine=engine_2,
            tracer_provider=self.tracer_provider,
        )
        cnx_2 = engine_2.connect()
        cnx_2.execute("SELECT	1 + 1;").fetchall()

        spans = self.memory_exporter.get_finished_spans()

        self.assertEqual(len(spans), 2)

Describe the solution you'd like

I'm lazy initializing multiple SQL Alchemy engines in my app to multiple DBs - I'd like to be able to directly call SQLAlchemyInstrumentor().instrument(... with different engines multiple times and have it work.

Describe alternatives you've considered

I have a place holder/naive/not-code reviewed instrumentation wrapper for now that seems to allow multiple engines:

"""SQLAlchemy OpenTelemetry instrumentation for multiple async engines."""

import logging
import weakref
from typing import Any, ClassVar

import structlog
from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy import (  # type: ignore[import-untyped]
    SQLAlchemyInstrumentor,
)
from opentelemetry.instrumentation.sqlalchemy.engine import (  # type: ignore[import-untyped]
    EngineTracer,
)
from opentelemetry.semconv._incubating.attributes import (  # type: ignore[import-untyped]
    db_attributes,
)
from sqlalchemy import Engine
from sqlalchemy.ext.asyncio import AsyncEngine

from plu.pcs.apps.services.web.backend.observability.logger import get_logger

# Stdlib logger for library-level class (consistent with BaseInstrumentor)
_LOG = logging.getLogger(__name__)

# Attribute name for db.name in logs (matches OpenTelemetry semantic conventions)
DB_NAME_LOG_ATTR = "db.name"


class DatabaseNameEngineTracer(EngineTracer):  # type: ignore[misc]
    """EngineTracer that adds database name to all spans.

    The base EngineTracer extracts db.name from URL.database, but ODBC connection
    strings (mssql+aioodbc:///?odbc_connect=...) don't expose the database in the
    URL object. This subclass stores the database name and adds it to all spans.

    Also handles connections_usage=None by overriding pool event methods to be no-ops.
    """

    def __init__(
        self,
        tracer: Any,
        engine: Any,
        connections_usage: Any,
        database_name: str,
        enable_commenter: bool = False,
        commenter_options: dict[str, Any] | None = None,
        enable_attribute_commenter: bool = False,
    ) -> None:
        """Initialize tracer with database name.

        Args:
            tracer: OpenTelemetry tracer instance.
            engine: SQLAlchemy engine to trace.
            connections_usage: Connections usage metric (can be None to skip metrics).
            database_name: Database name to add to spans.
            enable_commenter: Enable SQL commenter.
            commenter_options: Commenter options.
            enable_attribute_commenter: Include commenter in span attributes.

        """
        super().__init__(  # pyright: ignore[reportUnknownMemberType]
            tracer,
            engine,
            connections_usage,
            enable_commenter=enable_commenter,
            commenter_options=commenter_options,
            enable_attribute_commenter=enable_attribute_commenter,
        )
        self._database_name = database_name

    def _add_idle_to_connection_usage(self, value: int) -> None:
        """Override to handle None connections_usage."""
        if self.connections_usage is not None:
            super()._add_idle_to_connection_usage(value)  # pyright: ignore[reportUnknownMemberType]

    def _add_used_to_connection_usage(self, value: int) -> None:
        """Override to handle None connections_usage."""
        if self.connections_usage is not None:
            super()._add_used_to_connection_usage(value)  # pyright: ignore[reportUnknownMemberType]

    def _before_cur_exec(
        self,
        conn: Any,
        cursor: Any,
        statement: Any,
        params: Any,
        context: Any,
        executemany: Any,
    ) -> Any:
        """Hook before cursor execution - creates span with db.name attribute.

        Calls base implementation then adds db.name to the span if set.
        """
        result: tuple[Any, Any] = super()._before_cur_exec(  # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
            conn, cursor, statement, params, context, executemany
        )

        # Add database name to span if we have one and span exists
        if self._database_name and hasattr(context, "_otel_span"):
            span = context._otel_span
            if span and span.is_recording():
                span.set_attribute(db_attributes.DB_NAME, self._database_name)

        # Bind database name to structlog context for logs during query execution
        if self._database_name:
            structlog.contextvars.bind_contextvars(**{DB_NAME_LOG_ATTR: self._database_name})

        return result

    def _after_cur_exec(
        self,
        conn: Any,
        cursor: Any,
        statement: Any,
        params: Any,
        context: Any,
        executemany: Any,
    ) -> None:
        """Hook after cursor execution - cleans up context.

        Calls base implementation then unbinds db.name from structlog context.
        """
        super()._after_cur_exec(  # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue]
            conn, cursor, statement, params, context, executemany
        )

        # Unbind database name from structlog context
        if self._database_name:
            structlog.contextvars.unbind_contextvars(DB_NAME_LOG_ATTR)


class PluSQLAlchemyInstrumentor(SQLAlchemyInstrumentor):
    """SQLAlchemy instrumentor with PLU-specific enhancements.

    Enhancements over base SQLAlchemyInstrumentor:
    - Supports instrumenting multiple engines (base uses singleton pattern)
    - Adds db.name to all spans/logs (ODBC connection strings don't expose it)
    """

    _instrumented_engines: ClassVar[weakref.WeakSet[Engine]] = weakref.WeakSet()

    @classmethod
    def reset_instrumented_engines(cls) -> None:
        """Clear tracked engines. Useful for testing."""
        cls._instrumented_engines = weakref.WeakSet()

    def instrument(
        self,
        *,
        database_name: str = "",
        engine: Engine | None = None,
        **kwargs: Any,
    ) -> None:
        """Instrument an engine, allowing multiple engines.

        Tracks engines via WeakSet to warn on duplicates. When engines are
        garbage collected, they are automatically removed from tracking.

        Args:
            database_name: Name of database for spans/logging.
            engine: SQLAlchemy engine to instrument.
            **kwargs: Passed to base instrument().

        """
        if engine is not None:
            if engine in self._instrumented_engines:
                _LOG.warning(
                    "Engine for %s is already instrumented, skipping",
                    database_name or engine,
                )
                return
            self._instrumented_engines.add(engine)

        # Reset flag so base.instrument() doesn't hit the guard
        self._is_instrumented_by_opentelemetry = False

        # If we have a database_name, use our custom tracer directly
        if database_name and engine is not None:
            self._instrument_with_database_name(engine, database_name, **kwargs)
        else:
            # Fallback to base for full logic
            super().instrument(engine=engine, **kwargs)

    def _instrument_with_database_name(
        self,
        engine: Engine,
        database_name: str,
        **kwargs: Any,
    ) -> None:
        """Instrument engine with custom tracer that includes database name.

        Args:
            engine: SQLAlchemy engine to instrument.
            database_name: Database name to add to all spans.
            **kwargs: Additional options (tracer_provider, commenter settings).

        """
        tracer_provider = kwargs.get("tracer_provider")
        tracer = trace.get_tracer(
            __name__,
            tracer_provider=tracer_provider,
        )

        # Create our custom tracer with database name
        DatabaseNameEngineTracer(
            tracer,
            engine,
            connections_usage=None,  # Skip metrics for now
            database_name=database_name,
            enable_commenter=kwargs.get("enable_commenter", False),
            commenter_options=kwargs.get("commenter_options"),
            enable_attribute_commenter=kwargs.get("enable_attribute_commenter", False),
        )


def instrument_async_engine(engine: AsyncEngine, *, database_name: str = "") -> None:
    """Instrument an async SQLAlchemy engine for OpenTelemetry tracing.

    Args:
        engine: Async SQLAlchemy engine to instrument.
        database_name: Optional name for logging.
    """
    logger = get_logger(__name__)

    logger.info(
        "db.engine.instrumenting",
        database=database_name,
        message=f"Instrumenting SQLAlchemy engine for {database_name or 'unknown'}",
    )

    # Use sync_engine for async engines (required for event listener registration)
    PluSQLAlchemyInstrumentor().instrument(
        engine=engine.sync_engine,
        database_name=database_name,
    )

    logger.info(
        "db.engine.instrumented",
        database=database_name,
        message=f"SQLAlchemy engine instrumented for {database_name or 'unknown'}",
    )

Additional Context

I'm probably using the instrumentation for SQL Alchemy wrong and missing something obvious - feel free to yell at me that I should have read the docs/source code more closely. I have a lot of dev experience, but not much with python/otel...

Would you like to implement a fix?

Yes

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions