fix: continue to read after readableEnded

This commit is contained in:
orion 2024-05-14 13:51:57 -05:00
parent c8822aeffe
commit 33d42034fc
Signed by: orion
GPG Key ID: 6D4165AE4C928719
3 changed files with 30 additions and 32 deletions

View File

@ -24,16 +24,12 @@ export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end();
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
export const writeImpl =
({ ok, wouldBlock, closed }) =>
({ ok, wouldBlock }) =>
(s) =>
(a) =>
() => {
if (s.writableEnded) {
return closed;
}
if (s.write(a)) {
return ok;
} else {
@ -41,15 +37,11 @@ export const writeImpl =
}
};
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
export const readImpl =
({ just, closed, wouldBlock }) =>
({ just, wouldBlock }) =>
(s) =>
() => {
if (s.readableEnded) {
return closed;
}
const a = s.read();
if (a === null) {
return wouldBlock;

View File

@ -26,7 +26,6 @@ import Unsafe.Coerce (unsafeCoerce)
data ReadResult a
= ReadWouldBlock
| ReadClosed
| ReadJust a
derive instance Generic (ReadResult a) _
@ -37,7 +36,6 @@ instance Show (ReadResult a) where
data WriteResult
= WriteWouldBlock
| WriteClosed
| WriteOk
derive instance Generic WriteResult _
@ -45,8 +43,8 @@ derive instance Eq WriteResult
instance Show WriteResult where
show = genericShow
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult }
type ReadResultFFI a = { wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { wouldBlock :: WriteResult, ok :: WriteResult }
foreign import data Writable :: Type -> Type
foreign import data Readable :: Type -> Type
@ -64,10 +62,10 @@ foreign import needsDrainImpl :: forall s. s -> Effect Boolean
foreign import readableLengthImpl :: forall s. s -> Effect Int
readResultFFI :: forall a. ReadResultFFI a
readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust }
readResultFFI = { wouldBlock: ReadWouldBlock, just: ReadJust }
writeResultFFI :: WriteResultFFI
writeResultFFI = { closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk }
writeResultFFI = { wouldBlock: WriteWouldBlock, ok: WriteOk }
class Stream :: Type -> Constraint
class Stream s where

View File

@ -40,8 +40,13 @@ fromReadable r =
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
O.ReadClosed -> yield Nothing *> cleanup cancel
O.ReadWouldBlock -> do
ended <- liftEffect $ O.isReadableEnded r
if ended then do
yield Nothing
cleanup cancel
else
liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
in
do
e <- liftEffect $ O.withErrorST r
@ -66,14 +71,16 @@ fromWritable w =
needsDrain <- liftEffect $ O.needsDrain w
when needsDrain $ liftAff $ O.awaitWritableOrClosed w
ma <- await
case ma of
Nothing -> cleanup cancel
Just a -> do
res <- liftEffect $ O.write w a
case res of
O.WriteClosed -> cleanup cancel
_ -> pure $ Loop { error, cancel }
ended <- liftEffect $ O.isWritableEnded w
if ended then
cleanup cancel
else
await >>= case _ of
Nothing -> cleanup cancel
Just a -> do
void $ liftEffect $ O.write w a
pure $ Loop { error, cancel }
in
do
r <- liftEffect $ O.withErrorST w
@ -111,18 +118,19 @@ fromTransform t =
for_ err throwError
needsDrain <- liftEffect $ O.needsDrain t
ended <- liftEffect $ O.isWritableEnded t
if needsDrain then do
liftAff $ delay $ wrap 0.0
yieldWhileReadable
pure $ Loop { error, cancel }
else do
ma <- await
case ma of
else if ended then
cleanup cancel
else
await >>= case _ of
Nothing -> cleanup cancel
Just a' -> do
res <- liftEffect $ O.write t a'
case res of
O.WriteClosed -> cleanup cancel
O.WriteOk -> do
maybeYield1
pure $ Loop { error, cancel }