2024-05-09 22:21:15 +00:00
|
|
|
module Test.Pipes.Node.Stream where
|
|
|
|
|
|
|
|
import Prelude
|
|
|
|
|
2024-06-22 23:42:22 +00:00
|
|
|
import Control.Monad.Cont (lift)
|
|
|
|
import Control.Monad.Error.Class (liftEither)
|
|
|
|
import Control.Monad.Except (runExcept)
|
2024-05-09 22:21:15 +00:00
|
|
|
import Data.Array as Array
|
2024-06-22 23:42:22 +00:00
|
|
|
import Data.Bifunctor (lmap)
|
|
|
|
import Data.Either (Either(..))
|
|
|
|
import Data.Foldable (fold, intercalate)
|
|
|
|
import Data.FoldableWithIndex (forWithIndex_)
|
|
|
|
import Data.FunctorWithIndex (mapWithIndex)
|
|
|
|
import Data.Int as Int
|
2024-05-09 22:21:15 +00:00
|
|
|
import Data.List ((:))
|
|
|
|
import Data.List as List
|
2024-05-10 20:04:09 +00:00
|
|
|
import Data.Maybe (Maybe)
|
2024-05-09 22:21:15 +00:00
|
|
|
import Data.Newtype (wrap)
|
2024-06-22 23:42:22 +00:00
|
|
|
import Data.Profunctor.Strong (first)
|
|
|
|
import Data.String as String
|
2024-05-09 22:21:15 +00:00
|
|
|
import Data.String.Gen (genAlphaString)
|
2024-06-22 23:42:22 +00:00
|
|
|
import Data.Traversable (for_, traverse)
|
2024-05-09 22:21:15 +00:00
|
|
|
import Data.Tuple.Nested (type (/\), (/\))
|
|
|
|
import Effect (Effect)
|
2024-05-10 20:04:09 +00:00
|
|
|
import Effect.Aff (Aff, delay)
|
|
|
|
import Effect.Class (class MonadEffect, liftEffect)
|
2024-06-22 23:42:22 +00:00
|
|
|
import Effect.Exception (error)
|
|
|
|
import Effect.Unsafe (unsafePerformEffect)
|
|
|
|
import Foreign (Foreign)
|
2024-05-09 22:21:15 +00:00
|
|
|
import Node.Buffer (Buffer)
|
|
|
|
import Node.Buffer as Buffer
|
|
|
|
import Node.Encoding (Encoding(..))
|
|
|
|
import Node.FS.Stream as FS.Stream
|
|
|
|
import Node.FS.Sync as FS
|
|
|
|
import Node.Stream.Object as O
|
2024-06-22 23:42:22 +00:00
|
|
|
import Pipes (each) as Pipe
|
2024-05-09 22:21:15 +00:00
|
|
|
import Pipes (yield, (>->))
|
2024-06-22 23:42:22 +00:00
|
|
|
import Pipes.Async (sync, (>-/->))
|
|
|
|
import Pipes.Collect as Pipe.Collect
|
2024-05-10 20:04:09 +00:00
|
|
|
import Pipes.Core (Consumer, Producer, runEffect)
|
2024-06-22 23:42:22 +00:00
|
|
|
import Pipes.Node.Buffer as Pipe.Buffer
|
|
|
|
import Pipes.Node.FS as Pipe.FS
|
|
|
|
import Pipes.Node.Stream as Pipe.Node
|
|
|
|
import Pipes.Node.Zlib as Pipe.Zlib
|
|
|
|
import Pipes.Prelude (toListM) as Pipe
|
|
|
|
import Simple.JSON (readImpl, readJSON, writeJSON)
|
|
|
|
import Test.Common (jsonStringify, tmpFile, tmpFiles)
|
2024-05-09 22:21:15 +00:00
|
|
|
import Test.QuickCheck.Arbitrary (arbitrary)
|
2024-05-10 20:04:09 +00:00
|
|
|
import Test.QuickCheck.Gen (randomSample')
|
2024-05-09 22:21:15 +00:00
|
|
|
import Test.Spec (Spec, around, describe, it)
|
2024-06-22 23:42:22 +00:00
|
|
|
import Test.Spec.Assertions (fail, shouldEqual)
|
2024-05-09 22:21:15 +00:00
|
|
|
|
|
|
|
foreign import readableFromArray :: forall @a. Array a -> O.Readable a
|
2024-05-10 20:04:09 +00:00
|
|
|
foreign import discardTransform :: forall a b. Effect (O.Transform a b)
|
2024-06-22 23:42:22 +00:00
|
|
|
foreign import slowTransform :: forall a b. Effect (O.Transform a b)
|
2024-05-10 20:04:09 +00:00
|
|
|
foreign import charsTransform :: Effect (O.Transform String String)
|
2024-06-22 23:42:22 +00:00
|
|
|
foreign import cborEncodeSync :: forall a. a -> Effect Buffer
|
|
|
|
foreign import cborDecodeSync :: forall a. Buffer -> Effect a
|
|
|
|
foreign import cborEncode :: forall a. Effect (O.Transform a Buffer)
|
|
|
|
foreign import cborDecode :: forall a. Effect (O.Transform Buffer a)
|
|
|
|
foreign import csvEncode :: forall a. Effect (O.Transform a String)
|
|
|
|
foreign import csvDecode :: forall a. Effect (O.Transform String a)
|
2024-05-09 22:21:15 +00:00
|
|
|
|
2024-05-10 20:04:09 +00:00
|
|
|
writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit)
|
|
|
|
writer a = do
|
2024-05-15 18:29:58 +00:00
|
|
|
stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a
|
2024-06-22 23:42:22 +00:00
|
|
|
pure $ stream /\ Pipe.Node.fromWritable stream
|
2024-05-09 22:21:15 +00:00
|
|
|
|
2024-05-10 20:04:09 +00:00
|
|
|
reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit)
|
2024-06-22 23:42:22 +00:00
|
|
|
reader a = liftEffect $ Pipe.Node.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a
|
2024-05-09 22:21:15 +00:00
|
|
|
|
|
|
|
spec :: Spec Unit
|
|
|
|
spec =
|
|
|
|
describe "Test.Pipes.Node.Stream" do
|
|
|
|
describe "Readable" do
|
|
|
|
describe "Readable.from(<Iterable>)" do
|
|
|
|
it "empty" do
|
2024-06-22 23:42:22 +00:00
|
|
|
vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } []) >-> Pipe.Node.unEOS
|
2024-05-09 22:21:15 +00:00
|
|
|
vals `shouldEqual` List.Nil
|
|
|
|
it "singleton" do
|
2024-06-22 23:42:22 +00:00
|
|
|
vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> Pipe.Node.unEOS
|
2024-05-09 22:21:15 +00:00
|
|
|
vals `shouldEqual` ({ foo: "1" } : List.Nil)
|
|
|
|
it "many elements" do
|
|
|
|
let exp = (\n -> { foo: show n }) <$> Array.range 0 100
|
2024-06-22 23:42:22 +00:00
|
|
|
vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray exp) >-> Pipe.Node.unEOS
|
2024-05-09 22:21:15 +00:00
|
|
|
vals `shouldEqual` (List.fromFoldable exp)
|
|
|
|
describe "Writable" $ around tmpFile do
|
|
|
|
describe "fs.WriteStream" do
|
|
|
|
it "pipe to file" \p -> do
|
2024-05-15 18:29:58 +00:00
|
|
|
stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
|
2024-05-09 22:21:15 +00:00
|
|
|
let
|
2024-06-22 23:42:22 +00:00
|
|
|
w = Pipe.Node.fromWritable stream
|
2024-05-09 22:21:15 +00:00
|
|
|
source = do
|
|
|
|
buf <- liftEffect $ Buffer.fromString "hello" UTF8
|
2024-05-10 20:04:09 +00:00
|
|
|
yield buf
|
2024-06-22 23:42:22 +00:00
|
|
|
runEffect $ Pipe.Node.withEOS source >-> w
|
2024-05-09 22:21:15 +00:00
|
|
|
contents <- liftEffect $ FS.readTextFile UTF8 p
|
|
|
|
contents `shouldEqual` "hello"
|
2024-05-10 20:04:09 +00:00
|
|
|
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
|
2024-05-09 22:21:15 +00:00
|
|
|
it "async pipe to file" \p -> do
|
2024-06-22 23:42:22 +00:00
|
|
|
w <- Pipe.Node.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
|
2024-05-09 22:21:15 +00:00
|
|
|
let
|
|
|
|
source = do
|
2024-05-10 20:04:09 +00:00
|
|
|
yield "hello, "
|
2024-05-09 22:21:15 +00:00
|
|
|
lift $ delay $ wrap 5.0
|
2024-05-10 20:04:09 +00:00
|
|
|
yield "world!"
|
2024-05-09 22:21:15 +00:00
|
|
|
lift $ delay $ wrap 5.0
|
2024-05-10 20:04:09 +00:00
|
|
|
yield " "
|
2024-05-09 22:21:15 +00:00
|
|
|
lift $ delay $ wrap 5.0
|
2024-05-10 20:04:09 +00:00
|
|
|
yield "this is a "
|
2024-05-09 22:21:15 +00:00
|
|
|
lift $ delay $ wrap 5.0
|
2024-05-10 20:04:09 +00:00
|
|
|
yield "test."
|
2024-06-22 23:42:22 +00:00
|
|
|
runEffect $ Pipe.Node.withEOS (source >-> Pipe.Buffer.fromString UTF8) >-> w
|
2024-05-09 22:21:15 +00:00
|
|
|
contents <- liftEffect $ FS.readTextFile UTF8 p
|
|
|
|
contents `shouldEqual` "hello, world! this is a test."
|
|
|
|
it "chained pipes" \p -> do
|
|
|
|
let
|
|
|
|
obj = do
|
|
|
|
str :: String <- genAlphaString
|
|
|
|
num :: Int <- arbitrary
|
|
|
|
stuff :: Array String <- arbitrary
|
2024-05-11 23:01:34 +00:00
|
|
|
pure { str, num, stuff }
|
2024-05-10 20:04:09 +00:00
|
|
|
objs <- liftEffect (randomSample' 1 obj)
|
2024-05-09 22:21:15 +00:00
|
|
|
let
|
|
|
|
exp = fold (writeJSON <$> objs)
|
2024-05-10 20:04:09 +00:00
|
|
|
stream /\ w <- liftEffect $ writer p
|
2024-06-22 23:42:22 +00:00
|
|
|
runEffect $ Pipe.Node.withEOS (Pipe.each objs >-> jsonStringify >-> Pipe.Buffer.fromString UTF8) >-> w
|
2024-05-09 22:21:15 +00:00
|
|
|
contents <- liftEffect $ FS.readTextFile UTF8 p
|
|
|
|
contents `shouldEqual` exp
|
2024-05-10 20:04:09 +00:00
|
|
|
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
|
2024-05-09 22:21:15 +00:00
|
|
|
describe "Transform" do
|
2024-06-22 23:42:22 +00:00
|
|
|
let
|
|
|
|
bignums = Array.range 1 1000
|
|
|
|
firstNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/first_names.txt")
|
|
|
|
lastNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/last_names.txt")
|
|
|
|
names n = do
|
|
|
|
first <- firstNames
|
|
|
|
last <- Array.take (Int.round $ Int.toNumber n / Int.toNumber (Array.length firstNames)) lastNames
|
|
|
|
pure $ first <> " " <> last
|
|
|
|
people n = mapWithIndex (\ix name -> {id: show $ ix + 1, name}) (names n)
|
|
|
|
peopleCSV n = "id,name\n" <> intercalate "\n" ((\{id, name} -> id <> "," <> name) <$> people n)
|
|
|
|
|
|
|
|
for_ [4000, 8000, 32000, 64000, 200000] \n -> do
|
2024-05-09 22:21:15 +00:00
|
|
|
let
|
2024-06-22 23:42:22 +00:00
|
|
|
csv = peopleCSV n
|
|
|
|
people' = people n
|
|
|
|
around tmpFiles
|
|
|
|
$ it (show n <> " row csv >-/-> csv-parse >-/-> cborEncode") \(a /\ _) -> do
|
|
|
|
liftEffect $ FS.writeTextFile UTF8 a csv
|
|
|
|
cbor :: Buffer <- Pipe.Collect.toBuffer
|
|
|
|
$ Pipe.FS.read a
|
|
|
|
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
|
|
|
|
>-/-> Pipe.Node.fromTransform csvDecode
|
|
|
|
>-/-> Pipe.Node.fromTransform cborEncode
|
|
|
|
>-> Pipe.Node.unEOS
|
|
|
|
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
|
|
|
|
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
|
|
|
|
ppl `shouldEqual` people'
|
|
|
|
|
|
|
|
around tmpFiles
|
|
|
|
$ it (show n <> " row csv >-> sync csv-parse >-> sync cborEncode") \(a /\ _) -> do
|
|
|
|
liftEffect $ FS.writeTextFile UTF8 a csv
|
|
|
|
cbor :: Buffer <- Pipe.Collect.toBuffer
|
|
|
|
$ Pipe.FS.read a
|
|
|
|
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
|
|
|
|
>-> sync (Pipe.Node.fromTransform csvDecode)
|
|
|
|
>-> sync (Pipe.Node.fromTransform cborEncode)
|
|
|
|
>-> Pipe.Node.unEOS
|
|
|
|
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
|
|
|
|
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
|
|
|
|
ppl `shouldEqual` people'
|
|
|
|
|
|
|
|
around tmpFiles
|
|
|
|
$ it "file >-> sync gzip >-> sync gunzip" \(a /\ _) -> do
|
|
|
|
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
|
|
|
|
json <- Pipe.Collect.toMonoid
|
|
|
|
$ Pipe.FS.read a
|
|
|
|
>-> sync Pipe.Zlib.gzip
|
|
|
|
>-> sync Pipe.Zlib.gunzip
|
|
|
|
>-> Pipe.Node.unEOS
|
|
|
|
>-> Pipe.Buffer.toString UTF8
|
|
|
|
readJSON json `shouldEqual` (Right bignums)
|
|
|
|
|
|
|
|
around tmpFiles
|
|
|
|
$ it "file >-/-> gzip >-/-> slow >-/-> gunzip" \(a /\ _) -> do
|
|
|
|
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
|
|
|
|
json <-
|
|
|
|
Pipe.Collect.toMonoid
|
|
|
|
$ Pipe.FS.read a
|
|
|
|
>-/-> Pipe.Zlib.gzip
|
|
|
|
>-/-> Pipe.Node.fromTransform slowTransform
|
|
|
|
>-/-> Pipe.Zlib.gunzip
|
|
|
|
>-> Pipe.Node.unEOS
|
|
|
|
>-> Pipe.Buffer.toString UTF8
|
|
|
|
|
|
|
|
readJSON json `shouldEqual` (Right bignums)
|
2024-05-09 22:21:15 +00:00
|
|
|
around tmpFiles
|
2024-06-22 23:42:22 +00:00
|
|
|
$ it "file >-> sync gzip >-> sync slow >-> sync gunzip" \(a /\ _) -> do
|
|
|
|
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
|
|
|
|
json <-
|
|
|
|
Pipe.Collect.toMonoid
|
|
|
|
$ Pipe.FS.read a
|
|
|
|
>-> sync Pipe.Zlib.gzip
|
|
|
|
>-> sync (Pipe.Node.fromTransform slowTransform)
|
|
|
|
>-> sync Pipe.Zlib.gunzip
|
|
|
|
>-> Pipe.Node.unEOS
|
|
|
|
>-> Pipe.Buffer.toString UTF8
|
2024-05-09 22:21:15 +00:00
|
|
|
|
2024-06-22 23:42:22 +00:00
|
|
|
readJSON json `shouldEqual` (Right bignums)
|
2024-05-10 20:04:09 +00:00
|
|
|
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
|
|
|
|
liftEffect $ FS.writeTextFile UTF8 p "foo"
|
|
|
|
r <- reader p
|
2024-06-22 23:42:22 +00:00
|
|
|
out :: List.List Int <- Pipe.toListM $ r >-/-> Pipe.Node.fromTransform discardTransform >-> Pipe.Node.unEOS
|
2024-05-10 20:04:09 +00:00
|
|
|
out `shouldEqual` List.Nil
|
|
|
|
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
|
|
|
|
liftEffect $ FS.writeTextFile UTF8 p "foo bar"
|
|
|
|
r <- reader p
|
2024-06-22 23:42:22 +00:00
|
|
|
out :: List.List String <- Pipe.toListM $ r >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-/-> Pipe.Node.fromTransform charsTransform >-> Pipe.Node.unEOS
|
2024-05-11 23:01:34 +00:00
|
|
|
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]
|