fix: loop bug

This commit is contained in:
orion 2024-06-22 19:33:45 -05:00
parent 0ba315ede0
commit 2e0be4ac62
Signed by: orion
GPG Key ID: 6D4165AE4C928719
2 changed files with 7 additions and 8 deletions

View File

@ -8,7 +8,7 @@ import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork) import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (runMaybeT) import Control.Monad.Maybe.Trans (runMaybeT)
import Control.Monad.Morph (hoist) import Control.Monad.Morph (hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), forever, tailRecM) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef) import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref import Control.Monad.ST.Ref as ST.Ref
@ -116,7 +116,7 @@ instance Monad m => Profunctor (AsyncPipe x m) where
-- | Wraps all fields of an `AsyncPipe` with logging to debug -- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing. -- | behavior and timing.
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m a b -> AsyncPipe x m a b debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe x m (Maybe a) (Maybe b)
debug c (AsyncPipe init write awaitWrite read awaitRead) = debug c (AsyncPipe init write awaitWrite read awaitRead) =
let let
logL m = liftEffect $ log $ "[" <> c <> "] " <> m logL m = liftEffect $ log $ "[" <> c <> "] " <> m
@ -144,7 +144,7 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) =
read' x = do read' x = do
logR "read >" logR "read >"
elapsed /\ r <- time $ read x elapsed /\ r <- time $ read x
logR $ "< read " <> show (r $> unit) <> " (" <> show (unwrap elapsed) <> "ms)" logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r pure r
awaitWrite' x = do awaitWrite' x = do
@ -284,12 +284,12 @@ pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) =
x <- lift init x <- lift init
_thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod) _thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod)
forever do flip tailRecM unit $ const do
getThreadError >>= traverse_ throwError getThreadError >>= traverse_ throwError
rb <- lift $ read x rb <- lift $ read x
case rb of case rb of
ReadOk (Just b) -> yield $ Just b ReadOk (Just b) -> yield (Just b) $> Loop unit
ReadOk Nothing -> killThread *> yield Nothing ReadOk Nothing -> killThread *> yield Nothing $> Done unit
ReadWouldBlock -> void $ lift (awaitRead x) ReadWouldBlock -> void (lift (awaitRead x)) $> Loop unit
infixl 7 pipeAsync as >-/-> infixl 7 pipeAsync as >-/->

View File

@ -147,7 +147,6 @@ fromTransform t =
let s = transformStream x let s = transformStream x
readEnded <- liftEffect $ O.isReadableEnded s readEnded <- liftEffect $ O.isReadableEnded s
if readEnded then do if readEnded then do
transformCleanup x
pure $ AsyncPipe.ReadOk Nothing pure $ AsyncPipe.ReadOk Nothing
else else
maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s) maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s)