From 35b962f0925683c5015ed549eaad8d044ae6e32a Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Thu, 5 Oct 2023 11:10:53 -0400 Subject: [PATCH 1/4] RestartSourceWithContext --- .../akka/stream/scaladsl/RestartSpec.scala | 255 ++++++++++++++++++ .../scaladsl/RestartSourceWithContext.scala | 49 ++++ 2 files changed, 304 insertions(+) create mode 100644 akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index 596b269a1cb..6590ed255af 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -1014,4 +1014,259 @@ class RestartSpec created.get() shouldEqual 1 } } + + "A restart with backoff source with context" should { + "run normally" taggedAs TimingTest in { + val created = new AtomicInteger + val probe = RestartSourceWithContext + .withBackoff(shortRestartSettings) { () => + created.incrementAndGet() + SourceWithContext.fromTuples(Source.unfold(0) { state => + Some((state + 1) -> ("a" -> state)) + }) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("a" -> 1) + probe.requestNext("a" -> 2) + probe.requestNext("a" -> 3) + + created.get shouldBe 1 + + probe.cancel() + } + + "restart on completion" taggedAs TimingTest in { + val created = new AtomicInteger + + val probe = RestartSourceWithContext + .withBackoff(shortRestartSettings) { () => + val count = created.getAndIncrement() * 2 + SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1)))) + } + .runWith(TestSink()) + + EventFilter.info(start = "Restarting stream due to completion", occurrences = 2).intercept { + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + probe.requestNext("a" -> 2) + probe.requestNext("b" -> 3) + probe.requestNext("a" -> 4) + } + + created.get() shouldBe 3 + + probe.cancel() + } + + "restart on failure" taggedAs TimingTest in { + val created = new AtomicInteger + + val sourceFactory = { () => + SourceWithContext + .fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) => + (offset + 1) -> (elem -> offset) + }, _ => None)) + .map { elem: String => + if (elem == "c") throw TE("failed") + else elem + } + } + + val probe = + RestartSourceWithContext.withBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink()) + + EventFilter.info(start = "Restarting stream due to failure", occurrences = 2).intercept { + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + // offset 2 is "c" which blew up, triggering a restart + probe.requestNext("a" -> 3) + probe.requestNext("b" -> 4) + // offset 5 is "c", dropped in the restarting + probe.requestNext("a" -> 6) + } + + created.get() shouldBe 3 + + probe.cancel() + } + + "backoff before restart" taggedAs TimingTest in { + val created = new AtomicInteger + + val probe = RestartSourceWithContext + .withBackoff(restartSettings) { () => + val count = created.getAndIncrement() * 2 + SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1)))) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + + val deadline = (minBackoff - 1.millis).fromNow + probe.request(1) + + probe.expectNext("a" -> 2) + deadline.isOverdue() shouldBe true + + created.get() shouldBe 2 + + probe.cancel() + } + + "reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" taggedAs TimingTest in { + val created = new AtomicInteger + val probe = RestartSourceWithContext + .withBackoff(restartSettings) { () => + val count = created.getAndIncrement() * 2 + SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1)))) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + + val deadline = (minBackoff - 1.millis).fromNow + probe.request(1) + probe.expectNext("a" -> 2) + deadline.isOverdue() shouldBe true + probe.requestNext("b" -> 3) + + probe.request(1) + // The probe should now back off again with increased backoff + + // Wait for the delay, then subsequent backoff, to pass, so the restart count is reset + Thread.sleep(((minBackoff * 3) + 500.millis).toMillis) + + probe.expectNext("a" -> 4) + probe.requestNext("b" -> 5) + + probe.requestNext(2 * minBackoff - 1.milli) should be("a" -> 6) + + created.get() shouldBe 4 + + probe.cancel() + } + + "cancel the currently running SourceWithContext when canceled" taggedAs TimingTest in { + val created = new AtomicInteger() + val promise = Promise[Done]() + val probe = RestartSourceWithContext + .withBackoff(shortRestartSettings) { () => + SourceWithContext.fromTuples(Source.repeat("a").map { _ -> created.getAndIncrement() }.watchTermination() { + (_, term) => + promise.completeWith(term) + }) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.cancel() + + promise.future.futureValue shouldBe Done + + // wait to ensure that it isn't restarted + Thread.sleep(200) + created.get() shouldBe 1 + } + + "not restart the SourceWithContext when cancelled while backing off" taggedAs TimingTest in { + val created = new AtomicInteger() + val probe = RestartSourceWithContext + .withBackoff(restartSettings) { () => + created.getAndIncrement() + SourceWithContext.fromTuples(Source.single("a" -> 1)) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 1) + probe.request(1) + // back-off delays the restart (racy...) + probe.cancel() + + // wait to ensure it isn't restarted + Thread.sleep((minBackoff + 100.millis).toMillis) + created.get() shouldBe 1 + } + + "stop on completion if it should only be restarted on failures" taggedAs TimingTest in { + val created = new AtomicInteger() + + val cgai = { () => + created.getAndIncrement() + } + + val probe = RestartSourceWithContext + .onFailuresWithBackoff(shortRestartSettings) { () => + cgai() + SourceWithContext.fromTuples(Source(Seq("a" -> cgai(), "b" -> cgai(), "c" -> cgai()))).map { + case "c" => if (created.get() <= 4) throw new TE("failed") else "c" + case other => other + } + } + .runWith(TestSink()) + + probe.requestNext("a" -> 1) + probe.requestNext("b" -> 2) + // fails and restarts + probe.requestNext("a" -> 5) + probe.requestNext("b" -> 6) + probe.requestNext("c" -> 7) + probe.expectComplete() + + created.get() shouldBe 8 + + probe.cancel() + } + + "restart on failure when only due to failures should be restarted" taggedAs TimingTest in { + val created = new AtomicInteger() + + val sourceFactory = { () => + SourceWithContext + SourceWithContext + .fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) => + (offset + 1) -> (elem -> offset) + }, _ => None)) + .map { elem => + if (elem == "c") throw TE("failed") + else elem + } + } + + val probe = + RestartSourceWithContext.onFailuresWithBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + // offset 2 is "c" which blew up, triggering a restart + probe.requestNext("a" -> 3) + probe.requestNext("b" -> 4) + // offset 5 is "c", dropped in the restarting + probe.requestNext("a" -> 6) + + created.get() shouldBe 3 + + probe.cancel() + } + + "not restart when maxRestarts is reached" taggedAs TimingTest in { + val created = new AtomicInteger() + val probe = RestartSourceWithContext + .withBackoff(shortRestartSettings.withMaxRestarts(1, shortMinBackoff)) { () => + SourceWithContext.fromTuples(Source.single("a").map(_ -> created.getAndIncrement())) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("a" -> 1) + probe.expectComplete() + + created.get() shouldBe 2 + + probe.cancel() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala new file mode 100644 index 00000000000..1b4e3bd6bb8 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2023 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.RestartSettings + +/** + * A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will, + * e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the [[SourceWithContext]] restarts. + */ +object RestartSourceWithContext { + + /** + * Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails or completes using an exponential backoff. + * + * The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted. + * + * @param settings [[RestartSettings]] defining restart configuration + * @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap + */ + def withBackoff[T, C](settings: RestartSettings)( + sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = { + // we like wrapping so much, we unwrap so that we can rewrap... since the intended usecase is for sources (like + // Kafka or whatever) which are a network hop away, this overhead is negligible + val underlyingFactory = () => sourceFactory().asSource + SourceWithContext.fromTuples( + Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = false))) + } + + /** + * Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails using an exponential backoff. + * + * The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted. + * + * @param settings [[RestartSettings]] defining restart configuration + * @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap + */ + def onFailuresWithBackoff[T, C](settings: RestartSettings)( + sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = { + val underlyingFactory = () => sourceFactory().asSource + SourceWithContext.fromTuples( + Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = true))) + } +} From 5ce23dbe1cdfcfd5749a02161891e54f9d48cd9e Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Tue, 28 Nov 2023 10:10:24 -0500 Subject: [PATCH 2/4] doubled line? --- .../src/test/scala/akka/stream/scaladsl/RestartSpec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index 6590ed255af..0865245c357 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -1068,7 +1068,7 @@ class RestartSpec .fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) => (offset + 1) -> (elem -> offset) }, _ => None)) - .map { elem: String => + .map { (elem: String) => if (elem == "c") throw TE("failed") else elem } @@ -1225,7 +1225,6 @@ class RestartSpec val created = new AtomicInteger() val sourceFactory = { () => - SourceWithContext SourceWithContext .fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) => (offset + 1) -> (elem -> offset) From f6015d15f1c67799c0c7d492528b054049d7b915 Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Wed, 29 Nov 2023 10:50:17 -0500 Subject: [PATCH 3/4] Update akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Johan Andrén --- .../scala/akka/stream/scaladsl/RestartSourceWithContext.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala index 1b4e3bd6bb8..0decac18dc6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala @@ -25,8 +25,6 @@ object RestartSourceWithContext { */ def withBackoff[T, C](settings: RestartSettings)( sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = { - // we like wrapping so much, we unwrap so that we can rewrap... since the intended usecase is for sources (like - // Kafka or whatever) which are a network hop away, this overhead is negligible val underlyingFactory = () => sourceFactory().asSource SourceWithContext.fromTuples( Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = false))) From 3a7bad0086030b4244e21e1a66173536ed683e0b Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Wed, 29 Nov 2023 11:15:31 -0500 Subject: [PATCH 4/4] javadsl --- .../javadsl/RestartSourceWithContext.scala | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 akka-stream/src/main/scala/akka/stream/javadsl/RestartSourceWithContext.scala diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSourceWithContext.scala new file mode 100644 index 00000000000..57527ab51c3 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSourceWithContext.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2023 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.NotUsed +import akka.japi.function.Creator +import akka.stream.RestartSettings +import akka.stream.scaladsl + +/** + * A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will, + * e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the SourceWithContext restarts. + */ +object RestartSourceWithContext { + + /** + * Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails or completes using an exponential backoff. + * + * The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext will be canceled and will not be restarted. + * + * @param settings [[RestartSettings]] defining the restart configuration + * @param sourceFactory A factory for producing the SourceWithContext to wrap + */ + def withBackoff[T, C]( + settings: RestartSettings, + sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = { + val underlyingFactory = () => sourceFactory.create().asScala + new SourceWithContext(scaladsl.RestartSourceWithContext.withBackoff(settings)(underlyingFactory)) + } + + /** + * Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails using an exponential backoff. + * + * The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext if currently running will be canceled and will not be restarted. + * + * @param settings [[RestartSettings]] defining the restart configuration + * @param sourceFactory A factory for producing the SourceWithContext to wrap + */ + def onFailuresWithBackoff[T, C]( + settings: RestartSettings, + sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = { + val underlyingFactory = () => sourceFactory.create().asScala + new SourceWithContext(scaladsl.RestartSourceWithContext.onFailuresWithBackoff(settings)(underlyingFactory)) + } +}