Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.javadsl

import akka.NotUsed
import akka.japi.function.Creator
import akka.stream.RestartSettings
import akka.stream.scaladsl

/**
* A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will,
* e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the SourceWithContext restarts.
*/
object RestartSourceWithContext {

/**
* Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails or completes using an exponential backoff.
*
* The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext will be canceled and will not be restarted.
*
* @param settings [[RestartSettings]] defining the restart configuration
* @param sourceFactory A factory for producing the SourceWithContext to wrap
*/
def withBackoff[T, C](
settings: RestartSettings,
sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The akka.japi.function interfaces are parallel with the JDK stdlib ones but add the capability to throw checked, for example for usage in actor message handling. That's not important here, so let's go with java.util.function.Supplier instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used Creator to mirror the other Restart* factories... suppose there should be an issue to move those to Supplier (which would be binary incompat, but source compat) in 2.10?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, let's keep it aligned then 👍

val underlyingFactory = () => sourceFactory.create().asScala
new SourceWithContext(scaladsl.RestartSourceWithContext.withBackoff(settings)(underlyingFactory))
}

/**
* Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails using an exponential backoff.
*
* The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext if currently running will be canceled and will not be restarted.
*
* @param settings [[RestartSettings]] defining the restart configuration
* @param sourceFactory A factory for producing the SourceWithContext to wrap
*/
def onFailuresWithBackoff[T, C](
settings: RestartSettings,
sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = {
val underlyingFactory = () => sourceFactory.create().asScala
new SourceWithContext(scaladsl.RestartSourceWithContext.onFailuresWithBackoff(settings)(underlyingFactory))
}
}