From 5fbf2f9fdbded366b44e1c13a6d3a012ff0ed849 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Sun, 23 Jun 2024 20:35:07 -0500 Subject: [PATCH] fix: bump node-stream-pipes --- spago.lock | 8 ++--- spago.yaml | 3 +- src/Pipes.CSV.purs | 76 +++++++++++++++++----------------------------- 3 files changed, 32 insertions(+), 55 deletions(-) diff --git a/spago.lock b/spago.lock index 4aafaaf..25803f6 100644 --- a/spago.lock +++ b/spago.lock @@ -18,14 +18,13 @@ workspace: - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - - node-stream-pipes: ">=2.0.2 <3.0.0" + - node-stream-pipes: ">=2.1.0 <3.0.0" - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" - - profunctor: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" - record-extra: ">=5.0.1 <6.0.0" - st: ">=6.2.0 <7.0.0" @@ -623,8 +622,8 @@ packages: - unsafe-coerce node-stream-pipes: type: registry - version: 2.0.2 - integrity: sha256-IwkFgzWVwqjZkQRLYBGaRukKqYIw2I7wKHwIXRcdBWI= + version: 2.1.0 + integrity: sha256-pYBOQY4bGEZzI5UHsUxJAhsKqtmE6CC1sHmFqgj64V8= dependencies: - aff - arrays @@ -652,7 +651,6 @@ packages: - parallel - pipes - prelude - - profunctor - st - strings - tailrec diff --git a/spago.yaml b/spago.yaml index 0bdb5b0..a0e4a7a 100644 --- a/spago.yaml +++ b/spago.yaml @@ -25,14 +25,13 @@ package: - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - - node-stream-pipes: ">=2.0.2 <3.0.0" + - node-stream-pipes: ">=2.1.0 <3.0.0" - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" - - profunctor: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" - record-extra: ">=5.0.1 <6.0.0" - st: ">=6.2.0 <7.0.0" diff --git a/src/Pipes.CSV.purs b/src/Pipes.CSV.purs index b1345e7..577dc6c 100644 --- a/src/Pipes.CSV.purs +++ b/src/Pipes.CSV.purs @@ -2,20 +2,17 @@ module Pipes.CSV where import Prelude -import Control.Monad.Error.Class (liftEither) +import Control.Monad.Cont (lift) +import Control.Monad.Error.Class (liftEither, liftMaybe) import Control.Monad.Except (runExcept) -import Control.Monad.ST.Global (Global) import Control.Monad.ST.Global as ST -import Control.Monad.ST.Ref (STRef) import Control.Monad.ST.Ref as STRef import Data.Array as Array import Data.Bifunctor (lmap) import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord) import Data.FunctorWithIndex (mapWithIndex) -import Data.Map (Map) import Data.Map as Map import Data.Maybe (Maybe(..)) -import Data.Profunctor as Pro import Data.Tuple.Nested ((/\)) import Effect.Aff (Aff) import Effect.Class (liftEffect) @@ -23,25 +20,19 @@ import Effect.Exception (error) import Node.Buffer (Buffer) import Node.Stream.CSV.Parse as CSV.Parse import Node.Stream.CSV.Stringify as CSV.Stringify -import Pipes.Async (AsyncPipe(..), ReadResult(..)) -import Pipes.Node.Stream (TransformContext) +import Pipes.Async (AsyncPipe(..), ReadResult(..), getAsyncIO, mapIO, stripIO) import Pipes.Node.Stream as Pipes.Stream import Prim.RowList (class RowToList) import Record.Extra (class Keys, keys) import Type.Prelude (Proxy(..)) -type ParseContext = - { colsST :: STRef Global (Maybe (Map String Int)) - , t :: TransformContext Buffer (Array String) - } - -- | Transforms buffer chunks of a CSV file to parsed -- | records of `r`. parse :: forall @r rl . RowToList r rl => ReadCSVRecord r rl - => AsyncPipe ParseContext Aff (Maybe Buffer) (Maybe { | r }) + => AsyncPipe (Maybe Buffer) (Maybe { | r }) Aff Unit parse = let readCols st = liftEffect $ ST.toEffect $ STRef.read st @@ -49,54 +40,43 @@ parse = firstRow st a = putCols st $ Map.fromFoldable $ mapWithIndex (flip (/\)) a row a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a + in do + colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing - (AsyncPipe init' write' awaitWrite' read' awaitRead') = - Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {} + let + decoder = Pipes.Stream.fromTransformEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {} - init = do - t <- init' - colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing - pure {t, colsST} + {write, read: read', awaitRead, awaitWrite} /\ done <- liftMaybe (error "unreachable") =<< lift (getAsyncIO decoder) + let + read = + read' >>= case _ of + ReadWouldBlock -> pure ReadWouldBlock + ReadOk Nothing -> pure $ ReadOk Nothing + ReadOk (Just r') -> do + cols <- readCols colsST + case cols of + Just cols' -> ReadOk <$> Just <$> row r' cols' + Nothing -> firstRow colsST r' *> read - write {t} a = write' t a - awaitWrite {t} = awaitWrite' t - awaitRead {t} = awaitRead' t - - read {colsST, t} = do - r <- read' t - case r of - ReadWouldBlock -> pure ReadWouldBlock - ReadOk Nothing -> pure $ ReadOk Nothing - ReadOk (Just r') -> do - cols <- readCols colsST - case cols of - Just cols' -> ReadOk <$> Just <$> row r' cols' - Nothing -> firstRow colsST r' *> read {colsST, t} - in - AsyncPipe - init - write - awaitWrite - read - awaitRead + AsyncIO $ {write, read, awaitWrite, awaitRead} /\ lift (stripIO done) -- | Transforms buffer chunks of a CSV file to parsed -- | arrays of CSV values. -parseRaw :: AsyncPipe (TransformContext Buffer (Array String)) Aff (Maybe Buffer) (Maybe (Array String)) -parseRaw = do - Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {} +parseRaw :: AsyncPipe (Maybe Buffer) (Maybe (Array String)) Aff Unit +parseRaw = + Pipes.Stream.fromTransformEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {} -- | Transforms CSV rows into stringified CSV records -- | using the given ordered array of column names. -stringifyRaw :: Array String -> AsyncPipe (TransformContext (Array String) String) Aff (Maybe (Array String)) (Maybe String) +stringifyRaw :: Array String -> AsyncPipe (Maybe (Array String)) (Maybe String) Aff Unit stringifyRaw columns = - Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {} + Pipes.Stream.fromTransformEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {} -- | Transforms purescript records into stringified CSV records. -- | -- | Columns are inferred from the record's keys, ordered alphabetically. -stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => AsyncPipe (TransformContext (Array String) String) Aff (Maybe { | r }) (Maybe String) +stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => AsyncPipe (Maybe { | r }) (Maybe String) Aff Unit stringify = do let - p = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {} - Pro.lcmap (map $ writeCSVRecord @r @rl) p + p = Pipes.Stream.fromTransformEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {} + mapIO (map $ writeCSVRecord @r @rl) identity p