-
Notifications
You must be signed in to change notification settings - Fork 822
Description
What problem do you want to solve?
Adding SQL Alchemy engines one at a time using this pattern - does not seem to add the second engine to instrumentation (ie, calling SQLAlchemyInstrumentor().instrument(engine=engine_1, ...) multiple times)
instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py
def test_instrument_two_engines(self):
engine_1 = create_engine("sqlite:///:memory:")
engine_2 = create_engine("sqlite:///:memory:")
SQLAlchemyInstrumentor().instrument(
engine=engine_1,
tracer_provider=self.tracer_provider,
)
cnx_1 = engine_1.connect()
cnx_1.execute("SELECT 1 + 1;").fetchall()
SQLAlchemyInstrumentor().instrument(
engine=engine_2,
tracer_provider=self.tracer_provider,
)
cnx_2 = engine_2.connect()
cnx_2.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
Describe the solution you'd like
I'm lazy initializing multiple SQL Alchemy engines in my app to multiple DBs - I'd like to be able to directly call SQLAlchemyInstrumentor().instrument(... with different engines multiple times and have it work.
Describe alternatives you've considered
I have a place holder/naive/not-code reviewed instrumentation wrapper for now that seems to allow multiple engines:
"""SQLAlchemy OpenTelemetry instrumentation for multiple async engines."""
import logging
import weakref
from typing import Any, ClassVar
import structlog
from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy import ( # type: ignore[import-untyped]
SQLAlchemyInstrumentor,
)
from opentelemetry.instrumentation.sqlalchemy.engine import ( # type: ignore[import-untyped]
EngineTracer,
)
from opentelemetry.semconv._incubating.attributes import ( # type: ignore[import-untyped]
db_attributes,
)
from sqlalchemy import Engine
from sqlalchemy.ext.asyncio import AsyncEngine
from plu.pcs.apps.services.web.backend.observability.logger import get_logger
# Stdlib logger for library-level class (consistent with BaseInstrumentor)
_LOG = logging.getLogger(__name__)
# Attribute name for db.name in logs (matches OpenTelemetry semantic conventions)
DB_NAME_LOG_ATTR = "db.name"
class DatabaseNameEngineTracer(EngineTracer): # type: ignore[misc]
"""EngineTracer that adds database name to all spans.
The base EngineTracer extracts db.name from URL.database, but ODBC connection
strings (mssql+aioodbc:///?odbc_connect=...) don't expose the database in the
URL object. This subclass stores the database name and adds it to all spans.
Also handles connections_usage=None by overriding pool event methods to be no-ops.
"""
def __init__(
self,
tracer: Any,
engine: Any,
connections_usage: Any,
database_name: str,
enable_commenter: bool = False,
commenter_options: dict[str, Any] | None = None,
enable_attribute_commenter: bool = False,
) -> None:
"""Initialize tracer with database name.
Args:
tracer: OpenTelemetry tracer instance.
engine: SQLAlchemy engine to trace.
connections_usage: Connections usage metric (can be None to skip metrics).
database_name: Database name to add to spans.
enable_commenter: Enable SQL commenter.
commenter_options: Commenter options.
enable_attribute_commenter: Include commenter in span attributes.
"""
super().__init__( # pyright: ignore[reportUnknownMemberType]
tracer,
engine,
connections_usage,
enable_commenter=enable_commenter,
commenter_options=commenter_options,
enable_attribute_commenter=enable_attribute_commenter,
)
self._database_name = database_name
def _add_idle_to_connection_usage(self, value: int) -> None:
"""Override to handle None connections_usage."""
if self.connections_usage is not None:
super()._add_idle_to_connection_usage(value) # pyright: ignore[reportUnknownMemberType]
def _add_used_to_connection_usage(self, value: int) -> None:
"""Override to handle None connections_usage."""
if self.connections_usage is not None:
super()._add_used_to_connection_usage(value) # pyright: ignore[reportUnknownMemberType]
def _before_cur_exec(
self,
conn: Any,
cursor: Any,
statement: Any,
params: Any,
context: Any,
executemany: Any,
) -> Any:
"""Hook before cursor execution - creates span with db.name attribute.
Calls base implementation then adds db.name to the span if set.
"""
result: tuple[Any, Any] = super()._before_cur_exec( # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
conn, cursor, statement, params, context, executemany
)
# Add database name to span if we have one and span exists
if self._database_name and hasattr(context, "_otel_span"):
span = context._otel_span
if span and span.is_recording():
span.set_attribute(db_attributes.DB_NAME, self._database_name)
# Bind database name to structlog context for logs during query execution
if self._database_name:
structlog.contextvars.bind_contextvars(**{DB_NAME_LOG_ATTR: self._database_name})
return result
def _after_cur_exec(
self,
conn: Any,
cursor: Any,
statement: Any,
params: Any,
context: Any,
executemany: Any,
) -> None:
"""Hook after cursor execution - cleans up context.
Calls base implementation then unbinds db.name from structlog context.
"""
super()._after_cur_exec( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue]
conn, cursor, statement, params, context, executemany
)
# Unbind database name from structlog context
if self._database_name:
structlog.contextvars.unbind_contextvars(DB_NAME_LOG_ATTR)
class PluSQLAlchemyInstrumentor(SQLAlchemyInstrumentor):
"""SQLAlchemy instrumentor with PLU-specific enhancements.
Enhancements over base SQLAlchemyInstrumentor:
- Supports instrumenting multiple engines (base uses singleton pattern)
- Adds db.name to all spans/logs (ODBC connection strings don't expose it)
"""
_instrumented_engines: ClassVar[weakref.WeakSet[Engine]] = weakref.WeakSet()
@classmethod
def reset_instrumented_engines(cls) -> None:
"""Clear tracked engines. Useful for testing."""
cls._instrumented_engines = weakref.WeakSet()
def instrument(
self,
*,
database_name: str = "",
engine: Engine | None = None,
**kwargs: Any,
) -> None:
"""Instrument an engine, allowing multiple engines.
Tracks engines via WeakSet to warn on duplicates. When engines are
garbage collected, they are automatically removed from tracking.
Args:
database_name: Name of database for spans/logging.
engine: SQLAlchemy engine to instrument.
**kwargs: Passed to base instrument().
"""
if engine is not None:
if engine in self._instrumented_engines:
_LOG.warning(
"Engine for %s is already instrumented, skipping",
database_name or engine,
)
return
self._instrumented_engines.add(engine)
# Reset flag so base.instrument() doesn't hit the guard
self._is_instrumented_by_opentelemetry = False
# If we have a database_name, use our custom tracer directly
if database_name and engine is not None:
self._instrument_with_database_name(engine, database_name, **kwargs)
else:
# Fallback to base for full logic
super().instrument(engine=engine, **kwargs)
def _instrument_with_database_name(
self,
engine: Engine,
database_name: str,
**kwargs: Any,
) -> None:
"""Instrument engine with custom tracer that includes database name.
Args:
engine: SQLAlchemy engine to instrument.
database_name: Database name to add to all spans.
**kwargs: Additional options (tracer_provider, commenter settings).
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = trace.get_tracer(
__name__,
tracer_provider=tracer_provider,
)
# Create our custom tracer with database name
DatabaseNameEngineTracer(
tracer,
engine,
connections_usage=None, # Skip metrics for now
database_name=database_name,
enable_commenter=kwargs.get("enable_commenter", False),
commenter_options=kwargs.get("commenter_options"),
enable_attribute_commenter=kwargs.get("enable_attribute_commenter", False),
)
def instrument_async_engine(engine: AsyncEngine, *, database_name: str = "") -> None:
"""Instrument an async SQLAlchemy engine for OpenTelemetry tracing.
Args:
engine: Async SQLAlchemy engine to instrument.
database_name: Optional name for logging.
"""
logger = get_logger(__name__)
logger.info(
"db.engine.instrumenting",
database=database_name,
message=f"Instrumenting SQLAlchemy engine for {database_name or 'unknown'}",
)
# Use sync_engine for async engines (required for event listener registration)
PluSQLAlchemyInstrumentor().instrument(
engine=engine.sync_engine,
database_name=database_name,
)
logger.info(
"db.engine.instrumented",
database=database_name,
message=f"SQLAlchemy engine instrumented for {database_name or 'unknown'}",
)
Additional Context
I'm probably using the instrumentation for SQL Alchemy wrong and missing something obvious - feel free to yell at me that I should have read the docs/source code more closely. I have a lot of dev experience, but not much with python/otel...
Would you like to implement a fix?
Yes
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.