fix: pipes should use streamin/out

This commit is contained in:
orion 2024-06-25 17:57:53 -05:00
parent 2e7015b19d
commit 23496f71f3
Signed by: orion
GPG Key ID: 6D4165AE4C928719

View File

@ -3,39 +3,21 @@ module Pipes.Postgres where
import Prelude import Prelude
import Control.Monad.Cont (lift) import Control.Monad.Cont (lift)
import Control.Monad.Error.Class (class MonadError, class MonadThrow)
import Control.Monad.Morph (hoist) import Control.Monad.Morph (hoist)
import Control.Monad.Postgres (SessionT) import Control.Monad.Postgres (class MonadSession, streamIn, streamOut)
import Control.Monad.Reader (ask)
import Data.Maybe (Maybe) import Data.Maybe (Maybe)
import Effect.Aff.Class (class MonadAff) import Effect.Aff.Class (liftAff)
import Effect.Aff.Postgres.Client as Client
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Effect.Postgres.Error.RE as RE
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Stream.Object as O import Node.Stream.Object as O
import Pipes.Core (Consumer, Producer) import Pipes.Core (Consumer, Producer)
import Pipes.Node.Stream (fromReadable, fromWritable) import Pipes.Node.Stream (fromReadable, fromWritable)
stdin stdin :: forall m. MonadSession m => String -> Consumer (Maybe Buffer) m Unit
:: forall m
. MonadAff m
=> MonadError Error m
=> String
-> Consumer (Maybe Buffer) (SessionT m) Unit
stdin q = do stdin q = do
client <- lift ask stream <- lift $ streamIn q
stream <- lift $ RE.liftExcept $ hoist liftEffect $ Client.execWithStdin q client hoist liftAff $ fromWritable $ O.unsafeFromBufferWritable stream
hoist lift (fromWritable $ O.unsafeFromBufferWritable stream)
stdout stdout :: forall m. MonadSession m => String -> Producer (Maybe Buffer) m Unit
:: forall m
. MonadAff m
=> MonadThrow Error m
=> String
-> Producer (Maybe Buffer) (SessionT m) Unit
stdout q = do stdout q = do
client <- lift ask stream <- lift $ streamOut q
stream <- lift $ RE.liftExcept $ hoist liftEffect $ Client.queryWithStdout q client hoist liftAff $ fromReadable $ O.unsafeFromBufferReadable stream
hoist lift (fromReadable (O.unsafeFromBufferReadable stream))