Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1014,4 +1014,258 @@ class RestartSpec
created.get() shouldEqual 1
}
}

"A restart with backoff source with context" should {
"run normally" taggedAs TimingTest in {
val created = new AtomicInteger
val probe = RestartSourceWithContext
.withBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
SourceWithContext.fromTuples(Source.unfold(0) { state =>
Some((state + 1) -> ("a" -> state))
})
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("a" -> 1)
probe.requestNext("a" -> 2)
probe.requestNext("a" -> 3)

created.get shouldBe 1

probe.cancel()
}

"restart on completion" taggedAs TimingTest in {
val created = new AtomicInteger

val probe = RestartSourceWithContext
.withBackoff(shortRestartSettings) { () =>
val count = created.getAndIncrement() * 2
SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1))))
}
.runWith(TestSink())

EventFilter.info(start = "Restarting stream due to completion", occurrences = 2).intercept {
probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)
probe.requestNext("a" -> 2)
probe.requestNext("b" -> 3)
probe.requestNext("a" -> 4)
}

created.get() shouldBe 3

probe.cancel()
}

"restart on failure" taggedAs TimingTest in {
val created = new AtomicInteger

val sourceFactory = { () =>
SourceWithContext
.fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) =>
(offset + 1) -> (elem -> offset)
}, _ => None))
.map { (elem: String) =>
if (elem == "c") throw TE("failed")
else elem
}
}

val probe =
RestartSourceWithContext.withBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink())

EventFilter.info(start = "Restarting stream due to failure", occurrences = 2).intercept {
probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)
// offset 2 is "c" which blew up, triggering a restart
probe.requestNext("a" -> 3)
probe.requestNext("b" -> 4)
// offset 5 is "c", dropped in the restarting
probe.requestNext("a" -> 6)
}

created.get() shouldBe 3

probe.cancel()
}

"backoff before restart" taggedAs TimingTest in {
val created = new AtomicInteger

val probe = RestartSourceWithContext
.withBackoff(restartSettings) { () =>
val count = created.getAndIncrement() * 2
SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1))))
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)

val deadline = (minBackoff - 1.millis).fromNow
probe.request(1)

probe.expectNext("a" -> 2)
deadline.isOverdue() shouldBe true

created.get() shouldBe 2

probe.cancel()
}

"reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" taggedAs TimingTest in {
val created = new AtomicInteger
val probe = RestartSourceWithContext
.withBackoff(restartSettings) { () =>
val count = created.getAndIncrement() * 2
SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1))))
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)

val deadline = (minBackoff - 1.millis).fromNow
probe.request(1)
probe.expectNext("a" -> 2)
deadline.isOverdue() shouldBe true
probe.requestNext("b" -> 3)

probe.request(1)
// The probe should now back off again with increased backoff

// Wait for the delay, then subsequent backoff, to pass, so the restart count is reset
Thread.sleep(((minBackoff * 3) + 500.millis).toMillis)

probe.expectNext("a" -> 4)
probe.requestNext("b" -> 5)

probe.requestNext(2 * minBackoff - 1.milli) should be("a" -> 6)

created.get() shouldBe 4

probe.cancel()
}

"cancel the currently running SourceWithContext when canceled" taggedAs TimingTest in {
val created = new AtomicInteger()
val promise = Promise[Done]()
val probe = RestartSourceWithContext
.withBackoff(shortRestartSettings) { () =>
SourceWithContext.fromTuples(Source.repeat("a").map { _ -> created.getAndIncrement() }.watchTermination() {
(_, term) =>
promise.completeWith(term)
})
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.cancel()

promise.future.futureValue shouldBe Done

// wait to ensure that it isn't restarted
Thread.sleep(200)
created.get() shouldBe 1
}

"not restart the SourceWithContext when cancelled while backing off" taggedAs TimingTest in {
val created = new AtomicInteger()
val probe = RestartSourceWithContext
.withBackoff(restartSettings) { () =>
created.getAndIncrement()
SourceWithContext.fromTuples(Source.single("a" -> 1))
}
.runWith(TestSink())

probe.requestNext("a" -> 1)
probe.request(1)
// back-off delays the restart (racy...)
probe.cancel()

// wait to ensure it isn't restarted
Thread.sleep((minBackoff + 100.millis).toMillis)
created.get() shouldBe 1
}

"stop on completion if it should only be restarted on failures" taggedAs TimingTest in {
val created = new AtomicInteger()

val cgai = { () =>
created.getAndIncrement()
}

val probe = RestartSourceWithContext
.onFailuresWithBackoff(shortRestartSettings) { () =>
cgai()
SourceWithContext.fromTuples(Source(Seq("a" -> cgai(), "b" -> cgai(), "c" -> cgai()))).map {
case "c" => if (created.get() <= 4) throw new TE("failed") else "c"
case other => other
}
}
.runWith(TestSink())

probe.requestNext("a" -> 1)
probe.requestNext("b" -> 2)
// fails and restarts
probe.requestNext("a" -> 5)
probe.requestNext("b" -> 6)
probe.requestNext("c" -> 7)
probe.expectComplete()

created.get() shouldBe 8

probe.cancel()
}

"restart on failure when only due to failures should be restarted" taggedAs TimingTest in {
val created = new AtomicInteger()

val sourceFactory = { () =>
SourceWithContext
.fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) =>
(offset + 1) -> (elem -> offset)
}, _ => None))
.map { elem =>
if (elem == "c") throw TE("failed")
else elem
}
}

val probe =
RestartSourceWithContext.onFailuresWithBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)
// offset 2 is "c" which blew up, triggering a restart
probe.requestNext("a" -> 3)
probe.requestNext("b" -> 4)
// offset 5 is "c", dropped in the restarting
probe.requestNext("a" -> 6)

created.get() shouldBe 3

probe.cancel()
}

"not restart when maxRestarts is reached" taggedAs TimingTest in {
val created = new AtomicInteger()
val probe = RestartSourceWithContext
.withBackoff(shortRestartSettings.withMaxRestarts(1, shortMinBackoff)) { () =>
SourceWithContext.fromTuples(Source.single("a").map(_ -> created.getAndIncrement()))
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("a" -> 1)
probe.expectComplete()

created.get() shouldBe 2

probe.cancel()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.javadsl

import akka.NotUsed
import akka.japi.function.Creator
import akka.stream.RestartSettings
import akka.stream.scaladsl

/**
* A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will,
* e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the SourceWithContext restarts.
*/
object RestartSourceWithContext {

/**
* Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails or completes using an exponential backoff.
*
* The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext will be canceled and will not be restarted.
*
* @param settings [[RestartSettings]] defining the restart configuration
* @param sourceFactory A factory for producing the SourceWithContext to wrap
*/
def withBackoff[T, C](
settings: RestartSettings,
sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The akka.japi.function interfaces are parallel with the JDK stdlib ones but add the capability to throw checked, for example for usage in actor message handling. That's not important here, so let's go with java.util.function.Supplier instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used Creator to mirror the other Restart* factories... suppose there should be an issue to move those to Supplier (which would be binary incompat, but source compat) in 2.10?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, let's keep it aligned then 👍

val underlyingFactory = () => sourceFactory.create().asScala
new SourceWithContext(scaladsl.RestartSourceWithContext.withBackoff(settings)(underlyingFactory))
}

/**
* Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails using an exponential backoff.
*
* The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext if currently running will be canceled and will not be restarted.
*
* @param settings [[RestartSettings]] defining the restart configuration
* @param sourceFactory A factory for producing the SourceWithContext to wrap
*/
def onFailuresWithBackoff[T, C](
settings: RestartSettings,
sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = {
val underlyingFactory = () => sourceFactory.create().asScala
new SourceWithContext(scaladsl.RestartSourceWithContext.onFailuresWithBackoff(settings)(underlyingFactory))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.scaladsl

import akka.NotUsed
import akka.stream.RestartSettings

/**
* A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will,
* e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the [[SourceWithContext]] restarts.
*/
object RestartSourceWithContext {

/**
* Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails or completes using an exponential backoff.
*
* The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted.
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap
*/
def withBackoff[T, C](settings: RestartSettings)(
sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = {
val underlyingFactory = () => sourceFactory().asSource
SourceWithContext.fromTuples(
Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = false)))
}

/**
* Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails using an exponential backoff.
*
* The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted.
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap
*/
def onFailuresWithBackoff[T, C](settings: RestartSettings)(
sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = {
val underlyingFactory = () => sourceFactory().asSource
SourceWithContext.fromTuples(
Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = true)))
}
}