Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

At least once semantics #259

Open
reibitto opened this issue Apr 26, 2020 · 4 comments
Open

At least once semantics #259

reibitto opened this issue Apr 26, 2020 · 4 comments

Comments

@reibitto
Copy link

I'm looking to migrate to zio-sqs, but right now as far as I can tell there are 2 main modes of operation:

  • autoDelete=true: This is basically at most once semantics. If your handler dies or there is a hard crash, the message you were in the middle of handling is lost forever.
  • autoDelete=false: Manual mode. You have to ensure to call deleteMessage and so on properly yourself.

I think it would be nice if there was something similar to Alpakka's SqsAckSink:
https://doc.akka.io/docs/alpakka/current/sqs.html#updating-message-statuses

Basically through the type system to ensure that you handle what to do with a message after your handler has run. This gives you at least once semantics (or if you ensure your handler is idempotent, then it's effectively exactly once semantics).

A sealed trait with the following cases would cover everything I believe:

MessageAction

  • Done / Delete
  • Skip / Ignore
  • RetryLater(visibilityTimeout) / ChangeMessageVisibility(visibilityTimeout)

Not sure on the specific naming to use, but that's the basic idea.

Is this something that makes sense for zio-sqs?

@reibitto
Copy link
Author

I see there's retryDelay and retryMaxCount right now. I'm not sure if there's a different design to reconcile that with what I'm proposing.

The sealed trait I mentioned could maybe encode the concept of recoverable errors, unrecoverable errors, and maybe different retry policies (like Schedule.exponential) and so on. If we go with that kind of design, retryDelay and retryMaxCount kind of don't make sense anymore (it's too "global" of a setting).

This might require some more thought.

@reibitto
Copy link
Author

To give a better idea of the alternate design I was thinking of:

sealed trait MessageAction
object MessageAction {
  case object Success extends MessageAction
  case object Ignore extends MessageAction
  case class Delay(delay: Duration) extends MessageAction
  case class RecoverableError[E](error: E, retryPolicy: Schedule[Clock, Any, Duration]) extends MessageAction
  case class UnrecoverableError[E](error: E) extends MessageAction
}

For the RecoverableError case you calculate whether to retry or not and for how long based on ZIO Schedule and the ApproximateReceiveCount header. That means you can do stuff like Schedule.exponential(1.second) && Schedule.recurs(10) and put it in RecoverableError to say "retry 10 times using exponential backoff" or anything else you can express with Schedule.

This design feels more "ZIO" to me than my original proposal.

And maybe you could make retryPolicy a default param, with Schedule.spaced(settings.retryDelay) && Schedule.recurs(settings.retryMaxCount) if you want to keep those settings and not make a breaking change.

@ghostdogpr
Copy link
Member

The retry stuff only exists on the Publisher right now. I understand the need for Delay as an equivalent to Akka ChangeMessageVisibility, but for retrying can't you just use the combinators on ZStream?

@reibitto
Copy link
Author

Oops, missed the fact that retry was on ProducerSettings.

As for the relying on retries on ZStream for the consumer, that would work mostly fine in a single consumer case (with the exception of App restarts/crashes... you'll start retrying from 0 again). The issue is mostly with multiple consumers.

If you have multiple consumers in a distributed system pulling entries from the same queue, you can't share a ZIO retry policy between them. This is why I use ApproximateReceiveCount to drive retries and exponential backoff.

Another case for Delay(0) or Ignore: Let's say I'm doing a rolling deployment with my consumer nodes. They will be running different versions for a while. In this kind of case you may want to say, "I don't know how to handle this version of the message. Let some other consumer pick it up."

By the way, I created a separate issue in ZIO that's kind of a prerequisite for the retries I had in mind:
zio/zio#3468

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants