diff --git a/src/Node.Stream.CSV.Parse.purs b/src/Node.Stream.CSV.Parse.purs index 8829a3c..8b162a9 100644 --- a/src/Node.Stream.CSV.Parse.purs +++ b/src/Node.Stream.CSV.Parse.purs @@ -22,7 +22,7 @@ import Data.Nullable (Nullable) import Data.Nullable as Nullable import Data.Traversable (for_) import Effect (Effect) -import Effect.Aff (makeAff) +import Effect.Aff (Canceler(..), makeAff) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (liftEffect) import Effect.Exception (error) @@ -98,7 +98,9 @@ parse config csv = do foreach :: forall @r rl x m. MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit foreach stream cb = whileJust do isReadable <- liftEffect $ Stream.readable stream - liftAff $ when (not isReadable) $ makeAff \res -> mempty <* flip (Event.once Stream.readableH) stream $ res $ Right unit + liftAff $ when (not isReadable) $ makeAff \res -> do + stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit + pure $ Canceler $ const $ liftEffect stop whileJust do r <- liftEffect $ read @r stream for_ r cb diff --git a/src/Node.Stream.CSV.Stringify.purs b/src/Node.Stream.CSV.Stringify.purs index f5d0695..06dc4eb 100644 --- a/src/Node.Stream.CSV.Stringify.purs +++ b/src/Node.Stream.CSV.Stringify.purs @@ -13,7 +13,7 @@ import Data.Maybe (Maybe(..)) import Data.String.Regex (Regex) import Data.Traversable (for_) import Effect (Effect) -import Effect.Aff (makeAff) +import Effect.Aff (Canceler(..), makeAff) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (liftEffect) import Foreign (Foreign, unsafeToForeign) @@ -86,7 +86,9 @@ write s = writeImpl s <<< writeCSVRecord @r @rl foreach :: forall m r x. MonadAff m => MonadRec m => CSVStringifier r x -> (String -> m Unit) -> m Unit foreach stream cb = whileJust do isReadable <- liftEffect $ Stream.readable stream - liftAff $ when (not isReadable) $ makeAff \res -> mempty <* flip (Event.once Stream.readableH) stream $ res $ Right unit + liftAff $ when (not isReadable) $ makeAff \res -> do + stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit + pure $ Canceler $ const $ liftEffect stop whileJust do s <- liftEffect $ (join <<< map blush) <$> Stream.readEither stream for_ s cb