Skip to content

Commit 177b4ef

Browse files
Add TRACE logging level for verbose payload logging across event writers (#61)
* Add TRACE logging level for verbose payload logging across event writers
1 parent f671bbf commit 177b4ef

File tree

9 files changed

+529
-23
lines changed

9 files changed

+529
-23
lines changed

src/event_gate_lambda.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,24 @@
2222
import sys
2323
from typing import Any, Dict
2424

25-
import urllib3
26-
2725
import boto3
2826
import jwt
2927
import requests
28+
import urllib3
29+
from cryptography.exceptions import UnsupportedAlgorithm
3030
from cryptography.hazmat.primitives import serialization
3131
from jsonschema import validate
3232
from jsonschema.exceptions import ValidationError
3333

34-
# Added explicit import for serialization-related exceptions
35-
try: # pragma: no cover - import guard
36-
from cryptography.exceptions import UnsupportedAlgorithm # type: ignore
37-
except Exception: # pragma: no cover - very defensive
38-
UnsupportedAlgorithm = Exception # type: ignore
39-
4034
# Import writer modules with explicit ImportError fallback
4135
try:
42-
from . import writer_eventbridge, writer_kafka, writer_postgres
36+
from . import writer_eventbridge
37+
from . import writer_kafka
38+
from . import writer_postgres
4339
except ImportError: # fallback when executed outside package context
44-
import writer_eventbridge, writer_kafka, writer_postgres # type: ignore[no-redef]
40+
import writer_eventbridge # type: ignore[no-redef]
41+
import writer_kafka # type: ignore[no-redef]
42+
import writer_postgres # type: ignore[no-redef]
4543

4644
# Import configuration directory symbols with explicit ImportError fallback
4745
try:

src/logging_levels.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""Custom logging levels.
2+
3+
Adds a TRACE level below DEBUG for very verbose payload logging.
4+
"""
5+
6+
from __future__ import annotations
7+
import logging
8+
9+
TRACE_LEVEL = 5
10+
11+
# Register level name only once (idempotent)
12+
if not hasattr(logging, "TRACE"):
13+
logging.addLevelName(TRACE_LEVEL, "TRACE")
14+
15+
def trace(self: logging.Logger, message: str, *args, **kws): # type: ignore[override]
16+
"""Log a message with TRACE level.
17+
18+
Args:
19+
self: Logger instance.
20+
message: Log message format string.
21+
*args: Positional arguments for message formatting.
22+
**kws: Keyword arguments passed to _log.
23+
"""
24+
if self.isEnabledFor(TRACE_LEVEL):
25+
self._log(TRACE_LEVEL, message, args, **kws) # pylint: disable=protected-access
26+
27+
logging.Logger.trace = trace # type: ignore[attr-defined]
28+
29+
__all__ = ["TRACE_LEVEL"]

src/safe_serialization.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#
2+
# Copyright 2025 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
"""Safe serialization utilities for logging.
18+
19+
Provides PII-safe, size-bounded JSON serialization for TRACE logging.
20+
"""
21+
22+
import json
23+
import os
24+
from typing import Any, List, Set
25+
26+
27+
def _redact_sensitive_keys(obj: Any, redact_keys: Set[str]) -> Any:
28+
"""Recursively redact sensitive keys from nested structures.
29+
30+
Args:
31+
obj: Object to redact (dict, list, or scalar).
32+
redact_keys: Set of key names to redact (case-insensitive).
33+
34+
Returns:
35+
Copy of obj with sensitive values replaced by "***REDACTED***".
36+
"""
37+
if isinstance(obj, dict):
38+
return {
39+
k: "***REDACTED***" if k.lower() in redact_keys else _redact_sensitive_keys(v, redact_keys)
40+
for k, v in obj.items()
41+
}
42+
if isinstance(obj, list):
43+
return [_redact_sensitive_keys(item, redact_keys) for item in obj]
44+
return obj
45+
46+
47+
def safe_serialize_for_log(message: Any, redact_keys: List[str] | None = None, max_bytes: int | None = None) -> str:
48+
"""Safely serialize a message for logging with redaction and size capping.
49+
50+
Args:
51+
message: Object to serialize (typically a dict).
52+
redact_keys: List of key names to redact (case-insensitive). If None, uses env TRACE_REDACT_KEYS.
53+
max_bytes: Maximum serialized output size in bytes. If None, uses env TRACE_MAX_BYTES (default 10000).
54+
55+
Returns:
56+
JSON string (redacted and truncated if needed), or empty string on serialization error.
57+
"""
58+
# Apply configuration defaults
59+
if redact_keys is None:
60+
redact_keys_str = os.environ.get("TRACE_REDACT_KEYS", "password,secret,token,key,apikey,api_key")
61+
redact_keys = [k.strip() for k in redact_keys_str.split(",") if k.strip()]
62+
if max_bytes is None:
63+
max_bytes = int(os.environ.get("TRACE_MAX_BYTES", "10000"))
64+
65+
# Normalize to case-insensitive set
66+
redact_set = {k.lower() for k in redact_keys}
67+
68+
try:
69+
# Redact sensitive keys
70+
redacted = _redact_sensitive_keys(message, redact_set)
71+
# Serialize with minimal whitespace
72+
serialized = json.dumps(redacted, separators=(",", ":"))
73+
# Truncate if needed
74+
if len(serialized.encode("utf-8")) > max_bytes:
75+
# Binary truncate to max_bytes and append marker
76+
truncated_bytes = serialized.encode("utf-8")[:max_bytes]
77+
# Ensure we don't break mid-multibyte character
78+
try:
79+
return truncated_bytes.decode("utf-8", errors="ignore") + "..."
80+
except UnicodeDecodeError: # pragma: no cover - defensive
81+
return ""
82+
return serialized
83+
except (TypeError, ValueError, OverflowError): # pragma: no cover - catch serialization errors
84+
return ""
85+
86+
87+
__all__ = ["safe_serialize_for_log"]

src/trace_logging.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#
2+
# Copyright 2025 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
"""Trace-level logging utilities.
18+
19+
Provides reusable TRACE-level payload logging for writer modules.
20+
"""
21+
22+
import logging
23+
from typing import Any, Dict
24+
25+
from .logging_levels import TRACE_LEVEL
26+
from .safe_serialization import safe_serialize_for_log
27+
28+
29+
def log_payload_at_trace(logger: logging.Logger, writer_name: str, topic_name: str, message: Dict[str, Any]) -> None:
30+
"""Log message payload at TRACE level with safe serialization.
31+
32+
Args:
33+
logger: Logger instance to use for logging.
34+
writer_name: Name of the writer (e.g., "EventBridge", "Kafka", "Postgres").
35+
topic_name: Topic name being written to.
36+
message: Message payload to log.
37+
"""
38+
if not logger.isEnabledFor(TRACE_LEVEL):
39+
return
40+
41+
try:
42+
safe_payload = safe_serialize_for_log(message)
43+
if safe_payload:
44+
logger.trace( # type: ignore[attr-defined]
45+
"%s payload topic=%s payload=%s", writer_name, topic_name, safe_payload
46+
)
47+
except (TypeError, ValueError): # pragma: no cover - defensive serialization guard
48+
logger.trace("%s payload topic=%s <unserializable>", writer_name, topic_name) # type: ignore[attr-defined]
49+
50+
51+
__all__ = ["log_payload_at_trace"]

src/writer_eventbridge.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import boto3
2727
from botocore.exceptions import BotoCoreError, ClientError
2828

29+
from .trace_logging import log_payload_at_trace
30+
2931
STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None}
3032

3133

@@ -68,6 +70,8 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
6870
logger.debug("EventBridge client not initialized - skipping")
6971
return True, None
7072

73+
log_payload_at_trace(logger, "EventBridge", topic_name, message)
74+
7175
try:
7276
logger.debug("Sending to eventBridge %s", topic_name)
7377
response = client.put_events(

src/writer_kafka.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
from typing import Any, Dict, Optional, Tuple
2727
from confluent_kafka import Producer
2828

29+
from .trace_logging import log_payload_at_trace
30+
2931
try: # KafkaException may not exist in stubbed test module
3032
from confluent_kafka import KafkaException # type: ignore
3133
except (ImportError, ModuleNotFoundError): # pragma: no cover - fallback for test stub
@@ -92,6 +94,8 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
9294
logger.debug("Kafka producer not initialized - skipping")
9395
return True, None
9496

97+
log_payload_at_trace(logger, "Kafka", topic_name, message)
98+
9599
errors: list[str] = []
96100
has_exception = False
97101

src/writer_postgres.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
except ImportError: # pragma: no cover - environment without psycopg2
3232
psycopg2 = None # type: ignore
3333

34+
from .trace_logging import log_payload_at_trace
35+
3436
# Define a unified psycopg2 error base for safe exception handling even if psycopg2 missing
3537
if psycopg2 is not None: # type: ignore
3638
try: # pragma: no cover - attribute presence depends on installed psycopg2 variant
@@ -47,20 +49,20 @@ class PsycopgError(Exception): # type: ignore
4749

4850

4951
# Module level globals for typing
50-
_logger: logging.Logger = logging.getLogger(__name__)
52+
logger: logging.Logger = logging.getLogger(__name__)
5153
POSTGRES: Dict[str, Any] = {"database": ""}
5254

5355

54-
def init(logger: logging.Logger) -> None:
56+
def init(logger_instance: logging.Logger) -> None:
5557
"""Initialize Postgres credentials either from AWS Secrets Manager or fallback empty config.
5658
5759
Args:
58-
logger: Shared application logger.
60+
logger_instance: Shared application logger.
5961
"""
60-
global _logger # pylint: disable=global-statement
62+
global logger # pylint: disable=global-statement
6163
global POSTGRES # pylint: disable=global-statement
6264

63-
_logger = logger
65+
logger = logger_instance
6466

6567
secret_name = os.environ.get("POSTGRES_SECRET_NAME", "")
6668
secret_region = os.environ.get("POSTGRES_SECRET_REGION", "")
@@ -72,7 +74,7 @@ def init(logger: logging.Logger) -> None:
7274
else:
7375
POSTGRES = {"database": ""}
7476

75-
_logger.debug("Initialized POSTGRES writer")
77+
logger.debug("Initialized POSTGRES writer")
7678

7779

7880
def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
@@ -83,7 +85,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
8385
table: Target table name.
8486
message: Event payload.
8587
"""
86-
_logger.debug("Sending to Postgres - %s", table)
88+
logger.debug("Sending to Postgres - %s", table)
8789
cursor.execute(
8890
f"""
8991
INSERT INTO {table}
@@ -142,7 +144,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
142144
table_jobs: Jobs table name.
143145
message: Event payload (includes jobs array).
144146
"""
145-
_logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs)
147+
logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs)
146148
cursor.execute(
147149
f"""
148150
INSERT INTO {table_runs}
@@ -222,7 +224,7 @@ def postgres_test_write(cursor, table: str, message: Dict[str, Any]) -> None:
222224
table: Target table name.
223225
message: Event payload.
224226
"""
225-
_logger.debug("Sending to Postgres - %s", table)
227+
logger.debug("Sending to Postgres - %s", table)
226228
cursor.execute(
227229
f"""
228230
INSERT INTO {table}
@@ -265,12 +267,14 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
265267
"""
266268
try:
267269
if not POSTGRES.get("database"):
268-
_logger.debug("No Postgres - skipping")
270+
logger.debug("No Postgres - skipping")
269271
return True, None
270272
if psycopg2 is None: # type: ignore
271-
_logger.debug("psycopg2 not available - skipping actual Postgres write")
273+
logger.debug("psycopg2 not available - skipping actual Postgres write")
272274
return True, None
273275

276+
log_payload_at_trace(logger, "Postgres", topic_name, message)
277+
274278
with psycopg2.connect( # type: ignore[attr-defined]
275279
database=POSTGRES["database"],
276280
host=POSTGRES["host"],
@@ -287,13 +291,13 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
287291
postgres_test_write(cursor, "public_cps_za_test", message)
288292
else:
289293
msg = f"unknown topic for postgres {topic_name}"
290-
_logger.error(msg)
294+
logger.error(msg)
291295
return False, msg
292296

293297
connection.commit() # type: ignore
294298
except (RuntimeError, PsycopgError) as e: # narrowed exception set
295299
err_msg = f"The Postgres writer with failed unknown error: {str(e)}"
296-
_logger.error(err_msg)
300+
logger.exception(err_msg)
297301
return False, err_msg
298302

299303
return True, None

0 commit comments

Comments
 (0)