-
Notifications
You must be signed in to change notification settings - Fork 1k
Failsafe RetryPolicy instrumentation added #15255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -13,18 +13,24 @@ | |||||
|
|
||||||
| import dev.failsafe.CircuitBreaker; | ||||||
| import dev.failsafe.CircuitBreakerConfig; | ||||||
| import dev.failsafe.RetryPolicy; | ||||||
| import dev.failsafe.RetryPolicyConfig; | ||||||
| import io.opentelemetry.api.OpenTelemetry; | ||||||
| import io.opentelemetry.api.common.AttributeKey; | ||||||
| import io.opentelemetry.api.common.Attributes; | ||||||
| import io.opentelemetry.api.metrics.LongCounter; | ||||||
| import io.opentelemetry.api.metrics.LongHistogram; | ||||||
| import io.opentelemetry.api.metrics.Meter; | ||||||
| import java.util.Arrays; | ||||||
|
|
||||||
| /** Entrypoint for instrumenting Failsafe components. */ | ||||||
| public final class FailsafeTelemetry { | ||||||
| private static final String INSTRUMENTATION_NAME = "io.opentelemetry.failsafe-3.0"; | ||||||
|
|
||||||
| private static final AttributeKey<String> CIRCUIT_BREAKER_NAME = | ||||||
| AttributeKey.stringKey("failsafe.circuit_breaker.name"); | ||||||
| private static final AttributeKey<String> RETRY_POLICY_NAME = | ||||||
| AttributeKey.stringKey("failsafe.retry_policy.name"); | ||||||
|
|
||||||
| /** Returns a new {@link FailsafeTelemetry} configured with the given {@link OpenTelemetry}. */ | ||||||
| public static FailsafeTelemetry create(OpenTelemetry openTelemetry) { | ||||||
|
|
@@ -70,4 +76,41 @@ public <R> CircuitBreaker<R> createCircuitBreaker( | |||||
| .onClose(buildInstrumentedCloseListener(userConfig, stateChangesCounter, attributes)) | ||||||
| .build(); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Returns an instrumented {@link RetryPolicy} by given values. | ||||||
| * | ||||||
| * @param delegate user configured {@link RetryPolicy} to be instrumented | ||||||
| * @param retryPolicyName identifier of given {@link RetryPolicy} | ||||||
| * @param <R> {@link RetryPolicy}'s result type | ||||||
| * @return instrumented {@link RetryPolicy} | ||||||
| */ | ||||||
| public <R> RetryPolicy<R> createRetryPolicy(RetryPolicy<R> delegate, String retryPolicyName) { | ||||||
| RetryPolicyConfig<R> userConfig = delegate.getConfig(); | ||||||
| Meter meter = openTelemetry.getMeter(INSTRUMENTATION_NAME); | ||||||
| LongCounter executionCounter = | ||||||
| meter | ||||||
| .counterBuilder("failsafe.retry_policy.execution.count") | ||||||
| .setDescription( | ||||||
| "Count of execution events processed by the retry policy. " | ||||||
| + "Each event represents one complete execution flow (initial attempt + any retries). " | ||||||
| + "This metric does not count individual retry attempts - it counts each time the policy is invoked.") | ||||||
| .build(); | ||||||
| LongHistogram attemptsHistogram = | ||||||
| meter | ||||||
| .histogramBuilder("failsafe.retry_policy.attempts") | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure using a histogram for this is justified. @trask could you provide guidance on this
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't know why I didn't thread this: #15255 (comment) |
||||||
| .setDescription("Histogram of number of attempts for each execution.") | ||||||
|
||||||
| .setDescription("Histogram of number of attempts for each execution.") | |
| .setDescription("Number of attempts for each execution.") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.instrumentation.failsafe.v3_0; | ||
|
|
||
| import static io.opentelemetry.api.common.AttributeKey.stringKey; | ||
|
|
||
| import dev.failsafe.RetryPolicyConfig; | ||
| import dev.failsafe.event.EventListener; | ||
| import dev.failsafe.event.ExecutionCompletedEvent; | ||
| import io.opentelemetry.api.common.AttributeKey; | ||
| import io.opentelemetry.api.common.Attributes; | ||
| import io.opentelemetry.api.metrics.LongCounter; | ||
| import io.opentelemetry.api.metrics.LongHistogram; | ||
|
|
||
| final class RetryPolicyEventListenerBuilders { | ||
| private static final AttributeKey<String> OUTCOME_KEY = | ||
| stringKey("failsafe.retry_policy.outcome"); | ||
|
|
||
| private RetryPolicyEventListenerBuilders() {} | ||
|
|
||
| static <R> EventListener<ExecutionCompletedEvent<R>> buildInstrumentedFailureListener( | ||
| RetryPolicyConfig<R> userConfig, | ||
| LongCounter executionCounter, | ||
| LongHistogram attemptsHistogram, | ||
| Attributes commonAttributes) { | ||
| Attributes attributes = commonAttributes.toBuilder().put(OUTCOME_KEY, "failure").build(); | ||
| EventListener<ExecutionCompletedEvent<R>> userFailureListener = userConfig.getFailureListener(); | ||
| return e -> { | ||
| executionCounter.add(1, attributes); | ||
| attemptsHistogram.record(e.getAttemptCount(), attributes); | ||
| if (userFailureListener != null) { | ||
| userFailureListener.accept(e); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| static <R> EventListener<ExecutionCompletedEvent<R>> buildInstrumentedSuccessListener( | ||
| RetryPolicyConfig<R> userConfig, | ||
| LongCounter executionCounter, | ||
| LongHistogram attemptsHistogram, | ||
| Attributes commonAttributes) { | ||
| Attributes attributes = commonAttributes.toBuilder().put(OUTCOME_KEY, "success").build(); | ||
| EventListener<ExecutionCompletedEvent<R>> userSuccessListener = userConfig.getSuccessListener(); | ||
| return e -> { | ||
| executionCounter.add(1, attributes); | ||
| attemptsHistogram.record(e.getAttemptCount(), attributes); | ||
| if (userSuccessListener != null) { | ||
| userSuccessListener.accept(e); | ||
| } | ||
| }; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,12 +11,20 @@ | |
| import dev.failsafe.CircuitBreaker; | ||
| import dev.failsafe.CircuitBreakerOpenException; | ||
| import dev.failsafe.Failsafe; | ||
| import dev.failsafe.RetryPolicy; | ||
| import io.opentelemetry.api.common.Attributes; | ||
| import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; | ||
| import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; | ||
| import io.opentelemetry.sdk.metrics.data.HistogramData; | ||
| import io.opentelemetry.sdk.metrics.data.HistogramPointData; | ||
| import io.opentelemetry.sdk.metrics.data.LongPointData; | ||
| import io.opentelemetry.sdk.metrics.data.SumData; | ||
| import io.opentelemetry.sdk.testing.assertj.LongPointAssert; | ||
| import java.time.Duration; | ||
| import java.util.Arrays; | ||
| import java.util.Collection; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.Consumer; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.extension.RegisterExtension; | ||
|
|
@@ -80,6 +88,82 @@ void captureCircuitBreakerMetrics() { | |
| 1, "failsafe.circuit_breaker.state", "closed")))); | ||
| } | ||
|
|
||
| @Test | ||
| void captureRetryPolicyMetrics() { | ||
| // given | ||
| RetryPolicy<Object> userRetryPolicy = | ||
| dev.failsafe.RetryPolicy.builder() | ||
| .handleResultIf(Objects::isNull) | ||
| .withMaxAttempts(3) | ||
| .build(); | ||
| FailsafeTelemetry failsafeTelemetry = FailsafeTelemetry.create(testing.getOpenTelemetry()); | ||
| RetryPolicy<Object> instrumentedRetryPolicy = | ||
| failsafeTelemetry.createRetryPolicy(userRetryPolicy, "testing"); | ||
|
|
||
| // when | ||
| for (int i = 0; i <= 4; i++) { | ||
| int temp = i; | ||
| AtomicInteger retry = new AtomicInteger(0); | ||
| Failsafe.with(instrumentedRetryPolicy) | ||
| .get( | ||
| () -> { | ||
| if (retry.get() < temp) { | ||
| retry.incrementAndGet(); | ||
| return null; | ||
| } else { | ||
| return new Object(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| // then | ||
| testing.waitAndAssertMetrics("io.opentelemetry.failsafe-3.0"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a reason not to use the other style of assertions here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nothing special, I was using the same style and after some point I think it was overcomplicating what I was trying to do and I switched to this style which I found easier to read.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The usual way we do that @jaydeluca pointed out waits until the assertion succeeds. For example when the method is called and the metric data points aren't available yet (data is exported from a background thread) it will retry the assertions after waiting a bit. |
||
| assertThat(testing.metrics().size()).isEqualTo(2); | ||
|
|
||
| SumData<LongPointData> executionCountMetric = | ||
| testing.metrics().stream() | ||
| .filter(m -> m.getName().equals("failsafe.retry_policy.execution.count")) | ||
| .findFirst() | ||
| .get() | ||
| .getLongSumData(); | ||
| assertThat(executionCountMetric.getPoints().size()).isEqualTo(2); | ||
| assertThat(executionCountMetric.getPoints()) | ||
| .anyMatch( | ||
| p -> | ||
| p.getAttributes().equals(buildExpectedRetryPolicyAttributes("failure")) | ||
| && p.getValue() == 2); | ||
| assertThat(executionCountMetric.getPoints()) | ||
| .anyMatch( | ||
| p -> | ||
| p.getAttributes().equals(buildExpectedRetryPolicyAttributes("success")) | ||
| && p.getValue() == 3); | ||
|
|
||
| HistogramData attemptsMetric = | ||
| testing.metrics().stream() | ||
| .filter(m -> m.getName().equals("failsafe.retry_policy.attempts")) | ||
| .findFirst() | ||
| .get() | ||
| .getHistogramData(); | ||
| Collection<HistogramPointData> pointData = attemptsMetric.getPoints(); | ||
| assertThat(pointData).hasSize(2); | ||
| assertThat(pointData) | ||
| .anyMatch( | ||
| p -> | ||
| p.getCount() == 3 | ||
| && p.getMin() == 1 | ||
| && p.getMax() == 3 | ||
| && p.getAttributes().equals(buildExpectedRetryPolicyAttributes("success")) | ||
| && Arrays.equals(p.getCounts().toArray(), new Long[] {1L, 1L, 1L, 0L, 0L})); | ||
| assertThat(pointData) | ||
| .anyMatch( | ||
| p -> | ||
| p.getCount() == 2 | ||
| && p.getMin() == 3 | ||
| && p.getMax() == 3 | ||
| && p.getAttributes().equals(buildExpectedRetryPolicyAttributes("failure")) | ||
| && Arrays.equals(p.getCounts().toArray(), new Long[] {0L, 0L, 2L, 0L, 0L})); | ||
| } | ||
|
|
||
| private static Consumer<LongPointAssert> buildCircuitBreakerAssertion( | ||
| long expectedValue, String expectedAttributeKey, String expectedAttributeValue) { | ||
| return longSumAssert -> | ||
|
|
@@ -94,4 +178,11 @@ private static Consumer<LongPointAssert> buildCircuitBreakerAssertion( | |
| .build(), | ||
| attributes)); | ||
| } | ||
|
|
||
| private static Attributes buildExpectedRetryPolicyAttributes(String expectedOutcome) { | ||
| return Attributes.builder() | ||
| .put("failsafe.retry_policy.name", "testing") | ||
| .put("failsafe.retry_policy.outcome", expectedOutcome) | ||
| .build(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just an idea, but perhaps we could minimize the length of this description a bit by encoding some of this information in the unit?