Skip to content

Commit f671bbf

Browse files
authored
Flush timeout error handling for Kafka writer (#76)
* Flush error handling for Kafka writer. * Method documentation improvement. * Comments implementation.
1 parent 4fd2d45 commit f671bbf

File tree

4 files changed

+134
-23
lines changed

4 files changed

+134
-23
lines changed

DEVELOPER.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ cd EventGate
2828
```shell
2929
python3 -m venv .venv
3030
source .venv/bin/activate
31-
pip install -r requirements.txt
31+
pip3 install -r requirements.txt
3232
```
3333

3434
## Run Pylint Tool Locally

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ jsonschema==4.25.1
1111
PyJWT==2.10.1
1212
requests==2.32.5
1313
boto3==1.40.25
14-
confluent-kafka==2.11.1
14+
confluent-kafka==2.12.1
1515
# psycopg2-binary==2.9.10 # Ideal for local development, but not for long-term production use
1616
psycopg2==2.9.10

src/writer_kafka.py

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import json
2323
import logging
2424
import os
25+
import time
2526
from typing import Any, Dict, Optional, Tuple
26-
2727
from confluent_kafka import Producer
2828

2929
try: # KafkaException may not exist in stubbed test module
@@ -35,8 +35,10 @@ class KafkaException(Exception): # type: ignore
3535

3636

3737
STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "producer": None}
38-
# Configurable flush timeout (seconds) to avoid hanging indefinitely
39-
_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "5"))
38+
# Configurable flush timeouts and retries via env variables to avoid hanging indefinitely
39+
_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7"))
40+
_MAX_RETRIES = int(os.environ.get("KAFKA_FLUSH_RETRIES", "3"))
41+
_RETRY_BACKOFF_SEC = float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5"))
4042

4143

4244
def init(logger: logging.Logger, config: Dict[str, Any]) -> None:
@@ -86,12 +88,14 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
8688
"""
8789
logger = STATE["logger"]
8890
producer: Optional[Producer] = STATE.get("producer") # type: ignore[assignment]
89-
9091
if producer is None:
9192
logger.debug("Kafka producer not initialized - skipping")
9293
return True, None
9394

94-
errors: list[Any] = []
95+
errors: list[str] = []
96+
has_exception = False
97+
98+
# Produce step
9599
try:
96100
logger.debug("Sending to kafka %s", topic_name)
97101
producer.produce(
@@ -100,23 +104,51 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
100104
value=json.dumps(message).encode("utf-8"),
101105
callback=lambda err, msg: (errors.append(str(err)) if err is not None else None),
102106
)
107+
except KafkaException as e:
108+
errors.append(f"Produce exception: {e}")
109+
has_exception = True
110+
111+
# Flush step (always attempted)
112+
remaining: Optional[int] = None
113+
for attempt in range(1, _MAX_RETRIES + 1):
103114
try:
104-
remaining = producer.flush(_KAFKA_FLUSH_TIMEOUT_SEC) # type: ignore[arg-type]
105-
except TypeError: # Fallback for stub producers without timeout parameter
106-
remaining = producer.flush() # type: ignore[call-arg]
107-
# remaining can be number of undelivered messages (confluent_kafka returns int)
108-
if not errors and isinstance(remaining, int) and remaining > 0:
109-
timeout_msg = f"Kafka flush timeout after {_KAFKA_FLUSH_TIMEOUT_SEC}s: {remaining} message(s) still pending"
110-
logger.error(timeout_msg)
111-
return False, timeout_msg
112-
except KafkaException as e: # narrow exception capture
113-
err_msg = f"The Kafka writer failed with unknown error: {str(e)}"
114-
logger.exception(err_msg)
115-
return False, err_msg
115+
remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC)
116+
except KafkaException as e:
117+
errors.append(f"Flush exception: {e}")
118+
has_exception = True
119+
120+
# Treat None (flush returns None in some stubs) as success equivalent to 0 pending
121+
if (remaining is None or remaining == 0) and not errors:
122+
break
123+
if attempt < _MAX_RETRIES:
124+
logger.warning("Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES)
125+
time.sleep(_RETRY_BACKOFF_SEC)
126+
127+
# Warn if messages still pending after retries
128+
if isinstance(remaining, int) and remaining > 0:
129+
logger.warning(
130+
"Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining
131+
)
116132

117133
if errors:
118-
msg = "; ".join(errors)
119-
logger.error(msg)
120-
return False, msg
134+
failure_text = "Kafka writer failed: " + "; ".join(errors)
135+
(logger.exception if has_exception else logger.error)(failure_text)
136+
return False, failure_text
121137

122138
return True, None
139+
140+
141+
def flush_with_timeout(producer, timeout: float) -> Optional[int]:
142+
"""Flush the Kafka producer with a timeout, handling TypeError for stubs.
143+
144+
Args:
145+
producer: Kafka Producer instance.
146+
timeout: Timeout in seconds.
147+
Returns:
148+
Number of messages still pending after the flush call (0 all messages delivered).
149+
None is returned only if the underlying (stub/mock) producer.flush() does not provide a count.
150+
"""
151+
try:
152+
return producer.flush(timeout)
153+
except TypeError: # Fallback for stub producers without timeout parameter
154+
return producer.flush()

tests/test_writer_kafka.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import logging
32
from types import SimpleNamespace
43
import src.writer_kafka as wk
@@ -23,6 +22,44 @@ def produce(self, topic, key, value, callback): # noqa: D401
2322
callback("ERR", None)
2423

2524

25+
class FakeProducerFlushSequence(FakeProducerSuccess):
26+
def __init__(self, sequence): # sequence of remaining counts per flush call
27+
super().__init__()
28+
self.sequence = sequence
29+
self.flush_calls = 0
30+
31+
def flush(self, *a, **kw):
32+
# Simulate decreasing remaining messages
33+
if self.flush_calls < len(self.sequence):
34+
val = self.sequence[self.flush_calls]
35+
else:
36+
val = self.sequence[-1]
37+
self.flush_calls += 1
38+
return val
39+
40+
41+
class FakeProducerTimeout(FakeProducerSuccess):
42+
def __init__(self, remaining_value):
43+
super().__init__()
44+
self.remaining_value = remaining_value
45+
self.flush_calls = 0
46+
47+
def flush(self, *a, **kw): # always returns same remaining >0 to force timeout warning
48+
self.flush_calls += 1
49+
return self.remaining_value
50+
51+
52+
class FakeProducerTypeError(FakeProducerSuccess):
53+
def __init__(self):
54+
super().__init__()
55+
self.flush_calls = 0
56+
57+
# Intentionally omit timeout parameter causing TypeError on first attempt inside flush_with_timeout
58+
def flush(self): # noqa: D401
59+
self.flush_calls += 1
60+
return 0
61+
62+
2663
def test_write_skips_when_producer_none(monkeypatch):
2764
wk.STATE["logger"] = logging.getLogger("test")
2865
wk.STATE["producer"] = None
@@ -60,3 +97,45 @@ def produce(self, *a, **kw): # noqa: D401
6097
wk.STATE["producer"] = RaisingProducer()
6198
ok, err = wk.write("topic", {"d": 4})
6299
assert not ok and "boom" in err
100+
101+
102+
def test_write_flush_retries_until_success(monkeypatch, caplog):
103+
wk.STATE["logger"] = logging.getLogger("test")
104+
caplog.set_level(logging.WARNING)
105+
# Force smaller max retries for deterministic sequence length
106+
monkeypatch.setattr(wk, "_MAX_RETRIES", 5, raising=False)
107+
producer = FakeProducerFlushSequence([5, 4, 3, 1, 0])
108+
wk.STATE["producer"] = producer
109+
ok, err = wk.write("topic", {"e": 5})
110+
assert ok and err is None
111+
# It should break as soon as remaining == 0 (after flush call returning 0)
112+
assert producer.flush_calls == 5 # sequence consumed until 0
113+
# Warnings logged for attempts before success (flush_calls -1) because last attempt didn't warn
114+
warn_messages = [r.message for r in caplog.records if r.levelno == logging.WARNING]
115+
assert any("attempt 1" in m or "attempt 2" in m for m in warn_messages)
116+
117+
118+
def test_write_timeout_warning_when_remaining_after_retries(monkeypatch, caplog):
119+
wk.STATE["logger"] = logging.getLogger("test")
120+
caplog.set_level(logging.WARNING)
121+
monkeypatch.setattr(wk, "_MAX_RETRIES", 3, raising=False)
122+
producer = FakeProducerTimeout(2)
123+
wk.STATE["producer"] = producer
124+
ok, err = wk.write("topic", {"f": 6})
125+
timeout_warnings = [
126+
r.message for r in caplog.records if "timeout" in r.message
127+
] # final warning should mention timeout
128+
assert ok and err is None # function returns success even if timeout warning
129+
assert timeout_warnings, "Expected timeout warning logged"
130+
assert producer.flush_calls == 3 # retried 3 times
131+
132+
133+
def test_flush_with_timeout_typeerror_fallback(monkeypatch):
134+
wk.STATE["logger"] = logging.getLogger("test")
135+
monkeypatch.setattr(wk, "_MAX_RETRIES", 4, raising=False)
136+
producer = FakeProducerTypeError()
137+
wk.STATE["producer"] = producer
138+
ok, err = wk.write("topic", {"g": 7})
139+
assert ok and err is None
140+
# Since flush returns 0 immediately, only one flush call should be needed
141+
assert producer.flush_calls == 1

0 commit comments

Comments
 (0)