From a8702f48490b46a77685169e3fd5583a384b549b Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Thu, 20 Jun 2024 15:40:17 -0500 Subject: [PATCH] fix: `finish` may not emit until all chunks are read --- spago.lock | 1 - spago.yaml | 1 - src/Node.Stream.Object.js | 3 +++ src/Node.Stream.Object.purs | 41 +++++++++++++++++++++++++++---------- src/Pipes.Node.Stream.purs | 25 +++++++++++----------- src/Pipes.Util.purs | 9 ++++++-- 6 files changed, 53 insertions(+), 27 deletions(-) diff --git a/spago.lock b/spago.lock index 768b597..1b8e2df 100644 --- a/spago.lock +++ b/spago.lock @@ -13,7 +13,6 @@ workspace: - lists: ">=7.0.0 <8.0.0" - maybe: ">=6.0.0 <7.0.0" - mmorph: ">=7.0.0 <8.0.0" - - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - node-fs: ">=9.1.0 <10.0.0" diff --git a/spago.yaml b/spago.yaml index a055e69..772d409 100644 --- a/spago.yaml +++ b/spago.yaml @@ -20,7 +20,6 @@ package: - lists: ">=7.0.0 <8.0.0" - maybe: ">=6.0.0 <7.0.0" - mmorph: ">=7.0.0 <8.0.0" - - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - node-fs: ">=9.1.0 <10.0.0" diff --git a/src/Node.Stream.Object.js b/src/Node.Stream.Object.js index 37b3c99..4c32b27 100644 --- a/src/Node.Stream.Object.js +++ b/src/Node.Stream.Object.js @@ -21,6 +21,9 @@ export const isReadableEndedImpl = (s) => () => s.readableEnded; /** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */ export const isWritableEndedImpl = (s) => () => s.writableEnded; +/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */ +export const isWritableFinishedImpl = (s) => () => s.writableFinished; + /** @type {(s: Stream.Writable | Stream.Transform) => () => void} */ export const endImpl = (s) => () => s.end(); diff --git a/src/Node.Stream.Object.purs b/src/Node.Stream.Object.purs index 578d4df..3b73383 100644 --- a/src/Node.Stream.Object.purs +++ b/src/Node.Stream.Object.purs @@ -14,8 +14,9 @@ import Data.Maybe (Maybe(..)) import Data.Show.Generic (genericShow) import Effect (Effect) import Effect.Aff (Aff, effectCanceler, makeAff) +import Effect.Aff as Aff import Effect.Class (liftEffect) -import Effect.Exception (Error) +import Effect.Exception (Error, error) import Effect.Uncurried (mkEffectFn1) import Node.Buffer (Buffer) import Node.EventEmitter (EventHandle(..)) @@ -61,6 +62,7 @@ foreign import isReadableImpl :: forall s. s -> Effect Boolean foreign import isWritableImpl :: forall s. s -> Effect Boolean foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean +foreign import isWritableFinishedImpl :: forall s. s -> Effect Boolean foreign import isClosedImpl :: forall s. s -> Effect Boolean foreign import needsDrainImpl :: forall s. s -> Effect Boolean foreign import readableLengthImpl :: forall s. s -> Effect Int @@ -94,6 +96,7 @@ class Stream s <= Write s a | s -> a where isWritable :: s -> Effect Boolean needsDrain :: s -> Effect Boolean isWritableEnded :: s -> Effect Boolean + isWritableFinished :: s -> Effect Boolean write :: s -> a -> Effect WriteResult end :: s -> Effect Unit @@ -116,18 +119,21 @@ else instance (Read s a) => Read s a where instance Write (Writable a) a where isWritable = isWritableImpl isWritableEnded = isWritableEndedImpl + isWritableFinished = isWritableFinishedImpl write s = writeImpl writeResultFFI s end = endImpl needsDrain = needsDrainImpl else instance Write (Transform a b) a where isWritable = isWritableImpl isWritableEnded = isWritableEndedImpl + isWritableFinished = isWritableFinishedImpl write s = writeImpl writeResultFFI s end = endImpl needsDrain = needsDrainImpl else instance (Write s a) => Write s a where isWritable = isWritableImpl isWritableEnded = isWritableEndedImpl + isWritableFinished = isWritableFinishedImpl write s a = write s a end s = end s needsDrain = needsDrainImpl @@ -167,25 +173,38 @@ unsafeFromStringWritable = unsafeCoerce awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit awaitReadableOrClosed s = do + fiber <- + Aff.forkAff $ parOneOf + [ onceAff0 readableH s $> Right unit + , onceAff0 closeH s $> Right unit + , Left <$> onceAff1 errorH s + ] + closed <- liftEffect $ isClosed s + readEnded <- liftEffect $ isReadableEnded s readable <- liftEffect $ isReadable s length <- liftEffect $ readableLength s - when (readable && length == 0) - $ liftEither - =<< parOneOf - [ onceAff0 readableH s $> Right unit - , onceAff0 closeH s $> Right unit - , Left <$> onceAff1 errorH s - ] + if (not closed && not readEnded && readable && length == 0) then + liftEither =<< Aff.joinFiber fiber + else + Aff.killFiber (error "") fiber awaitFinished :: forall s a. Write s a => s -> Aff Unit -awaitFinished s = onceAff0 finishH s +awaitFinished s = do + fiber <- Aff.forkAff $ onceAff0 finishH s + finished <- liftEffect $ isWritableFinished s + if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit awaitWritableOrClosed s = do + fiber <- Aff.forkAff $ parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] + closed <- liftEffect $ isClosed s + writeEnded <- liftEffect $ isWritableEnded s writable <- liftEffect $ isWritable s needsDrain <- liftEffect $ needsDrain s - when (writable && needsDrain) - $ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] + if not closed && not writeEnded && writable && needsDrain then + liftEither =<< Aff.joinFiber fiber + else + Aff.killFiber (error "") fiber onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit onceAff0 h emitter = makeAff \res -> do diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index 4d089c3..eb16461 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -8,10 +8,8 @@ import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Ref as STRef import Control.Monad.Trans.Class (lift) import Data.Maybe (Maybe(..), maybe) -import Data.Newtype (wrap) import Data.Traversable (for_, traverse, traverse_) import Data.Tuple.Nested ((/\)) -import Effect.Aff (delay) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (liftEffect) import Effect.Exception (Error) @@ -19,7 +17,6 @@ import Node.Stream.Object as O import Pipes (await, yield) import Pipes (for) as P import Pipes.Core (Consumer, Pipe, Producer, Producer_) -import Pipes.Prelude (mapFoldable) as P import Pipes.Util (InvokeResult(..), invoke) -- | Convert a `Readable` stream to a `Pipe`. @@ -97,28 +94,32 @@ fromTransform t = do maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST) cleanup = do + flip tailRecM unit $ const do + liftAff $ O.awaitReadableOrClosed t + readEnded <- liftEffect $ O.isReadableEnded t + yieldWhileReadable + pure $ (if readEnded then Done else Loop) unit + liftAff $ O.awaitFinished t - fromReadable t maybeThrow liftEffect $ removeErrorListener + yield Nothing yieldWhileReadable = void $ whileJust $ maybeYield1 maybeYield1 = traverse (\a -> yield (Just a) $> Just unit) =<< O.maybeReadResult <$> liftEffect (O.read t) onEOS = liftEffect (O.end t) *> cleanup $> Done unit - onChunk a = - liftEffect (O.write t a) - >>= case _ of - O.WriteOk -> maybeYield1 $> Loop unit - O.WriteWouldBlock -> yieldWhileReadable $> Loop unit + onChunk a = liftEffect (O.write t a) $> Loop unit go _ = do maybeThrow needsDrain <- liftEffect $ O.needsDrain t ended <- liftEffect $ O.isWritableEnded t - if needsDrain then - liftAff (delay $ wrap 0.0) *> yieldWhileReadable $> Loop unit + if needsDrain then do + yieldWhileReadable + liftAff $ O.awaitWritableOrClosed t + pure $ Loop unit else if ended then cleanup $> Done unit else @@ -136,7 +137,7 @@ withEOS a = do -- | Strip a pipeline of the EOS signal unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit -unEOS = P.mapFoldable identity +unEOS = tailRecM (const $ maybe (pure $ Done unit) (\a -> yield a $> Loop unit) =<< await) unit -- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`. -- | diff --git a/src/Pipes.Util.purs b/src/Pipes.Util.purs index 563bca4..d45a8ef 100644 --- a/src/Pipes.Util.purs +++ b/src/Pipes.Util.purs @@ -86,9 +86,14 @@ chunked size = do a <- MaybeT await chunkPut a len <- lift chunkLength - when (len >= size) $ lift $ yield =<< Just <$> chunkTake + when (len >= size) do + chunk <- lift chunkTake + lift $ yield $ Just chunk len <- chunkLength - when (len > 0) $ yield =<< Just <$> chunkTake + when (len > 0) do + chunk <- chunkTake + yield $ Just chunk + yield Nothing -- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.