generated from tpl/purs
fix: pipes monad stuff
This commit is contained in:
parent
ca7ebb4337
commit
188403681a
@ -105,7 +105,7 @@ instance (MonadBracket e f m, MonadAff m) => MonadSession (PostgresT m) where
|
||||
-- | - `session` - Session monad (for `PostgresT` this is `SessionT`)
|
||||
-- | - `cursor` - Cursor session monad (for `PostgresT` this is `CursorT`)
|
||||
-- | - `ct` - Open type parameter for cursor type. Don't pin this to a concrete type.
|
||||
class (MonadSession session, MonadCursor cursor ct) <= MonadPostgres m session cursor ct | m -> ct cursor session where
|
||||
class (Monad m, MonadSession session, MonadCursor cursor ct) <= MonadPostgres m session cursor ct | m -> ct cursor session where
|
||||
-- | Run a session in `m`.
|
||||
session :: session ~> m
|
||||
-- | Run a session in `m`, wrapped in a transaction.
|
||||
|
@ -3,21 +3,53 @@ module Pipes.Postgres where
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Postgres.Session (class MonadSession, streamIn, streamOut)
|
||||
import Control.Monad.Trans.Class (lift)
|
||||
import Data.Maybe (Maybe)
|
||||
import Effect.Aff.Class (liftAff)
|
||||
import Control.Monad.Postgres (class MonadPostgres)
|
||||
import Control.Monad.Reader (class MonadAsk, ask)
|
||||
import Data.Maybe (Maybe(..))
|
||||
import Effect.Aff.Class (class MonadAff, liftAff)
|
||||
import Effect.Aff.Postgres.Client as Client
|
||||
import Effect.Aff.Postgres.Pool (Pool)
|
||||
import Effect.Aff.Postgres.Pool as Pool
|
||||
import Effect.Class (liftEffect)
|
||||
import Node.Buffer (Buffer)
|
||||
import Node.Stream.Object as O
|
||||
import Pipes ((>->))
|
||||
import Pipes.Core (Consumer, Producer)
|
||||
import Pipes.Node.Stream (fromReadable, fromWritable)
|
||||
import Pipes.Prelude as Pipes
|
||||
|
||||
stdin :: forall m. MonadSession m => String -> Consumer (Maybe Buffer) m Unit
|
||||
stdin
|
||||
:: forall m s c ct
|
||||
. MonadAff m
|
||||
=> MonadAsk Pool m
|
||||
=> MonadPostgres m s c ct
|
||||
=> String
|
||||
-> Consumer (Maybe Buffer) m Unit
|
||||
stdin q = do
|
||||
stream <- lift $ streamIn q
|
||||
hoist liftAff $ fromWritable $ O.fromBufferWritable stream
|
||||
pool <- ask
|
||||
client <- liftAff $ Pool.connect pool
|
||||
stream <- liftEffect $ Client.execWithStdin q client
|
||||
let
|
||||
releaseOnEOS Nothing = Pool.release pool client $> Nothing
|
||||
releaseOnEOS (Just a) = pure (Just a)
|
||||
hoist liftAff
|
||||
$ hoist liftEffect (Pipes.mapM releaseOnEOS)
|
||||
>-> fromWritable (O.fromBufferWritable stream)
|
||||
|
||||
stdout :: forall m. MonadSession m => String -> Producer (Maybe Buffer) m Unit
|
||||
stdout
|
||||
:: forall m s c ct
|
||||
. MonadAff m
|
||||
=> MonadAsk Pool m
|
||||
=> MonadPostgres m s c ct
|
||||
=> String
|
||||
-> Producer (Maybe Buffer) m Unit
|
||||
stdout q = do
|
||||
stream <- lift $ streamOut q
|
||||
hoist liftAff $ fromReadable $ O.fromBufferReadable stream
|
||||
pool <- ask
|
||||
client <- liftAff $ Pool.connect pool
|
||||
stream <- liftEffect $ Client.queryWithStdout q client
|
||||
let
|
||||
releaseOnEOS Nothing = Pool.release pool client $> Nothing
|
||||
releaseOnEOS (Just a) = pure (Just a)
|
||||
hoist liftAff
|
||||
$ fromReadable (O.fromBufferReadable stream)
|
||||
>-> hoist liftEffect (Pipes.mapM releaseOnEOS)
|
||||
|
Loading…
Reference in New Issue
Block a user