fix: more Pipes.Collect utils, stack-safety

This commit is contained in:
bingus 2024-05-11 18:01:34 -05:00
parent f2f18c3c13
commit 634e52fe39
Signed by: orion
GPG Key ID: 6D4165AE4C928719
14 changed files with 354 additions and 136 deletions

View File

@ -1,6 +1,7 @@
"name": "purescript-csv-stream",
"name": "purescript-node-stream-pipes",
"version": "v1.0.5",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@ -9,6 +9,8 @@ workspace:
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object
- lists
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
@ -18,6 +20,7 @@ workspace:
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- ordered-collections
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
@ -25,6 +28,8 @@ workspace:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples
- unordered-collections
- unsafe-coerce: ">=6.0.0 <7.0.0"
- console
@ -105,6 +110,7 @@ workspace:
- type-equality
- typelevel-prelude
- unfoldable
- unordered-collections
- unsafe-coerce
- variant
extra_packages: {}
@ -903,6 +909,21 @@ packages:
- partial
- prelude
- tuples
type: registry
version: 3.1.0
integrity: sha256-H2eQR+ylI+cljz4XzWfEbdF7ee+pnw2IZCeq69AuJ+Q=
- arrays
- enums
- functions
- integers
- lists
- prelude
- record
- tuples
- typelevel-prelude
- unfoldable
type: registry
version: 6.0.0

View File

@ -10,6 +10,11 @@ package:
strict: true
pedanticPackages: true
- foreign-object
- lists
- ordered-collections
- tuples
- unordered-collections
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- effect: ">=4.0.0 <5.0.0"

View File

@ -1,35 +1,39 @@
import Stream from "stream";
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableImpl = s => () => s.readable
export const isReadableImpl = (s) => () => s.readable;
/** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */
export const isClosedImpl = s => () => s.closed
export const isClosedImpl = (s) => () => s.closed;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableImpl = s => () => s.writable
export const isWritableImpl = (s) => () => s.writable;
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableEndedImpl = s => () => s.readableEnded
export const isReadableEndedImpl = (s) => () => s.readableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableEndedImpl = s => () => s.writableEnded
export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end();
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
export const writeImpl = ({ok, wouldBlock, closed}) => (s) => (a) => () => {
export const writeImpl =
({ ok, wouldBlock, closed }) =>
(s) =>
(a) =>
() => {
if (s.closed || s.writableEnded) {
return closed
return closed;
if (s.write(a)) {
return ok
return ok;
} else {
return wouldBlock
return wouldBlock;
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
export const readImpl =

View File

@ -28,6 +28,7 @@ data ReadResult a
= ReadWouldBlock
| ReadClosed
| ReadJust a
derive instance Generic (ReadResult a) _
derive instance Functor ReadResult
derive instance Eq a => Eq (ReadResult a)
@ -38,9 +39,11 @@ data WriteResult
= WriteWouldBlock
| WriteClosed
| WriteOk
derive instance Generic WriteResult _
derive instance Eq WriteResult
instance Show WriteResult where show = genericShow
instance Show WriteResult where
show = genericShow
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult }

View File

@ -2,17 +2,81 @@ module Pipes.Collect where
import Prelude
import Control.Monad.Rec.Class (class MonadRec)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array.ST as Array.ST
import Data.Either (hush)
import Data.HashMap (HashMap)
import Data.HashMap as HashMap
import Data.Hashable (class Hashable)
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (for) as Pipes
import Foreign.Object (Object)
import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Pipes (next) as Pipes
import Pipes.Core (Producer)
import Pipes.Core (runEffect) as Pipes
-- | Traverse a pipe, collecting into a mutable array with constant stack usage
collectArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
collectArray p = do
-- | Fold every value produced
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b
fold f b p =
insertNext b' p' = runMaybeT do
a /\ p'' <- MaybeT $ hush <$> p'
pure $ Loop $ f b' a /\ p''
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p'
-- | Fold every value produced with a monadic action
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Unit -> m b
traverse f b p =
insertNext b' p' = runMaybeT do
a /\ p'' <- MaybeT $ hush <$> p'
b'' <- lift $ f b' a
pure $ Loop $ b'' /\ p''
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p'
-- | Execute a monadic action on every item in a producer.
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f = traverse (const f) unit
-- | Collect all values from a `Producer` into an array.
toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
toArray p = do
st <- liftEffect $ liftST $
Pipes.runEffect $ Pipes.for p \a -> void $ liftEffect $ liftST $ Array.ST.push a st
foreach (void <<< liftEffect <<< liftST <<< flip Array.ST.push st) p
liftEffect $ liftST $ Array.ST.unsafeFreeze st
-- | Collect all values from a `Producer` into a list.
toList :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a)
toList = map List.reverse <<< fold (flip List.Cons) List.Nil
-- | Collect all values from a `Producer` into a Javascript Object.
toObject :: forall a m. MonadRec m => MonadEffect m => Producer (String /\ a) m Unit -> m (Object a)
toObject p = do
st <- liftEffect $ liftST $
foreach (\(k /\ v) -> void $ liftEffect $ liftST $ Object.ST.poke k v st) p
liftEffect $ liftST $ Object.ST.Unsafe.unsafeFreeze st
-- | Collect all values from a `Producer` into a `HashMap`
toHashMap :: forall k v m. Hashable k => MonadRec m => Producer (k /\ v) m Unit -> m (HashMap k v)
toHashMap = fold (\map (k /\ v) -> HashMap.insert k v map) HashMap.empty
-- | Collect all values from a `Producer` into a `Map`
toMap :: forall k v m. Ord k => MonadRec m => Producer (k /\ v) m Unit -> m (Map k v)
toMap = fold (\map (k /\ v) -> Map.insert k v map) Map.empty

View File

@ -24,7 +24,9 @@ import Prim.Row (class Union)
:: forall r trash
. Union r trash WriteStreamOptions
=> Record r -> FilePath -> Consumer (Maybe Buffer) Aff Unit
=> Record r
-> FilePath
-> Consumer (Maybe Buffer) Aff Unit
write o p = do
w <- liftEffect $ FS.Stream.createWriteStream' p o
fromWritable $ O.fromBufferWritable w

View File

@ -40,7 +40,8 @@ fromReadable r =
O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop { error, cancel }
O.ReadClosed -> yield Nothing *> cleanup cancel
in do
e <- liftEffect $ O.withErrorST r
tailRecM go e
@ -72,7 +73,8 @@ fromWritable w =
liftAff (O.awaitWritableOrClosed w)
pure $ Loop { error, cancel }
O.WriteClosed -> cleanup cancel
in do
r <- liftEffect $ O.withErrorST w
tailRecM go r
@ -111,7 +113,8 @@ fromTransform t =
O.WriteWouldBlock -> do
lift (O.awaitWritableOrClosed t)
pure $ Loop { error, cancel }
in do
r <- liftEffect $ O.withErrorST t
tailRecM go r

View File

@ -8,6 +8,7 @@ import Effect.Aff (launchAff_)
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Collect as Test.Pipes.Collect
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
@ -16,3 +17,4 @@ main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing

View File

@ -0,0 +1,111 @@
module Test.Pipes.Collect where
import Prelude
import Control.Monad.Gen (chooseInt)
import Control.Monad.Rec.Class (Step(..), tailRecM)
import Control.Monad.ST as ST
import Control.Monad.ST.Ref as STRef
import Data.Array as Array
import Data.Bifunctor (lmap)
import Data.HashMap (HashMap)
import Data.HashMap as HashMap
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..))
import Data.Traversable (traverse)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
import Effect.Unsafe (unsafePerformEffect)
import Foreign.Object (Object)
import Foreign.Object as Object
import Pipes (yield)
import Pipes.Collect as Pipes.Collect
import Pipes.Core (Producer)
import Test.QuickCheck.Gen (randomSampleOne)
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
:: { array :: Array (Int /\ Int)
, list :: List (Int /\ Int)
, strarray :: Array (String /\ Int)
, object :: Object Int
, map :: Map Int Int
, hashMap :: HashMap Int Int
, stream :: Producer (Int /\ Int) Aff Unit
, streamStr :: Producer (String /\ Int) Aff Unit
testData =
unsafePerformEffect $ do
array <-
flip traverse (Array.range 0 99999) \k -> do
v <- liftEffect $ randomSampleOne $ chooseInt 0 99999
pure $ k /\ v
strarray = lmap show <$> array
object = Object.fromFoldable strarray
map' :: forall m. m -> (Int -> Int -> m -> m) -> m
map' empty insert = do
st <- empty
ST.foreach array \(k /\ v) -> void $ STRef.modify (insert k v) st st
hashMap = map' HashMap.empty HashMap.insert
map = map' Map.empty Map.insert
{ array
, strarray
, list: List.fromFoldable array
, object
, hashMap
, map
, stream: flip tailRecM 0 \ix -> case Array.index array ix of
Just a -> yield a $> Loop (ix + 1)
Nothing -> pure $ Done unit
, streamStr: flip tailRecM 0 \ix -> case Array.index strarray ix of
Just a -> yield a $> Loop (ix + 1)
Nothing -> pure $ Done unit
spec :: Spec Unit
spec =
describe "Test.Pipes.Collect" do
describe "toArray" do
it "collects an array" do
act <- Pipes.Collect.toArray
act `shouldEqual` testData.array
it "empty ok" do
act :: Array Int <- Pipes.Collect.toArray (pure unit)
act `shouldEqual` []
describe "toObject" do
it "collects" do
act <- Pipes.Collect.toObject $ testData.streamStr
act `shouldEqual` testData.object
it "empty ok" do
act :: Object Int <- Pipes.Collect.toObject (pure unit)
act `shouldEqual` Object.empty
describe "toMap" do
it "collects" do
act <- Pipes.Collect.toMap
act `shouldEqual`
it "empty ok" do
act :: Map String Int <- Pipes.Collect.toMap (pure unit)
act `shouldEqual` Map.empty
describe "toHashMap" do
it "collects" do
act <- Pipes.Collect.toHashMap
act `shouldEqual` testData.hashMap
it "empty ok" do
act :: HashMap String Int <- Pipes.Collect.toHashMap (pure unit)
act `shouldEqual` HashMap.empty
describe "toList" do
it "collects" do
act <- Pipes.Collect.toList
act `shouldEqual` testData.list
it "empty ok" do
act :: List (String /\ Int) <- Pipes.Collect.toList (pure unit)
act `shouldEqual` List.Nil

View File

@ -27,6 +27,7 @@ import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (fail, shouldEqual)
data BufferJunk = BufferJunk Buffer
instance Arbitrary BufferJunk where
arbitrary = sized \s -> do
ns <- vectorOf s (chooseInt 0 7)
@ -36,6 +37,7 @@ instance Arbitrary BufferJunk where
pure $ BufferJunk buf
data BufferUTF8 = BufferUTF8 String Buffer
instance Arbitrary BufferUTF8 where
arbitrary = do
s <- genAsciiString