diff --git a/src/Control.Monad.Postgres.Base.purs b/src/Control.Monad.Postgres.Base.purs index 0b8328f..2e8e744 100644 --- a/src/Control.Monad.Postgres.Base.purs +++ b/src/Control.Monad.Postgres.Base.purs @@ -105,7 +105,7 @@ instance (MonadBracket e f m, MonadAff m) => MonadSession (PostgresT m) where -- | - `session` - Session monad (for `PostgresT` this is `SessionT`) -- | - `cursor` - Cursor session monad (for `PostgresT` this is `CursorT`) -- | - `ct` - Open type parameter for cursor type. Don't pin this to a concrete type. -class (MonadSession session, MonadCursor cursor ct) <= MonadPostgres m session cursor ct | m -> ct cursor session where +class (Monad m, MonadSession session, MonadCursor cursor ct) <= MonadPostgres m session cursor ct | m -> ct cursor session where -- | Run a session in `m`. session :: session ~> m -- | Run a session in `m`, wrapped in a transaction. diff --git a/src/Pipes.Postgres.purs b/src/Pipes.Postgres.purs index 3b4e248..5e0dad2 100644 --- a/src/Pipes.Postgres.purs +++ b/src/Pipes.Postgres.purs @@ -3,21 +3,53 @@ module Pipes.Postgres where import Prelude import Control.Monad.Morph (hoist) -import Control.Monad.Postgres.Session (class MonadSession, streamIn, streamOut) -import Control.Monad.Trans.Class (lift) -import Data.Maybe (Maybe) -import Effect.Aff.Class (liftAff) +import Control.Monad.Postgres (class MonadPostgres) +import Control.Monad.Reader (class MonadAsk, ask) +import Data.Maybe (Maybe(..)) +import Effect.Aff.Class (class MonadAff, liftAff) +import Effect.Aff.Postgres.Client as Client +import Effect.Aff.Postgres.Pool (Pool) +import Effect.Aff.Postgres.Pool as Pool +import Effect.Class (liftEffect) import Node.Buffer (Buffer) import Node.Stream.Object as O +import Pipes ((>->)) import Pipes.Core (Consumer, Producer) import Pipes.Node.Stream (fromReadable, fromWritable) +import Pipes.Prelude as Pipes -stdin :: forall m. MonadSession m => String -> Consumer (Maybe Buffer) m Unit +stdin + :: forall m s c ct + . MonadAff m + => MonadAsk Pool m + => MonadPostgres m s c ct + => String + -> Consumer (Maybe Buffer) m Unit stdin q = do - stream <- lift $ streamIn q - hoist liftAff $ fromWritable $ O.fromBufferWritable stream + pool <- ask + client <- liftAff $ Pool.connect pool + stream <- liftEffect $ Client.execWithStdin q client + let + releaseOnEOS Nothing = Pool.release pool client $> Nothing + releaseOnEOS (Just a) = pure (Just a) + hoist liftAff + $ hoist liftEffect (Pipes.mapM releaseOnEOS) + >-> fromWritable (O.fromBufferWritable stream) -stdout :: forall m. MonadSession m => String -> Producer (Maybe Buffer) m Unit +stdout + :: forall m s c ct + . MonadAff m + => MonadAsk Pool m + => MonadPostgres m s c ct + => String + -> Producer (Maybe Buffer) m Unit stdout q = do - stream <- lift $ streamOut q - hoist liftAff $ fromReadable $ O.fromBufferReadable stream + pool <- ask + client <- liftAff $ Pool.connect pool + stream <- liftEffect $ Client.queryWithStdout q client + let + releaseOnEOS Nothing = Pool.release pool client $> Nothing + releaseOnEOS (Just a) = pure (Just a) + hoist liftAff + $ fromReadable (O.fromBufferReadable stream) + >-> hoist liftEffect (Pipes.mapM releaseOnEOS)