Skip to content

Instantly share code, notes, and snippets.

@Yotamho
Last active February 15, 2021 14:12
Show Gist options
  • Select an option

  • Save Yotamho/d8b8bff1b929ca7b3b9602e49d417aba to your computer and use it in GitHub Desktop.

Select an option

Save Yotamho/d8b8bff1b929ca7b3b9602e49d417aba to your computer and use it in GitHub Desktop.
import cats.Monoid
import cats.effect.Sync
object Fs2AccumulateWhile {
def mapAccumulateWhile[F[_], S, O, O2](
f: O => (S, O2),
p: S => Boolean,
publishState: S => F[Unit]
)(implicit F: Sync[F], S: Monoid[S]): fs2.Pipe[F, O, O2] = {
def go(s: fs2.Stream[F, O], state: S): fs2.Pull[F, O2, Unit] =
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
val (nextState, elem) = f(hd)
val combinedState = S.combine(state, nextState)
if (p(combinedState))
fs2.Pull.output1(elem) >> go(tl, combinedState)
else
fs2.Pull.eval(publishState(combinedState))
case None =>
fs2.Pull.eval(publishState(state))
}
go(_, S.empty).stream
}
def collectAccumulateWhile[F[_], S, O, O2](
f: O => (S, Option[O2]),
p: S => Boolean,
publishState: S => F[Unit]
)(implicit F: Sync[F], S: Monoid[S]): fs2.Pipe[F, O, O2] = {
def go(s: fs2.Stream[F, O], state: S): fs2.Pull[F, O2, Unit] =
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
val (nextState, elemOpt) = f(hd)
val combinedState = S.combine(state, nextState)
if (p(combinedState))
elemOpt match {
case Some(elem) => fs2.Pull.output1(elem) >> go(tl, combinedState)
case None => go(tl, combinedState)
}
else
fs2.Pull.eval(publishState(combinedState))
case None =>
fs2.Pull.eval(publishState(state))
}
go(_, S.empty).stream
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment