diff --git a/src/Node.Stream.Object.js b/src/Node.Stream.Object.js index 5f85aa1..2bdaaa7 100644 --- a/src/Node.Stream.Object.js +++ b/src/Node.Stream.Object.js @@ -3,6 +3,9 @@ import Stream from "stream"; /** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */ export const isReadableImpl = (s) => () => s.readable; +/** @type {(s: Stream.Readable | Stream.Transform) => () => number} */ +export const readableLengthImpl = (s) => () => s.readableLength; + /** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */ export const isClosedImpl = (s) => () => s.closed; diff --git a/src/Node.Stream.Object.purs b/src/Node.Stream.Object.purs index 1cbe20d..27c566c 100644 --- a/src/Node.Stream.Object.purs +++ b/src/Node.Stream.Object.purs @@ -60,6 +60,7 @@ foreign import isWritableImpl :: forall s. s -> Effect Boolean foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean foreign import isClosedImpl :: forall s. s -> Effect Boolean +foreign import readableLengthImpl :: forall s. s -> Effect Int readResultFFI :: forall a. ReadResultFFI a readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust } @@ -81,6 +82,7 @@ else instance Stream s => Stream s where isClosed s = isClosed s class Stream s <= Read s a | s -> a where + readableLength :: s -> Effect Int isReadable :: s -> Effect Boolean isReadableEnded :: s -> Effect Boolean read :: s -> Effect (ReadResult a) @@ -92,14 +94,17 @@ class Stream s <= Write s a | s -> a where end :: s -> Effect Unit instance Read (Readable a) a where + readableLength = readableLengthImpl isReadable = isReadableImpl isReadableEnded = isReadableEndedImpl read = readImpl readResultFFI else instance Read (Transform a b) b where + readableLength = readableLengthImpl isReadable = isReadableImpl isReadableEnded = isReadableEndedImpl read = readImpl readResultFFI else instance (Read s a) => Read s a where + readableLength = readableLengthImpl isReadable = isReadableImpl isReadableEnded = isReadableEndedImpl read s = read s @@ -149,8 +154,14 @@ awaitReadableOrClosed s = do closed <- liftEffect $ isClosed s ended <- liftEffect $ isReadableEnded s readable <- liftEffect $ isReadable s - when (not ended && not closed && not readable) - $ liftEither =<< parOneOf [ onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] + length <- liftEffect $ readableLength s + when (not ended && not closed && not readable && length > 0) + $ liftEither + =<< parOneOf + [ onceAff0 readableH s $> Right unit + , onceAff0 closeH s $> Right unit + , Left <$> onceAff1 errorH s + ] awaitFinished :: forall s a. Write s a => s -> Aff Unit awaitFinished s = onceAff0 finishH s