fix: bump node-stream-pipes

This commit is contained in:
orion 2024-06-23 20:35:07 -05:00
parent 39cf106ac1
commit 5fbf2f9fdb
Signed by: orion
GPG Key ID: 6D4165AE4C928719
3 changed files with 32 additions and 55 deletions

View File

@ -18,14 +18,13 @@ workspace:
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=2.0.2 <3.0.0"
- node-stream-pipes: ">=2.1.0 <3.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
- record-extra: ">=5.0.1 <6.0.0"
- st: ">=6.2.0 <7.0.0"
@ -623,8 +622,8 @@ packages:
- unsafe-coerce
node-stream-pipes:
type: registry
version: 2.0.2
integrity: sha256-IwkFgzWVwqjZkQRLYBGaRukKqYIw2I7wKHwIXRcdBWI=
version: 2.1.0
integrity: sha256-pYBOQY4bGEZzI5UHsUxJAhsKqtmE6CC1sHmFqgj64V8=
dependencies:
- aff
- arrays
@ -652,7 +651,6 @@ packages:
- parallel
- pipes
- prelude
- profunctor
- st
- strings
- tailrec

View File

@ -25,14 +25,13 @@ package:
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=2.0.2 <3.0.0"
- node-stream-pipes: ">=2.1.0 <3.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
- record-extra: ">=5.0.1 <6.0.0"
- st: ">=6.2.0 <7.0.0"

View File

@ -2,20 +2,17 @@ module Pipes.CSV where
import Prelude
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Cont (lift)
import Control.Monad.Error.Class (liftEither, liftMaybe)
import Control.Monad.Except (runExcept)
import Control.Monad.ST.Global (Global)
import Control.Monad.ST.Global as ST
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef
import Data.Array as Array
import Data.Bifunctor (lmap)
import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord)
import Data.FunctorWithIndex (mapWithIndex)
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..))
import Data.Profunctor as Pro
import Data.Tuple.Nested ((/\))
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
@ -23,25 +20,19 @@ import Effect.Exception (error)
import Node.Buffer (Buffer)
import Node.Stream.CSV.Parse as CSV.Parse
import Node.Stream.CSV.Stringify as CSV.Stringify
import Pipes.Async (AsyncPipe(..), ReadResult(..))
import Pipes.Node.Stream (TransformContext)
import Pipes.Async (AsyncPipe(..), ReadResult(..), getAsyncIO, mapIO, stripIO)
import Pipes.Node.Stream as Pipes.Stream
import Prim.RowList (class RowToList)
import Record.Extra (class Keys, keys)
import Type.Prelude (Proxy(..))
type ParseContext =
{ colsST :: STRef Global (Maybe (Map String Int))
, t :: TransformContext Buffer (Array String)
}
-- | Transforms buffer chunks of a CSV file to parsed
-- | records of `r`.
parse
:: forall @r rl
. RowToList r rl
=> ReadCSVRecord r rl
=> AsyncPipe ParseContext Aff (Maybe Buffer) (Maybe { | r })
=> AsyncPipe (Maybe Buffer) (Maybe { | r }) Aff Unit
parse =
let
readCols st = liftEffect $ ST.toEffect $ STRef.read st
@ -49,54 +40,43 @@ parse =
firstRow st a = putCols st $ Map.fromFoldable $ mapWithIndex (flip (/\)) a
row a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a
in do
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
(AsyncPipe init' write' awaitWrite' read' awaitRead') =
Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
let
decoder = Pipes.Stream.fromTransformEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
init = do
t <- init'
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
pure {t, colsST}
{write, read: read', awaitRead, awaitWrite} /\ done <- liftMaybe (error "unreachable") =<< lift (getAsyncIO decoder)
let
read =
read' >>= case _ of
ReadWouldBlock -> pure ReadWouldBlock
ReadOk Nothing -> pure $ ReadOk Nothing
ReadOk (Just r') -> do
cols <- readCols colsST
case cols of
Just cols' -> ReadOk <$> Just <$> row r' cols'
Nothing -> firstRow colsST r' *> read
write {t} a = write' t a
awaitWrite {t} = awaitWrite' t
awaitRead {t} = awaitRead' t
read {colsST, t} = do
r <- read' t
case r of
ReadWouldBlock -> pure ReadWouldBlock
ReadOk Nothing -> pure $ ReadOk Nothing
ReadOk (Just r') -> do
cols <- readCols colsST
case cols of
Just cols' -> ReadOk <$> Just <$> row r' cols'
Nothing -> firstRow colsST r' *> read {colsST, t}
in
AsyncPipe
init
write
awaitWrite
read
awaitRead
AsyncIO $ {write, read, awaitWrite, awaitRead} /\ lift (stripIO done)
-- | Transforms buffer chunks of a CSV file to parsed
-- | arrays of CSV values.
parseRaw :: AsyncPipe (TransformContext Buffer (Array String)) Aff (Maybe Buffer) (Maybe (Array String))
parseRaw = do
Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
parseRaw :: AsyncPipe (Maybe Buffer) (Maybe (Array String)) Aff Unit
parseRaw =
Pipes.Stream.fromTransformEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
-- | Transforms CSV rows into stringified CSV records
-- | using the given ordered array of column names.
stringifyRaw :: Array String -> AsyncPipe (TransformContext (Array String) String) Aff (Maybe (Array String)) (Maybe String)
stringifyRaw :: Array String -> AsyncPipe (Maybe (Array String)) (Maybe String) Aff Unit
stringifyRaw columns =
Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {}
Pipes.Stream.fromTransformEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {}
-- | Transforms purescript records into stringified CSV records.
-- |
-- | Columns are inferred from the record's keys, ordered alphabetically.
stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => AsyncPipe (TransformContext (Array String) String) Aff (Maybe { | r }) (Maybe String)
stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => AsyncPipe (Maybe { | r }) (Maybe String) Aff Unit
stringify = do
let
p = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
Pro.lcmap (map $ writeCSVRecord @r @rl) p
p = Pipes.Stream.fromTransformEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
mapIO (map $ writeCSVRecord @r @rl) identity p