fix: finish may not emit until all chunks are read

This commit is contained in:
orion 2024-06-20 15:40:17 -05:00
parent f3ea830379
commit a8702f4849
Signed by: orion
GPG Key ID: 6D4165AE4C928719
6 changed files with 53 additions and 27 deletions

View File

@ -13,7 +13,6 @@ workspace:
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0" - node-fs: ">=9.1.0 <10.0.0"

View File

@ -20,7 +20,6 @@ package:
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0" - node-fs: ">=9.1.0 <10.0.0"

View File

@ -21,6 +21,9 @@ export const isReadableEndedImpl = (s) => () => s.readableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */ /** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableEndedImpl = (s) => () => s.writableEnded; export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableFinishedImpl = (s) => () => s.writableFinished;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */ /** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end(); export const endImpl = (s) => () => s.end();

View File

@ -14,8 +14,9 @@ import Data.Maybe (Maybe(..))
import Data.Show.Generic (genericShow) import Data.Show.Generic (genericShow)
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff, effectCanceler, makeAff) import Effect.Aff (Aff, effectCanceler, makeAff)
import Effect.Aff as Aff
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error) import Effect.Exception (Error, error)
import Effect.Uncurried (mkEffectFn1) import Effect.Uncurried (mkEffectFn1)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.EventEmitter (EventHandle(..)) import Node.EventEmitter (EventHandle(..))
@ -61,6 +62,7 @@ foreign import isReadableImpl :: forall s. s -> Effect Boolean
foreign import isWritableImpl :: forall s. s -> Effect Boolean foreign import isWritableImpl :: forall s. s -> Effect Boolean
foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableFinishedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean foreign import isClosedImpl :: forall s. s -> Effect Boolean
foreign import needsDrainImpl :: forall s. s -> Effect Boolean foreign import needsDrainImpl :: forall s. s -> Effect Boolean
foreign import readableLengthImpl :: forall s. s -> Effect Int foreign import readableLengthImpl :: forall s. s -> Effect Int
@ -94,6 +96,7 @@ class Stream s <= Write s a | s -> a where
isWritable :: s -> Effect Boolean isWritable :: s -> Effect Boolean
needsDrain :: s -> Effect Boolean needsDrain :: s -> Effect Boolean
isWritableEnded :: s -> Effect Boolean isWritableEnded :: s -> Effect Boolean
isWritableFinished :: s -> Effect Boolean
write :: s -> a -> Effect WriteResult write :: s -> a -> Effect WriteResult
end :: s -> Effect Unit end :: s -> Effect Unit
@ -116,18 +119,21 @@ else instance (Read s a) => Read s a where
instance Write (Writable a) a where instance Write (Writable a) a where
isWritable = isWritableImpl isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s write s = writeImpl writeResultFFI s
end = endImpl end = endImpl
needsDrain = needsDrainImpl needsDrain = needsDrainImpl
else instance Write (Transform a b) a where else instance Write (Transform a b) a where
isWritable = isWritableImpl isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s write s = writeImpl writeResultFFI s
end = endImpl end = endImpl
needsDrain = needsDrainImpl needsDrain = needsDrainImpl
else instance (Write s a) => Write s a where else instance (Write s a) => Write s a where
isWritable = isWritableImpl isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s a = write s a write s a = write s a
end s = end s end s = end s
needsDrain = needsDrainImpl needsDrain = needsDrainImpl
@ -167,25 +173,38 @@ unsafeFromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do awaitReadableOrClosed s = do
fiber <-
Aff.forkAff $ parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
closed <- liftEffect $ isClosed s
readEnded <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s readable <- liftEffect $ isReadable s
length <- liftEffect $ readableLength s length <- liftEffect $ readableLength s
when (readable && length == 0) if (not closed && not readEnded && readable && length == 0) then
$ liftEither liftEither =<< Aff.joinFiber fiber
=<< parOneOf else
[ onceAff0 readableH s $> Right unit Aff.killFiber (error "") fiber
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
awaitFinished :: forall s a. Write s a => s -> Aff Unit awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = onceAff0 finishH s awaitFinished s = do
fiber <- Aff.forkAff $ onceAff0 finishH s
finished <- liftEffect $ isWritableFinished s
if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do awaitWritableOrClosed s = do
fiber <- Aff.forkAff $ parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
closed <- liftEffect $ isClosed s
writeEnded <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s writable <- liftEffect $ isWritable s
needsDrain <- liftEffect $ needsDrain s needsDrain <- liftEffect $ needsDrain s
when (writable && needsDrain) if not closed && not writeEnded && writable && needsDrain then
$ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] liftEither =<< Aff.joinFiber fiber
else
Aff.killFiber (error "") fiber
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit
onceAff0 h emitter = makeAff \res -> do onceAff0 h emitter = makeAff \res -> do

View File

@ -8,10 +8,8 @@ import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..), maybe) import Data.Maybe (Maybe(..), maybe)
import Data.Newtype (wrap)
import Data.Traversable (for_, traverse, traverse_) import Data.Traversable (for_, traverse, traverse_)
import Data.Tuple.Nested ((/\)) import Data.Tuple.Nested ((/\))
import Effect.Aff (delay)
import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error) import Effect.Exception (Error)
@ -19,7 +17,6 @@ import Node.Stream.Object as O
import Pipes (await, yield) import Pipes (await, yield)
import Pipes (for) as P import Pipes (for) as P
import Pipes.Core (Consumer, Pipe, Producer, Producer_) import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Prelude (mapFoldable) as P
import Pipes.Util (InvokeResult(..), invoke) import Pipes.Util (InvokeResult(..), invoke)
-- | Convert a `Readable` stream to a `Pipe`. -- | Convert a `Readable` stream to a `Pipe`.
@ -97,28 +94,32 @@ fromTransform t = do
maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST) maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST)
cleanup = do cleanup = do
flip tailRecM unit $ const do
liftAff $ O.awaitReadableOrClosed t
readEnded <- liftEffect $ O.isReadableEnded t
yieldWhileReadable
pure $ (if readEnded then Done else Loop) unit
liftAff $ O.awaitFinished t liftAff $ O.awaitFinished t
fromReadable t
maybeThrow maybeThrow
liftEffect $ removeErrorListener liftEffect $ removeErrorListener
yield Nothing
yieldWhileReadable = void $ whileJust $ maybeYield1 yieldWhileReadable = void $ whileJust $ maybeYield1
maybeYield1 = traverse (\a -> yield (Just a) $> Just unit) =<< O.maybeReadResult <$> liftEffect (O.read t) maybeYield1 = traverse (\a -> yield (Just a) $> Just unit) =<< O.maybeReadResult <$> liftEffect (O.read t)
onEOS = liftEffect (O.end t) *> cleanup $> Done unit onEOS = liftEffect (O.end t) *> cleanup $> Done unit
onChunk a = onChunk a = liftEffect (O.write t a) $> Loop unit
liftEffect (O.write t a)
>>= case _ of
O.WriteOk -> maybeYield1 $> Loop unit
O.WriteWouldBlock -> yieldWhileReadable $> Loop unit
go _ = do go _ = do
maybeThrow maybeThrow
needsDrain <- liftEffect $ O.needsDrain t needsDrain <- liftEffect $ O.needsDrain t
ended <- liftEffect $ O.isWritableEnded t ended <- liftEffect $ O.isWritableEnded t
if needsDrain then if needsDrain then do
liftAff (delay $ wrap 0.0) *> yieldWhileReadable $> Loop unit yieldWhileReadable
liftAff $ O.awaitWritableOrClosed t
pure $ Loop unit
else if ended then else if ended then
cleanup $> Done unit cleanup $> Done unit
else else
@ -136,7 +137,7 @@ withEOS a = do
-- | Strip a pipeline of the EOS signal -- | Strip a pipeline of the EOS signal
unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit
unEOS = P.mapFoldable identity unEOS = tailRecM (const $ maybe (pure $ Done unit) (\a -> yield a $> Loop unit) =<< await) unit
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`. -- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.
-- | -- |

View File

@ -86,9 +86,14 @@ chunked size = do
a <- MaybeT await a <- MaybeT await
chunkPut a chunkPut a
len <- lift chunkLength len <- lift chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake when (len >= size) do
chunk <- lift chunkTake
lift $ yield $ Just chunk
len <- chunkLength len <- chunkLength
when (len > 0) $ yield =<< Just <$> chunkTake when (len > 0) do
chunk <- chunkTake
yield $ Just chunk
yield Nothing yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it. -- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.