-
-
Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
| package zio | |
| import java.io.{ InputStream, OutputStream } | |
| import java.nio.charset.StandardCharsets | |
| object streaming { | |
| type Pull[-R, +E, +I] = ZIO[R, Option[E], I] | |
| type Push[-R, +E, -I] = I => ZIO[R, E, Unit] | |
| type Step[-R, +E, -I, +O] = I => ZIO[R, E, O] | |
| def example1(is: InputStream): URIO[ZEnv, Unit] = | |
| ZStream | |
| .fromInputStream(is) | |
| .run(ZTransducer.utf8Decode >>> ZTransducer.newLines >>> ZSink.putStrLn.chunked) | |
| final class ZSink[-R, +E, -I, +Z](val process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]) { | |
| def chunked: ZSink[R, E, Chunk[I], Z] = | |
| ZSink(process.map { case (push, done) => (ZIO.foreach_(_)(push), done) }) | |
| } | |
| final class ZStream[-R, +E, +I](val pull: URManaged[R, Pull[R, E, I]]) { | |
| def >>>[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] = | |
| ZStream(pull.zip(ZRef.makeManaged(false)).zipWith(transducer.process) { | |
| case ((pull, done), (step, last)) => | |
| val pipe = pull.foldCauseM( | |
| Cause | |
| .sequenceCauseOption(_) | |
| .fold(last.foldCauseM(Pull.halt, _.fold[Pull[Any, E, A]](Pull.end)(done.set(true).as(_))))(Pull.halt), | |
| step(_).foldCauseM(Pull.halt, Pull.emit) | |
| ) | |
| ZIO.ifM(done.get)(Pull.end, pipe) | |
| }) | |
| def aggregate[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] = | |
| >>>(transducer) | |
| def flatMap[R1 <: R, E1 >: E, A](f: I => ZStream[R1, E1, A]): ZStream[R1, E1, A] = ??? | |
| def head: ZIO[R, Option[E], I] = | |
| pull.use(identity) | |
| def run[R1 <: R, E1 >: E, I1 >: I, Z](sink: ZSink[R1, E1, I1, Z]): ZIO[R1, E1, Z] = | |
| (pull <*> sink.process).use { | |
| case (pull, (push, done)) => | |
| def go: ZIO[R1, E1, Z] = pull.foldCauseM(Cause.sequenceCauseOption(_).fold(done)(ZIO.halt(_)), push(_) *> go) | |
| go | |
| } | |
| } | |
| final class ZTransducer[-R, +E, -I, +O](val process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]) { | |
| def >>>[R1 <: R, E1 >: E, A](transducer: ZTransducer[R1, E1, O, A]): ZTransducer[R1, E1, I, A] = | |
| ZTransducer(process.zipWith(transducer.process) { | |
| case ((ls, lo), (rs, ro)) => | |
| (ls.andThen(_.flatMap(rs)), lo.flatMap(_.fold(ro)(rs(_) *> ro))) | |
| }) | |
| def >>>[R1 <: R, E1 >: E, Z](sink: ZSink[R1, E1, O, Z]): ZSink[R1, E1, I, Z] = | |
| ZSink(process.zipWith(sink.process) { | |
| case ((step, last), (push, done)) => | |
| (step.andThen(_.flatMap(push)), last.flatMap(_.fold(done)(push(_) *> done))) | |
| }) | |
| def chunked: ZTransducer[R, E, Chunk[I], Chunk[O]] = | |
| ZTransducer(process.map { | |
| case (step, last) => | |
| ((chunk: Chunk[I]) => ZIO.foreach(chunk)(step), last.map(_.map(Chunk.single))) | |
| }) | |
| } | |
| object Pull { | |
| val end: IO[Option[Nothing], Nothing] = ZIO.fail(None) | |
| def emit[A](a: A): UIO[A] = ZIO.succeedNow(a) | |
| def halt[E](cause: Cause[E]): IO[Option[E], Nothing] = ZIO.halt(cause.map(Option.apply)) | |
| } | |
| object ZSink { | |
| val putStrLn: ZSink[console.Console, Nothing, String, Unit] = | |
| apply(ZManaged.succeedNow((console.putStrLn(_), ZIO.unit))) | |
| def apply[R, E, I, Z](process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]): ZSink[R, E, I, Z] = | |
| new ZSink(process) | |
| def foldLeftM[R, E, I, Z](z: Z)(f: (I, Z) => ZIO[R, E, Z]): ZSink[R, E, I, Z] = | |
| apply(ZRef.makeManaged(z).map(ref => ((i: I) => ref.get.flatMap(f(i, _).flatMap(ref.set)), ref.get))) | |
| def head[A]: ZSink[Any, Nothing, A, Option[A]] = | |
| apply( | |
| ZRef | |
| .makeManaged(Option.empty[A]) | |
| .zipWith(ZRef.makeManaged(true))((ref, empty) => (a => ref.set(Some(a)).whenM(empty.get), ref.get)) | |
| ) | |
| def last[A]: ZSink[Any, Nothing, A, Option[A]] = | |
| apply(ZRef.makeManaged(Option.empty[A]).map(ref => (a => ref.set(Some(a)), ref.get))) | |
| def toOutputStream(os: OutputStream): ZSink[Any, Nothing, Chunk[Byte], Int] = ??? | |
| } | |
| object ZStream { | |
| def apply[R, E, I](pull: URManaged[R, Pull[R, E, I]]): ZStream[R, E, I] = | |
| new ZStream(pull) | |
| def fromChunk[I](chunk: Chunk[I]): ZStream[Any, Nothing, I] = ??? | |
| def fromInputStream(is: InputStream): ZStream[Any, Nothing, Chunk[Byte]] = ??? | |
| } | |
| object ZTransducer { | |
| val newLines: ZTransducer[system.System, Nothing, String, Chunk[String]] = | |
| apply( | |
| ZRef | |
| .makeManaged("") | |
| .zipWith(ZIO.accessM[system.System](_.get.lineSeparator).toManaged_) { (ref, sep) => | |
| val di = sep.length | |
| ((s: String) => | |
| ref.get.flatMap( | |
| l => | |
| ZIO.effectSuspendTotal { | |
| val cb = ChunkBuilder.make[String]() | |
| var rem = l ++ s | |
| var i = rem.indexOf(sep) | |
| while (i != -1) { | |
| cb += rem.take(i) | |
| rem = rem.drop(i + di) | |
| i = rem.indexOf(sep) | |
| } | |
| ref.set(rem).as(cb.result()) | |
| } | |
| ), | |
| ref | |
| .getAndSet("") | |
| .map( | |
| s => | |
| if (s.isEmpty) None | |
| else Option(Chunk.single(s)) | |
| )) | |
| } | |
| ) | |
| val utf8Decode: ZTransducer[Any, Nothing, Chunk[Byte], String] = | |
| succeed(chunk => ZIO.succeedNow(new String(chunk.toArray, StandardCharsets.UTF_8))) | |
| def apply[R, E, I, O](process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]): ZTransducer[R, E, I, O] = | |
| new ZTransducer(process) | |
| def chunkLimit[A](max: Int): ZTransducer[Any, Nothing, Chunk[A], Chunk[Chunk[A]]] = | |
| succeed(chunk => ZIO.succeedNow(Chunk.fromIterable(chunk.grouped(max).toList))) // TODO use ChunkBuilder | |
| def chunkN[R, E, A]( | |
| size: Int, | |
| pad: Chunk[A] => ZIO[R, E, Option[Chunk[Chunk[A]]]] | |
| ): ZTransducer[R, E, Chunk[A], Chunk[Chunk[A]]] = | |
| apply( | |
| ZRef | |
| .makeManaged(Chunk.empty: Chunk[A]) | |
| .map( | |
| ref => | |
| (chunk => | |
| ref.get.flatMap( | |
| rem => | |
| ZIO.effectSuspendTotal { | |
| val cs = Chunk.fromIterable((rem ++ chunk).grouped(size).toList) // TODO use ChunkBuilder | |
| if (cs.last.length == size) ref.set(Chunk.empty).as(cs) else ref.set(cs.last).as(cs.init) | |
| } | |
| ), | |
| ref | |
| .getAndSet(Chunk.empty) | |
| .flatMap( | |
| rem => | |
| if (rem.isEmpty) ZIO.none | |
| else pad(rem) | |
| )) | |
| ) | |
| ) | |
| def identity[A]: ZTransducer[Any, Nothing, A, A] = | |
| succeed(ZIO.succeedNow) | |
| def succeed[R, E, I, O](step: Step[R, E, I, O], last: ZIO[R, E, Option[O]] = ZIO.none): ZTransducer[R, E, I, O] = | |
| apply(ZManaged.succeedNow(step -> last)) | |
| } | |
| } |
Note that ZStream.head is defined as
def head: ZIO[R, Option[E], I] =
pull.use(identity)Extracting only the first element is not a stream processing operation. This usecase treats ZStream as a collection.
To run the whole stream and return the head, run the stream with ZSink.head.
Thank you for suggesting this! The previous ZStream encoding did not use chunks internally, and it was a conscious decision to extensively embed chunks so that users can enjoy their performance improvements without using them explicitly.
@iravid
I agree that acceptable performance can only be achieved through chunking.
The part I disagree with is that the library has to force this on every stream instead of chunking only when required.
ZStream[R, E, Byte] looks better than ZStream[R, E, Chunk[Byte]] but what utility does it add?
I understand that your are busy and that this may not be convincing enough.
I also know that the streams encoding was changed recently and this must have been an exhausting effort.
If I can come up with some benchmarks for the encodings, would you be open to further this discussion?
I have created a branch with a basic implementation and a test.
This is how the new test for chunkN looks like.
testM("pads last element to size")(
checkM(Gen.chunkOfBounded(0, 5)(Gen.chunkOf(Gen.anyInt)))(chunk =>
ZStream
.fromChunk(chunk)
.run(ZTransducer.chunkN[Int](3, 0) >>> ZSink.collect[Int].chunked.chunked)
.map(xs => assert(xs)(equalTo(chunk.flatten.padTo(xs.length, 0))))
)
)Note the double use of chunked combinator on ZSink.collect[Int]. The best part is that the new encoding ensures that no intermediate chunks are created while collecting!
Or I could open a WIP pull request?
Always happy to see additional approaches! However the use of chunks is non-negotiable at this point. Can be reconsidered for 2.0.
Thanks @iravid.
I will continue working on this and bug you ocassionally :)
This re-design addresses 2 separate concerns with the current encoding:
Chunk.Optionat every step in the pipeline.The concerns are addressed by
Chunkis treated as a normal value and appears in signatures likeZSinkandZTransducer, respectively, from the back-propagation of the final result.