diff --git a/src/Node.Stream.Object.purs b/src/Node.Stream.Object.purs index 177c087..ac55465 100644 --- a/src/Node.Stream.Object.purs +++ b/src/Node.Stream.Object.purs @@ -160,11 +160,11 @@ awaitReadableOrClosed s = do length <- liftEffect $ readableLength s when (readable && length == 0) $ liftEither - =<< parOneOf - [ onceAff0 readableH s $> Right unit - , onceAff0 closeH s $> Right unit - , Left <$> onceAff1 errorH s - ] + =<< parOneOf + [ onceAff0 readableH s $> Right unit + , onceAff0 closeH s $> Right unit + , Left <$> onceAff1 errorH s + ] awaitFinished :: forall s a. Write s a => s -> Aff Unit awaitFinished s = onceAff0 finishH s diff --git a/src/Pipes.Collect.purs b/src/Pipes.Collect.purs index 2dd1852..b45ec93 100644 --- a/src/Pipes.Collect.purs +++ b/src/Pipes.Collect.purs @@ -12,11 +12,14 @@ import Data.List (List) import Data.List as List import Data.Map (Map) import Data.Map as Map +import Data.Maybe (Maybe(..), maybe) import Data.Tuple.Nested (type (/\), (/\)) import Effect.Class (class MonadEffect, liftEffect) import Foreign.Object (Object) import Foreign.Object.ST as Object.ST import Foreign.Object.ST.Unsafe as Object.ST.Unsafe +import Node.Buffer (Buffer) +import Node.Buffer as Buffer import Pipes.Core (Producer) import Pipes.Internal (Proxy(..)) @@ -48,6 +51,21 @@ fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0 foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit foreach f p0 = traverse (\_ a -> f a) unit p0 +-- | Concatenate all produced buffers +toBuffer :: forall m. MonadRec m => MonadEffect m => Producer Buffer m Unit -> m Buffer +toBuffer p = + (liftEffect <<< maybe (Buffer.alloc 0) pure) + =<< traverse + ( flip \b -> + case _ of + Just acc -> do + new <- liftEffect $ Buffer.concat [ acc, b ] + pure $ Just new + _ -> pure $ Just b + ) + Nothing + p + -- | Collect all values from a `Producer` into an array. toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a) toArray p = do diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index 39b9db1..afb400f 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -113,7 +113,7 @@ fromTransform t = if needsDrain then do liftAff $ delay $ wrap 0.0 yieldWhileReadable - pure $ Loop {error, cancel} + pure $ Loop { error, cancel } else do ma <- await case ma of