Compare commits

..

No commits in common. "cda67508d4a6cd0f053528d523df0915448c826b" and "7f11c303fb17779dbdea92710caf80d9388b02de" have entirely different histories.

7 changed files with 230 additions and 278 deletions

View File

@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v2.1.0",
"version": "v2.0.2",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.6",

View File

@ -29,6 +29,7 @@ workspace:
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"

View File

@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '2.1.0'
version: '2.0.2'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@ -36,6 +36,7 @@ package:
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"

View File

@ -3,11 +3,10 @@ module Pipes.Async where
import Prelude hiding (join)
import Control.Alternative (class Alternative, empty, guard)
import Control.Monad.Cont (class MonadTrans)
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
import Control.Monad.Error.Class (class MonadError, catchError, throwError)
import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Maybe.Trans (runMaybeT)
import Control.Monad.Morph (hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
@ -15,30 +14,30 @@ import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.Trans.Class (lift)
import Control.Parallel (class Parallel, parOneOf)
import Data.Array (fold)
import Data.Array as Array
import Data.DateTime.Instant as Instant
import Data.Either (Either(..), either)
import Data.Foldable (class Foldable, fold)
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..), fromMaybe, isNothing)
import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (unwrap)
import Data.Profunctor (class Profunctor)
import Data.Show.Generic (genericShow)
import Data.Time.Duration (Milliseconds)
import Data.Traversable (class Traversable, traverse, traverse_)
import Data.Traversable (traverse_)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Console (log)
import Effect.Exception (Error, error)
import Effect.Exception (Error)
import Effect.Now as Now
import Pipes (await, yield)
import Pipes.Collect as Collect
import Pipes.Core (Pipe, Proxy, Producer)
import Pipes.Core (Pipe, Producer, Proxy)
data WriteSignal
= WriteSignalOk
| WriteSignalEnded
derive instance Generic WriteSignal _
derive instance Eq WriteSignal
derive instance Ord WriteSignal
@ -63,7 +62,6 @@ data WriteResult
= WriteAgain
| WriteNeedsDrain
| WriteEnded
derive instance Generic WriteResult _
derive instance Eq WriteResult
derive instance Ord WriteResult
@ -72,148 +70,96 @@ instance Show WriteResult where show = genericShow
data ReadResult a
= ReadOk a
| ReadWouldBlock
derive instance Generic (ReadResult a) _
derive instance Eq a => Eq (ReadResult a)
derive instance Ord a => Ord (ReadResult a)
derive instance Functor ReadResult
derive instance Foldable ReadResult
derive instance Traversable ReadResult
instance Show a => Show (ReadResult a) where show = genericShow
type AsyncIO a b m r =
{ write :: a -> m WriteResult
, read :: m (ReadResult b)
, awaitWrite :: m WriteSignal
, awaitRead :: m ReadSignal
}
/\ AsyncPipe a b m r
-- | An `AsyncPipe` is a `Pipe`-like struct that allows
-- | concurrently reading from a `Producer` and writing to a `Consumer`.
-- |
-- | An implementation of `AsyncPipe` for Node `Transform` streams
-- | is provided in `Pipes.Node.Stream`.
data AsyncPipe a b m r
-- | A pure return value
= Pure r
-- | An `AsyncPipe` behind a computation
| M (m (AsyncPipe a b m r))
-- | Interface to write & read from the backing resource
| AsyncIO (AsyncIO a b m r)
-- |
-- | ## Fields
-- | - `m x`
-- | - Initializer
-- | - `x -> a -> m WriteResult`
-- | - Write a value `a` to the underlying resource
-- | - `x -> m WriteSignal`
-- | - Block until the pipe is writable again (or writing must stop)
-- | - `x -> m (ReadResult b)`
-- | - Attempt to read a chunk
-- | - `x -> m ReadSignal`
-- | - Block until the pipe is readable again (or reading must stop)
data AsyncPipe x m a b =
AsyncPipe
(m x)
(x -> a -> m WriteResult)
(x -> m WriteSignal)
(x -> m (ReadResult b))
(x -> m ReadSignal)
-- | Modify request / response types
mapIO :: forall aa ab ba bb m r. Monad m => (ab -> aa) -> (ba -> bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
mapIO _ _ (Pure a) = Pure a
mapIO a b (M m) = M $ mapIO a b <$> m
mapIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: write <<< a, awaitWrite, read: map b <$> read, awaitRead} /\ mapIO a b m
instance Monad m => Functor (AsyncPipe x m a) where
map f (AsyncPipe init w aw r ar) = AsyncPipe init w aw (map (map f) <<< r) ar
-- | Modify request / response types
bindIO :: forall aa ab ba bb m r. Monad m => (ab -> m aa) -> (ba -> m bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
bindIO _ _ (Pure a) = Pure a
bindIO a b (M m) = M $ bindIO a b <$> m
bindIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: flip bind write <<< a, awaitWrite, read: traverse b =<< read, awaitRead} /\ bindIO a b m
-- | Remove the `AsyncPipe` wrapper by discarding the IO
stripIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m r
stripIO (Pure r) = pure r
stripIO (M m) = m >>= stripIO
stripIO (AsyncIO (_ /\ m)) = stripIO m
-- | Execute the `AsyncPipe` monad stack until `AsyncIO` is reached (if any)
getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))
getAsyncIO (AsyncIO a) = pure $ Just a
getAsyncIO (M m) = m >>= getAsyncIO
getAsyncIO (Pure _) = pure Nothing
instance MonadTrans (AsyncPipe a b) where
lift = M <<< map Pure
instance Monad m => Functor (AsyncPipe a b m) where
map f (Pure r) = Pure $ f r
map f (M m) = M $ map f <$> m
map f (AsyncIO (io /\ m)) = AsyncIO $ io /\ (f <$> m)
instance Monad m => Apply (AsyncPipe a b m) where
apply (Pure f) ma = f <$> ma
apply (M mf) ma = M $ (_ <*> ma) <$> mf
apply (AsyncIO (io /\ mf)) ma = AsyncIO $ io /\ (mf <*> ma)
instance Monad m => Applicative (AsyncPipe a b m) where
pure = Pure
instance Monad m => Bind (AsyncPipe a b m) where
bind (Pure a) f = f a
bind (M ma) f = M $ (_ >>= f) <$> ma
bind (AsyncIO (io /\ m)) f = AsyncIO $ io /\ (m >>= f)
instance Monad m => Monad (AsyncPipe a b m)
instance MonadThrow e m => MonadThrow e (AsyncPipe a b m) where
throwError = lift <<< throwError
instance MonadError e m => MonadError e (AsyncPipe a b m) where
catchError m f = lift $ catchError (stripIO m) (stripIO <<< f)
instance MonadEffect m => MonadEffect (AsyncPipe a b m) where
liftEffect = lift <<< liftEffect
instance MonadAff m => MonadAff (AsyncPipe a b m) where
liftAff = lift <<< liftAff
instance Monad m => Profunctor (AsyncPipe x m) where
dimap :: forall a b c d. (a -> b) -> (c -> d) -> _ b c -> _ a d
dimap ab cd (AsyncPipe init w aw r ar) =
AsyncPipe
init
(\x -> w x <<< ab)
aw
(map (map cd) <<< r)
ar
-- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing.
debug :: forall a b m r. MonadAff m => String -> AsyncPipe (Maybe a) (Maybe b) m r -> AsyncPipe (Maybe a) (Maybe b) m r
debug c m =
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe x m (Maybe a) (Maybe b)
debug c (AsyncPipe init write awaitWrite read awaitRead) =
let
logL :: forall m'. MonadEffect m' => _ -> m' Unit
logL msg = liftEffect $ log $ "[" <> c <> "] " <> msg
logR :: forall m'. MonadEffect m' => _ -> m' Unit
logR msg = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> msg
logL m = liftEffect $ log $ "[" <> c <> "] " <> m
logR m = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> m
time :: forall m' a'. MonadEffect m' => m' a' -> m' (Milliseconds /\ a')
time ma = do
time :: forall a'. m a' -> m (Milliseconds /\ a')
time m = do
start <- liftEffect Now.now
a <- ma
a <- m
end <- liftEffect Now.now
pure $ (end `Instant.diff` start) /\ a
init' = do
logL "init >"
elapsed /\ x <- time init
logL $ "< init " <> "(" <> show (unwrap elapsed) <> "ms)"
pure x
write' x a = do
logL "write >"
elapsed /\ w <- time $ write x a
logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
read' x = do
logR "read >"
elapsed /\ r <- time $ read x
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
awaitWrite' x = do
logL "awaitWrite >"
elapsed /\ w <- time $ awaitWrite x
logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
awaitRead' x = do
logR "awaitRead >"
elapsed /\ r <- time $ awaitRead x
logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
in
flip bind (fromMaybe m)
$ runMaybeT do
(io /\ done') <- MaybeT $ lift $ getAsyncIO m
let
write a = do
logL "write >"
elapsed /\ w <- time $ io.write a
logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
read = do
logR "read >"
elapsed /\ r <- time $ io.read
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
awaitWrite = do
logL "awaitWrite >"
elapsed /\ w <- time $ io.awaitWrite
logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
awaitRead = do
logR "awaitRead >"
elapsed /\ r <- time $ io.awaitRead
logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
done = do
logL "done >"
elapsed /\ r <- time done'
logL $ "< done (" <> show (unwrap elapsed) <> "ms)"
pure r
pure $ AsyncIO $ {write, read, awaitWrite, awaitRead} /\ done
AsyncPipe init' write' awaitWrite' read' awaitRead'
-- | Convert an `AsyncPipe` to a regular `Pipe`.
-- |
@ -256,47 +202,43 @@ debug c m =
-- | * `read` will pass chunks to `parse` as fast as `parse` allows
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r
sync m =
sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x m (Maybe a) (Maybe b) -> Pipe (Maybe a) (Maybe b) m Unit
sync (AsyncPipe init write awaitWrite read awaitRead) =
let
liftPipe :: forall r'. (Proxy _ _ _ _ m) r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r
liftPipe = lift
liftM :: forall r'. m r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftM :: forall r. m r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r
liftM = liftPipe <<< lift
in
lift (getAsyncIO m) >>=
case _ of
Nothing -> lift $ stripIO m
Just ({write, awaitWrite, read, awaitRead} /\ done) ->
let
awaitRW onR onW =
liftM (parOneOf [Right <$> awaitWrite, Left <$> awaitRead])
>>= either (const onR) onW
wSignal WriteSignalOk = WriteAgain
wSignal WriteSignalEnded = WriteEnded
continue a = throwError (Loop a)
break = throwError (Done unit)
tailRecEarly f a = tailRecM (map (either identity identity) <<< runExceptT <<< f) a
continue a = throwError (Loop a)
break = (Done <$> liftM (stripIO done)) >>= throwError
in do
flip tailRecEarly WriteAgain \writable -> do
rb <- liftM read
case rb of
ReadWouldBlock
| writable == WriteEnded -> liftM awaitRead *> continue writable
| writable == WriteNeedsDrain -> awaitRW (continue writable) (continue <<< wSignal)
| otherwise -> pure unit
ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue writable
ReadOk Nothing -> liftPipe (yield Nothing) *> break
awaitRW x = parOneOf [Right <$> awaitWrite x, Left <$> awaitRead x]
when (writable /= WriteAgain) $ continue writable
wSignal WriteSignalOk = WriteAgain
wSignal WriteSignalEnded = WriteEnded
in do
x <- lift init
flip tailRecM WriteAgain
\w ->
map (either identity identity)
$ runExceptT do
rb <- liftM $ read x
case rb of
ReadWouldBlock
| w == WriteEnded -> liftM (awaitRead x) *> continue w
| w == WriteNeedsDrain -> liftM (awaitRW x) >>= either (const $ continue w) (continue <<< wSignal)
| otherwise -> pure unit
ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue w
ReadOk Nothing -> liftPipe (yield Nothing) *> break
a <- liftPipe await
writable' <- liftM $ write a
when (isNothing a) $ continue WriteEnded
pure $ Loop writable'
when (w /= WriteAgain) $ continue w
a <- liftPipe await
w' <- liftM $ write x a
when (isNothing a) $ continue WriteEnded
pure $ Loop w'
-- | Implementation of `(>-/->)`
-- |
@ -310,48 +252,44 @@ sync m =
-- |
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
pipeAsync
:: forall f m a b
:: forall f m x a b
. MonadRec m
=> MonadAff m
=> MonadBracket Error f m
=> Producer (Maybe a) m Unit
-> AsyncPipe (Maybe a) (Maybe b) m Unit
-> AsyncPipe x m (Maybe a) (Maybe b)
-> Producer (Maybe b) m Unit
pipeAsync prod m =
lift (getAsyncIO m)
>>= case _ of
Nothing -> throwError $ error "`pipeAsync` invoked on `AsyncPipe` that did not have `AsyncIO`"
Just ({write, read, awaitWrite, awaitRead} /\ done) -> do
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) =
do
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
let
killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST
threadKilled = liftEffect $ liftST $ ST.Ref.read killST
putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just
getThreadError = liftEffect $ liftST $ ST.Ref.read errST
let
killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST
threadKilled = liftEffect $ liftST $ ST.Ref.read killST
putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just
getThreadError = liftEffect $ liftST $ ST.Ref.read errST
rx a = do
killed <- threadKilled
guard $ not killed
w <- lift $ write a
case w of
WriteNeedsDrain -> lift $ void awaitWrite
WriteEnded -> empty
WriteAgain -> pure unit
rx x a = do
killed <- threadKilled
guard $ not killed
w <- lift $ write x a
case w of
WriteNeedsDrain -> lift $ void $ awaitWrite x
WriteEnded -> empty
WriteAgain -> pure unit
spawn = lift <<< fork <<< flip catchError putThreadError
spawn = lift <<< fork <<< flip catchError putThreadError
_thread <- spawn $ void $ runMaybeT $ Collect.foreach rx (hoist lift prod)
x <- lift init
_thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod)
flip tailRecM unit $ const do
getThreadError >>= traverse_ throwError
rb <- lift read
case rb of
ReadOk (Just b) -> yield (Just b) $> Loop unit
ReadOk Nothing -> killThread *> yield Nothing $> Done unit
ReadWouldBlock -> void (lift awaitRead) $> Loop unit
lift $ stripIO done
flip tailRecM unit $ const do
getThreadError >>= traverse_ throwError
rb <- lift $ read x
case rb of
ReadOk (Just b) -> yield (Just b) $> Loop unit
ReadOk Nothing -> killThread *> yield Nothing $> Done unit
ReadWouldBlock -> void (lift (awaitRead x)) $> Loop unit
infixl 7 pipeAsync as >-/->

View File

@ -5,6 +5,8 @@ import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Global (Global)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
@ -13,9 +15,9 @@ import Data.Traversable (for_, traverse_)
import Data.Tuple.Nested ((/\))
import Effect (Effect)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception (Error)
import Node.Stream.Object (ReadResult(..), WriteResult(..))
import Node.Stream.Object (WriteResult(..), maybeReadResult)
import Node.Stream.Object as O
import Pipes (await, yield)
import Pipes (for) as P
@ -88,60 +90,77 @@ fromWritable w = do
tailRecM go unit
fromTransformEffect
:: forall a b m
. MonadThrow Error m
=> MonadAff m
=> Effect (O.Transform a b)
-> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransformEffect = fromTransform <=< liftEffect
newtype TransformContext a b =
TransformContext
{ stream :: O.Transform a b
, removeErrorListener :: Effect Unit
, errorST :: STRef Global (Maybe Error)
}
transformCleanup :: forall m a b. MonadEffect m => TransformContext a b -> m Unit
transformCleanup (TransformContext {removeErrorListener}) = do
liftEffect removeErrorListener
transformStream :: forall a b. TransformContext a b -> O.Transform a b
transformStream (TransformContext {stream}) = stream
transformRethrow :: forall m a b. MonadThrow Error m => MonadEffect m => TransformContext a b -> m Unit
transformRethrow (TransformContext {errorST}) = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
-- | Convert a `Transform` stream to an `AsyncPipe`.
fromTransform
:: forall a b m
. MonadThrow Error m
=> MonadAff m
=> O.Transform a b
-> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransform stream = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream
let
rethrow = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
cleanup = liftEffect removeErrorListener
writeSignal =
liftEffect (O.isWritableEnded stream)
<#> if _ then AsyncPipe.WriteSignalEnded else AsyncPipe.WriteSignalOk
readSignal =
liftEffect (O.isReadableEnded stream)
<#> if _ then AsyncPipe.ReadSignalEnded else AsyncPipe.ReadSignalOk
writeResult WriteOk = AsyncPipe.WriteAgain
writeResult WriteWouldBlock = AsyncPipe.WriteNeedsDrain
readResult (ReadJust a) = AsyncPipe.ReadOk (Just a)
readResult ReadWouldBlock = AsyncPipe.ReadWouldBlock
awaitWritable = liftAff $ O.awaitWritableOrClosed stream
awaitReadable = liftAff $ O.awaitReadableOrClosed stream
awaitWrite = rethrow *> awaitWritable *> writeSignal
awaitRead = rethrow *> awaitReadable *> readSignal
whenReadNotEnded m =
liftEffect (O.isReadableEnded stream)
>>= if _ then pure $ AsyncPipe.ReadOk Nothing else m
readNow = readResult <$> liftEffect (O.read stream)
writeNow a = writeResult <$> liftEffect (O.write stream a)
read = rethrow *> whenReadNotEnded readNow
write Nothing = liftEffect (O.end stream) $> AsyncPipe.WriteEnded
write (Just a) = rethrow *> writeNow a
AsyncIO ({write, awaitWrite, read, awaitRead} /\ cleanup)
=> Effect (O.Transform a b)
-> AsyncPipe (TransformContext a b) m (Maybe a) (Maybe b)
fromTransform t =
let
init = do
stream <- liftEffect t
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream
pure $ TransformContext {errorST, removeErrorListener, stream}
write x Nothing = do
let s = transformStream x
liftEffect $ O.end s
pure AsyncPipe.WriteEnded
write x (Just a) = do
transformRethrow x
let s = transformStream x
w <- liftEffect $ O.write s a
pure $ case w of
WriteOk -> AsyncPipe.WriteAgain
WriteWouldBlock -> AsyncPipe.WriteNeedsDrain
awaitWrite x = do
transformRethrow x
let s = transformStream x
liftAff $ O.awaitWritableOrClosed s
ended <- liftEffect $ O.isWritableEnded s
if ended then
pure $ AsyncPipe.WriteSignalEnded
else do
liftAff $ O.awaitWritableOrClosed s
pure $ AsyncPipe.WriteSignalOk
read x =
do
transformRethrow x
let s = transformStream x
readEnded <- liftEffect $ O.isReadableEnded s
if readEnded then do
pure $ AsyncPipe.ReadOk Nothing
else
maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s)
awaitRead x = do
transformRethrow x
let s = transformStream x
ended <- liftEffect $ O.isReadableEnded s
if ended then
pure $ AsyncPipe.ReadSignalEnded
else do
liftAff $ O.awaitReadableOrClosed s
pure $ AsyncPipe.ReadSignalOk
in
AsyncPipe init write awaitWrite read awaitRead
-- | Given a `Producer` of values, wrap them in `Just`.
-- |

View File

@ -13,31 +13,33 @@ import Node.Stream.Object as O
import Node.Zlib as Zlib
import Node.Zlib.Types (ZlibStream)
import Pipes.Async (AsyncPipe)
import Pipes.Node.Stream (fromTransform)
import Pipes.Node.Stream (TransformContext, fromTransform)
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
type X = TransformContext Buffer Buffer
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
fromZlib z =
do
fromTransform do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.unsafeCoerceTransform raw
pure $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
gzip = fromZlib Zlib.createGzip
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
gunzip = fromZlib Zlib.createGunzip
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
unzip = fromZlib Zlib.createUnzip
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
inflate = fromZlib Zlib.createInflate
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
deflate = fromZlib Zlib.createDeflate
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
brotliDecompress = fromZlib Zlib.createBrotliDecompress

View File

@ -150,8 +150,8 @@ spec =
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect csvDecode
>-/-> Pipe.Node.fromTransformEffect cborEncode
>-/-> Pipe.Node.fromTransform csvDecode
>-/-> Pipe.Node.fromTransform cborEncode
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
@ -163,8 +163,8 @@ spec =
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-> sync (Pipe.Node.fromTransformEffect csvDecode)
>-> sync (Pipe.Node.fromTransformEffect cborEncode)
>-> sync (Pipe.Node.fromTransform csvDecode)
>-> sync (Pipe.Node.fromTransform cborEncode)
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
@ -188,7 +188,7 @@ spec =
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-/-> Pipe.Zlib.gzip
>-/-> Pipe.Node.fromTransformEffect slowTransform
>-/-> Pipe.Node.fromTransform slowTransform
>-/-> Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
@ -201,7 +201,7 @@ spec =
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip
>-> sync (Pipe.Node.fromTransformEffect slowTransform)
>-> sync (Pipe.Node.fromTransform slowTransform)
>-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
@ -210,19 +210,10 @@ spec =
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p
out :: List.List Int <-
Pipe.toListM
$ r
>-/-> Pipe.Node.fromTransformEffect discardTransform
>-> Pipe.Node.unEOS
out :: List.List Int <- Pipe.toListM $ r >-/-> Pipe.Node.fromTransform discardTransform >-> Pipe.Node.unEOS
out `shouldEqual` List.Nil
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo bar"
r <- reader p
out :: List.List String <-
Pipe.toListM $
r
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect charsTransform
>-> Pipe.Node.unEOS
out :: List.List String <- Pipe.toListM $ r >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-/-> Pipe.Node.fromTransform charsTransform >-> Pipe.Node.unEOS
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]