Skip to content

Commit 542f9bc

Browse files
leviramseyHe-Pin
authored andcommitted
feat: RestartSourceWithContext (akka#32178)
1 parent 4ac5562 commit 542f9bc

File tree

3 files changed

+350
-0
lines changed

3 files changed

+350
-0
lines changed

akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,4 +1014,258 @@ class RestartSpec
10141014
created.get() shouldEqual 1
10151015
}
10161016
}
1017+
1018+
"A restart with backoff source with context" should {
1019+
"run normally" taggedAs TimingTest in {
1020+
val created = new AtomicInteger
1021+
val probe = RestartSourceWithContext
1022+
.withBackoff(shortRestartSettings) { () =>
1023+
created.incrementAndGet()
1024+
SourceWithContext.fromTuples(Source.unfold(0) { state =>
1025+
Some((state + 1) -> ("a" -> state))
1026+
})
1027+
}
1028+
.runWith(TestSink())
1029+
1030+
probe.requestNext("a" -> 0)
1031+
probe.requestNext("a" -> 1)
1032+
probe.requestNext("a" -> 2)
1033+
probe.requestNext("a" -> 3)
1034+
1035+
created.get shouldBe 1
1036+
1037+
probe.cancel()
1038+
}
1039+
1040+
"restart on completion" taggedAs TimingTest in {
1041+
val created = new AtomicInteger
1042+
1043+
val probe = RestartSourceWithContext
1044+
.withBackoff(shortRestartSettings) { () =>
1045+
val count = created.getAndIncrement() * 2
1046+
SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1))))
1047+
}
1048+
.runWith(TestSink())
1049+
1050+
EventFilter.info(start = "Restarting stream due to completion", occurrences = 2).intercept {
1051+
probe.requestNext("a" -> 0)
1052+
probe.requestNext("b" -> 1)
1053+
probe.requestNext("a" -> 2)
1054+
probe.requestNext("b" -> 3)
1055+
probe.requestNext("a" -> 4)
1056+
}
1057+
1058+
created.get() shouldBe 3
1059+
1060+
probe.cancel()
1061+
}
1062+
1063+
"restart on failure" taggedAs TimingTest in {
1064+
val created = new AtomicInteger
1065+
1066+
val sourceFactory = { () =>
1067+
SourceWithContext
1068+
.fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) =>
1069+
(offset + 1) -> (elem -> offset)
1070+
}, _ => None))
1071+
.map { (elem: String) =>
1072+
if (elem == "c") throw TE("failed")
1073+
else elem
1074+
}
1075+
}
1076+
1077+
val probe =
1078+
RestartSourceWithContext.withBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink())
1079+
1080+
EventFilter.info(start = "Restarting stream due to failure", occurrences = 2).intercept {
1081+
probe.requestNext("a" -> 0)
1082+
probe.requestNext("b" -> 1)
1083+
// offset 2 is "c" which blew up, triggering a restart
1084+
probe.requestNext("a" -> 3)
1085+
probe.requestNext("b" -> 4)
1086+
// offset 5 is "c", dropped in the restarting
1087+
probe.requestNext("a" -> 6)
1088+
}
1089+
1090+
created.get() shouldBe 3
1091+
1092+
probe.cancel()
1093+
}
1094+
1095+
"backoff before restart" taggedAs TimingTest in {
1096+
val created = new AtomicInteger
1097+
1098+
val probe = RestartSourceWithContext
1099+
.withBackoff(restartSettings) { () =>
1100+
val count = created.getAndIncrement() * 2
1101+
SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1))))
1102+
}
1103+
.runWith(TestSink())
1104+
1105+
probe.requestNext("a" -> 0)
1106+
probe.requestNext("b" -> 1)
1107+
1108+
val deadline = (minBackoff - 1.millis).fromNow
1109+
probe.request(1)
1110+
1111+
probe.expectNext("a" -> 2)
1112+
deadline.isOverdue() shouldBe true
1113+
1114+
created.get() shouldBe 2
1115+
1116+
probe.cancel()
1117+
}
1118+
1119+
"reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" taggedAs TimingTest in {
1120+
val created = new AtomicInteger
1121+
val probe = RestartSourceWithContext
1122+
.withBackoff(restartSettings) { () =>
1123+
val count = created.getAndIncrement() * 2
1124+
SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1))))
1125+
}
1126+
.runWith(TestSink())
1127+
1128+
probe.requestNext("a" -> 0)
1129+
probe.requestNext("b" -> 1)
1130+
1131+
val deadline = (minBackoff - 1.millis).fromNow
1132+
probe.request(1)
1133+
probe.expectNext("a" -> 2)
1134+
deadline.isOverdue() shouldBe true
1135+
probe.requestNext("b" -> 3)
1136+
1137+
probe.request(1)
1138+
// The probe should now back off again with increased backoff
1139+
1140+
// Wait for the delay, then subsequent backoff, to pass, so the restart count is reset
1141+
Thread.sleep(((minBackoff * 3) + 500.millis).toMillis)
1142+
1143+
probe.expectNext("a" -> 4)
1144+
probe.requestNext("b" -> 5)
1145+
1146+
probe.requestNext(2 * minBackoff - 1.milli) should be("a" -> 6)
1147+
1148+
created.get() shouldBe 4
1149+
1150+
probe.cancel()
1151+
}
1152+
1153+
"cancel the currently running SourceWithContext when canceled" taggedAs TimingTest in {
1154+
val created = new AtomicInteger()
1155+
val promise = Promise[Done]()
1156+
val probe = RestartSourceWithContext
1157+
.withBackoff(shortRestartSettings) { () =>
1158+
SourceWithContext.fromTuples(Source.repeat("a").map { _ -> created.getAndIncrement() }.watchTermination() {
1159+
(_, term) =>
1160+
promise.completeWith(term)
1161+
})
1162+
}
1163+
.runWith(TestSink())
1164+
1165+
probe.requestNext("a" -> 0)
1166+
probe.cancel()
1167+
1168+
promise.future.futureValue shouldBe Done
1169+
1170+
// wait to ensure that it isn't restarted
1171+
Thread.sleep(200)
1172+
created.get() shouldBe 1
1173+
}
1174+
1175+
"not restart the SourceWithContext when cancelled while backing off" taggedAs TimingTest in {
1176+
val created = new AtomicInteger()
1177+
val probe = RestartSourceWithContext
1178+
.withBackoff(restartSettings) { () =>
1179+
created.getAndIncrement()
1180+
SourceWithContext.fromTuples(Source.single("a" -> 1))
1181+
}
1182+
.runWith(TestSink())
1183+
1184+
probe.requestNext("a" -> 1)
1185+
probe.request(1)
1186+
// back-off delays the restart (racy...)
1187+
probe.cancel()
1188+
1189+
// wait to ensure it isn't restarted
1190+
Thread.sleep((minBackoff + 100.millis).toMillis)
1191+
created.get() shouldBe 1
1192+
}
1193+
1194+
"stop on completion if it should only be restarted on failures" taggedAs TimingTest in {
1195+
val created = new AtomicInteger()
1196+
1197+
val cgai = { () =>
1198+
created.getAndIncrement()
1199+
}
1200+
1201+
val probe = RestartSourceWithContext
1202+
.onFailuresWithBackoff(shortRestartSettings) { () =>
1203+
cgai()
1204+
SourceWithContext.fromTuples(Source(Seq("a" -> cgai(), "b" -> cgai(), "c" -> cgai()))).map {
1205+
case "c" => if (created.get() <= 4) throw new TE("failed") else "c"
1206+
case other => other
1207+
}
1208+
}
1209+
.runWith(TestSink())
1210+
1211+
probe.requestNext("a" -> 1)
1212+
probe.requestNext("b" -> 2)
1213+
// fails and restarts
1214+
probe.requestNext("a" -> 5)
1215+
probe.requestNext("b" -> 6)
1216+
probe.requestNext("c" -> 7)
1217+
probe.expectComplete()
1218+
1219+
created.get() shouldBe 8
1220+
1221+
probe.cancel()
1222+
}
1223+
1224+
"restart on failure when only due to failures should be restarted" taggedAs TimingTest in {
1225+
val created = new AtomicInteger()
1226+
1227+
val sourceFactory = { () =>
1228+
SourceWithContext
1229+
.fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) =>
1230+
(offset + 1) -> (elem -> offset)
1231+
}, _ => None))
1232+
.map { elem =>
1233+
if (elem == "c") throw TE("failed")
1234+
else elem
1235+
}
1236+
}
1237+
1238+
val probe =
1239+
RestartSourceWithContext.onFailuresWithBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink())
1240+
1241+
probe.requestNext("a" -> 0)
1242+
probe.requestNext("b" -> 1)
1243+
// offset 2 is "c" which blew up, triggering a restart
1244+
probe.requestNext("a" -> 3)
1245+
probe.requestNext("b" -> 4)
1246+
// offset 5 is "c", dropped in the restarting
1247+
probe.requestNext("a" -> 6)
1248+
1249+
created.get() shouldBe 3
1250+
1251+
probe.cancel()
1252+
}
1253+
1254+
"not restart when maxRestarts is reached" taggedAs TimingTest in {
1255+
val created = new AtomicInteger()
1256+
val probe = RestartSourceWithContext
1257+
.withBackoff(shortRestartSettings.withMaxRestarts(1, shortMinBackoff)) { () =>
1258+
SourceWithContext.fromTuples(Source.single("a").map(_ -> created.getAndIncrement()))
1259+
}
1260+
.runWith(TestSink())
1261+
1262+
probe.requestNext("a" -> 0)
1263+
probe.requestNext("a" -> 1)
1264+
probe.expectComplete()
1265+
1266+
created.get() shouldBe 2
1267+
1268+
probe.cancel()
1269+
}
1270+
}
10171271
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.stream.javadsl
6+
7+
import akka.NotUsed
8+
import akka.japi.function.Creator
9+
import akka.stream.RestartSettings
10+
import akka.stream.scaladsl
11+
12+
/**
13+
* A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails.
14+
*
15+
* They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will,
16+
* e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the SourceWithContext restarts.
17+
*/
18+
object RestartSourceWithContext {
19+
20+
/**
21+
* Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails or completes using an exponential backoff.
22+
*
23+
* The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext will be canceled and will not be restarted.
24+
*
25+
* @param settings [[RestartSettings]] defining the restart configuration
26+
* @param sourceFactory A factory for producing the SourceWithContext to wrap
27+
*/
28+
def withBackoff[T, C](
29+
settings: RestartSettings,
30+
sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = {
31+
val underlyingFactory = () => sourceFactory.create().asScala
32+
new SourceWithContext(scaladsl.RestartSourceWithContext.withBackoff(settings)(underlyingFactory))
33+
}
34+
35+
/**
36+
* Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails using an exponential backoff.
37+
*
38+
* The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext if currently running will be canceled and will not be restarted.
39+
*
40+
* @param settings [[RestartSettings]] defining the restart configuration
41+
* @param sourceFactory A factory for producing the SourceWithContext to wrap
42+
*/
43+
def onFailuresWithBackoff[T, C](
44+
settings: RestartSettings,
45+
sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = {
46+
val underlyingFactory = () => sourceFactory.create().asScala
47+
new SourceWithContext(scaladsl.RestartSourceWithContext.onFailuresWithBackoff(settings)(underlyingFactory))
48+
}
49+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.stream.scaladsl
6+
7+
import akka.NotUsed
8+
import akka.stream.RestartSettings
9+
10+
/**
11+
* A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails.
12+
*
13+
* They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will,
14+
* e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the [[SourceWithContext]] restarts.
15+
*/
16+
object RestartSourceWithContext {
17+
18+
/**
19+
* Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails or completes using an exponential backoff.
20+
*
21+
* The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted.
22+
*
23+
* @param settings [[RestartSettings]] defining restart configuration
24+
* @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap
25+
*/
26+
def withBackoff[T, C](settings: RestartSettings)(
27+
sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = {
28+
val underlyingFactory = () => sourceFactory().asSource
29+
SourceWithContext.fromTuples(
30+
Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = false)))
31+
}
32+
33+
/**
34+
* Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails using an exponential backoff.
35+
*
36+
* The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted.
37+
*
38+
* @param settings [[RestartSettings]] defining restart configuration
39+
* @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap
40+
*/
41+
def onFailuresWithBackoff[T, C](settings: RestartSettings)(
42+
sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = {
43+
val underlyingFactory = () => sourceFactory().asSource
44+
SourceWithContext.fromTuples(
45+
Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = true)))
46+
}
47+
}

0 commit comments

Comments
 (0)