From 33d42034fc7f103bf6520e8f3fa35cd8dbc01254 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Tue, 14 May 2024 13:51:57 -0500 Subject: [PATCH] fix: continue to read after readableEnded --- src/Node.Stream.Object.js | 16 ++++------------ src/Node.Stream.Object.purs | 10 ++++------ src/Pipes.Node.Stream.purs | 36 ++++++++++++++++++++++-------------- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/src/Node.Stream.Object.js b/src/Node.Stream.Object.js index db8ee5e..37b3c99 100644 --- a/src/Node.Stream.Object.js +++ b/src/Node.Stream.Object.js @@ -24,16 +24,12 @@ export const isWritableEndedImpl = (s) => () => s.writableEnded; /** @type {(s: Stream.Writable | Stream.Transform) => () => void} */ export const endImpl = (s) => () => s.end(); -/** @type {(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */ +/** @type {(o: {ok: WriteResult, wouldBlock: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */ export const writeImpl = - ({ ok, wouldBlock, closed }) => + ({ ok, wouldBlock }) => (s) => (a) => () => { - if (s.writableEnded) { - return closed; - } - if (s.write(a)) { return ok; } else { @@ -41,15 +37,11 @@ export const writeImpl = } }; -/** @type {(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */ +/** @type {(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */ export const readImpl = - ({ just, closed, wouldBlock }) => + ({ just, wouldBlock }) => (s) => () => { - if (s.readableEnded) { - return closed; - } - const a = s.read(); if (a === null) { return wouldBlock; diff --git a/src/Node.Stream.Object.purs b/src/Node.Stream.Object.purs index ac55465..55fed8b 100644 --- a/src/Node.Stream.Object.purs +++ b/src/Node.Stream.Object.purs @@ -26,7 +26,6 @@ import Unsafe.Coerce (unsafeCoerce) data ReadResult a = ReadWouldBlock - | ReadClosed | ReadJust a derive instance Generic (ReadResult a) _ @@ -37,7 +36,6 @@ instance Show (ReadResult a) where data WriteResult = WriteWouldBlock - | WriteClosed | WriteOk derive instance Generic WriteResult _ @@ -45,8 +43,8 @@ derive instance Eq WriteResult instance Show WriteResult where show = genericShow -type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a } -type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult } +type ReadResultFFI a = { wouldBlock :: ReadResult a, just :: a -> ReadResult a } +type WriteResultFFI = { wouldBlock :: WriteResult, ok :: WriteResult } foreign import data Writable :: Type -> Type foreign import data Readable :: Type -> Type @@ -64,10 +62,10 @@ foreign import needsDrainImpl :: forall s. s -> Effect Boolean foreign import readableLengthImpl :: forall s. s -> Effect Int readResultFFI :: forall a. ReadResultFFI a -readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust } +readResultFFI = { wouldBlock: ReadWouldBlock, just: ReadJust } writeResultFFI :: WriteResultFFI -writeResultFFI = { closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk } +writeResultFFI = { wouldBlock: WriteWouldBlock, ok: WriteOk } class Stream :: Type -> Constraint class Stream s where diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index f7be32f..39d1af3 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -40,8 +40,13 @@ fromReadable r = res <- liftEffect $ O.read r case res of O.ReadJust a -> yield (Just a) $> Loop { error, cancel } - O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel } - O.ReadClosed -> yield Nothing *> cleanup cancel + O.ReadWouldBlock -> do + ended <- liftEffect $ O.isReadableEnded r + if ended then do + yield Nothing + cleanup cancel + else + liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel } in do e <- liftEffect $ O.withErrorST r @@ -66,14 +71,16 @@ fromWritable w = needsDrain <- liftEffect $ O.needsDrain w when needsDrain $ liftAff $ O.awaitWritableOrClosed w - ma <- await - case ma of - Nothing -> cleanup cancel - Just a -> do - res <- liftEffect $ O.write w a - case res of - O.WriteClosed -> cleanup cancel - _ -> pure $ Loop { error, cancel } + + ended <- liftEffect $ O.isWritableEnded w + if ended then + cleanup cancel + else + await >>= case _ of + Nothing -> cleanup cancel + Just a -> do + void $ liftEffect $ O.write w a + pure $ Loop { error, cancel } in do r <- liftEffect $ O.withErrorST w @@ -111,18 +118,19 @@ fromTransform t = for_ err throwError needsDrain <- liftEffect $ O.needsDrain t + ended <- liftEffect $ O.isWritableEnded t if needsDrain then do liftAff $ delay $ wrap 0.0 yieldWhileReadable pure $ Loop { error, cancel } - else do - ma <- await - case ma of + else if ended then + cleanup cancel + else + await >>= case _ of Nothing -> cleanup cancel Just a' -> do res <- liftEffect $ O.write t a' case res of - O.WriteClosed -> cleanup cancel O.WriteOk -> do maybeYield1 pure $ Loop { error, cancel }