feat: fix inEOS, add uniqHash + invoke

This commit is contained in:
orion kindel 2024-05-11 22:00:58 -05:00
parent 4baf317f43
commit eb01962553
Signed by: orion
GPG Key ID: 6D4165AE4C928719
3 changed files with 79 additions and 9 deletions

1
src/Pipes.Construct.purs Normal file
View File

@ -0,0 +1 @@
module Pipes.Construct where

View File

@ -3,21 +3,23 @@ module Pipes.Node.Stream where
import Prelude import Prelude
import Control.Monad.Error.Class (throwError) import Control.Monad.Error.Class (throwError)
import Control.Monad.Rec.Class (Step(..), tailRecM, whileJust) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..), maybe) import Data.Maybe (Maybe(..))
import Data.Newtype (wrap) import Data.Newtype (wrap)
import Data.Traversable (for_) import Data.Traversable (for_)
import Data.Tuple.Nested ((/\))
import Effect.Aff (Aff, delay) import Effect.Aff (Aff, delay)
import Effect.Aff.Class (liftAff) import Effect.Aff.Class (liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Node.Stream.Object as O import Node.Stream.Object as O
import Pipes (await, yield, (>->)) import Pipes (await, yield)
import Pipes (for) as P import Pipes (for) as P
import Pipes.Core (Consumer, Pipe, Producer, Producer_) import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Prelude (mapFoldable, map) as P import Pipes.Prelude (mapFoldable) as P
import Pipes.Util (InvokeResult(..), invoke)
-- | Convert a `Readable` stream to a `Pipe`. -- | Convert a `Readable` stream to a `Pipe`.
-- | -- |
@ -140,8 +142,16 @@ unEOS = P.mapFoldable identity
-- | `Just` values will be passed to the pipe, and the response(s) will be wrapped in `Just`. -- | `Just` values will be passed to the pipe, and the response(s) will be wrapped in `Just`.
-- | -- |
-- | `Nothing` will bypass the given pipe entirely, and the pipe will not be invoked again. -- | `Nothing` will bypass the given pipe entirely, and the pipe will not be invoked again.
inEOS :: forall a b. Pipe a b Aff Unit -> Pipe (Maybe a) (Maybe b) Aff Unit inEOS :: forall a b m. MonadRec m => Pipe a b m Unit -> Pipe (Maybe a) (Maybe b) m Unit
inEOS p = whileJust do inEOS p = flip tailRecM p \p' -> do
ma <- await ma <- await
maybe (yield Nothing) (\a -> yield a >-> p >-> P.map Just) ma case ma of
pure $ void ma Just a -> do
res <- lift $ invoke p' a
case res of
Yielded (as /\ p'') -> do
for_ (Just <$> as) yield
pure $ Loop p''
DidNotYield p'' -> pure $ Loop p''
Exited -> yield Nothing $> Done unit
_ -> yield Nothing $> Done unit

View File

@ -3,17 +3,22 @@ module Pipes.Util where
import Prelude import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust) import Control.Monad.Rec.Class (class MonadRec, forever, whileJust)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef) import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Data.Array.ST (STArray) import Data.Array.ST (STArray)
import Data.Array.ST as Array.ST import Data.Array.ST as Array.ST
import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..))
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield) import Pipes (await, yield)
import Pipes.Core (Pipe) import Pipes.Core (Pipe)
import Pipes.Internal (Proxy(..))
-- | Yields a separator value `sep` between received values -- | Yields a separator value `sep` between received values
-- | -- |
@ -62,3 +67,57 @@ chunked size = do
when (len >= size) $ lift $ yield =<< Just <$> chunkTake when (len >= size) $ lift $ yield =<< Just <$> chunkTake
yield =<< Just <$> chunkTake yield =<< Just <$> chunkTake
yield Nothing yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.
-- |
-- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe
-- | will occupy O(n) space, and `yield` in O(1) time.
uniqHash :: forall a m. Hashable a => MonadEffect m => MonadRec m => Pipe a a m Unit
uniqHash = do
seenHashesST <- liftEffect $ liftST $ STRef.new HashSet.empty
forever do
a <- await
seenHashes <- liftEffect $ liftST $ STRef.read seenHashesST
when (not $ HashSet.member (hash a) seenHashes) do
void $ liftEffect $ liftST $ STRef.modify (HashSet.insert $ hash a) seenHashesST
yield a
-- | The result of a single step forward of a pipe.
data InvokeResult a b m
-- | The pipe `await`ed the value, but did not `yield` a response.
= DidNotYield (Pipe a b m Unit)
-- | The pipe `await`ed the value, and `yield`ed 1 or more responses.
| Yielded (NonEmptyList b /\ Pipe a b m Unit)
-- | The pipe `await`ed the value, and exited.
| Exited
data IntermediateInvokeResult a b m
= IDidNotYield (Pipe a b m Unit)
| IYielded (NonEmptyList b /\ Pipe a b m Unit)
| IDidNotAwait (Pipe a b m Unit)
-- | Pass a single value to a pipe, returning the result of the pipe's invocation.
invoke :: forall m a b. Monad m => Pipe a b m Unit -> a -> m (InvokeResult a b m)
invoke m a =
let
go :: IntermediateInvokeResult a b m -> m (InvokeResult a b m)
go (IYielded (as /\ n)) =
case n of
Request _ _ -> pure $ Yielded $ as /\ n
Respond rep f -> go (IYielded $ (as <> pure rep) /\ f unit)
M o -> go =<< IYielded <$> (as /\ _) <$> o
Pure _ -> pure Exited
go (IDidNotYield n) =
case n of
Request _ _ -> pure $ DidNotYield n
Respond rep f -> go (IYielded $ pure rep /\ f unit)
M o -> go =<< IDidNotYield <$> o
Pure _ -> pure Exited
go (IDidNotAwait n) =
case n of
Request _ f -> go (IDidNotYield (f a))
Respond rep f -> go (IYielded $ pure rep /\ f unit)
M o -> go =<< IDidNotAwait <$> o
Pure _ -> pure Exited
in
go (IDidNotAwait m)