Compare commits

...

2 Commits

Author SHA1 Message Date
cda67508d4
chore: prepare v2.1.0 2024-06-23 18:34:01 -05:00
860ace3990
feat: AsyncPipe really should be a monad 2024-06-23 18:33:45 -05:00
7 changed files with 278 additions and 230 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "purescript-node-stream-pipes", "name": "purescript-node-stream-pipes",
"version": "v2.0.2", "version": "v2.1.0",
"type": "module", "type": "module",
"dependencies": { "dependencies": {
"csv-parse": "^5.5.6", "csv-parse": "^5.5.6",

View File

@ -29,7 +29,6 @@ workspace:
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"

View File

@ -1,7 +1,7 @@
package: package:
name: node-stream-pipes name: node-stream-pipes
publish: publish:
version: '2.0.2' version: '2.1.0'
license: 'GPL-3.0-or-later' license: 'GPL-3.0-or-later'
location: location:
githubOwner: 'cakekindel' githubOwner: 'cakekindel'
@ -36,7 +36,6 @@ package:
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"

View File

@ -3,10 +3,11 @@ module Pipes.Async where
import Prelude hiding (join) import Prelude hiding (join)
import Control.Alternative (class Alternative, empty, guard) import Control.Alternative (class Alternative, empty, guard)
import Control.Monad.Error.Class (class MonadError, catchError, throwError) import Control.Monad.Cont (class MonadTrans)
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
import Control.Monad.Except (ExceptT, runExceptT) import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork) import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (runMaybeT) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Morph (hoist) import Control.Monad.Morph (hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
@ -14,30 +15,30 @@ import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Control.Parallel (class Parallel, parOneOf) import Control.Parallel (class Parallel, parOneOf)
import Data.Array (fold)
import Data.Array as Array import Data.Array as Array
import Data.DateTime.Instant as Instant import Data.DateTime.Instant as Instant
import Data.Either (Either(..), either) import Data.Either (Either(..), either)
import Data.Foldable (class Foldable, fold)
import Data.Generic.Rep (class Generic) import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..), isNothing) import Data.Maybe (Maybe(..), fromMaybe, isNothing)
import Data.Newtype (unwrap) import Data.Newtype (unwrap)
import Data.Profunctor (class Profunctor)
import Data.Show.Generic (genericShow) import Data.Show.Generic (genericShow)
import Data.Time.Duration (Milliseconds) import Data.Time.Duration (Milliseconds)
import Data.Traversable (traverse_) import Data.Traversable (class Traversable, traverse, traverse_)
import Data.Tuple.Nested (type (/\), (/\)) import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff.Class (class MonadAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Effect.Console (log) import Effect.Console (log)
import Effect.Exception (Error) import Effect.Exception (Error, error)
import Effect.Now as Now import Effect.Now as Now
import Pipes (await, yield) import Pipes (await, yield)
import Pipes.Collect as Collect import Pipes.Collect as Collect
import Pipes.Core (Pipe, Producer, Proxy) import Pipes.Core (Pipe, Proxy, Producer)
data WriteSignal data WriteSignal
= WriteSignalOk = WriteSignalOk
| WriteSignalEnded | WriteSignalEnded
derive instance Generic WriteSignal _ derive instance Generic WriteSignal _
derive instance Eq WriteSignal derive instance Eq WriteSignal
derive instance Ord WriteSignal derive instance Ord WriteSignal
@ -62,6 +63,7 @@ data WriteResult
= WriteAgain = WriteAgain
| WriteNeedsDrain | WriteNeedsDrain
| WriteEnded | WriteEnded
derive instance Generic WriteResult _ derive instance Generic WriteResult _
derive instance Eq WriteResult derive instance Eq WriteResult
derive instance Ord WriteResult derive instance Ord WriteResult
@ -70,96 +72,148 @@ instance Show WriteResult where show = genericShow
data ReadResult a data ReadResult a
= ReadOk a = ReadOk a
| ReadWouldBlock | ReadWouldBlock
derive instance Generic (ReadResult a) _ derive instance Generic (ReadResult a) _
derive instance Eq a => Eq (ReadResult a) derive instance Eq a => Eq (ReadResult a)
derive instance Ord a => Ord (ReadResult a) derive instance Ord a => Ord (ReadResult a)
derive instance Functor ReadResult derive instance Functor ReadResult
derive instance Foldable ReadResult
derive instance Traversable ReadResult
instance Show a => Show (ReadResult a) where show = genericShow instance Show a => Show (ReadResult a) where show = genericShow
type AsyncIO a b m r =
{ write :: a -> m WriteResult
, read :: m (ReadResult b)
, awaitWrite :: m WriteSignal
, awaitRead :: m ReadSignal
}
/\ AsyncPipe a b m r
-- | An `AsyncPipe` is a `Pipe`-like struct that allows -- | An `AsyncPipe` is a `Pipe`-like struct that allows
-- | concurrently reading from a `Producer` and writing to a `Consumer`. -- | concurrently reading from a `Producer` and writing to a `Consumer`.
-- | -- |
-- | An implementation of `AsyncPipe` for Node `Transform` streams -- | An implementation of `AsyncPipe` for Node `Transform` streams
-- | is provided in `Pipes.Node.Stream`. -- | is provided in `Pipes.Node.Stream`.
-- | data AsyncPipe a b m r
-- | ## Fields -- | A pure return value
-- | - `m x` = Pure r
-- | - Initializer -- | An `AsyncPipe` behind a computation
-- | - `x -> a -> m WriteResult` | M (m (AsyncPipe a b m r))
-- | - Write a value `a` to the underlying resource -- | Interface to write & read from the backing resource
-- | - `x -> m WriteSignal` | AsyncIO (AsyncIO a b m r)
-- | - Block until the pipe is writable again (or writing must stop)
-- | - `x -> m (ReadResult b)`
-- | - Attempt to read a chunk
-- | - `x -> m ReadSignal`
-- | - Block until the pipe is readable again (or reading must stop)
data AsyncPipe x m a b =
AsyncPipe
(m x)
(x -> a -> m WriteResult)
(x -> m WriteSignal)
(x -> m (ReadResult b))
(x -> m ReadSignal)
instance Monad m => Functor (AsyncPipe x m a) where -- | Modify request / response types
map f (AsyncPipe init w aw r ar) = AsyncPipe init w aw (map (map f) <<< r) ar mapIO :: forall aa ab ba bb m r. Monad m => (ab -> aa) -> (ba -> bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
mapIO _ _ (Pure a) = Pure a
mapIO a b (M m) = M $ mapIO a b <$> m
mapIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: write <<< a, awaitWrite, read: map b <$> read, awaitRead} /\ mapIO a b m
instance Monad m => Profunctor (AsyncPipe x m) where -- | Modify request / response types
dimap :: forall a b c d. (a -> b) -> (c -> d) -> _ b c -> _ a d bindIO :: forall aa ab ba bb m r. Monad m => (ab -> m aa) -> (ba -> m bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
dimap ab cd (AsyncPipe init w aw r ar) = bindIO _ _ (Pure a) = Pure a
AsyncPipe bindIO a b (M m) = M $ bindIO a b <$> m
init bindIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
(\x -> w x <<< ab) AsyncIO $ {write: flip bind write <<< a, awaitWrite, read: traverse b =<< read, awaitRead} /\ bindIO a b m
aw
(map (map cd) <<< r) -- | Remove the `AsyncPipe` wrapper by discarding the IO
ar stripIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m r
stripIO (Pure r) = pure r
stripIO (M m) = m >>= stripIO
stripIO (AsyncIO (_ /\ m)) = stripIO m
-- | Execute the `AsyncPipe` monad stack until `AsyncIO` is reached (if any)
getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))
getAsyncIO (AsyncIO a) = pure $ Just a
getAsyncIO (M m) = m >>= getAsyncIO
getAsyncIO (Pure _) = pure Nothing
instance MonadTrans (AsyncPipe a b) where
lift = M <<< map Pure
instance Monad m => Functor (AsyncPipe a b m) where
map f (Pure r) = Pure $ f r
map f (M m) = M $ map f <$> m
map f (AsyncIO (io /\ m)) = AsyncIO $ io /\ (f <$> m)
instance Monad m => Apply (AsyncPipe a b m) where
apply (Pure f) ma = f <$> ma
apply (M mf) ma = M $ (_ <*> ma) <$> mf
apply (AsyncIO (io /\ mf)) ma = AsyncIO $ io /\ (mf <*> ma)
instance Monad m => Applicative (AsyncPipe a b m) where
pure = Pure
instance Monad m => Bind (AsyncPipe a b m) where
bind (Pure a) f = f a
bind (M ma) f = M $ (_ >>= f) <$> ma
bind (AsyncIO (io /\ m)) f = AsyncIO $ io /\ (m >>= f)
instance Monad m => Monad (AsyncPipe a b m)
instance MonadThrow e m => MonadThrow e (AsyncPipe a b m) where
throwError = lift <<< throwError
instance MonadError e m => MonadError e (AsyncPipe a b m) where
catchError m f = lift $ catchError (stripIO m) (stripIO <<< f)
instance MonadEffect m => MonadEffect (AsyncPipe a b m) where
liftEffect = lift <<< liftEffect
instance MonadAff m => MonadAff (AsyncPipe a b m) where
liftAff = lift <<< liftAff
-- | Wraps all fields of an `AsyncPipe` with logging to debug -- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing. -- | behavior and timing.
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe x m (Maybe a) (Maybe b) debug :: forall a b m r. MonadAff m => String -> AsyncPipe (Maybe a) (Maybe b) m r -> AsyncPipe (Maybe a) (Maybe b) m r
debug c (AsyncPipe init write awaitWrite read awaitRead) = debug c m =
let let
logL m = liftEffect $ log $ "[" <> c <> "] " <> m logL :: forall m'. MonadEffect m' => _ -> m' Unit
logR m = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> m logL msg = liftEffect $ log $ "[" <> c <> "] " <> msg
logR :: forall m'. MonadEffect m' => _ -> m' Unit
logR msg = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> msg
time :: forall a'. m a' -> m (Milliseconds /\ a') time :: forall m' a'. MonadEffect m' => m' a' -> m' (Milliseconds /\ a')
time m = do time ma = do
start <- liftEffect Now.now start <- liftEffect Now.now
a <- m a <- ma
end <- liftEffect Now.now end <- liftEffect Now.now
pure $ (end `Instant.diff` start) /\ a pure $ (end `Instant.diff` start) /\ a
init' = do
logL "init >"
elapsed /\ x <- time init
logL $ "< init " <> "(" <> show (unwrap elapsed) <> "ms)"
pure x
write' x a = do
logL "write >"
elapsed /\ w <- time $ write x a
logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
read' x = do
logR "read >"
elapsed /\ r <- time $ read x
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
awaitWrite' x = do
logL "awaitWrite >"
elapsed /\ w <- time $ awaitWrite x
logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
awaitRead' x = do
logR "awaitRead >"
elapsed /\ r <- time $ awaitRead x
logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
in in
AsyncPipe init' write' awaitWrite' read' awaitRead' flip bind (fromMaybe m)
$ runMaybeT do
(io /\ done') <- MaybeT $ lift $ getAsyncIO m
let
write a = do
logL "write >"
elapsed /\ w <- time $ io.write a
logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
read = do
logR "read >"
elapsed /\ r <- time $ io.read
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
awaitWrite = do
logL "awaitWrite >"
elapsed /\ w <- time $ io.awaitWrite
logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
awaitRead = do
logR "awaitRead >"
elapsed /\ r <- time $ io.awaitRead
logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
done = do
logL "done >"
elapsed /\ r <- time done'
logL $ "< done (" <> show (unwrap elapsed) <> "ms)"
pure r
pure $ AsyncIO $ {write, read, awaitWrite, awaitRead} /\ done
-- | Convert an `AsyncPipe` to a regular `Pipe`. -- | Convert an `AsyncPipe` to a regular `Pipe`.
-- | -- |
@ -202,43 +256,47 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) =
-- | * `read` will pass chunks to `parse` as fast as `parse` allows -- | * `read` will pass chunks to `parse` as fast as `parse` allows
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready -- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready -- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x m (Maybe a) (Maybe b) -> Pipe (Maybe a) (Maybe b) m Unit sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r
sync (AsyncPipe init write awaitWrite read awaitRead) = sync m =
let let
liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r liftPipe :: forall r'. (Proxy _ _ _ _ m) r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftPipe = lift liftPipe = lift
liftM :: forall r. m r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r liftM :: forall r'. m r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftM = liftPipe <<< lift liftM = liftPipe <<< lift
in
lift (getAsyncIO m) >>=
case _ of
Nothing -> lift $ stripIO m
Just ({write, awaitWrite, read, awaitRead} /\ done) ->
let
awaitRW onR onW =
liftM (parOneOf [Right <$> awaitWrite, Left <$> awaitRead])
>>= either (const onR) onW
continue a = throwError (Loop a) wSignal WriteSignalOk = WriteAgain
break = throwError (Done unit) wSignal WriteSignalEnded = WriteEnded
awaitRW x = parOneOf [Right <$> awaitWrite x, Left <$> awaitRead x] tailRecEarly f a = tailRecM (map (either identity identity) <<< runExceptT <<< f) a
continue a = throwError (Loop a)
break = (Done <$> liftM (stripIO done)) >>= throwError
in do
flip tailRecEarly WriteAgain \writable -> do
rb <- liftM read
case rb of
ReadWouldBlock
| writable == WriteEnded -> liftM awaitRead *> continue writable
| writable == WriteNeedsDrain -> awaitRW (continue writable) (continue <<< wSignal)
| otherwise -> pure unit
ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue writable
ReadOk Nothing -> liftPipe (yield Nothing) *> break
wSignal WriteSignalOk = WriteAgain when (writable /= WriteAgain) $ continue writable
wSignal WriteSignalEnded = WriteEnded
in do
x <- lift init
flip tailRecM WriteAgain
\w ->
map (either identity identity)
$ runExceptT do
rb <- liftM $ read x
case rb of
ReadWouldBlock
| w == WriteEnded -> liftM (awaitRead x) *> continue w
| w == WriteNeedsDrain -> liftM (awaitRW x) >>= either (const $ continue w) (continue <<< wSignal)
| otherwise -> pure unit
ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue w
ReadOk Nothing -> liftPipe (yield Nothing) *> break
when (w /= WriteAgain) $ continue w a <- liftPipe await
writable' <- liftM $ write a
a <- liftPipe await when (isNothing a) $ continue WriteEnded
w' <- liftM $ write x a pure $ Loop writable'
when (isNothing a) $ continue WriteEnded
pure $ Loop w'
-- | Implementation of `(>-/->)` -- | Implementation of `(>-/->)`
-- | -- |
@ -252,44 +310,48 @@ sync (AsyncPipe init write awaitWrite read awaitRead) =
-- | -- |
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown. -- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
pipeAsync pipeAsync
:: forall f m x a b :: forall f m a b
. MonadRec m . MonadRec m
=> MonadAff m => MonadAff m
=> MonadBracket Error f m => MonadBracket Error f m
=> Producer (Maybe a) m Unit => Producer (Maybe a) m Unit
-> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe (Maybe a) (Maybe b) m Unit
-> Producer (Maybe b) m Unit -> Producer (Maybe b) m Unit
pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) = pipeAsync prod m =
do lift (getAsyncIO m)
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing >>= case _ of
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false Nothing -> throwError $ error "`pipeAsync` invoked on `AsyncPipe` that did not have `AsyncIO`"
Just ({write, read, awaitWrite, awaitRead} /\ done) -> do
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
let let
killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST
threadKilled = liftEffect $ liftST $ ST.Ref.read killST threadKilled = liftEffect $ liftST $ ST.Ref.read killST
putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just
getThreadError = liftEffect $ liftST $ ST.Ref.read errST getThreadError = liftEffect $ liftST $ ST.Ref.read errST
rx x a = do rx a = do
killed <- threadKilled killed <- threadKilled
guard $ not killed guard $ not killed
w <- lift $ write x a w <- lift $ write a
case w of case w of
WriteNeedsDrain -> lift $ void $ awaitWrite x WriteNeedsDrain -> lift $ void awaitWrite
WriteEnded -> empty WriteEnded -> empty
WriteAgain -> pure unit WriteAgain -> pure unit
spawn = lift <<< fork <<< flip catchError putThreadError spawn = lift <<< fork <<< flip catchError putThreadError
x <- lift init _thread <- spawn $ void $ runMaybeT $ Collect.foreach rx (hoist lift prod)
_thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod)
flip tailRecM unit $ const do flip tailRecM unit $ const do
getThreadError >>= traverse_ throwError getThreadError >>= traverse_ throwError
rb <- lift $ read x rb <- lift read
case rb of case rb of
ReadOk (Just b) -> yield (Just b) $> Loop unit ReadOk (Just b) -> yield (Just b) $> Loop unit
ReadOk Nothing -> killThread *> yield Nothing $> Done unit ReadOk Nothing -> killThread *> yield Nothing $> Done unit
ReadWouldBlock -> void (lift (awaitRead x)) $> Loop unit ReadWouldBlock -> void (lift awaitRead) $> Loop unit
lift $ stripIO done
infixl 7 pipeAsync as >-/-> infixl 7 pipeAsync as >-/->

View File

@ -5,8 +5,6 @@ import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError) import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Global (Global)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
@ -15,9 +13,9 @@ import Data.Traversable (for_, traverse_)
import Data.Tuple.Nested ((/\)) import Data.Tuple.Nested ((/\))
import Effect (Effect) import Effect (Effect)
import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error) import Effect.Exception (Error)
import Node.Stream.Object (WriteResult(..), maybeReadResult) import Node.Stream.Object (ReadResult(..), WriteResult(..))
import Node.Stream.Object as O import Node.Stream.Object as O
import Pipes (await, yield) import Pipes (await, yield)
import Pipes (for) as P import Pipes (for) as P
@ -90,77 +88,60 @@ fromWritable w = do
tailRecM go unit tailRecM go unit
newtype TransformContext a b = fromTransformEffect
TransformContext :: forall a b m
{ stream :: O.Transform a b . MonadThrow Error m
, removeErrorListener :: Effect Unit => MonadAff m
, errorST :: STRef Global (Maybe Error) => Effect (O.Transform a b)
} -> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransformEffect = fromTransform <=< liftEffect
transformCleanup :: forall m a b. MonadEffect m => TransformContext a b -> m Unit
transformCleanup (TransformContext {removeErrorListener}) = do
liftEffect removeErrorListener
transformStream :: forall a b. TransformContext a b -> O.Transform a b
transformStream (TransformContext {stream}) = stream
transformRethrow :: forall m a b. MonadThrow Error m => MonadEffect m => TransformContext a b -> m Unit
transformRethrow (TransformContext {errorST}) = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
-- | Convert a `Transform` stream to an `AsyncPipe`. -- | Convert a `Transform` stream to an `AsyncPipe`.
fromTransform fromTransform
:: forall a b m :: forall a b m
. MonadThrow Error m . MonadThrow Error m
=> MonadAff m => MonadAff m
=> Effect (O.Transform a b) => O.Transform a b
-> AsyncPipe (TransformContext a b) m (Maybe a) (Maybe b) -> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransform t = fromTransform stream = do
let { error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream
init = do let
stream <- liftEffect t rethrow = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream cleanup = liftEffect removeErrorListener
pure $ TransformContext {errorST, removeErrorListener, stream}
write x Nothing = do writeSignal =
let s = transformStream x liftEffect (O.isWritableEnded stream)
liftEffect $ O.end s <#> if _ then AsyncPipe.WriteSignalEnded else AsyncPipe.WriteSignalOk
pure AsyncPipe.WriteEnded
write x (Just a) = do readSignal =
transformRethrow x liftEffect (O.isReadableEnded stream)
let s = transformStream x <#> if _ then AsyncPipe.ReadSignalEnded else AsyncPipe.ReadSignalOk
w <- liftEffect $ O.write s a
pure $ case w of writeResult WriteOk = AsyncPipe.WriteAgain
WriteOk -> AsyncPipe.WriteAgain writeResult WriteWouldBlock = AsyncPipe.WriteNeedsDrain
WriteWouldBlock -> AsyncPipe.WriteNeedsDrain
awaitWrite x = do readResult (ReadJust a) = AsyncPipe.ReadOk (Just a)
transformRethrow x readResult ReadWouldBlock = AsyncPipe.ReadWouldBlock
let s = transformStream x
liftAff $ O.awaitWritableOrClosed s awaitWritable = liftAff $ O.awaitWritableOrClosed stream
ended <- liftEffect $ O.isWritableEnded s awaitReadable = liftAff $ O.awaitReadableOrClosed stream
if ended then
pure $ AsyncPipe.WriteSignalEnded awaitWrite = rethrow *> awaitWritable *> writeSignal
else do awaitRead = rethrow *> awaitReadable *> readSignal
liftAff $ O.awaitWritableOrClosed s
pure $ AsyncPipe.WriteSignalOk whenReadNotEnded m =
read x = liftEffect (O.isReadableEnded stream)
do >>= if _ then pure $ AsyncPipe.ReadOk Nothing else m
transformRethrow x
let s = transformStream x readNow = readResult <$> liftEffect (O.read stream)
readEnded <- liftEffect $ O.isReadableEnded s writeNow a = writeResult <$> liftEffect (O.write stream a)
if readEnded then do
pure $ AsyncPipe.ReadOk Nothing read = rethrow *> whenReadNotEnded readNow
else
maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s) write Nothing = liftEffect (O.end stream) $> AsyncPipe.WriteEnded
awaitRead x = do write (Just a) = rethrow *> writeNow a
transformRethrow x
let s = transformStream x AsyncIO ({write, awaitWrite, read, awaitRead} /\ cleanup)
ended <- liftEffect $ O.isReadableEnded s
if ended then
pure $ AsyncPipe.ReadSignalEnded
else do
liftAff $ O.awaitReadableOrClosed s
pure $ AsyncPipe.ReadSignalOk
in
AsyncPipe init write awaitWrite read awaitRead
-- | Given a `Producer` of values, wrap them in `Just`. -- | Given a `Producer` of values, wrap them in `Just`.
-- | -- |

View File

@ -13,33 +13,31 @@ import Node.Stream.Object as O
import Node.Zlib as Zlib import Node.Zlib as Zlib
import Node.Zlib.Types (ZlibStream) import Node.Zlib.Types (ZlibStream)
import Pipes.Async (AsyncPipe) import Pipes.Async (AsyncPipe)
import Pipes.Node.Stream (TransformContext, fromTransform) import Pipes.Node.Stream (fromTransform)
type X = TransformContext Buffer Buffer fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
fromZlib z = fromZlib z =
fromTransform do do
raw <- liftEffect $ Zlib.toDuplex <$> z raw <- liftEffect $ Zlib.toDuplex <$> z
pure $ O.unsafeCoerceTransform raw fromTransform $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip gzip = fromZlib Zlib.createGzip
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip = fromZlib Zlib.createGunzip gunzip = fromZlib Zlib.createGunzip
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip = fromZlib Zlib.createUnzip unzip = fromZlib Zlib.createUnzip
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate = fromZlib Zlib.createInflate inflate = fromZlib Zlib.createInflate
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate = fromZlib Zlib.createDeflate deflate = fromZlib Zlib.createDeflate
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress = fromZlib Zlib.createBrotliCompress brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress brotliDecompress = fromZlib Zlib.createBrotliDecompress

View File

@ -150,8 +150,8 @@ spec =
cbor :: Buffer <- Pipe.Collect.toBuffer cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a $ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransform csvDecode >-/-> Pipe.Node.fromTransformEffect csvDecode
>-/-> Pipe.Node.fromTransform cborEncode >-/-> Pipe.Node.fromTransformEffect cborEncode
>-> Pipe.Node.unEOS >-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
@ -163,8 +163,8 @@ spec =
cbor :: Buffer <- Pipe.Collect.toBuffer cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a $ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-> sync (Pipe.Node.fromTransform csvDecode) >-> sync (Pipe.Node.fromTransformEffect csvDecode)
>-> sync (Pipe.Node.fromTransform cborEncode) >-> sync (Pipe.Node.fromTransformEffect cborEncode)
>-> Pipe.Node.unEOS >-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
@ -188,7 +188,7 @@ spec =
Pipe.Collect.toMonoid Pipe.Collect.toMonoid
$ Pipe.FS.read a $ Pipe.FS.read a
>-/-> Pipe.Zlib.gzip >-/-> Pipe.Zlib.gzip
>-/-> Pipe.Node.fromTransform slowTransform >-/-> Pipe.Node.fromTransformEffect slowTransform
>-/-> Pipe.Zlib.gunzip >-/-> Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS >-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8 >-> Pipe.Buffer.toString UTF8
@ -201,7 +201,7 @@ spec =
Pipe.Collect.toMonoid Pipe.Collect.toMonoid
$ Pipe.FS.read a $ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip >-> sync Pipe.Zlib.gzip
>-> sync (Pipe.Node.fromTransform slowTransform) >-> sync (Pipe.Node.fromTransformEffect slowTransform)
>-> sync Pipe.Zlib.gunzip >-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS >-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8 >-> Pipe.Buffer.toString UTF8
@ -210,10 +210,19 @@ spec =
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo" liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p r <- reader p
out :: List.List Int <- Pipe.toListM $ r >-/-> Pipe.Node.fromTransform discardTransform >-> Pipe.Node.unEOS out :: List.List Int <-
Pipe.toListM
$ r
>-/-> Pipe.Node.fromTransformEffect discardTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.Nil out `shouldEqual` List.Nil
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo bar" liftEffect $ FS.writeTextFile UTF8 p "foo bar"
r <- reader p r <- reader p
out :: List.List String <- Pipe.toListM $ r >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-/-> Pipe.Node.fromTransform charsTransform >-> Pipe.Node.unEOS out :: List.List String <-
Pipe.toListM $
r
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect charsTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ] out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]