From 23496f71f3adf7378a1f595de86fc8eac4268e8e Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Tue, 25 Jun 2024 17:57:53 -0500 Subject: [PATCH] fix: pipes should use streamin/out --- src/Pipes.Postgres.purs | 34 ++++++++-------------------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/src/Pipes.Postgres.purs b/src/Pipes.Postgres.purs index b4063f3..9d894f8 100644 --- a/src/Pipes.Postgres.purs +++ b/src/Pipes.Postgres.purs @@ -3,39 +3,21 @@ module Pipes.Postgres where import Prelude import Control.Monad.Cont (lift) -import Control.Monad.Error.Class (class MonadError, class MonadThrow) import Control.Monad.Morph (hoist) -import Control.Monad.Postgres (SessionT) -import Control.Monad.Reader (ask) +import Control.Monad.Postgres (class MonadSession, streamIn, streamOut) import Data.Maybe (Maybe) -import Effect.Aff.Class (class MonadAff) -import Effect.Aff.Postgres.Client as Client -import Effect.Class (liftEffect) -import Effect.Exception (Error) -import Effect.Postgres.Error.RE as RE +import Effect.Aff.Class (liftAff) import Node.Buffer (Buffer) import Node.Stream.Object as O import Pipes.Core (Consumer, Producer) import Pipes.Node.Stream (fromReadable, fromWritable) -stdin - :: forall m - . MonadAff m - => MonadError Error m - => String - -> Consumer (Maybe Buffer) (SessionT m) Unit +stdin :: forall m. MonadSession m => String -> Consumer (Maybe Buffer) m Unit stdin q = do - client <- lift ask - stream <- lift $ RE.liftExcept $ hoist liftEffect $ Client.execWithStdin q client - hoist lift (fromWritable $ O.unsafeFromBufferWritable stream) + stream <- lift $ streamIn q + hoist liftAff $ fromWritable $ O.unsafeFromBufferWritable stream -stdout - :: forall m - . MonadAff m - => MonadThrow Error m - => String - -> Producer (Maybe Buffer) (SessionT m) Unit +stdout :: forall m. MonadSession m => String -> Producer (Maybe Buffer) m Unit stdout q = do - client <- lift ask - stream <- lift $ RE.liftExcept $ hoist liftEffect $ Client.queryWithStdout q client - hoist lift (fromReadable (O.unsafeFromBufferReadable stream)) + stream <- lift $ streamOut q + hoist liftAff $ fromReadable $ O.unsafeFromBufferReadable stream