feat!: add AsyncPipe abstraction, significantly improve throughput of Transform streams

This commit is contained in:
bingus 2024-06-22 18:42:22 -05:00
parent 4cd44367a8
commit 5b3eda707e
Signed by: orion
GPG Key ID: 6D4165AE4C928719
14 changed files with 8737 additions and 114 deletions

BIN
bun.lockb

Binary file not shown.

View File

@ -3,10 +3,11 @@
"version": "v1.6.1", "version": "v1.6.1",
"type": "module", "type": "module",
"dependencies": { "dependencies": {
"csv-parse": "^5.5.5", "csv-parse": "^5.5.6",
"csv-stringify": "^6.4.6" "csv-stringify": "^6.5.0"
}, },
"devDependencies": { "devDependencies": {
"cbor-x": "^1.5.9",
"typescript": "^5.4.5" "typescript": "^5.4.5"
} }
} }

View File

@ -5,20 +5,26 @@ workspace:
dependencies: dependencies:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- console
- control: ">=6.0.0 <7.0.0"
- datetime
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0" - node-fs: ">=9.1.0 <10.0.0"
- node-path: ">=5.0.0 <6.0.0" - node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0" - node-zlib: ">=0.4.0 <0.5.0"
- now
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"

View File

@ -12,20 +12,26 @@ package:
dependencies: dependencies:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- console: ">=6.1.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0" - node-fs: ">=9.1.0 <10.0.0"
- node-path: ">=5.0.0 <6.0.0" - node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0" - node-zlib: ">=0.4.0 <0.5.0"
- now: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"

281
src/Pipes.Async.purs Normal file
View File

@ -0,0 +1,281 @@
module Pipes.Async where
import Prelude hiding (join)
import Control.Alternative (class Alternative, empty, guard)
import Control.Monad.Error.Class (class MonadError, catchError, throwError)
import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (runMaybeT)
import Control.Monad.Morph (hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), forever, tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.Trans.Class (lift)
import Control.Parallel (class Parallel, parOneOf)
import Data.Array (fold)
import Data.Array as Array
import Data.DateTime.Instant as Instant
import Data.Either (Either(..), either)
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (unwrap)
import Data.Show.Generic (genericShow)
import Data.Time.Duration (Milliseconds)
import Data.Traversable (traverse_)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Console (log)
import Effect.Exception (Error)
import Effect.Now as Now
import Pipes (await, yield)
import Pipes.Collect as Collect
import Pipes.Core (Pipe, Producer, Proxy)
data WriteSignal
= WriteSignalOk
| WriteSignalEnded
derive instance Generic WriteSignal _
derive instance Eq WriteSignal
derive instance Ord WriteSignal
instance Show WriteSignal where show = genericShow
instance Discard WriteSignal where
discard = bind
data ReadSignal
= ReadSignalOk
| ReadSignalEnded
derive instance Generic ReadSignal _
derive instance Eq ReadSignal
derive instance Ord ReadSignal
instance Show ReadSignal where show = genericShow
instance Discard ReadSignal where
discard = bind
data WriteResult
= WriteAgain
| WriteNeedsDrain
| WriteEnded
derive instance Generic WriteResult _
derive instance Eq WriteResult
derive instance Ord WriteResult
instance Show WriteResult where show = genericShow
data ReadResult a
= ReadOk a
| ReadWouldBlock
derive instance Generic (ReadResult a) _
derive instance Eq a => Eq (ReadResult a)
derive instance Ord a => Ord (ReadResult a)
derive instance Functor ReadResult
instance Show a => Show (ReadResult a) where show = genericShow
-- | An `AsyncPipe` is a `Pipe`-like struct that allows
-- | concurrently reading from a `Producer` and writing to a `Consumer`.
-- |
-- | An implementation of `AsyncPipe` for Node `Transform` streams
-- | is provided in `Pipes.Node.Stream`.
-- |
-- | ## Fields
-- | - `m x`
-- | - Initializer
-- | - `x -> a -> m WriteResult`
-- | - Write a value `a` to the underlying resource
-- | - `x -> m WriteSignal`
-- | - Block until the pipe is writable again (or writing must stop)
-- | - `x -> m (ReadResult b)`
-- | - Attempt to read a chunk
-- | - `x -> m ReadSignal`
-- | - Block until the pipe is readable again (or reading must stop)
data AsyncPipe x a b m =
AsyncPipe
(m x)
(x -> a -> m WriteResult)
(x -> m WriteSignal)
(x -> m (ReadResult b))
(x -> m ReadSignal)
-- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing.
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x a b m -> AsyncPipe x a b m
debug c (AsyncPipe init write awaitWrite read awaitRead) =
let
logL m = liftEffect $ log $ "[" <> c <> "] " <> m
logR m = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> m
time :: forall a'. m a' -> m (Milliseconds /\ a')
time m = do
start <- liftEffect Now.now
a <- m
end <- liftEffect Now.now
pure $ (end `Instant.diff` start) /\ a
init' = do
logL "init >"
elapsed /\ x <- time init
logL $ "< init " <> "(" <> show (unwrap elapsed) <> "ms)"
pure x
write' x a = do
logL "write >"
elapsed /\ w <- time $ write x a
logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
read' x = do
logR "read >"
elapsed /\ r <- time $ read x
logR $ "< read " <> show (r $> unit) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
awaitWrite' x = do
logL "awaitWrite >"
elapsed /\ w <- time $ awaitWrite x
logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
awaitRead' x = do
logR "awaitRead >"
elapsed /\ r <- time $ awaitRead x
logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
in
AsyncPipe init' write' awaitWrite' read' awaitRead'
-- | Convert an `AsyncPipe` to a regular `Pipe`.
-- |
-- | Rather than two concurrently-running halves (producer & consumer),
-- | this requires the `AsyncPipe` to occasionally stop `await`ing data
-- | written by the upstream `Producer` so that it can `yield` to the downstream `Consumer`.
-- |
-- | This implementation chooses to prioritize `yield`ing data to the `Consumer` over
-- | `await`ing written chunks.
-- |
-- | Note that using this limits the potential parallelism of the entire pipeline, ex:
-- |
-- | ```purs
-- | Pipe.FS.read "foo.csv" -- read
-- | >-> sync Pipe.CSV.parse -- parse
-- | >-> sync Pipe.CBOR.encode -- encode
-- | >-> Pipe.FS.write "foo.bin" -- write
-- | ```
-- |
-- | In the above example, this is what happens when the pipeline
-- | is executed:
-- | 1. `write` asks `encode` "do you have any data yet?" (fast)
-- | 1. `encode` asks `parse` "do you have any data yet?" (fast)
-- | 1. `parse` asks `read` "do you have any data yet?" (fast)
-- | 1. `read` passes 1 chunk to `parse` (fast)
-- | 1. `parse` blocks until the chunk is parsed (slow)
-- | 1. `parse` passes 1 chunk to `encode` (fast)
-- | 1. `encode` blocks until the chunk is encoded (slow)
-- | 1. `write` writes the block (fast)
-- |
-- | For larger workloads, changing this to use `asyncPipe` would be preferable, ex:
-- | ```purs
-- | Pipe.FS.read "foo.csv" -- read
-- | >-/-> Pipe.CSV.parse -- parse
-- | >-/-> Pipe.CBOR.encode -- encode
-- | >-> Pipe.FS.write "foo.bin" -- write
-- | ```
-- |
-- | With this change:
-- | * `read` will pass chunks to `parse` as fast as `parse` allows
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x (Maybe a) (Maybe b) m -> Pipe (Maybe a) (Maybe b) m Unit
sync (AsyncPipe init write awaitWrite read awaitRead) =
let
liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r
liftPipe = lift
liftM :: forall r. m r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r
liftM = liftPipe <<< lift
continue a = throwError (Loop a)
break = throwError (Done unit)
awaitRW x = parOneOf [Right <$> awaitWrite x, Left <$> awaitRead x]
wSignal WriteSignalOk = WriteAgain
wSignal WriteSignalEnded = WriteEnded
in do
x <- lift init
flip tailRecM WriteAgain
\w ->
map (either identity identity)
$ runExceptT do
rb <- liftM $ read x
case rb of
ReadWouldBlock
| w == WriteEnded -> liftM (awaitRead x) *> continue w
| w == WriteNeedsDrain -> liftM (awaitRW x) >>= either (const $ continue w) (continue <<< wSignal)
| otherwise -> pure unit
ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue w
ReadOk Nothing -> liftPipe (yield Nothing) *> break
when (w /= WriteAgain) $ continue w
a <- liftPipe await
w' <- liftM $ write x a
when (isNothing a) $ continue WriteEnded
pure $ Loop w'
-- | Implementation of `(>-/->)`
-- |
-- | In the current `MonadFork` "thread", read data from the `AsyncPipe` as it
-- | is yielded and `yield` to the downstream `Consumer`.
-- |
-- | Concurrently, in a separate thread, read data from the upstream `Producer`
-- | and write to the `AsyncPipe` at max throughput.
-- |
-- | If the producing half fails, the error is caught and rethrown.
-- |
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
pipeAsync
:: forall f m x a b
. MonadRec m
=> MonadAff m
=> MonadBracket Error f m
=> Producer (Maybe a) m Unit
-> AsyncPipe x (Maybe a) (Maybe b) m
-> Producer (Maybe b) m Unit
pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) =
do
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
let
killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST
threadKilled = liftEffect $ liftST $ ST.Ref.read killST
putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just
getThreadError = liftEffect $ liftST $ ST.Ref.read errST
rx x a = do
killed <- threadKilled
guard $ not killed
w <- lift $ write x a
case w of
WriteNeedsDrain -> lift $ void $ awaitWrite x
WriteEnded -> empty
WriteAgain -> pure unit
spawn = lift <<< fork <<< flip catchError putThreadError
x <- lift init
_thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod)
forever do
getThreadError >>= traverse_ throwError
rb <- lift $ read x
case rb of
ReadOk (Just b) -> yield $ Just b
ReadOk Nothing -> killThread *> yield Nothing
ReadWouldBlock -> void $ lift (awaitRead x)
infixl 7 pipeAsync as >-/->

View File

@ -31,12 +31,8 @@ traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Uni
traverse f b0 p0 = traverse f b0 p0 =
flip tailRecM (p0 /\ b0) \(p /\ b) -> flip tailRecM (p0 /\ b0) \(p /\ b) ->
case p of case p of
Respond a m -> do Respond a m -> Loop <$> (m unit /\ _) <$> f b a
b' <- f b a M m -> Loop <$> (_ /\ b) <$> m
pure $ Loop $ m unit /\ b'
M m -> do
n <- m
pure $ Loop $ (n /\ b)
Request _ _ -> pure $ Done b Request _ _ -> pure $ Done b
Pure _ -> pure $ Done b Pure _ -> pure $ Done b

View File

@ -3,20 +3,26 @@ module Pipes.Node.Stream where
import Prelude hiding (join) import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError) import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM, whileJust) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Global (Global)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Control.Parallel (parOneOf)
import Data.Maybe (Maybe(..), maybe) import Data.Maybe (Maybe(..), maybe)
import Data.Traversable (for_, traverse, traverse_) import Data.Traversable (for_, traverse_)
import Data.Tuple.Nested ((/\)) import Data.Tuple.Nested ((/\))
import Effect (Effect)
import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception (Error) import Effect.Exception (Error)
import Node.Stream.Object (WriteResult(..), maybeReadResult)
import Node.Stream.Object as O import Node.Stream.Object as O
import Pipes (await, yield) import Pipes (await, yield)
import Pipes (for) as P import Pipes (for) as P
import Pipes.Async (AsyncPipe(..))
import Pipes.Async as AsyncPipe
import Pipes.Core (Consumer, Pipe, Producer, Producer_) import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Util (InvokeResult(..), invoke) import Pipes.Util (InvokeResult(..), invoke)
@ -84,49 +90,78 @@ fromWritable w = do
tailRecM go unit tailRecM go unit
-- | Convert a `Transform` stream to a `Pipe`. newtype TransformContext a b =
-- | TransformContext
-- | When `Nothing` is piped to this, the `Transform` stream will { stream :: O.Transform a b
-- | be `end`ed, and the pipe will noop if invoked again. , removeErrorListener :: Effect Unit
fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit , errorST :: STRef Global (Maybe Error)
fromTransform t = do }
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST t
transformCleanup :: forall m a b. MonadEffect m => TransformContext a b -> m Unit
transformCleanup (TransformContext {removeErrorListener}) = do
liftEffect removeErrorListener
transformStream :: forall a b. TransformContext a b -> O.Transform a b
transformStream (TransformContext {stream}) = stream
transformRethrow :: forall m a b. MonadThrow Error m => MonadEffect m => TransformContext a b -> m Unit
transformRethrow (TransformContext {errorST}) = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
-- | Convert a `Transform` stream to an `AsyncPipe`.
fromTransform
:: forall a b m
. MonadThrow Error m
=> MonadAff m
=> Effect (O.Transform a b)
-> AsyncPipe (TransformContext a b) (Maybe a) (Maybe b) m
fromTransform t =
let let
maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST) init = do
stream <- liftEffect t
cleanup = do { error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream
flip tailRecM unit $ const do pure $ TransformContext {errorST, removeErrorListener, stream}
liftAff $ O.awaitReadableOrClosed t write x Nothing = do
readEnded <- liftEffect $ O.isReadableEnded t let s = transformStream x
yieldWhileReadable liftEffect $ O.end s
pure $ (if readEnded then Done else Loop) unit pure AsyncPipe.WriteEnded
write x (Just a) = do
liftAff $ O.awaitFinished t transformRethrow x
maybeThrow let s = transformStream x
liftEffect $ removeErrorListener w <- liftEffect $ O.write s a
yield Nothing pure $ case w of
WriteOk -> AsyncPipe.WriteAgain
yieldWhileReadable = void $ whileJust $ maybeYield1 WriteWouldBlock -> AsyncPipe.WriteNeedsDrain
awaitWrite x = do
maybeYield1 = traverse (\a -> yield (Just a) $> Just unit) =<< O.maybeReadResult <$> liftEffect (O.read t) transformRethrow x
let s = transformStream x
onEOS = liftEffect (O.end t) *> cleanup $> Done unit liftAff $ O.awaitWritableOrClosed s
onChunk a = liftEffect (O.write t a) $> Loop unit ended <- liftEffect $ O.isWritableEnded s
if ended then
go _ = do pure $ AsyncPipe.WriteSignalEnded
maybeThrow else do
needsDrain <- liftEffect $ O.needsDrain t liftAff $ O.awaitWritableOrClosed s
ended <- liftEffect $ O.isWritableEnded t pure $ AsyncPipe.WriteSignalOk
if needsDrain then do read x =
yieldWhileReadable do
liftAff $ parOneOf [O.awaitWritableOrClosed t, O.awaitReadableOrClosed t] transformRethrow x
pure $ Loop unit let s = transformStream x
else if ended then readEnded <- liftEffect $ O.isReadableEnded s
cleanup $> Done unit if readEnded then do
transformCleanup x
pure $ AsyncPipe.ReadOk Nothing
else else
await >>= maybe onEOS onChunk maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s)
awaitRead x = do
tailRecM go unit transformRethrow x
let s = transformStream x
ended <- liftEffect $ O.isReadableEnded s
if ended then
pure $ AsyncPipe.ReadSignalEnded
else do
liftAff $ O.awaitReadableOrClosed s
pure $ AsyncPipe.ReadSignalOk
in
AsyncPipe init write awaitWrite read awaitRead
-- | Given a `Producer` of values, wrap them in `Just`. -- | Given a `Producer` of values, wrap them in `Just`.
-- | -- |

View File

@ -12,31 +12,34 @@ import Node.Buffer (Buffer)
import Node.Stream.Object as O import Node.Stream.Object as O
import Node.Zlib as Zlib import Node.Zlib as Zlib
import Node.Zlib.Types (ZlibStream) import Node.Zlib.Types (ZlibStream)
import Pipes.Core (Pipe) import Pipes.Async (AsyncPipe)
import Pipes.Node.Stream (fromTransform) import Pipes.Node.Stream (TransformContext, fromTransform)
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit type X = TransformContext Buffer Buffer
fromZlib z = do
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
fromZlib z =
fromTransform do
raw <- liftEffect $ Zlib.toDuplex <$> z raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.unsafeCoerceTransform raw pure $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
gzip = fromZlib Zlib.createGzip gzip = fromZlib Zlib.createGzip
gunzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
gunzip = fromZlib Zlib.createGunzip gunzip = fromZlib Zlib.createGunzip
unzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
unzip = fromZlib Zlib.createUnzip unzip = fromZlib Zlib.createUnzip
inflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
inflate = fromZlib Zlib.createInflate inflate = fromZlib Zlib.createInflate
deflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
deflate = fromZlib Zlib.createDeflate deflate = fromZlib Zlib.createDeflate
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
brotliCompress = fromZlib Zlib.createBrotliCompress brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
brotliDecompress = fromZlib Zlib.createBrotliDecompress brotliDecompress = fromZlib Zlib.createBrotliDecompress

View File

@ -149,3 +149,5 @@ invoke m a =
Pure _ -> pure Exited Pure _ -> pure Exited
in in
go (IDidNotAwait m) go (IDidNotAwait m)

View File

@ -3,18 +3,19 @@ module Test.Main where
import Prelude import Prelude
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..))
import Data.Time.Duration (Milliseconds(..))
import Effect (Effect) import Effect (Effect)
import Effect.Aff (launchAff_) import Effect.Aff (launchAff_)
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Collect as Test.Pipes.Collect import Test.Pipes.Collect as Test.Pipes.Collect
import Test.Pipes.Construct as Test.Pipes.Construct import Test.Pipes.Construct as Test.Pipes.Construct
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Spec.Reporter (specReporter) import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec') import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit main :: Effect Unit
main = launchAff_ $ runSpec' (defaultConfig { exit = false, timeout = Nothing }) [ specReporter ] do main = launchAff_ $ runSpec' (defaultConfig { slow = Milliseconds 0.0, failFast = true, exit = false, timeout = Nothing }) [ specReporter ] do
Test.Pipes.Node.Stream.spec Test.Pipes.Node.Stream.spec
Test.Pipes.Node.Buffer.spec Test.Pipes.Node.Buffer.spec
Test.Pipes.Node.FS.spec Test.Pipes.Node.FS.spec

View File

@ -1,4 +1,16 @@
import Stream from 'stream' import Stream from 'stream'
import * as CBOR from "cbor-x";
import * as CSVDecode from "csv-parse";
import * as CSVEncode from "csv-stringify";
export const cborDecode = () => new CBOR.DecoderStream({useRecords: false, allowHalfOpen: true});
export const cborEncode = () => new CBOR.EncoderStream({useRecords: false, allowHalfOpen: true});
export const cborDecodeSync = a => () => CBOR.decodeMultiple(a);
export const cborEncodeSync = a => () => CBOR.encode(a, {useRecords: false});
export const csvDecode = () => CSVDecode.parse({columns: true, allowHalfOpen: true})
export const csvEncode = () => CSVEncode.stringify({header: true, allowHalfOpen: true})
export const discardTransform = () => new Stream.Transform({ export const discardTransform = () => new Stream.Transform({
transform: function(_ck, _enc, cb) { transform: function(_ck, _enc, cb) {
@ -7,6 +19,16 @@ export const discardTransform = () => new Stream.Transform({
objectMode: true objectMode: true
}) })
export const slowTransform = () => {
return new Stream.Transform({
transform: function(ck, _enc, cb) {
this.push(ck)
setTimeout(() => cb(), 4)
},
objectMode: true
})
}
export const charsTransform = () => new Stream.Transform({ export const charsTransform = () => new Stream.Transform({
transform: function(ck, _enc, cb) { transform: function(ck, _enc, cb) {
ck.split('').filter(s => !!s).forEach(s => this.push(s)) ck.split('').filter(s => !!s).forEach(s => this.push(s))

View File

@ -2,49 +2,72 @@ module Test.Pipes.Node.Stream where
import Prelude import Prelude
import Control.Monad.Trans.Class (lift) import Control.Monad.Cont (lift)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Data.Array as Array import Data.Array as Array
import Data.Foldable (fold) import Data.Bifunctor (lmap)
import Data.Either (Either(..))
import Data.Foldable (fold, intercalate)
import Data.FoldableWithIndex (forWithIndex_)
import Data.FunctorWithIndex (mapWithIndex)
import Data.Int as Int
import Data.List ((:)) import Data.List ((:))
import Data.List as List import Data.List as List
import Data.Maybe (Maybe) import Data.Maybe (Maybe)
import Data.Newtype (wrap) import Data.Newtype (wrap)
import Data.Profunctor.Strong (first)
import Data.String as String
import Data.String.Gen (genAlphaString) import Data.String.Gen (genAlphaString)
import Data.Traversable (for_, traverse)
import Data.Tuple.Nested (type (/\), (/\)) import Data.Tuple.Nested (type (/\), (/\))
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff, delay) import Effect.Aff (Aff, delay)
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception (error)
import Effect.Unsafe (unsafePerformEffect)
import Foreign (Foreign)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Buffer as Buffer import Node.Buffer as Buffer
import Node.Encoding (Encoding(..)) import Node.Encoding (Encoding(..))
import Node.FS.Stream as FS.Stream import Node.FS.Stream as FS.Stream
import Node.FS.Sync as FS import Node.FS.Sync as FS
import Node.Stream.Object as O import Node.Stream.Object as O
import Node.Zlib as Zlib import Pipes (each) as Pipe
import Pipes (each) as Pipes
import Pipes (yield, (>->)) import Pipes (yield, (>->))
import Pipes.Async (sync, (>-/->))
import Pipes.Collect as Pipe.Collect
import Pipes.Core (Consumer, Producer, runEffect) import Pipes.Core (Consumer, Producer, runEffect)
import Pipes.Node.Buffer as Pipes.Buffer import Pipes.Node.Buffer as Pipe.Buffer
import Pipes.Node.Stream as S import Pipes.Node.FS as Pipe.FS
import Pipes.Prelude (mapFoldable, toListM) as Pipes import Pipes.Node.Stream as Pipe.Node
import Simple.JSON (writeJSON) import Pipes.Node.Zlib as Pipe.Zlib
import Test.Common (jsonParse, jsonStringify, tmpFile, tmpFiles) import Pipes.Prelude (toListM) as Pipe
import Simple.JSON (readImpl, readJSON, writeJSON)
import Test.Common (jsonStringify, tmpFile, tmpFiles)
import Test.QuickCheck.Arbitrary (arbitrary) import Test.QuickCheck.Arbitrary (arbitrary)
import Test.QuickCheck.Gen (randomSample') import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, around, describe, it) import Test.Spec (Spec, around, describe, it)
import Test.Spec.Assertions (shouldEqual) import Test.Spec.Assertions (fail, shouldEqual)
foreign import readableFromArray :: forall @a. Array a -> O.Readable a foreign import readableFromArray :: forall @a. Array a -> O.Readable a
foreign import discardTransform :: forall a b. Effect (O.Transform a b) foreign import discardTransform :: forall a b. Effect (O.Transform a b)
foreign import slowTransform :: forall a b. Effect (O.Transform a b)
foreign import charsTransform :: Effect (O.Transform String String) foreign import charsTransform :: Effect (O.Transform String String)
foreign import cborEncodeSync :: forall a. a -> Effect Buffer
foreign import cborDecodeSync :: forall a. Buffer -> Effect a
foreign import cborEncode :: forall a. Effect (O.Transform a Buffer)
foreign import cborDecode :: forall a. Effect (O.Transform Buffer a)
foreign import csvEncode :: forall a. Effect (O.Transform a String)
foreign import csvDecode :: forall a. Effect (O.Transform String a)
writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit) writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit)
writer a = do writer a = do
stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a
pure $ stream /\ S.fromWritable stream pure $ stream /\ Pipe.Node.fromWritable stream
reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit) reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit)
reader a = liftEffect $ S.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a reader a = liftEffect $ Pipe.Node.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a
spec :: Spec Unit spec :: Spec Unit
spec = spec =
@ -52,30 +75,30 @@ spec =
describe "Readable" do describe "Readable" do
describe "Readable.from(<Iterable>)" do describe "Readable.from(<Iterable>)" do
it "empty" do it "empty" do
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } []) >-> S.unEOS vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } []) >-> Pipe.Node.unEOS
vals `shouldEqual` List.Nil vals `shouldEqual` List.Nil
it "singleton" do it "singleton" do
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> S.unEOS vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> Pipe.Node.unEOS
vals `shouldEqual` ({ foo: "1" } : List.Nil) vals `shouldEqual` ({ foo: "1" } : List.Nil)
it "many elements" do it "many elements" do
let exp = (\n -> { foo: show n }) <$> Array.range 0 100 let exp = (\n -> { foo: show n }) <$> Array.range 0 100
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray exp) >-> S.unEOS vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray exp) >-> Pipe.Node.unEOS
vals `shouldEqual` (List.fromFoldable exp) vals `shouldEqual` (List.fromFoldable exp)
describe "Writable" $ around tmpFile do describe "Writable" $ around tmpFile do
describe "fs.WriteStream" do describe "fs.WriteStream" do
it "pipe to file" \p -> do it "pipe to file" \p -> do
stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p) stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let let
w = S.fromWritable stream w = Pipe.Node.fromWritable stream
source = do source = do
buf <- liftEffect $ Buffer.fromString "hello" UTF8 buf <- liftEffect $ Buffer.fromString "hello" UTF8
yield buf yield buf
runEffect $ S.withEOS source >-> w runEffect $ Pipe.Node.withEOS source >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello" contents `shouldEqual` "hello"
shouldEqual true =<< liftEffect (O.isWritableEnded stream) shouldEqual true =<< liftEffect (O.isWritableEnded stream)
it "async pipe to file" \p -> do it "async pipe to file" \p -> do
w <- S.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p) w <- Pipe.Node.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let let
source = do source = do
yield "hello, " yield "hello, "
@ -87,7 +110,7 @@ spec =
yield "this is a " yield "this is a "
lift $ delay $ wrap 5.0 lift $ delay $ wrap 5.0
yield "test." yield "test."
runEffect $ S.withEOS (source >-> Pipes.Buffer.fromString UTF8) >-> w runEffect $ Pipe.Node.withEOS (source >-> Pipe.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello, world! this is a test." contents `shouldEqual` "hello, world! this is a test."
it "chained pipes" \p -> do it "chained pipes" \p -> do
@ -101,40 +124,96 @@ spec =
let let
exp = fold (writeJSON <$> objs) exp = fold (writeJSON <$> objs)
stream /\ w <- liftEffect $ writer p stream /\ w <- liftEffect $ writer p
runEffect $ S.withEOS (Pipes.each objs >-> jsonStringify >-> Pipes.Buffer.fromString UTF8) >-> w runEffect $ Pipe.Node.withEOS (Pipe.each objs >-> jsonStringify >-> Pipe.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` exp contents `shouldEqual` exp
shouldEqual true =<< liftEffect (O.isWritableEnded stream) shouldEqual true =<< liftEffect (O.isWritableEnded stream)
describe "Transform" do describe "Transform" do
it "gzip" do
let let
json = yield $ writeJSON { foo: "bar" } bignums = Array.range 1 1000
exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000" firstNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/first_names.txt")
gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) lastNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/last_names.txt")
outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex) names n = do
fold outs `shouldEqual` exp first <- firstNames
around tmpFiles last <- Array.take (Int.round $ Int.toNumber n / Int.toNumber (Array.length firstNames)) lastNames
$ it "file >-> gzip >-> file >-> gunzip" \(a /\ b) -> do pure $ first <> " " <> last
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [ 1, 2, 3, 4 ] people n = mapWithIndex (\ix name -> {id: show $ ix + 1, name}) (names n)
areader <- liftEffect $ reader a peopleCSV n = "id,name\n" <> intercalate "\n" ((\{id, name} -> id <> "," <> name) <$> people n)
bwritestream /\ bwriter <- liftEffect $ writer b
gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
runEffect $ areader >-> gzip >-> bwriter
shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream)
gunzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip) for_ [4000, 8000, 32000, 64000, 200000] \n -> do
breader <- liftEffect $ reader b let
nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity) csv = peopleCSV n
Array.fromFoldable nums `shouldEqual` [ 1, 2, 3, 4 ] people' = people n
around tmpFiles
$ it (show n <> " row csv >-/-> csv-parse >-/-> cborEncode") \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a csv
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransform csvDecode
>-/-> Pipe.Node.fromTransform cborEncode
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
ppl `shouldEqual` people'
around tmpFiles
$ it (show n <> " row csv >-> sync csv-parse >-> sync cborEncode") \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a csv
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-> sync (Pipe.Node.fromTransform csvDecode)
>-> sync (Pipe.Node.fromTransform cborEncode)
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
ppl `shouldEqual` people'
around tmpFiles
$ it "file >-> sync gzip >-> sync gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <- Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip
>-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFiles
$ it "file >-/-> gzip >-/-> slow >-/-> gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <-
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-/-> Pipe.Zlib.gzip
>-/-> Pipe.Node.fromTransform slowTransform
>-/-> Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFiles
$ it "file >-> sync gzip >-> sync slow >-> sync gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <-
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip
>-> sync (Pipe.Node.fromTransform slowTransform)
>-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo" liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p r <- reader p
discard' <- liftEffect discardTransform out :: List.List Int <- Pipe.toListM $ r >-/-> Pipe.Node.fromTransform discardTransform >-> Pipe.Node.unEOS
out :: List.List Int <- Pipes.toListM $ r >-> S.fromTransform discard' >-> S.unEOS
out `shouldEqual` List.Nil out `shouldEqual` List.Nil
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo bar" liftEffect $ FS.writeTextFile UTF8 p "foo bar"
r <- reader p r <- reader p
chars' <- liftEffect charsTransform out :: List.List String <- Pipe.toListM $ r >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-/-> Pipe.Node.fromTransform charsTransform >-> Pipe.Node.unEOS
out :: List.List String <- Pipes.toListM $ r >-> S.inEOS (Pipes.Buffer.toString UTF8) >-> S.fromTransform chars' >-> S.unEOS
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ] out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]

4095
test/Test/first_names.txt Normal file

File diff suppressed because it is too large Load Diff

4096
test/Test/last_names.txt Normal file

File diff suppressed because it is too large Load Diff