Compare commits
2 Commits
a8702f4849
...
f3d31ac236
Author | SHA1 | Date | |
---|---|---|---|
f3d31ac236 | |||
4b91ab7d5c |
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "purescript-node-stream-pipes",
|
||||
"version": "v1.5.0",
|
||||
"version": "v1.6.0",
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"csv-parse": "^5.5.5",
|
||||
|
@ -1,7 +1,7 @@
|
||||
package:
|
||||
name: node-stream-pipes
|
||||
publish:
|
||||
version: '1.5.0'
|
||||
version: '1.6.0'
|
||||
license: 'GPL-3.0-or-later'
|
||||
location:
|
||||
githubOwner: 'cakekindel'
|
||||
|
@ -2,18 +2,19 @@ module Node.Stream.Object where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Alt ((<|>))
|
||||
import Control.Monad.Error.Class (liftEither)
|
||||
import Control.Monad.ST.Class (liftST)
|
||||
import Control.Monad.ST.Global (Global)
|
||||
import Control.Monad.ST.Ref (STRef)
|
||||
import Control.Monad.ST.Ref as STRef
|
||||
import Control.Parallel (parOneOf)
|
||||
import Control.Parallel (parOneOf, parallel, sequential)
|
||||
import Data.Either (Either(..))
|
||||
import Data.Generic.Rep (class Generic)
|
||||
import Data.Maybe (Maybe(..))
|
||||
import Data.Show.Generic (genericShow)
|
||||
import Effect (Effect)
|
||||
import Effect.Aff (Aff, effectCanceler, makeAff)
|
||||
import Effect.Aff (Aff, ParAff, effectCanceler, makeAff)
|
||||
import Effect.Aff as Aff
|
||||
import Effect.Class (liftEffect)
|
||||
import Effect.Exception (Error, error)
|
||||
@ -172,13 +173,13 @@ unsafeFromStringWritable :: forall r. Stream.Writable r -> Writable String
|
||||
unsafeFromStringWritable = unsafeCoerce
|
||||
|
||||
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
|
||||
awaitReadableOrClosed s = do
|
||||
awaitReadableOrClosed s = Aff.supervise do
|
||||
fiber <-
|
||||
Aff.forkAff $ parOneOf
|
||||
[ onceAff0 readableH s $> Right unit
|
||||
, onceAff0 closeH s $> Right unit
|
||||
, Left <$> onceAff1 errorH s
|
||||
]
|
||||
Aff.forkAff
|
||||
$ sequential
|
||||
$ parallel (onceAff0 readableH s $> Right unit)
|
||||
<|> parallel (onceAff0 closeH s $> Right unit)
|
||||
<|> parallel (Left <$> onceAff1 errorH s)
|
||||
closed <- liftEffect $ isClosed s
|
||||
readEnded <- liftEffect $ isReadableEnded s
|
||||
readable <- liftEffect $ isReadable s
|
||||
@ -189,14 +190,19 @@ awaitReadableOrClosed s = do
|
||||
Aff.killFiber (error "") fiber
|
||||
|
||||
awaitFinished :: forall s a. Write s a => s -> Aff Unit
|
||||
awaitFinished s = do
|
||||
awaitFinished s = Aff.supervise do
|
||||
fiber <- Aff.forkAff $ onceAff0 finishH s
|
||||
finished <- liftEffect $ isWritableFinished s
|
||||
if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber
|
||||
|
||||
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
|
||||
awaitWritableOrClosed s = do
|
||||
fiber <- Aff.forkAff $ parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
|
||||
awaitWritableOrClosed s = Aff.supervise do
|
||||
fiber <-
|
||||
Aff.forkAff
|
||||
$ sequential
|
||||
$ parallel (onceAff0 drainH s $> Right unit)
|
||||
<|> parallel (onceAff0 closeH s $> Right unit)
|
||||
<|> parallel (Left <$> onceAff1 errorH s)
|
||||
closed <- liftEffect $ isClosed s
|
||||
writeEnded <- liftEffect $ isWritableEnded s
|
||||
writable <- liftEffect $ isWritable s
|
||||
|
@ -7,6 +7,7 @@ import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM, whileJust)
|
||||
import Control.Monad.ST.Class (liftST)
|
||||
import Control.Monad.ST.Ref as STRef
|
||||
import Control.Monad.Trans.Class (lift)
|
||||
import Control.Parallel (parOneOf)
|
||||
import Data.Maybe (Maybe(..), maybe)
|
||||
import Data.Traversable (for_, traverse, traverse_)
|
||||
import Data.Tuple.Nested ((/\))
|
||||
@ -118,7 +119,7 @@ fromTransform t = do
|
||||
ended <- liftEffect $ O.isWritableEnded t
|
||||
if needsDrain then do
|
||||
yieldWhileReadable
|
||||
liftAff $ O.awaitWritableOrClosed t
|
||||
liftAff $ parOneOf [O.awaitWritableOrClosed t, O.awaitReadableOrClosed t]
|
||||
pure $ Loop unit
|
||||
else if ended then
|
||||
cleanup $> Done unit
|
||||
|
Loading…
Reference in New Issue
Block a user