feat: Pipes.Util.chunked

This commit is contained in:
orion kindel 2024-05-10 18:30:08 -05:00
parent 821a47229c
commit 76958b63ef
Signed by: orion
GPG Key ID: 6D4165AE4C928719
2 changed files with 44 additions and 13 deletions

View File

@ -25,7 +25,6 @@ workspace:
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies: test_dependencies:
- console - console

View File

@ -2,9 +2,14 @@ module Pipes.Util where
import Prelude import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust) import Control.Monad.Rec.Class (whileJust)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Array.ST (STArray)
import Data.Array.ST as Array.ST
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..))
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield) import Pipes (await, yield)
@ -18,15 +23,42 @@ import Pipes.Core (Pipe)
-- | ``` -- | ```
intersperse :: forall m a. MonadEffect m => a -> Pipe (Maybe a) (Maybe a) m Unit intersperse :: forall m a. MonadEffect m => a -> Pipe (Maybe a) (Maybe a) m Unit
intersperse sep = do intersperse sep = do
isFirst <- liftEffect $ liftST $ STRef.new true isFirstST <- liftEffect $ liftST $ STRef.new true
whileJust do let
ma <- await getIsFirst = liftEffect $ liftST $ STRef.read isFirstST
isFirst' <- liftEffect $ liftST $ STRef.read isFirst markNotFirst = void $ liftEffect $ liftST $ STRef.write false isFirstST
case ma of
Just a whileJust $ runMaybeT do
| isFirst' -> do a <- MaybeT await
void $ liftEffect $ liftST $ STRef.write false isFirst isFirst <- getIsFirst
yield $ Just a if isFirst then markNotFirst else lift $ yield $ Just sep
| otherwise -> yield (Just sep) *> yield (Just a) lift $ yield $ Just a
Nothing -> yield Nothing
pure $ void ma yield Nothing
-- | Accumulate values in chunks of a given size.
-- |
-- | If the pipe closes without yielding a multiple of `size` elements,
-- | the remaining elements are yielded at the end.
chunked :: forall m a. MonadEffect m => Int -> Pipe (Maybe a) (Maybe (Array a)) m Unit
chunked size = do
chunkST :: STRef _ (STArray _ a) <- liftEffect $ liftST $ STRef.new =<< Array.ST.new
let
chunkPut a = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
void $ Array.ST.push a chunkArray
chunkLength = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
Array.ST.length chunkArray
chunkTake = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
void $ flip STRef.write chunkST =<< Array.ST.new
Array.ST.unsafeFreeze chunkArray
whileJust $ runMaybeT do
a <- MaybeT await
chunkPut a
len <- chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake
yield =<< Just <$> chunkTake
yield Nothing