|
1 | | -# |
2 | | -# Copyright 2025 ABSA Group Limited |
3 | | -# |
4 | | -# Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | -# you may not use this file except in compliance with the License. |
6 | | -# You may obtain a copy of the License at |
7 | | -# |
8 | | -# http://www.apache.org/licenses/LICENSE-2.0 |
9 | | -# |
10 | | -# Unless required by applicable law or agreed to in writing, software |
11 | | -# distributed under the License is distributed on an "AS IS" BASIS, |
12 | | -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | -# See the License for the specific language governing permissions and |
14 | | -# limitations under the License. |
15 | | -# |
16 | | - |
17 | | -"""EventBridge writer module. |
18 | | -
|
19 | | -Provides initialization and write functionality for publishing events to AWS EventBridge. |
20 | | -""" |
21 | | - |
22 | | -import json |
23 | | -import logging |
24 | | -from typing import Any, Dict, Optional, Tuple, List |
25 | | - |
26 | | -import boto3 |
27 | | -from botocore.exceptions import BotoCoreError, ClientError |
28 | | - |
29 | | -from .trace_logging import log_payload_at_trace |
30 | | - |
31 | | -STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None} |
32 | | - |
33 | | - |
34 | | -def init(logger: logging.Logger, config: Dict[str, Any]) -> None: |
35 | | - """Initialize the EventBridge writer. |
36 | | -
|
37 | | - Args: |
38 | | - logger: Shared application logger. |
39 | | - config: Configuration dictionary (expects optional 'event_bus_arn'). |
40 | | - """ |
41 | | - STATE["logger"] = logger |
42 | | - STATE["client"] = boto3.client("events") |
43 | | - STATE["event_bus_arn"] = config.get("event_bus_arn", "") |
44 | | - STATE["logger"].debug("Initialized EVENTBRIDGE writer") |
45 | | - |
46 | | - |
47 | | -def _format_failed_entries(entries: List[Dict[str, Any]]) -> str: |
48 | | - failed = [e for e in entries if "ErrorCode" in e or "ErrorMessage" in e] |
49 | | - # Keep message concise but informative |
50 | | - return json.dumps(failed) if failed else "[]" |
51 | | - |
52 | | - |
53 | | -def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: |
54 | | - """Publish a message to EventBridge. |
55 | | -
|
56 | | - Args: |
57 | | - topic_name: Source topic name used as event Source. |
58 | | - message: JSON-serializable payload. |
59 | | - Returns: |
60 | | - Tuple of success flag and optional error message. |
61 | | - """ |
62 | | - logger = STATE["logger"] |
63 | | - event_bus_arn = STATE["event_bus_arn"] |
64 | | - client = STATE["client"] |
65 | | - |
66 | | - if not event_bus_arn: |
67 | | - logger.debug("No EventBus Arn - skipping") |
68 | | - return True, None |
69 | | - if client is None: # defensive |
70 | | - logger.debug("EventBridge client not initialized - skipping") |
71 | | - return True, None |
72 | | - |
73 | | - log_payload_at_trace(logger, "EventBridge", topic_name, message) |
74 | | - |
75 | | - try: |
76 | | - logger.debug("Sending to eventBridge %s", topic_name) |
77 | | - response = client.put_events( |
78 | | - Entries=[ |
79 | | - { |
80 | | - "Source": topic_name, |
81 | | - "DetailType": "JSON", |
82 | | - "Detail": json.dumps(message), |
83 | | - "EventBusName": event_bus_arn, |
84 | | - } |
85 | | - ] |
86 | | - ) |
87 | | - failed_count = response.get("FailedEntryCount", 0) |
88 | | - if failed_count > 0: |
89 | | - entries = response.get("Entries", []) |
90 | | - failed_repr = _format_failed_entries(entries) |
91 | | - msg = f"{failed_count} EventBridge entries failed: {failed_repr}" |
92 | | - logger.error(msg) |
93 | | - return False, msg |
94 | | - except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors |
95 | | - logger.exception("EventBridge put_events call failed") |
96 | | - return False, str(err) |
97 | | - |
98 | | - # Let any unexpected exception propagate for upstream handler (avoids broad except BLE001 / TRY400) |
99 | | - return True, None |
| 1 | +# |
| 2 | +# Copyright 2025 ABSA Group Limited |
| 3 | +# |
| 4 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +# you may not use this file except in compliance with the License. |
| 6 | +# You may obtain a copy of the License at |
| 7 | +# |
| 8 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +# |
| 10 | +# Unless required by applicable law or agreed to in writing, software |
| 11 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +# See the License for the specific language governing permissions and |
| 14 | +# limitations under the License. |
| 15 | +# |
| 16 | + |
| 17 | +"""EventBridge writer module. |
| 18 | +
|
| 19 | +Provides initialization and write functionality for publishing events to AWS EventBridge. |
| 20 | +""" |
| 21 | + |
| 22 | +import json |
| 23 | +import logging |
| 24 | +from typing import Any, Dict, Optional, Tuple, List |
| 25 | + |
| 26 | +import boto3 |
| 27 | +from botocore.exceptions import BotoCoreError, ClientError |
| 28 | + |
| 29 | +from src.utils.trace_logging import log_payload_at_trace |
| 30 | + |
| 31 | +STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None} |
| 32 | + |
| 33 | + |
| 34 | +def init(logger: logging.Logger, config: Dict[str, Any]) -> None: |
| 35 | + """Initialize the EventBridge writer. |
| 36 | +
|
| 37 | + Args: |
| 38 | + logger: Shared application logger. |
| 39 | + config: Configuration dictionary (expects optional 'event_bus_arn'). |
| 40 | + """ |
| 41 | + STATE["logger"] = logger |
| 42 | + STATE["client"] = boto3.client("events") |
| 43 | + STATE["event_bus_arn"] = config.get("event_bus_arn", "") |
| 44 | + STATE["logger"].debug("Initialized EVENTBRIDGE writer") |
| 45 | + |
| 46 | + |
| 47 | +def _format_failed_entries(entries: List[Dict[str, Any]]) -> str: |
| 48 | + failed = [e for e in entries if "ErrorCode" in e or "ErrorMessage" in e] |
| 49 | + # Keep message concise but informative |
| 50 | + return json.dumps(failed) if failed else "[]" |
| 51 | + |
| 52 | + |
| 53 | +def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: |
| 54 | + """Publish a message to EventBridge. |
| 55 | +
|
| 56 | + Args: |
| 57 | + topic_name: Source topic name used as event Source. |
| 58 | + message: JSON-serializable payload. |
| 59 | + Returns: |
| 60 | + Tuple of success flag and optional error message. |
| 61 | + """ |
| 62 | + logger = STATE["logger"] |
| 63 | + event_bus_arn = STATE["event_bus_arn"] |
| 64 | + client = STATE["client"] |
| 65 | + |
| 66 | + if not event_bus_arn: |
| 67 | + logger.debug("No EventBus Arn - skipping") |
| 68 | + return True, None |
| 69 | + if client is None: # defensive |
| 70 | + logger.debug("EventBridge client not initialized - skipping") |
| 71 | + return True, None |
| 72 | + |
| 73 | + log_payload_at_trace(logger, "EventBridge", topic_name, message) |
| 74 | + |
| 75 | + try: |
| 76 | + logger.debug("Sending to eventBridge %s", topic_name) |
| 77 | + response = client.put_events( |
| 78 | + Entries=[ |
| 79 | + { |
| 80 | + "Source": topic_name, |
| 81 | + "DetailType": "JSON", |
| 82 | + "Detail": json.dumps(message), |
| 83 | + "EventBusName": event_bus_arn, |
| 84 | + } |
| 85 | + ] |
| 86 | + ) |
| 87 | + failed_count = response.get("FailedEntryCount", 0) |
| 88 | + if failed_count > 0: |
| 89 | + entries = response.get("Entries", []) |
| 90 | + failed_repr = _format_failed_entries(entries) |
| 91 | + msg = f"{failed_count} EventBridge entries failed: {failed_repr}" |
| 92 | + logger.error(msg) |
| 93 | + return False, msg |
| 94 | + except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors |
| 95 | + logger.exception("EventBridge put_events call failed") |
| 96 | + return False, str(err) |
| 97 | + |
| 98 | + # Let any unexpected exception propagate for upstream handler (avoids broad except BLE001 / TRY400) |
| 99 | + return True, None |
0 commit comments