Skip to content

Conversation

@leviramsey
Copy link
Contributor

SourceWithContext cannot be wrapped into a RestartSource.

val sourceWithContext = SourceWithContext.fromTuples(Source(Seq("A", "B", "C").zipWithIndex))

RestartSource.onFailuresWithBackoff(RestartSettings(1.milli, 10.millis, 0.2)) { () => sourceWithContext }

       error: type mismatch;
        found   : akka.stream.scaladsl.SourceWithContext[String,Int,akka.NotUsed]
        required: akka.stream.scaladsl.Source[?, _]

I would assume that similar limitations apply to SinkWithContext/FlowWithContext, but it's not clear how well their respective limitations with respect to dropping on restart mesh with the intended use of the ...WithContext variations for carrying something like a Kafka or Projection offset, which tend to be where an at-least-once guarantee is desired.

@leviramsey leviramsey force-pushed the restart-source-with-context branch from c40f32f to 2d0ef75 Compare October 16, 2023 18:54
@leviramsey
Copy link
Contributor Author

leviramsey commented Oct 16, 2023

Weird, I can't even get Test / compile to work for me with Scala 3.3 due to an akka-(actor|cluster)-typed weirdness (so not brought in by this change)

akka > +~ 3.3 Test/compile
[error] /home2/levi/code/akka/akka-actor-typed/src/main/scala-3/akka/actor/typed/internal/receptionist/Platform.scala:16: error: ; expected but match found
[error]   type Service[K <: Aux[_]] = K match {
[error]                                 ^: /home2/levi/code/akka/akka-actor-typed/src/main/scala-3/akka/actor/typed/internal/receptionist/Platform.scala
[error] /home2/levi/code/akka/akka-cluster-typed/src/main/scala-3/akka/cluster/typed/internal/receptionist/ClusterReceptionistProtocol.scala:18: error: ; expected but match found
[error]   type SubscriptionsKV[K <: Aux[_]] = K match {
[error]                                         ^: /home2/levi/code/akka/akka-cluster-typed/src/main/scala-3/akka/cluster/typed/internal/receptionist/ClusterReceptionistProtocol.scala
[error] (akka-actor-typed / Compile / scalafmt) /home2/levi/code/akka/akka-actor-typed/src/main/scala-3/akka/actor/typed/internal/receptionist/Platform.scala:16: error: ; expected but match found
[error]   type Service[K <: Aux[_]] = K match {
[error]                                 ^: /home2/levi/code/akka/akka-actor-typed/src/main/scala-3/akka/actor/typed/internal/receptionist/Platform.scala
[error] (akka-cluster-typed / Compile / scalafmt) /home2/levi/code/akka/akka-cluster-typed/src/main/scala-3/akka/cluster/typed/internal/receptionist/ClusterReceptionistProtocol.scala:18: error: ; expected but match found
[error]   type SubscriptionsKV[K <: Aux[_]] = K match {
[error]                                         ^: /home2/levi/code/akka/akka-cluster-typed/src/main/scala-3/akka/cluster/typed/internal/receptionist/ClusterReceptionistProtocol.scala

EDIT: scalafmt (see #32186 & #32187)

@leviramsey leviramsey force-pushed the restart-source-with-context branch from 2d0ef75 to 565e052 Compare October 24, 2023 18:46
@leviramsey leviramsey closed this Nov 9, 2023
@leviramsey leviramsey reopened this Nov 9, 2023
@leviramsey
Copy link
Contributor Author

Close/reopening for CI

@leviramsey leviramsey force-pushed the restart-source-with-context branch from 565e052 to 35b962f Compare November 28, 2023 14:52
Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, needs Java APIs

Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing then I think this is ready to go.

*/
def withBackoff[T, C](
settings: RestartSettings,
sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The akka.japi.function interfaces are parallel with the JDK stdlib ones but add the capability to throw checked, for example for usage in actor message handling. That's not important here, so let's go with java.util.function.Supplier instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used Creator to mirror the other Restart* factories... suppose there should be an issue to move those to Supplier (which would be binary incompat, but source compat) in 2.10?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, let's keep it aligned then 👍

Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@johanandren johanandren merged commit 6e080c0 into akka:main Dec 5, 2023
@johanandren johanandren added this to the 2.9.1 milestone Dec 5, 2023
He-Pin pushed a commit to He-Pin/akka that referenced this pull request Jan 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants