Compare commits
3 Commits
3b4ec77414
...
22a4e7a88c
Author | SHA1 | Date | |
---|---|---|---|
22a4e7a88c | |||
52af3cadae | |||
9c86da438c |
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "purescript-csv-stream",
|
"name": "purescript-csv-stream",
|
||||||
"version": "v2.0.2",
|
"version": "v2.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"csv-parse": "^5.5.5",
|
"csv-parse": "^5.5.5",
|
||||||
|
16
spago.lock
16
spago.lock
@ -18,19 +18,18 @@ workspace:
|
|||||||
- newtype: ">=5.0.0 <6.0.0"
|
- newtype: ">=5.0.0 <6.0.0"
|
||||||
- node-buffer: ">=9.0.0 <10.0.0"
|
- node-buffer: ">=9.0.0 <10.0.0"
|
||||||
- node-event-emitter: ">=3.0.0 <4.0.0"
|
- node-event-emitter: ">=3.0.0 <4.0.0"
|
||||||
- node-stream-pipes: ">=1.6.0 <2.0.0"
|
- node-stream-pipes: ">=2.0.2 <3.0.0"
|
||||||
- node-streams: ">=9.0.0 <10.0.0"
|
- node-streams: ">=9.0.0 <10.0.0"
|
||||||
- nullable: ">=6.0.0 <7.0.0"
|
- nullable: ">=6.0.0 <7.0.0"
|
||||||
- numbers: ">=9.0.1 <10.0.0"
|
- numbers: ">=9.0.1 <10.0.0"
|
||||||
- ordered-collections: ">=3.2.0 <4.0.0"
|
- ordered-collections: ">=3.2.0 <4.0.0"
|
||||||
- pipes: ">=8.0.0 <9.0.0"
|
|
||||||
- precise-datetime: ">=7.0.0 <8.0.0"
|
- precise-datetime: ">=7.0.0 <8.0.0"
|
||||||
- prelude: ">=6.0.1 <7.0.0"
|
- prelude: ">=6.0.1 <7.0.0"
|
||||||
|
- profunctor
|
||||||
- record: ">=4.0.0 <5.0.0"
|
- record: ">=4.0.0 <5.0.0"
|
||||||
- record-extra: ">=5.0.1 <6.0.0"
|
- record-extra: ">=5.0.1 <6.0.0"
|
||||||
- st: ">=6.2.0 <7.0.0"
|
- st: ">=6.2.0 <7.0.0"
|
||||||
- strings: ">=6.0.1 <7.0.0"
|
- strings: ">=6.0.1 <7.0.0"
|
||||||
- tailrec: ">=6.1.0 <7.0.0"
|
|
||||||
- transformers: ">=6.0.0 <7.0.0"
|
- transformers: ">=6.0.0 <7.0.0"
|
||||||
- tuples: ">=7.0.0 <8.0.0"
|
- tuples: ">=7.0.0 <8.0.0"
|
||||||
- typelevel-prelude: ">=7.0.0 <8.0.0"
|
- typelevel-prelude: ">=7.0.0 <8.0.0"
|
||||||
@ -624,29 +623,36 @@ packages:
|
|||||||
- unsafe-coerce
|
- unsafe-coerce
|
||||||
node-stream-pipes:
|
node-stream-pipes:
|
||||||
type: registry
|
type: registry
|
||||||
version: 1.6.0
|
version: 2.0.2
|
||||||
integrity: sha256-aYwtrkJgTzLEaYZNel2HLISaWecyBzyKoGVpZpIRTJI=
|
integrity: sha256-IwkFgzWVwqjZkQRLYBGaRukKqYIw2I7wKHwIXRcdBWI=
|
||||||
dependencies:
|
dependencies:
|
||||||
- aff
|
- aff
|
||||||
- arrays
|
- arrays
|
||||||
|
- console
|
||||||
|
- control
|
||||||
|
- datetime
|
||||||
- effect
|
- effect
|
||||||
- either
|
- either
|
||||||
- exceptions
|
- exceptions
|
||||||
- foldable-traversable
|
- foldable-traversable
|
||||||
- foreign-object
|
- foreign-object
|
||||||
|
- fork
|
||||||
- lists
|
- lists
|
||||||
- maybe
|
- maybe
|
||||||
- mmorph
|
- mmorph
|
||||||
|
- newtype
|
||||||
- node-buffer
|
- node-buffer
|
||||||
- node-event-emitter
|
- node-event-emitter
|
||||||
- node-fs
|
- node-fs
|
||||||
- node-path
|
- node-path
|
||||||
- node-streams
|
- node-streams
|
||||||
- node-zlib
|
- node-zlib
|
||||||
|
- now
|
||||||
- ordered-collections
|
- ordered-collections
|
||||||
- parallel
|
- parallel
|
||||||
- pipes
|
- pipes
|
||||||
- prelude
|
- prelude
|
||||||
|
- profunctor
|
||||||
- st
|
- st
|
||||||
- strings
|
- strings
|
||||||
- tailrec
|
- tailrec
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package:
|
package:
|
||||||
name: csv-stream
|
name: csv-stream
|
||||||
publish:
|
publish:
|
||||||
version: '2.0.2'
|
version: '2.2.0'
|
||||||
license: 'GPL-3.0-or-later'
|
license: 'GPL-3.0-or-later'
|
||||||
location:
|
location:
|
||||||
githubOwner: 'cakekindel'
|
githubOwner: 'cakekindel'
|
||||||
@ -10,7 +10,7 @@ package:
|
|||||||
strict: true
|
strict: true
|
||||||
pedanticPackages: true
|
pedanticPackages: true
|
||||||
dependencies:
|
dependencies:
|
||||||
- node-stream-pipes: ">=1.6.0 <2.0.0"
|
- profunctor
|
||||||
- aff: ">=7.1.0 <8.0.0"
|
- aff: ">=7.1.0 <8.0.0"
|
||||||
- arrays: ">=7.3.0 <8.0.0"
|
- arrays: ">=7.3.0 <8.0.0"
|
||||||
- bifunctors: ">=6.0.0 <7.0.0"
|
- bifunctors: ">=6.0.0 <7.0.0"
|
||||||
@ -26,18 +26,17 @@ package:
|
|||||||
- newtype: ">=5.0.0 <6.0.0"
|
- newtype: ">=5.0.0 <6.0.0"
|
||||||
- node-buffer: ">=9.0.0 <10.0.0"
|
- node-buffer: ">=9.0.0 <10.0.0"
|
||||||
- node-event-emitter: ">=3.0.0 <4.0.0"
|
- node-event-emitter: ">=3.0.0 <4.0.0"
|
||||||
|
- node-stream-pipes: ">=2.0.2 <3.0.0"
|
||||||
- node-streams: ">=9.0.0 <10.0.0"
|
- node-streams: ">=9.0.0 <10.0.0"
|
||||||
- nullable: ">=6.0.0 <7.0.0"
|
- nullable: ">=6.0.0 <7.0.0"
|
||||||
- numbers: ">=9.0.1 <10.0.0"
|
- numbers: ">=9.0.1 <10.0.0"
|
||||||
- ordered-collections: ">=3.2.0 <4.0.0"
|
- ordered-collections: ">=3.2.0 <4.0.0"
|
||||||
- pipes: ">=8.0.0 <9.0.0"
|
|
||||||
- precise-datetime: ">=7.0.0 <8.0.0"
|
- precise-datetime: ">=7.0.0 <8.0.0"
|
||||||
- prelude: ">=6.0.1 <7.0.0"
|
- prelude: ">=6.0.1 <7.0.0"
|
||||||
- record: ">=4.0.0 <5.0.0"
|
- record: ">=4.0.0 <5.0.0"
|
||||||
- record-extra: ">=5.0.1 <6.0.0"
|
- record-extra: ">=5.0.1 <6.0.0"
|
||||||
- st: ">=6.2.0 <7.0.0"
|
- st: ">=6.2.0 <7.0.0"
|
||||||
- strings: ">=6.0.1 <7.0.0"
|
- strings: ">=6.0.1 <7.0.0"
|
||||||
- tailrec: ">=6.1.0 <7.0.0"
|
|
||||||
- transformers: ">=6.0.0 <7.0.0"
|
- transformers: ">=6.0.0 <7.0.0"
|
||||||
- tuples: ">=7.0.0 <8.0.0"
|
- tuples: ">=7.0.0 <8.0.0"
|
||||||
- typelevel-prelude: ">=7.0.0 <8.0.0"
|
- typelevel-prelude: ">=7.0.0 <8.0.0"
|
||||||
|
@ -4,15 +4,18 @@ import Prelude
|
|||||||
|
|
||||||
import Control.Monad.Error.Class (liftEither)
|
import Control.Monad.Error.Class (liftEither)
|
||||||
import Control.Monad.Except (runExcept)
|
import Control.Monad.Except (runExcept)
|
||||||
import Control.Monad.Rec.Class (forever)
|
import Control.Monad.ST.Global (Global)
|
||||||
import Control.Monad.ST.Global as ST
|
import Control.Monad.ST.Global as ST
|
||||||
|
import Control.Monad.ST.Ref (STRef)
|
||||||
import Control.Monad.ST.Ref as STRef
|
import Control.Monad.ST.Ref as STRef
|
||||||
import Data.Array as Array
|
import Data.Array as Array
|
||||||
import Data.Bifunctor (lmap)
|
import Data.Bifunctor (lmap)
|
||||||
import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord)
|
import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord)
|
||||||
import Data.FunctorWithIndex (mapWithIndex)
|
import Data.FunctorWithIndex (mapWithIndex)
|
||||||
|
import Data.Map (Map)
|
||||||
import Data.Map as Map
|
import Data.Map as Map
|
||||||
import Data.Maybe (Maybe(..))
|
import Data.Maybe (Maybe(..))
|
||||||
|
import Data.Profunctor as Pro
|
||||||
import Data.Tuple.Nested ((/\))
|
import Data.Tuple.Nested ((/\))
|
||||||
import Effect.Aff (Aff)
|
import Effect.Aff (Aff)
|
||||||
import Effect.Class (liftEffect)
|
import Effect.Class (liftEffect)
|
||||||
@ -20,79 +23,80 @@ import Effect.Exception (error)
|
|||||||
import Node.Buffer (Buffer)
|
import Node.Buffer (Buffer)
|
||||||
import Node.Stream.CSV.Parse as CSV.Parse
|
import Node.Stream.CSV.Parse as CSV.Parse
|
||||||
import Node.Stream.CSV.Stringify as CSV.Stringify
|
import Node.Stream.CSV.Stringify as CSV.Stringify
|
||||||
import Pipes (await, yield, (>->))
|
import Pipes.Async (AsyncPipe(..), ReadResult(..))
|
||||||
import Pipes.Core (Pipe)
|
import Pipes.Node.Stream (TransformContext)
|
||||||
import Pipes.Node.Stream as Pipes.Stream
|
import Pipes.Node.Stream as Pipes.Stream
|
||||||
import Prim.RowList (class RowToList)
|
import Prim.RowList (class RowToList)
|
||||||
import Record.Extra (class Keys, keys)
|
import Record.Extra (class Keys, keys)
|
||||||
import Type.Prelude (Proxy(..))
|
import Type.Prelude (Proxy(..))
|
||||||
|
|
||||||
|
type ParseContext =
|
||||||
|
{ colsST :: STRef Global (Maybe (Map String Int))
|
||||||
|
, t :: TransformContext Buffer (Array String)
|
||||||
|
}
|
||||||
|
|
||||||
-- | Transforms buffer chunks of a CSV file to parsed
|
-- | Transforms buffer chunks of a CSV file to parsed
|
||||||
-- | records of `r`.
|
-- | records of `r`.
|
||||||
-- |
|
|
||||||
-- | ```
|
|
||||||
-- | -- == my-data.csv.gz ==
|
|
||||||
-- | -- id,foo,is_deleted
|
|
||||||
-- | -- 1,hi,f
|
|
||||||
-- | -- 2,bye,t
|
|
||||||
-- |
|
|
||||||
-- | rows
|
|
||||||
-- | :: Array {id :: Int, foo :: String, is_deleted :: Boolean}
|
|
||||||
-- | <- map Array.fromFoldable
|
|
||||||
-- | $ Pipes.toListM
|
|
||||||
-- | $ Pipes.Node.Stream.unEOS
|
|
||||||
-- | $ Pipes.Node.FS.read "my-data.csv.gz"
|
|
||||||
-- | >-> Pipes.Node.Zlib.gunzip
|
|
||||||
-- | >-> Pipes.CSV.parse
|
|
||||||
-- | rows `shouldEqual` [{id: 1, foo: "hi", is_deleted: false}, {id: 2, foo: "bye", is_deleted: true}]
|
|
||||||
-- | ```
|
|
||||||
parse
|
parse
|
||||||
:: forall @r rl
|
:: forall @r rl
|
||||||
. RowToList r rl
|
. RowToList r rl
|
||||||
=> ReadCSVRecord r rl
|
=> ReadCSVRecord r rl
|
||||||
=> Pipe (Maybe Buffer) (Maybe { | r }) Aff Unit
|
=> AsyncPipe ParseContext Aff (Maybe Buffer) (Maybe { | r })
|
||||||
parse = do
|
parse =
|
||||||
raw <- liftEffect $ CSV.Parse.make {}
|
|
||||||
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
|
|
||||||
|
|
||||||
let
|
let
|
||||||
readCols = liftEffect $ ST.toEffect $ STRef.read colsST
|
readCols st = liftEffect $ ST.toEffect $ STRef.read st
|
||||||
putCols a = void $ liftEffect $ ST.toEffect $ STRef.write (Just a) colsST
|
putCols st a = void $ liftEffect $ ST.toEffect $ STRef.write (Just a) st
|
||||||
|
|
||||||
parse' a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a
|
firstRow st a = putCols st $ Map.fromFoldable $ mapWithIndex (flip (/\)) a
|
||||||
firstRow a = putCols $ Map.fromFoldable $ mapWithIndex (flip (/\)) a
|
row a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a
|
||||||
row a cols' = yield =<< parse' a cols'
|
|
||||||
unmarshal = forever do
|
|
||||||
r <- await
|
|
||||||
cols <- readCols
|
|
||||||
case cols of
|
|
||||||
Just cols' -> row r cols'
|
|
||||||
Nothing -> firstRow r
|
|
||||||
|
|
||||||
parser = Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream raw
|
(AsyncPipe init' write' awaitWrite' read' awaitRead') =
|
||||||
parser >-> Pipes.Stream.inEOS unmarshal
|
Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
|
||||||
|
|
||||||
|
init = do
|
||||||
|
t <- init'
|
||||||
|
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
|
||||||
|
pure {t, colsST}
|
||||||
|
|
||||||
|
write {t} a = write' t a
|
||||||
|
awaitWrite {t} = awaitWrite' t
|
||||||
|
awaitRead {t} = awaitRead' t
|
||||||
|
|
||||||
|
read {colsST, t} = do
|
||||||
|
r <- read' t
|
||||||
|
case r of
|
||||||
|
ReadWouldBlock -> pure ReadWouldBlock
|
||||||
|
ReadOk Nothing -> pure $ ReadOk Nothing
|
||||||
|
ReadOk (Just r') -> do
|
||||||
|
cols <- readCols colsST
|
||||||
|
case cols of
|
||||||
|
Just cols' -> ReadOk <$> Just <$> row r' cols'
|
||||||
|
Nothing -> firstRow colsST r' *> read {colsST, t}
|
||||||
|
in
|
||||||
|
AsyncPipe
|
||||||
|
init
|
||||||
|
write
|
||||||
|
awaitWrite
|
||||||
|
read
|
||||||
|
awaitRead
|
||||||
|
|
||||||
-- | Transforms buffer chunks of a CSV file to parsed
|
-- | Transforms buffer chunks of a CSV file to parsed
|
||||||
-- | arrays of CSV values.
|
-- | arrays of CSV values.
|
||||||
parseRaw :: Pipe (Maybe Buffer) (Maybe (Array String)) Aff Unit
|
parseRaw :: AsyncPipe (TransformContext Buffer (Array String)) Aff (Maybe Buffer) (Maybe (Array String))
|
||||||
parseRaw = do
|
parseRaw = do
|
||||||
s <- liftEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
|
Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
|
||||||
Pipes.Stream.fromTransform s
|
|
||||||
|
|
||||||
-- | Transforms CSV rows into stringified CSV records
|
-- | Transforms CSV rows into stringified CSV records
|
||||||
-- | using the given ordered array of column names.
|
-- | using the given ordered array of column names.
|
||||||
stringifyRaw :: Array String -> Pipe (Maybe (Array String)) (Maybe String) Aff Unit
|
stringifyRaw :: Array String -> AsyncPipe (TransformContext (Array String) String) Aff (Maybe (Array String)) (Maybe String)
|
||||||
stringifyRaw columns = do
|
stringifyRaw columns =
|
||||||
s <- liftEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {}
|
Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {}
|
||||||
Pipes.Stream.fromTransform s
|
|
||||||
|
|
||||||
-- | Transforms purescript records into stringified CSV records.
|
-- | Transforms purescript records into stringified CSV records.
|
||||||
-- |
|
-- |
|
||||||
-- | Columns are inferred from the record's keys, ordered alphabetically.
|
-- | Columns are inferred from the record's keys, ordered alphabetically.
|
||||||
stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => Pipe (Maybe { | r }) (Maybe String) Aff Unit
|
stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => AsyncPipe (TransformContext (Array String) String) Aff (Maybe { | r }) (Maybe String)
|
||||||
stringify = do
|
stringify = do
|
||||||
raw <- liftEffect $ CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
|
|
||||||
let
|
let
|
||||||
printer = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream raw
|
p = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
|
||||||
marshal = forever $ yield =<< (writeCSVRecord @r @rl <$> await)
|
Pro.lcmap (map $ writeCSVRecord @r @rl) p
|
||||||
Pipes.Stream.inEOS marshal >-> printer
|
|
||||||
|
@ -17,6 +17,7 @@ import Effect.Console (log)
|
|||||||
import Node.Encoding (Encoding(..))
|
import Node.Encoding (Encoding(..))
|
||||||
import Partial.Unsafe (unsafePartial)
|
import Partial.Unsafe (unsafePartial)
|
||||||
import Pipes (yield, (>->))
|
import Pipes (yield, (>->))
|
||||||
|
import Pipes.Async ((>-/->))
|
||||||
import Pipes.CSV as Pipes.CSV
|
import Pipes.CSV as Pipes.CSV
|
||||||
import Pipes.Collect as Pipes.Collect
|
import Pipes.Collect as Pipes.Collect
|
||||||
import Pipes.Construct as Pipes.Construct
|
import Pipes.Construct as Pipes.Construct
|
||||||
@ -50,7 +51,7 @@ spec =
|
|||||||
, { id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z" }
|
, { id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z" }
|
||||||
]
|
]
|
||||||
|
|
||||||
csv' <- map fold $ Pipes.Collect.toArray $ Pipes.Stream.withEOS (Pipes.Construct.eachArray objs) >-> Pipes.CSV.stringify >-> Pipes.Stream.unEOS
|
csv' <- map fold $ Pipes.Collect.toArray $ Pipes.Stream.withEOS (Pipes.Construct.eachArray objs) >-/-> Pipes.CSV.stringify >-> Pipes.Stream.unEOS
|
||||||
csv' `shouldEqual` csv
|
csv' `shouldEqual` csv
|
||||||
describe "parse" do
|
describe "parse" do
|
||||||
it "parses csv" do
|
it "parses csv" do
|
||||||
@ -58,7 +59,7 @@ spec =
|
|||||||
$ Pipes.toListM
|
$ Pipes.toListM
|
||||||
$ Pipes.Stream.withEOS (yield csv)
|
$ Pipes.Stream.withEOS (yield csv)
|
||||||
>-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8)
|
>-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8)
|
||||||
>-> Pipes.CSV.parse
|
>-/-> Pipes.CSV.parse
|
||||||
>-> Pipes.Stream.unEOS
|
>-> Pipes.Stream.unEOS
|
||||||
|
|
||||||
rows `shouldEqual`
|
rows `shouldEqual`
|
||||||
@ -82,7 +83,7 @@ spec =
|
|||||||
rows <-
|
rows <-
|
||||||
Pipes.Collect.toArray
|
Pipes.Collect.toArray
|
||||||
$ Pipes.Stream.withEOS (Pipes.Construct.eachArray bufs)
|
$ Pipes.Stream.withEOS (Pipes.Construct.eachArray bufs)
|
||||||
>-> Pipes.CSV.parse @(id :: Int)
|
>-/-> Pipes.CSV.parse @(id :: Int)
|
||||||
>-> Pipes.Stream.unEOS
|
>-> Pipes.Stream.unEOS
|
||||||
|
|
||||||
rows `shouldEqual` ((\id -> { id }) <$> nums)
|
rows `shouldEqual` ((\id -> { id }) <$> nums)
|
||||||
|
Loading…
Reference in New Issue
Block a user