From a5c535fb1e5d7f3974ef72e64a4bb3ae170910ea Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Mon, 13 May 2024 11:21:06 -0500 Subject: [PATCH] feat: Pipes.Construct --- spago.yaml | 2 ++ src/Node.Stream.Object.purs | 6 ++++ src/Pipes.Collect.purs | 41 ++++++++-------------- src/Pipes.Construct.purs | 63 ++++++++++++++++++++++++++++++++++ src/Pipes.Node.Stream.purs | 1 + src/Pipes.Util.purs | 32 ++++++++++++++--- test/Test/Main.purs | 4 ++- test/Test/Pipes.Construct.purs | 58 +++++++++++++++++++++++++++++++ test/Test/Pipes.Node.FS.purs | 4 +-- 9 files changed, 177 insertions(+), 34 deletions(-) create mode 100644 test/Test/Pipes.Construct.purs diff --git a/spago.yaml b/spago.yaml index 31c593a..eaafb00 100644 --- a/spago.yaml +++ b/spago.yaml @@ -40,6 +40,8 @@ package: - unsafe-coerce: ">=6.0.0 <7.0.0" test: main: Test.Main + build: + strict: true dependencies: - console - gen diff --git a/src/Node.Stream.Object.purs b/src/Node.Stream.Object.purs index 5d57aea..1cbe20d 100644 --- a/src/Node.Stream.Object.purs +++ b/src/Node.Stream.Object.purs @@ -152,6 +152,9 @@ awaitReadableOrClosed s = do when (not ended && not closed && not readable) $ liftEither =<< parOneOf [ onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] +awaitFinished :: forall s a. Write s a => s -> Aff Unit +awaitFinished s = onceAff0 finishH s + awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit awaitWritableOrClosed s = do closed <- liftEffect $ isClosed s @@ -184,3 +187,6 @@ errorH = EventHandle "error" mkEffectFn1 endH :: forall s a. Write s a => EventHandle0 s endH = EventHandle "end" identity + +finishH :: forall s a. Write s a => EventHandle0 s +finishH = EventHandle "finish" identity diff --git a/src/Pipes.Collect.purs b/src/Pipes.Collect.purs index b2d72c8..06f9dbf 100644 --- a/src/Pipes.Collect.purs +++ b/src/Pipes.Collect.purs @@ -2,12 +2,9 @@ module Pipes.Collect where import Prelude -import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.ST.Class (liftST) -import Control.Monad.Trans.Class (lift) import Data.Array.ST as Array.ST -import Data.Either (hush) import Data.HashMap (HashMap) import Data.HashMap as HashMap import Data.Hashable (class Hashable) @@ -15,45 +12,37 @@ import Data.List (List) import Data.List as List import Data.Map (Map) import Data.Map as Map -import Data.Maybe (fromMaybe) import Data.Tuple.Nested (type (/\), (/\)) import Effect.Class (class MonadEffect, liftEffect) import Foreign.Object (Object) import Foreign.Object.ST as Object.ST import Foreign.Object.ST.Unsafe as Object.ST.Unsafe -import Pipes (next) as Pipes import Pipes.Core (Producer) - --- | Fold every value produced --- | --- | Uses `MonadRec`, supporting producers of arbitrary length. -fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b -fold f b p = - let - insertNext b' p' = runMaybeT do - a /\ p'' <- MaybeT $ hush <$> Pipes.next p' - pure $ Loop $ f b' a /\ p'' - in - flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p' +import Pipes.Internal (Proxy(..)) -- | Fold every value produced with a monadic action -- | -- | Uses `MonadRec`, supporting producers of arbitrary length. traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Unit -> m b -traverse f b p = - let - insertNext b' p' = runMaybeT do - a /\ p'' <- MaybeT $ hush <$> Pipes.next p' - b'' <- lift $ f b' a - pure $ Loop $ b'' /\ p'' - in - flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p' +traverse f b0 p0 = + flip tailRecM (p0 /\ b0) \(p /\ b) -> + case p of + Respond a m -> Loop <$> (m unit /\ _) <$> f b a + M m -> Loop <$> (_ /\ b) <$> m + Request _ _ -> pure $ Done b + Pure _ -> pure $ Done b + +-- | Fold every value produced +-- | +-- | Uses `MonadRec`, supporting producers of arbitrary length. +fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b +fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0 -- | Execute a monadic action on every item in a producer. -- | -- | Uses `MonadRec`, supporting producers of arbitrary length. foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit -foreach f = traverse (const f) unit +foreach f p0 = traverse (\_ a -> f a) unit p0 -- | Collect all values from a `Producer` into an array. toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a) diff --git a/src/Pipes.Construct.purs b/src/Pipes.Construct.purs index 88e6c43..254eeb5 100644 --- a/src/Pipes.Construct.purs +++ b/src/Pipes.Construct.purs @@ -1 +1,64 @@ module Pipes.Construct where + +import Prelude + +import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) +import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) +import Control.Monad.ST.Class (liftST) +import Control.Monad.Trans.Class (lift) +import Data.Array as Array +import Data.Array.ST as Array.ST +import Data.List (List) +import Data.List as List +import Data.Map (Map) +import Data.Map.Internal as Map.Internal +import Data.Maybe (fromMaybe) +import Data.Tuple.Nested (type (/\), (/\)) +import Effect.Class (class MonadEffect, liftEffect) +import Pipes (yield, (>->)) +import Pipes.Core (Producer) +import Pipes.Prelude as Pipe +import Pipes.Util as Pipe.Util + +-- Producer that will emit monotonically increasing integers +-- ex `monotonic 0 -> 0 1 2 3 4 5 6 7 ..` +monotonic :: forall m. MonadRec m => Int -> Producer Int m Unit +monotonic start = flip tailRecM start \n -> yield n $> Loop (n + 1) + +-- Producer that will emit integers from `start` (inclusive) to `end` (exclusive) +range :: forall m. MonadRec m => Int -> Int -> Producer Int m Unit +range start end = monotonic start >-> Pipe.take end + +-- | Stack-safe producer that yields every value in an Array +eachArray :: forall a m. MonadRec m => Array a -> Producer a m Unit +eachArray as = monotonic 0 >-> Pipe.map (Array.index as) >-> Pipe.Util.whileJust + +-- | Stack-safe producer that yields every value in a List +eachList :: forall a m. MonadRec m => List a -> Producer a m Unit +eachList init = + flip tailRecM init \as -> fromMaybe (Done unit) <$> runMaybeT do + head <- MaybeT $ pure $ List.head as + tail <- MaybeT $ pure $ List.tail as + lift $ yield head + pure $ Loop tail + +-- | Stack-safe producer that yields every value in a Map +eachMap :: forall k v m. MonadEffect m => MonadRec m => Map k v -> Producer (k /\ v) m Unit +eachMap init = do + stack <- liftEffect $ liftST $ Array.ST.new + let + push a = void $ liftEffect $ liftST $ Array.ST.push a stack + pop = liftEffect $ liftST $ Array.ST.pop stack + flip tailRecM init case _ of + Map.Internal.Leaf -> fromMaybe (Done unit) <$> runMaybeT do + a <- MaybeT pop + pure $ Loop a + Map.Internal.Node _ _ k v Map.Internal.Leaf Map.Internal.Leaf -> do + yield $ k /\ v + pure $ Loop Map.Internal.Leaf + Map.Internal.Node _ _ k v Map.Internal.Leaf r -> do + yield $ k /\ v + pure $ Loop r + Map.Internal.Node a b k v l r -> do + push $ Map.Internal.Node a b k v Map.Internal.Leaf r + pure $ Loop l diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index 780d83d..245f1f9 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -58,6 +58,7 @@ fromWritable w = cleanup rmErrorListener = do liftEffect rmErrorListener liftEffect $ O.end w + liftAff $ O.awaitFinished w pure $ Done unit go { error, cancel } = do diff --git a/src/Pipes.Util.purs b/src/Pipes.Util.purs index c1ec3e5..635196d 100644 --- a/src/Pipes.Util.purs +++ b/src/Pipes.Util.purs @@ -3,23 +3,35 @@ module Pipes.Util where import Prelude import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) -import Control.Monad.Rec.Class (class MonadRec, forever, whileJust) +import Control.Monad.Rec.Class (class MonadRec, Step(..), forever, tailRecM) +import Control.Monad.Rec.Class as Rec import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Ref (STRef) import Control.Monad.ST.Ref as STRef import Control.Monad.Trans.Class (lift) import Data.Array.ST (STArray) import Data.Array.ST as Array.ST +import Data.Either (hush) import Data.HashSet as HashSet import Data.Hashable (class Hashable, hash) import Data.List.NonEmpty (NonEmptyList) -import Data.Maybe (Maybe(..)) +import Data.Maybe (Maybe(..), fromMaybe) import Data.Tuple.Nested (type (/\), (/\)) import Effect.Class (class MonadEffect, liftEffect) import Pipes (await, yield) -import Pipes.Core (Pipe) +import Pipes as Pipes +import Pipes.Core (Pipe, Producer) import Pipes.Internal (Proxy(..)) +-- | Re-yield all `Just`s, and close when `Nothing` is encountered +whileJust :: forall m a. MonadRec m => Pipe (Maybe a) a m Unit +whileJust = do + first <- await + flip tailRecM first $ \ma -> fromMaybe (Done unit) <$> runMaybeT do + a <- MaybeT $ pure ma + lift $ yield a + lift $ Loop <$> await + -- | Yields a separator value `sep` between received values -- | -- | ```purescript @@ -33,7 +45,7 @@ intersperse sep = do getIsFirst = liftEffect $ liftST $ STRef.read isFirstST markNotFirst = void $ liftEffect $ liftST $ STRef.write false isFirstST - whileJust $ runMaybeT do + Rec.whileJust $ runMaybeT do a <- MaybeT await isFirst <- getIsFirst if isFirst then markNotFirst else lift $ yield $ Just sep @@ -41,6 +53,16 @@ intersperse sep = do yield Nothing +-- Pair every emitted value from 2 producers together, exiting when either exits. +zip :: forall a b m. MonadRec m => Producer a m Unit -> Producer b m Unit -> Producer (a /\ b) m Unit +zip as bs = + flip tailRecM (as /\ bs) \(as' /\ bs') -> + fromMaybe (Done unit) <$> runMaybeT do + a /\ as'' <- MaybeT $ lift $ hush <$> Pipes.next as' + b /\ bs'' <- MaybeT $ lift $ hush <$> Pipes.next bs' + lift $ yield $ a /\ b + pure $ Loop $ as'' /\ bs'' + -- | Accumulate values in chunks of a given size. -- | -- | If the pipe closes without yielding a multiple of `size` elements, @@ -60,7 +82,7 @@ chunked size = do void $ flip STRef.write chunkST =<< Array.ST.new Array.ST.unsafeFreeze chunkArray - whileJust $ runMaybeT do + Rec.whileJust $ runMaybeT do a <- MaybeT await chunkPut a len <- chunkLength diff --git a/test/Test/Main.purs b/test/Test/Main.purs index 0315a5a..13079e3 100644 --- a/test/Test/Main.purs +++ b/test/Test/Main.purs @@ -9,12 +9,14 @@ import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer import Test.Pipes.Node.FS as Test.Pipes.Node.FS import Test.Pipes.Collect as Test.Pipes.Collect +import Test.Pipes.Construct as Test.Pipes.Construct import Test.Spec.Reporter (specReporter) import Test.Spec.Runner (defaultConfig, runSpec') main :: Effect Unit -main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do +main = launchAff_ $ runSpec' (defaultConfig { exit = false, timeout = Nothing }) [ specReporter ] do Test.Pipes.Node.Stream.spec Test.Pipes.Node.Buffer.spec Test.Pipes.Node.FS.spec Test.Pipes.Collect.spec + Test.Pipes.Construct.spec diff --git a/test/Test/Pipes.Construct.purs b/test/Test/Pipes.Construct.purs new file mode 100644 index 0000000..6f1bf55 --- /dev/null +++ b/test/Test/Pipes.Construct.purs @@ -0,0 +1,58 @@ +module Test.Pipes.Construct where + +import Prelude + +import Data.Array as Array +import Data.List as List +import Data.Map as Map +import Data.Tuple.Nested (type (/\), (/\)) +import Effect.Class (liftEffect) +import Pipes.Collect as Pipes.Collect +import Pipes.Construct as Pipes.Construct +import Test.Spec (Spec, describe, it) +import Test.Spec.Assertions (shouldEqual) + +spec :: Spec Unit +spec = + describe "Test.Pipes.Construct" do + describe "eachMap" do + it "empty map" do + kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachMap Map.empty + kvs `shouldEqual` ([] :: Array (Int /\ Int)) + it "nonempty map" do + let + exp = (\n -> n /\ n) <$> Array.range 0 99999 + map = Map.fromFoldable exp + kvs <- + liftEffect + $ Pipes.Collect.toArray + $ Pipes.Construct.eachMap + $ map + kvs `shouldEqual` exp + describe "eachArray" do + it "empty array" do + kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachArray [] + kvs `shouldEqual` ([] :: Array Int) + it "nonempty array" do + let + inp = (\n -> n /\ n) <$> Array.range 0 99999 + kvs <- + liftEffect + $ Pipes.Collect.toArray + $ Pipes.Construct.eachArray + $ inp + kvs `shouldEqual` inp + describe "eachList" do + it "empty list" do + kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachList List.Nil + kvs `shouldEqual` ([] :: Array Int) + it "nonempty list" do + let + inp = (\n -> n /\ n) <$> Array.range 0 99999 + kvs <- + liftEffect + $ Pipes.Collect.toArray + $ Pipes.Construct.eachList + $ List.fromFoldable + $ inp + kvs `shouldEqual` inp diff --git a/test/Test/Pipes.Node.FS.purs b/test/Test/Pipes.Node.FS.purs index 6196e0e..998df76 100644 --- a/test/Test/Pipes.Node.FS.purs +++ b/test/Test/Pipes.Node.FS.purs @@ -34,7 +34,7 @@ spec = describe "Pipes.Node.FS" do s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8) s `shouldEqual` "foo" around tmpFile $ it "fails if the file already exists" \p -> do - liftEffect $ FS.writeTextFile UTF8 "foo" p + liftEffect $ FS.writeTextFile UTF8 p "foo" flip catchError (const $ pure unit) do Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p fail "should have thrown" @@ -44,7 +44,7 @@ spec = describe "Pipes.Node.FS" do contents <- liftEffect $ FS.readTextFile UTF8 p contents `shouldEqual` "foo" around tmpFile $ it "fails if the file already exists" \p -> do - liftEffect $ FS.writeTextFile UTF8 "foo" p + liftEffect $ FS.writeTextFile UTF8 p "foo" flip catchError (const $ pure unit) do Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p fail "should have thrown"