diff --git a/src/Pipes.Util.purs b/src/Pipes.Util.purs index 6d780fd..a211ea9 100644 --- a/src/Pipes.Util.purs +++ b/src/Pipes.Util.purs @@ -15,9 +15,12 @@ import Data.Either (hush) import Data.HashSet as HashSet import Data.Hashable (class Hashable, hash) import Data.List.NonEmpty (NonEmptyList) -import Data.Maybe (Maybe(..), fromMaybe) +import Data.Maybe (Maybe(..), fromMaybe, maybe) +import Data.Traversable (traverse_) import Data.Tuple.Nested (type (/\), (/\)) import Effect.Class (class MonadEffect, liftEffect) +import Node.Buffer (Buffer) +import Node.Buffer as Buffer import Pipes (await, yield) import Pipes as Pipes import Pipes.Core (Pipe, Producer) @@ -96,6 +99,30 @@ chunked size = do yield Nothing +-- | Buffers input to the given size before passing to subsequent pipes +buffered :: forall m. MonadEffect m => Int -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit +buffered size = do + chunkST :: STRef _ (Maybe Buffer) <- liftEffect $ liftST $ STRef.new Nothing + + let + chunkClear = liftEffect $ liftST $ STRef.write Nothing chunkST + chunkPeek = liftEffect $ liftST $ STRef.read chunkST + chunkLen = maybe (pure 0) (liftEffect <<< Buffer.size) =<< chunkPeek + chunkPut b = liftEffect do + new <- liftST (STRef.read chunkST) >>= maybe (pure b) (\a -> Buffer.concat [a, b]) + void $ liftST $ STRef.write (Just new) chunkST + pure new + + Rec.whileJust $ runMaybeT do + a <- MaybeT await + buf <- chunkPut a + len <- lift chunkLen + when (len > size) $ chunkClear *> lift (yield $ Just buf) + + len <- chunkLen + chunkPeek >>= traverse_ (when (len > 0) <<< yield <<< Just) + yield Nothing + -- | Equivalent of unix `uniq`, filtering out duplicate values passed to it. -- | -- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe @@ -149,5 +176,3 @@ invoke m a = Pure _ -> pure Exited in go (IDidNotAwait m) - -