feat: Pipe.Util.buffered

This commit is contained in:
orion 2024-07-08 12:40:25 -05:00
parent 43ff92a4ad
commit dc7a2d3387
Signed by: orion
GPG Key ID: 6D4165AE4C928719

View File

@ -15,9 +15,12 @@ import Data.Either (hush)
import Data.HashSet as HashSet import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash) import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList) import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..), fromMaybe) import Data.Maybe (Maybe(..), fromMaybe, maybe)
import Data.Traversable (traverse_)
import Data.Tuple.Nested (type (/\), (/\)) import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Pipes (await, yield) import Pipes (await, yield)
import Pipes as Pipes import Pipes as Pipes
import Pipes.Core (Pipe, Producer) import Pipes.Core (Pipe, Producer)
@ -96,6 +99,30 @@ chunked size = do
yield Nothing yield Nothing
-- | Buffers input to the given size before passing to subsequent pipes
buffered :: forall m. MonadEffect m => Int -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
buffered size = do
chunkST :: STRef _ (Maybe Buffer) <- liftEffect $ liftST $ STRef.new Nothing
let
chunkClear = liftEffect $ liftST $ STRef.write Nothing chunkST
chunkPeek = liftEffect $ liftST $ STRef.read chunkST
chunkLen = maybe (pure 0) (liftEffect <<< Buffer.size) =<< chunkPeek
chunkPut b = liftEffect do
new <- liftST (STRef.read chunkST) >>= maybe (pure b) (\a -> Buffer.concat [a, b])
void $ liftST $ STRef.write (Just new) chunkST
pure new
Rec.whileJust $ runMaybeT do
a <- MaybeT await
buf <- chunkPut a
len <- lift chunkLen
when (len > size) $ chunkClear *> lift (yield $ Just buf)
len <- chunkLen
chunkPeek >>= traverse_ (when (len > 0) <<< yield <<< Just)
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it. -- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.
-- | -- |
-- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe -- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe
@ -149,5 +176,3 @@ invoke m a =
Pure _ -> pure Exited Pure _ -> pure Exited
in in
go (IDidNotAwait m) go (IDidNotAwait m)