Compare commits

...

2 Commits

Author SHA1 Message Date
f3d31ac236
fix: introduced transform bug 2024-06-21 13:18:32 -05:00
4b91ab7d5c
chore: prepare v1.6.0 2024-06-20 15:40:32 -05:00
4 changed files with 21 additions and 14 deletions

View File

@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v1.5.0",
"version": "v1.6.0",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",

View File

@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.5.0'
version: '1.6.0'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'

View File

@ -2,18 +2,19 @@ module Node.Stream.Object where
import Prelude
import Control.Alt ((<|>))
import Control.Monad.Error.Class (liftEither)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Global (Global)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef
import Control.Parallel (parOneOf)
import Control.Parallel (parOneOf, parallel, sequential)
import Data.Either (Either(..))
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..))
import Data.Show.Generic (genericShow)
import Effect (Effect)
import Effect.Aff (Aff, effectCanceler, makeAff)
import Effect.Aff (Aff, ParAff, effectCanceler, makeAff)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Exception (Error, error)
@ -172,13 +173,13 @@ unsafeFromStringWritable :: forall r. Stream.Writable r -> Writable String
unsafeFromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do
awaitReadableOrClosed s = Aff.supervise do
fiber <-
Aff.forkAff $ parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
Aff.forkAff
$ sequential
$ parallel (onceAff0 readableH s $> Right unit)
<|> parallel (onceAff0 closeH s $> Right unit)
<|> parallel (Left <$> onceAff1 errorH s)
closed <- liftEffect $ isClosed s
readEnded <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s
@ -189,14 +190,19 @@ awaitReadableOrClosed s = do
Aff.killFiber (error "") fiber
awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = do
awaitFinished s = Aff.supervise do
fiber <- Aff.forkAff $ onceAff0 finishH s
finished <- liftEffect $ isWritableFinished s
if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do
fiber <- Aff.forkAff $ parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
awaitWritableOrClosed s = Aff.supervise do
fiber <-
Aff.forkAff
$ sequential
$ parallel (onceAff0 drainH s $> Right unit)
<|> parallel (onceAff0 closeH s $> Right unit)
<|> parallel (Left <$> onceAff1 errorH s)
closed <- liftEffect $ isClosed s
writeEnded <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s

View File

@ -7,6 +7,7 @@ import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Control.Parallel (parOneOf)
import Data.Maybe (Maybe(..), maybe)
import Data.Traversable (for_, traverse, traverse_)
import Data.Tuple.Nested ((/\))
@ -118,7 +119,7 @@ fromTransform t = do
ended <- liftEffect $ O.isWritableEnded t
if needsDrain then do
yieldWhileReadable
liftAff $ O.awaitWritableOrClosed t
liftAff $ parOneOf [O.awaitWritableOrClosed t, O.awaitReadableOrClosed t]
pure $ Loop unit
else if ended then
cleanup $> Done unit