generated from tpl/purs
feat!: rework error handling
This commit is contained in:
parent
c9e5ee9205
commit
11c1645173
@ -15,11 +15,13 @@ workspace:
|
||||
- foldable-traversable: ">=6.0.0 <7.0.0"
|
||||
- foreign: ">=7.0.0 <8.0.0"
|
||||
- fork: ">=6.0.0 <7.0.0"
|
||||
- functors
|
||||
- integers: ">=6.0.0 <7.0.0"
|
||||
- js-bigints: ">=2.2.1 <3.0.0"
|
||||
- lists: ">=7.0.0 <8.0.0"
|
||||
- maybe: ">=6.0.0 <7.0.0"
|
||||
- mmorph: ">=7.0.0 <8.0.0"
|
||||
- monad-control
|
||||
- newtype: ">=5.0.0 <6.0.0"
|
||||
- node-buffer: ">=9.0.0 <10.0.0"
|
||||
- node-event-emitter: ">=3.0.0 <4.0.0"
|
||||
@ -33,6 +35,7 @@ workspace:
|
||||
- prelude: ">=6.0.1 <7.0.0"
|
||||
- profunctor: ">=6.0.1 <7.0.0"
|
||||
- record: ">=4.0.0 <5.0.0"
|
||||
- refs
|
||||
- simple-json: ">=9.0.0 <10.0.0"
|
||||
- strings: ">=6.0.1 <7.0.0"
|
||||
- tailrec: ">=6.1.0 <7.0.0"
|
||||
@ -728,8 +731,8 @@ packages:
|
||||
- unsafe-coerce
|
||||
node-stream-pipes:
|
||||
type: registry
|
||||
version: 2.1.0
|
||||
integrity: sha256-pYBOQY4bGEZzI5UHsUxJAhsKqtmE6CC1sHmFqgj64V8=
|
||||
version: 2.1.1
|
||||
integrity: sha256-j7ZE+Vtc9gwXlH8s8pyVsbbCMd91AIRk05IOsZAO7x0=
|
||||
dependencies:
|
||||
- aff
|
||||
- arrays
|
||||
|
@ -13,6 +13,9 @@ package:
|
||||
strict: true
|
||||
pedanticPackages: true
|
||||
dependencies:
|
||||
- functors
|
||||
- monad-control
|
||||
- refs
|
||||
- aff: ">=7.1.0 <8.0.0"
|
||||
- aff-promise: ">=4.0.0 <5.0.0"
|
||||
- arrays: ">=7.3.0 <8.0.0"
|
||||
|
@ -2,18 +2,13 @@ module Control.Monad.Postgres.Base where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Alt (class Alt)
|
||||
import Control.Alternative (class Alternative, class Plus)
|
||||
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
|
||||
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, class MonadKill, bracket, kill, never, uninterruptible)
|
||||
import Control.Monad.Morph (class MFunctor, class MMonad)
|
||||
import Control.Monad.Error.Class (catchError, throwError)
|
||||
import Control.Monad.Fork.Class (class MonadBracket, bracket)
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Postgres.Cursor (class MonadCursor, CursorT)
|
||||
import Control.Monad.Postgres.Session (class MonadSession, SessionT, exec, exec_, query, streamIn, streamOut)
|
||||
import Control.Monad.Reader (class MonadAsk, class MonadReader, ReaderT, ask, local, runReaderT)
|
||||
import Control.Monad.Rec.Class (class MonadRec)
|
||||
import Control.Monad.Trans.Class (class MonadTrans, lift)
|
||||
import Control.Parallel (class Parallel, parallel, sequential)
|
||||
import Data.Newtype (class Newtype, unwrap, wrap)
|
||||
import Control.Monad.Postgres.Session (class MonadSession, SessionT, endSession, exec, exec_, startSession)
|
||||
import Control.Monad.Reader (ask, runReaderT)
|
||||
import Data.Newtype (unwrap)
|
||||
import Data.Postgres (RepT)
|
||||
import Data.Postgres.Query (class AsQuery, asQuery)
|
||||
import Data.Postgres.Raw (Raw)
|
||||
@ -22,9 +17,12 @@ import Data.Tuple.Nested ((/\))
|
||||
import Effect.Aff.Class (class MonadAff, liftAff)
|
||||
import Effect.Aff.Postgres.Pool (Pool)
|
||||
import Effect.Aff.Postgres.Pool as Pool
|
||||
import Effect.Aff.Unlift (class MonadUnliftAff)
|
||||
import Effect.Class (class MonadEffect, liftEffect)
|
||||
import Effect.Unlift (class MonadUnliftEffect)
|
||||
import Effect.Class (liftEffect)
|
||||
import Effect.Exception as Effect
|
||||
import Effect.Postgres.Error (RE)
|
||||
import Effect.Postgres.Error as E
|
||||
import Effect.Postgres.Error.Except as X
|
||||
import Effect.Postgres.Error.RE as RE
|
||||
import Prim.Row (class Union)
|
||||
|
||||
-- | Monad handling pool resource acquisition & release
|
||||
@ -51,53 +49,8 @@ import Prim.Row (class Union)
|
||||
-- | res <- Client.query "select * from foo" client
|
||||
-- | pure $ res == 1
|
||||
-- | ```
|
||||
newtype PostgresT :: forall k. (k -> Type) -> k -> Type
|
||||
newtype PostgresT m a = PostgresT (ReaderT Pool m a)
|
||||
|
||||
derive instance Newtype (PostgresT m a) _
|
||||
derive newtype instance (Functor m) => Functor (PostgresT m)
|
||||
derive newtype instance (Apply m) => Apply (PostgresT m)
|
||||
derive newtype instance (Alternative m) => Alternative (PostgresT m)
|
||||
derive newtype instance (Applicative m) => Applicative (PostgresT m)
|
||||
derive newtype instance (Plus m) => Plus (PostgresT m)
|
||||
derive newtype instance (Alt m) => Alt (PostgresT m)
|
||||
derive newtype instance (Bind m) => Bind (PostgresT m)
|
||||
derive newtype instance (Monad m) => Monad (PostgresT m)
|
||||
derive newtype instance (MonadEffect m) => MonadEffect (PostgresT m)
|
||||
derive newtype instance (MonadAff m) => MonadAff (PostgresT m)
|
||||
derive newtype instance (MonadUnliftEffect m) => MonadUnliftEffect (PostgresT m)
|
||||
derive newtype instance (MonadUnliftAff m) => MonadUnliftAff (PostgresT m)
|
||||
derive newtype instance MonadRec m => MonadRec (PostgresT m)
|
||||
derive newtype instance MonadTrans (PostgresT)
|
||||
derive newtype instance (MonadThrow e m) => MonadThrow e (PostgresT m)
|
||||
derive newtype instance (MonadError e m) => MonadError e (PostgresT m)
|
||||
derive newtype instance (MonadFork f m) => MonadFork f (PostgresT m)
|
||||
derive newtype instance MFunctor PostgresT
|
||||
derive newtype instance MMonad PostgresT
|
||||
instance (Apply m, Apply p, Parallel p m) => Parallel (PostgresT p) (PostgresT m) where
|
||||
parallel = wrap <<< parallel <<< unwrap
|
||||
sequential = wrap <<< sequential <<< unwrap
|
||||
|
||||
instance (Monad m, MonadKill e f m) => MonadKill e f (PostgresT m) where
|
||||
kill a b = lift $ kill a b
|
||||
|
||||
instance (Monad m, MonadBracket e f (ReaderT Pool m), MonadBracket e f m) => MonadBracket e f (PostgresT m) where
|
||||
bracket acq rel m = wrap $ bracket (unwrap acq) (\a b -> unwrap $ rel a b) (unwrap <<< m)
|
||||
uninterruptible a = wrap $ uninterruptible $ unwrap a
|
||||
never = lift $ never
|
||||
|
||||
instance Monad m => MonadAsk Pool (PostgresT m) where
|
||||
ask = wrap ask
|
||||
|
||||
instance Monad m => MonadReader Pool (PostgresT m) where
|
||||
local f m = wrap $ local f $ unwrap m
|
||||
|
||||
instance (MonadBracket e f m, MonadAff m) => MonadSession (PostgresT m) where
|
||||
query = session <<< query
|
||||
exec = session <<< exec
|
||||
exec_ = session <<< exec_
|
||||
streamIn = session <<< streamIn
|
||||
streamOut = session <<< streamOut
|
||||
type PostgresT :: (Type -> Type) -> Type -> Type
|
||||
type PostgresT = RE Pool
|
||||
|
||||
-- | Typeclass generalizing `PostgresT`. Allows for dependency-injecting different
|
||||
-- | implementations of the idea of a postgres connection.
|
||||
@ -117,13 +70,23 @@ class (Monad m, MonadSession session, MonadCursor cursor ct) <= MonadPostgres m
|
||||
-- | yielded by the cursor
|
||||
cursorWith :: forall q. AsQuery q => (Array Raw -> RepT ct) -> String -> q -> cursor ~> m
|
||||
|
||||
instance (MonadBracket e f m, MonadAff m, MonadSession (SessionT m), MonadCursor (CursorT t (SessionT m)) t) => MonadPostgres (PostgresT m) (SessionT m) (CursorT ct (SessionT m)) ct where
|
||||
instance
|
||||
( MonadBracket Effect.Error f m
|
||||
, MonadAff m
|
||||
, MonadSession (SessionT m)
|
||||
, MonadCursor (CursorT t (SessionT m)) t
|
||||
) => MonadPostgres
|
||||
(PostgresT m)
|
||||
(SessionT m)
|
||||
(CursorT ct (SessionT m))
|
||||
ct
|
||||
where
|
||||
session m = do
|
||||
pool <- ask
|
||||
let
|
||||
acq = liftAff $ Pool.connect pool
|
||||
rel _ c = liftEffect $ Pool.release pool c
|
||||
lift $ bracket acq rel (runReaderT m)
|
||||
client <- RE.liftExcept $ hoist liftAff $ startSession pool
|
||||
RE.finally
|
||||
(RE.liftExcept $ hoist liftEffect $ endSession pool client)
|
||||
(RE.liftExcept $ RE.toExcept m client)
|
||||
transaction m =
|
||||
let
|
||||
begin = void $ exec "begin;"
|
||||
@ -133,27 +96,29 @@ instance (MonadBracket e f m, MonadAff m, MonadSession (SessionT m), MonadCursor
|
||||
session $ begin *> catchError commit rollback
|
||||
cursorWith f cur q m =
|
||||
transaction do
|
||||
q' <- liftEffect $ asQuery q
|
||||
q' <- RE.liftExcept $ X.printing $ asQuery q
|
||||
exec_ $ "declare " <> cur <> " cursor for (" <> (unwrap q').text <> ");"
|
||||
runReaderT (unwrap m) (cur /\ f)
|
||||
|
||||
-- | Create a server-side cursor for a query in a transaction,
|
||||
-- | and execute a `CursorT` with a view to the new cursor.
|
||||
cursor :: forall @cursort t session cursor q a. MonadPostgres t session cursor cursort => AsQuery q => FromRow cursort => String -> q -> cursor a -> t a
|
||||
cursor = cursorWith fromRow
|
||||
cursor = cursorWith (fromRow 0)
|
||||
|
||||
-- | Execute a `PostgresT` using an existing connection pool.
|
||||
-- |
|
||||
-- | This will not invoke `Pool.end` after executing.
|
||||
withPool :: forall m a. PostgresT m a -> Pool -> m a
|
||||
withPool = runReaderT <<< unwrap
|
||||
withPool :: forall m a. PostgresT m a -> Pool -> E.Except m a
|
||||
withPool m p = runReaderT (unwrap m) p
|
||||
|
||||
-- | Create a new connection pool from the provided config and execute
|
||||
-- | the postgres monad, invoking `Effect.Aff.Postgres.Pool.end` afterwards.
|
||||
runPostgres :: forall m a missing trash r e f. MonadBracket e f m => MonadAff m => Union r missing (Pool.Config trash) => Record r -> PostgresT m a -> m a
|
||||
runPostgres :: forall m a missing trash r f. MonadBracket Effect.Error f m => MonadAff m => Union r missing (Pool.Config trash) => Record r -> PostgresT m a -> E.Except m a
|
||||
runPostgres cfg m =
|
||||
let
|
||||
acq :: RE Unit m Pool
|
||||
acq = liftEffect $ Pool.make @r @missing @trash cfg
|
||||
rel _ p = liftAff $ Pool.end p
|
||||
rel :: _ -> Pool -> RE Unit m Unit
|
||||
rel _ p = RE.liftExcept $ hoist liftAff $ Pool.end p
|
||||
in
|
||||
bracket acq rel $ withPool m
|
||||
RE.toExcept (bracket acq rel $ RE.liftExcept <<< withPool m) unit
|
||||
|
@ -22,6 +22,7 @@ import Data.Tuple.Nested (type (/\), (/\))
|
||||
import Effect.Aff.Class (class MonadAff)
|
||||
import Effect.Aff.Unlift (class MonadUnliftAff)
|
||||
import Effect.Class (class MonadEffect, liftEffect)
|
||||
import Effect.Postgres.Error as E
|
||||
import Effect.Unlift (class MonadUnliftEffect)
|
||||
|
||||
data Move
|
||||
@ -118,7 +119,7 @@ instance (MonadSession m) => MonadCursor (CursorT t m) t where
|
||||
RowsAffected n' <- query $ ("move relative $1 from " <> cur) /\ n
|
||||
pure n'
|
||||
|
||||
instance (MonadSession m) => MonadSession (CursorT t m) where
|
||||
instance (MonadThrow E.E m, MonadSession m) => MonadSession (CursorT t m) where
|
||||
query = lift <<< query
|
||||
exec = lift <<< exec
|
||||
exec_ = lift <<< exec_
|
||||
|
@ -1,21 +1,50 @@
|
||||
module Control.Monad.Postgres.Session where
|
||||
|
||||
import Prelude
|
||||
import Prelude hiding (join)
|
||||
|
||||
import Control.Monad.Reader (ReaderT, ask)
|
||||
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
|
||||
import Control.Monad.Fork.Class (class MonadBracket)
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Reader (ask)
|
||||
import Control.Monad.Trans.Class (lift)
|
||||
import Data.Newtype (wrap)
|
||||
import Data.Postgres.Query (class AsQuery)
|
||||
import Data.Postgres.Result (class FromRows)
|
||||
import Effect (Effect)
|
||||
import Effect.Aff (Aff)
|
||||
import Effect.Aff.Class (class MonadAff, liftAff)
|
||||
import Effect.Aff.Postgres.Client (Client)
|
||||
import Effect.Aff.Postgres.Client as Client
|
||||
import Effect.Class (liftEffect)
|
||||
import Node.Stream (Writable, Readable)
|
||||
import Effect.Aff.Postgres.Pool (Pool)
|
||||
import Effect.Aff.Postgres.Pool as Pool
|
||||
import Effect.Class (class MonadEffect, liftEffect)
|
||||
import Effect.Exception as Effect
|
||||
import Effect.Postgres.Error (RE)
|
||||
import Effect.Postgres.Error as E
|
||||
import Effect.Postgres.Error.Except as X
|
||||
import Effect.Postgres.Error.RE as RE
|
||||
import Effect.Ref as Ref
|
||||
import Node.EventEmitter as Event
|
||||
import Node.Stream (Readable, Stream, Writable)
|
||||
import Node.Stream as Stream
|
||||
|
||||
type SessionT :: forall k. (k -> Type) -> k -> Type
|
||||
type SessionT = ReaderT Client
|
||||
type SessionT :: (Type -> Type) -> Type -> Type
|
||||
type SessionT = RE Client
|
||||
|
||||
class MonadStartSession a where
|
||||
startSession :: a -> E.Except Aff Client
|
||||
endSession :: a -> Client -> E.Except Effect Unit
|
||||
|
||||
instance MonadStartSession Pool where
|
||||
startSession = Pool.connect
|
||||
endSession p c = Pool.release p c
|
||||
|
||||
instance MonadStartSession Client where
|
||||
startSession = pure
|
||||
endSession _ _ = pure unit
|
||||
|
||||
-- | A monad representing a connected session to a database
|
||||
class MonadAff m <= MonadSession m where
|
||||
class (MonadThrow E.E m, MonadAff m) <= MonadSession m where
|
||||
-- | Executes a query and unmarshals the result into `r`
|
||||
query :: forall q r. AsQuery q => FromRows r => q -> m r
|
||||
-- | Executes a query and returns the number of rows affected
|
||||
@ -41,17 +70,44 @@ class MonadAff m <= MonadSession m where
|
||||
-- | ```
|
||||
streamOut :: String -> m (Readable ())
|
||||
|
||||
instance MonadAff m => MonadSession (SessionT m) where
|
||||
instance (MonadStartSession s, MonadBracket Effect.Error f m, MonadAff m) => MonadSession (RE s m) where
|
||||
query q = do
|
||||
client <- ask
|
||||
liftAff $ Client.query q client
|
||||
pool <- ask
|
||||
client <- hoist liftAff $ RE.liftExcept $ startSession pool
|
||||
RE.finally
|
||||
(hoist liftEffect $ RE.liftExcept $ endSession pool client)
|
||||
(wrap $ lift $ hoist liftAff $ Client.query q client)
|
||||
exec q = do
|
||||
client <- ask
|
||||
liftAff $ Client.exec q client
|
||||
pool <- ask
|
||||
client <- hoist liftAff $ RE.liftExcept $ startSession pool
|
||||
RE.finally
|
||||
(hoist liftEffect $ RE.liftExcept $ endSession pool client)
|
||||
(wrap $ lift $ hoist liftAff $ Client.exec q client)
|
||||
exec_ = void <<< exec
|
||||
streamIn q = do
|
||||
client <- ask
|
||||
liftEffect $ Client.execWithStdin q client
|
||||
pool <- ask
|
||||
client <- hoist liftAff $ RE.liftExcept $ startSession pool
|
||||
handleStream (X.run $ endSession pool client) (RE.liftExcept $ hoist liftEffect $ Client.execWithStdin q client)
|
||||
streamOut q = do
|
||||
client <- ask
|
||||
liftEffect $ Client.queryWithStdout q client
|
||||
pool <- ask
|
||||
client <- hoist liftAff $ RE.liftExcept $ startSession pool
|
||||
handleStream (X.run $ endSession pool client) (RE.liftExcept $ hoist liftEffect $ Client.queryWithStdout q client)
|
||||
|
||||
handleStream :: forall e m r. MonadEffect m => MonadError e m => Effect Unit -> m (Stream r) -> m (Stream r)
|
||||
handleStream onError getStream =
|
||||
flip catchError
|
||||
(\e -> liftEffect onError *> throwError e)
|
||||
do
|
||||
stream <- getStream
|
||||
liftEffect $ onErrorOrClose stream onError
|
||||
pure stream
|
||||
|
||||
onErrorOrClose :: forall r. Stream r -> Effect Unit -> Effect Unit
|
||||
onErrorOrClose stream eff = do
|
||||
did <- Ref.new false
|
||||
let
|
||||
onE = do
|
||||
did' <- Ref.read did
|
||||
when (not did') (Ref.write true did *> eff)
|
||||
Event.once_ Stream.errorH (const onE) stream
|
||||
Event.once_ Stream.closeH onE stream
|
||||
|
@ -2,10 +2,11 @@ module Data.Postgres.Query where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Cont (lift)
|
||||
import Data.Maybe (Maybe(..))
|
||||
import Data.Newtype (class Newtype)
|
||||
import Data.Nullable (Nullable, toNullable)
|
||||
import Data.Postgres (class Rep, serialize, smash)
|
||||
import Data.Postgres (class Rep, RepT, serialize, smash)
|
||||
import Data.Postgres.Raw (Raw)
|
||||
import Data.Tuple.Nested (type (/\), (/\))
|
||||
import Effect (Effect)
|
||||
@ -30,26 +31,29 @@ derive newtype instance Show Query
|
||||
emptyQuery :: Query
|
||||
emptyQuery = Query { text: "", values: [], name: Nothing }
|
||||
|
||||
stringQuery :: String -> Query
|
||||
stringQuery s = Query { text: s, values: [], name: Nothing }
|
||||
|
||||
-- | Any value that can be converted to an array of query parameters
|
||||
class AsQueryParams a where
|
||||
asQueryParams :: a -> Effect (Array Raw)
|
||||
asQueryParams :: a -> RepT (Array Raw)
|
||||
|
||||
instance AsQueryParams (Array Raw) where
|
||||
asQueryParams = pure
|
||||
else instance (Rep a, AsQueryParams b) => AsQueryParams (a /\ b) where
|
||||
asQueryParams (a /\ tail) = do
|
||||
a' <- map pure $ smash $ serialize a
|
||||
a' <- map pure $ serialize a
|
||||
tail' <- asQueryParams tail
|
||||
pure $ a' <> tail'
|
||||
else instance (Rep a) => AsQueryParams a where
|
||||
asQueryParams = map pure <<< smash <<< serialize
|
||||
asQueryParams = map pure <<< serialize
|
||||
|
||||
-- | Values that can be rendered as a SQL query
|
||||
class AsQuery a where
|
||||
asQuery :: a -> Effect Query
|
||||
asQuery :: a -> RepT Query
|
||||
|
||||
instance AsQuery a => AsQuery (Effect a) where
|
||||
asQuery a = asQuery =<< a
|
||||
asQuery a = asQuery =<< lift a
|
||||
|
||||
instance AsQuery Query where
|
||||
asQuery = pure
|
||||
|
@ -2,7 +2,7 @@ module Data.Postgres.Result where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Error.Class (liftMaybe, throwError)
|
||||
import Control.Monad.Error.Class (catchError, liftMaybe, throwError)
|
||||
import Data.Array as Array
|
||||
import Data.Generic.Rep (class Generic)
|
||||
import Data.Int as Int
|
||||
@ -43,9 +43,9 @@ class FromRows a where
|
||||
instance FromRows RowsAffected where
|
||||
fromRows a _ = pure a
|
||||
else instance (FromRow a) => FromRows (Array a) where
|
||||
fromRows _ = traverse fromRow
|
||||
fromRows _ = traverse (fromRow 0)
|
||||
else instance (FromRow a) => FromRows (Maybe a) where
|
||||
fromRows _ = map Array.head <<< traverse fromRow
|
||||
fromRows _ = map Array.head <<< traverse (fromRow 0)
|
||||
else instance (FromRow a) => FromRows a where
|
||||
fromRows a =
|
||||
let
|
||||
@ -89,29 +89,34 @@ class FromRow (a :: Type) where
|
||||
-- | Minimum length of row for type `a`
|
||||
minColumnCount :: forall g. g a -> Int
|
||||
-- | Performs the conversion
|
||||
fromRow :: Array Raw -> RepT a
|
||||
fromRow :: Int -> Array Raw -> RepT a
|
||||
|
||||
instance (Deserialize a, FromRow b) => FromRow (a /\ b) where
|
||||
minColumnCount _ = minColumnCount (Proxy @b) + 1
|
||||
fromRow r =
|
||||
fromRow n r =
|
||||
let
|
||||
minLen = minColumnCount (Proxy @(Tuple a b))
|
||||
lengthMismatch = pure $ TypeMismatch ("Expected row to have at least " <> show minLen <> " columns") ("Found row of length " <> show (Array.length r))
|
||||
in
|
||||
do
|
||||
let
|
||||
de a =
|
||||
catchError
|
||||
(deserialize @a a)
|
||||
(\e -> throwError $ ErrorAtIndex n <$> e)
|
||||
when (Array.length r < minLen) (throwError lengthMismatch)
|
||||
a <- deserialize =<< liftMaybe lengthMismatch (Array.head r)
|
||||
b <- fromRow =<< liftMaybe lengthMismatch (Array.tail r)
|
||||
a <- de =<< liftMaybe lengthMismatch (Array.head r)
|
||||
b <- fromRow (n + 1) =<< liftMaybe lengthMismatch (Array.tail r)
|
||||
pure $ a /\ b
|
||||
else instance FromRow (Array Raw) where
|
||||
minColumnCount _ = 0
|
||||
fromRow = pure
|
||||
fromRow _ = pure
|
||||
else instance FromRow Unit where
|
||||
minColumnCount _ = 0
|
||||
fromRow _ = pure unit
|
||||
fromRow _ _ = pure unit
|
||||
else instance Deserialize a => FromRow a where
|
||||
minColumnCount _ = 1
|
||||
fromRow r =
|
||||
fromRow _ r =
|
||||
let
|
||||
err = pure $ TypeMismatch "Expected row of length >= 1" "Empty row"
|
||||
in
|
||||
|
@ -1,4 +1,4 @@
|
||||
module Effect.Aff.Postgres.Client (connected, connect, end, exec, execWithStdin, queryWithStdout, query, queryRaw, __connect, __end, __query, __execStreamStdin, __execStreamStdout, module X) where
|
||||
module Effect.Aff.Postgres.Client (connected, connect, end, exec, execWithStdin, queryWithStdout, query, queryRaw, __connect, __end, __query, __execStreamStdin, __execStreamStdout, module Reexport) where
|
||||
|
||||
import Prelude
|
||||
|
||||
@ -7,14 +7,15 @@ import Control.Promise as Promise
|
||||
import Data.Functor (voidRight)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Newtype (wrap)
|
||||
import Data.Postgres (smash)
|
||||
import Data.Postgres.Query (class AsQuery, QueryRaw, asQuery, __queryToRaw)
|
||||
import Data.Postgres.Query (class AsQuery, QueryRaw, __queryToRaw, asQuery, stringQuery)
|
||||
import Data.Postgres.Result (class FromRows, Result, fromRows, rows, rowsAffected)
|
||||
import Effect (Effect)
|
||||
import Effect.Aff (Aff)
|
||||
import Effect.Class (liftEffect)
|
||||
import Effect.Postgres.Client (Client, ClientConfigRaw, Config, Notification, NotificationRaw, __make, __uncfg, endE, errorE, make, noticeE, notificationE) as X
|
||||
import Effect.Postgres.Client (Client, ClientConfigRaw, Config, Notification, NotificationRaw, __make, __uncfg, endE, errorE, make, noticeE, notificationE) as Reexport
|
||||
import Effect.Postgres.Client (Client, Config, make)
|
||||
import Effect.Postgres.Error as E
|
||||
import Effect.Postgres.Error.Except as X
|
||||
import Node.Stream (Readable, Writable)
|
||||
import Prim.Row (class Union)
|
||||
|
||||
@ -23,8 +24,8 @@ import Prim.Row (class Union)
|
||||
-- | The config parameter `r` is `Config` with all keys optional.
|
||||
-- |
|
||||
-- | This is a shorthand for `(voidRight <*> connect) =<< liftEffect (make cfg)`
|
||||
connected :: forall r missing trash. Union r missing (Config trash) => Record r -> Aff Client
|
||||
connected cfg = (voidRight <*> connect) =<< liftEffect (make @r @missing @trash cfg)
|
||||
connected :: forall r missing trash. Union r missing (Config trash) => Record r -> E.Except Aff Client
|
||||
connected cfg = X.with E.Connecting $ (voidRight <*> connect) =<< liftEffect (make @r @missing @trash cfg)
|
||||
|
||||
-- | Connects the client to the database
|
||||
-- |
|
||||
@ -35,41 +36,43 @@ connect = Promise.toAffE <<< __connect
|
||||
-- | Disconnects the client from the database
|
||||
-- |
|
||||
-- | <https://node-postgres.com/apis/client#clientend>
|
||||
end :: Client -> Aff Unit
|
||||
end = Promise.toAffE <<< __end
|
||||
end :: Client -> E.Except Aff Unit
|
||||
end = X.with E.Disconnecting <<< Promise.toAffE <<< __end
|
||||
|
||||
-- | Performs a query, returning the raw `Result` object
|
||||
-- |
|
||||
-- | <https://node-postgres.com/apis/client#clientquery>
|
||||
queryRaw :: forall q. AsQuery q => q -> Client -> Aff Result
|
||||
queryRaw :: forall q. AsQuery q => q -> Client -> E.Except Aff Result
|
||||
queryRaw q c = do
|
||||
q' <- __queryToRaw <$> liftEffect (asQuery q)
|
||||
Promise.toAffE $ __query q' c
|
||||
q' <- X.printing $ asQuery q
|
||||
let q'' = __queryToRaw q'
|
||||
X.executing q' $ Promise.toAffE $ __query q'' c
|
||||
|
||||
-- | Performs a query that we expect to not yield any rows,
|
||||
-- | returning the number of rows affected by the statement.
|
||||
-- |
|
||||
-- | <https://node-postgres.com/apis/client#clientquery>
|
||||
exec :: forall q. AsQuery q => q -> Client -> Aff Int
|
||||
exec :: forall q. AsQuery q => q -> Client -> E.Except Aff Int
|
||||
exec q = map (fromMaybe 0 <<< rowsAffected) <<< queryRaw q
|
||||
|
||||
-- | Performs a query that we expect to yield rows,
|
||||
-- | returning them unmarshalled into destination type `r`.
|
||||
-- |
|
||||
-- | <https://node-postgres.com/apis/client#clientquery>
|
||||
query :: forall q r. AsQuery q => FromRows r => q -> Client -> Aff r
|
||||
query :: forall q r. AsQuery q => FromRows r => q -> Client -> E.Except Aff r
|
||||
query q c = do
|
||||
q' <- X.printing $ asQuery q
|
||||
raw <- queryRaw q c
|
||||
let
|
||||
affected = rowsAffected raw
|
||||
rows' = rows raw
|
||||
liftEffect $ smash $ fromRows (wrap $ fromMaybe 0 affected) rows'
|
||||
X.parsing q' $ fromRows (wrap $ fromMaybe 0 affected) rows'
|
||||
|
||||
execWithStdin :: String -> Client -> Effect (Writable ())
|
||||
execWithStdin q c = __execStreamStdin q c
|
||||
execWithStdin :: String -> Client -> E.Except Effect (Writable ())
|
||||
execWithStdin q c = X.executing (stringQuery q) $ __execStreamStdin q c
|
||||
|
||||
queryWithStdout :: String -> Client -> Effect (Readable ())
|
||||
queryWithStdout q c = __execStreamStdout q c
|
||||
queryWithStdout :: String -> Client -> E.Except Effect (Readable ())
|
||||
queryWithStdout q c = X.executing (stringQuery q) $ __execStreamStdout q c
|
||||
|
||||
-- | FFI binding to `Client#connect`
|
||||
foreign import __connect :: Client -> Effect (Promise Unit)
|
||||
|
@ -7,8 +7,25 @@ import Control.Promise as Promise
|
||||
import Effect (Effect)
|
||||
import Effect.Aff (Aff)
|
||||
import Effect.Postgres.Client (Client)
|
||||
import Effect.Postgres.Error as E
|
||||
import Effect.Postgres.Error.Except as E.Except
|
||||
import Effect.Postgres.Pool (Pool)
|
||||
import Effect.Postgres.Pool as X
|
||||
import Effect.Postgres.Pool
|
||||
(Config
|
||||
, Pool
|
||||
, PoolConfigRaw
|
||||
, acquireE
|
||||
, clientCount
|
||||
, clientIdleCount
|
||||
, clientWaitingCount
|
||||
, connectE
|
||||
, destroy
|
||||
, errorE
|
||||
, make
|
||||
, release
|
||||
, releaseE
|
||||
, removeE
|
||||
) as X
|
||||
|
||||
-- | Acquires a client from the pool.
|
||||
-- |
|
||||
@ -17,8 +34,8 @@ import Effect.Postgres.Pool as X
|
||||
-- | * If the pool is 'full' and all clients are currently checked out will wait in a FIFO queue until a client becomes available by it being released back to the pool.
|
||||
-- |
|
||||
-- | <https://node-postgres.com/apis/pool#poolconnect>
|
||||
connect :: Pool -> Aff Client
|
||||
connect = Promise.toAffE <<< __connect
|
||||
connect :: Pool -> E.Except Aff Client
|
||||
connect = E.Except.with E.Connecting <<< Promise.toAffE <<< __connect
|
||||
|
||||
-- | Drain the pool of all active clients, disconnect them,
|
||||
-- | and shut down any internal timers in the pool.
|
||||
@ -27,8 +44,8 @@ connect = Promise.toAffE <<< __connect
|
||||
-- | your process is attempting to shut down cleanly.
|
||||
-- |
|
||||
-- | <https://node-postgres.com/apis/pool#poolend>
|
||||
end :: Pool -> Aff Unit
|
||||
end = Promise.toAffE <<< __end
|
||||
end :: Pool -> E.Except Aff Unit
|
||||
end = E.Except.with E.Disconnecting <<< Promise.toAffE <<< __end
|
||||
|
||||
-- | FFI binding to `Pool#end`
|
||||
foreign import __end :: Pool -> Effect (Promise Unit)
|
||||
|
30
src/Effect.Postgres.Error.Common.purs
Normal file
30
src/Effect.Postgres.Error.Common.purs
Normal file
@ -0,0 +1,30 @@
|
||||
module Effect.Postgres.Error.Common where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Data.Array.NonEmpty (NonEmptyArray)
|
||||
import Data.Generic.Rep (class Generic)
|
||||
import Data.Postgres.Query (Query)
|
||||
import Data.Show.Generic (genericShow)
|
||||
import Effect.Exception as Effect
|
||||
import Foreign (MultipleErrors)
|
||||
|
||||
type E = NonEmptyArray Error
|
||||
|
||||
data Error
|
||||
= Deserializing Query MultipleErrors
|
||||
| Serializing MultipleErrors
|
||||
| Executing Query Effect.Error
|
||||
| Connecting Effect.Error
|
||||
| Disconnecting Effect.Error
|
||||
| Other Effect.Error
|
||||
|
||||
derive instance Generic Error _
|
||||
instance Show Error where
|
||||
show = genericShow
|
||||
|
||||
toException' :: Error -> Effect.Error
|
||||
toException' = Effect.error <<< show
|
||||
|
||||
toException :: E -> Effect.Error
|
||||
toException = Effect.error <<< show
|
39
src/Effect.Postgres.Error.Except.purs
Normal file
39
src/Effect.Postgres.Error.Except.purs
Normal file
@ -0,0 +1,39 @@
|
||||
module Effect.Postgres.Error.Except where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Error.Class (class MonadError, class MonadThrow, liftEither, try)
|
||||
import Control.Monad.Except (ExceptT(..), runExceptT)
|
||||
import Data.Bifunctor (lmap)
|
||||
import Data.Either (Either)
|
||||
import Data.Postgres (RepT)
|
||||
import Data.Postgres.Query (Query)
|
||||
import Effect.Class (class MonadEffect, liftEffect)
|
||||
import Effect.Exception as Effect
|
||||
import Effect.Postgres.Error.Common (E, Error(..), toException)
|
||||
|
||||
type Except m = ExceptT E m
|
||||
|
||||
run :: forall m a. MonadThrow Effect.Error m => Except m a -> m a
|
||||
run m = liftEither =<< lmap toException <$> runExceptT m
|
||||
|
||||
toEither :: forall m a. Except m a -> m (Either E a)
|
||||
toEither = runExceptT
|
||||
|
||||
with :: forall e m a. MonadError e m => (e -> Error) -> m a -> Except m a
|
||||
with e m = ExceptT $ map (lmap $ pure <<< e) $ try m
|
||||
|
||||
withEither :: forall e m a. Monad m => (e -> Error) -> m (Either e a) -> Except m a
|
||||
withEither e m = ExceptT $ map (lmap $ pure <<< e) $ m
|
||||
|
||||
exception :: forall m a. MonadError Effect.Error m => m a -> Except m a
|
||||
exception = with Other
|
||||
|
||||
parsing :: forall m a. MonadEffect m => Query -> RepT a -> Except m a
|
||||
parsing q = withEither (Deserializing q) <<< liftEffect <<< runExceptT
|
||||
|
||||
printing :: forall m a. MonadEffect m => RepT a -> Except m a
|
||||
printing = withEither Serializing <<< liftEffect <<< runExceptT
|
||||
|
||||
executing :: forall m a. MonadError Effect.Error m => Query -> m a -> Except m a
|
||||
executing q = with (Executing q)
|
116
src/Effect.Postgres.Error.RE.purs
Normal file
116
src/Effect.Postgres.Error.RE.purs
Normal file
@ -0,0 +1,116 @@
|
||||
module Effect.Postgres.Error.RE where
|
||||
|
||||
import Prelude hiding (join)
|
||||
|
||||
import Control.Alt (class Alt)
|
||||
import Control.Monad.Base (class MonadBase)
|
||||
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, liftEither, throwError)
|
||||
import Control.Monad.Except (ExceptT, runExceptT)
|
||||
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, class MonadKill, BracketCondition, bracket, fork, join, kill, never, suspend, uninterruptible)
|
||||
import Control.Monad.Fork.Class as Bracket
|
||||
import Control.Monad.Morph (class MFunctor, class MMonad, embed, hoist)
|
||||
import Control.Monad.Reader (class MonadAsk, class MonadReader, ReaderT(..), runReaderT)
|
||||
import Control.Monad.Rec.Class (class MonadRec)
|
||||
import Control.Monad.Trans.Class (class MonadTrans, lift)
|
||||
import Control.Monad.Unlift (class MonadUnlift, withRunInBase)
|
||||
import Control.Parallel (class Parallel, parallel, sequential)
|
||||
import Data.Bifunctor (lmap)
|
||||
import Data.Either (Either)
|
||||
import Data.Functor.Compose (Compose)
|
||||
import Data.Newtype (class Newtype, unwrap, wrap)
|
||||
import Effect.Aff.Class (class MonadAff)
|
||||
import Effect.Aff.Unlift (class MonadUnliftAff, withRunInAff)
|
||||
import Effect.Class (class MonadEffect)
|
||||
import Effect.Exception as Effect
|
||||
import Effect.Postgres.Error.Common (E, Error(..), toException)
|
||||
import Effect.Postgres.Error.Except (Except)
|
||||
import Effect.Unlift (class MonadUnliftEffect, withRunInEffect)
|
||||
|
||||
-- | `ReaderT` with `ExceptT E`
|
||||
-- |
|
||||
-- | `ReaderT r (ExceptT (NonEmptyList Effect.Postgres.Error.Error) m) a`
|
||||
newtype RE r m a = RE (ReaderT r (ExceptT E m) a)
|
||||
|
||||
newtype ParRE r f a = ParRE (ReaderT r (Compose f (Either E)) a)
|
||||
|
||||
finally :: forall r f m a. MonadBracket Effect.Error f m => RE r m Unit -> RE r m a -> RE r m a
|
||||
finally after m = (m <* after) `catchError` \e -> after *> throwError e
|
||||
|
||||
run :: forall m r a. MonadThrow Effect.Error m => RE r m a -> r -> m a
|
||||
run m r = liftEither =<< lmap toException <$> runExceptT (runReaderT (unwrap m) r)
|
||||
|
||||
toExcept :: forall m r a. RE r m a -> r -> Except m a
|
||||
toExcept m r = runReaderT (unwrap m) r
|
||||
|
||||
toEither :: forall m r a. RE r m a -> r -> m (Either E a)
|
||||
toEither m r = runExceptT $ runReaderT (unwrap m) r
|
||||
|
||||
liftExcept :: forall m r a. Monad m => Except m a -> RE r m a
|
||||
liftExcept = wrap <<< lift
|
||||
|
||||
derive instance Newtype (ParRE r m a) _
|
||||
derive newtype instance Functor m => Functor (ParRE r m)
|
||||
derive newtype instance Apply m => Apply (ParRE r m)
|
||||
derive newtype instance Applicative m => Applicative (ParRE r m)
|
||||
|
||||
derive instance Newtype (RE r m a) _
|
||||
derive newtype instance Monad m => MonadAsk r (RE r m)
|
||||
derive newtype instance Monad m => MonadReader r (RE r m)
|
||||
derive newtype instance Monad m => Functor (RE r m)
|
||||
derive newtype instance Monad m => Apply (RE r m)
|
||||
derive newtype instance Monad m => Applicative (RE r m)
|
||||
derive newtype instance Monad m => Bind (RE r m)
|
||||
derive newtype instance Monad m => Monad (RE r m)
|
||||
derive newtype instance Monad m => MonadError E (RE r m)
|
||||
derive newtype instance Monad m => MonadThrow E (RE r m)
|
||||
derive newtype instance MonadEffect m => MonadEffect (RE r m)
|
||||
derive newtype instance MonadAff m => MonadAff (RE r m)
|
||||
derive newtype instance MonadRec m => MonadRec (RE r m)
|
||||
|
||||
instance Parallel p m => Parallel (ParRE r p) (RE r m) where
|
||||
parallel = wrap <<< parallel <<< unwrap
|
||||
sequential = wrap <<< sequential <<< unwrap
|
||||
|
||||
instance MonadTrans (RE r) where
|
||||
lift = wrap <<< lift <<< lift
|
||||
|
||||
instance MFunctor (RE r) where
|
||||
hoist mn (RE m) = RE $ hoist (hoist mn) m
|
||||
|
||||
instance MMonad (RE r) where
|
||||
embed :: forall n m b. Monad n => (forall a. m a -> RE r n a) -> RE r m b -> RE r n b
|
||||
embed mtn (RE m) =
|
||||
RE $ ReaderT $ \r -> embed (flip runReaderT r <<< unwrap <<< mtn) (runReaderT m r)
|
||||
|
||||
instance Monad m => Alt (RE r m) where
|
||||
alt a b = catchError b (\e -> catchError a (\e' -> throwError $ e <> e'))
|
||||
|
||||
instance (MonadThrow Effect.Error m, MonadUnliftEffect m) => MonadUnliftEffect (RE r m) where
|
||||
withRunInEffect f = RE $ ReaderT $ \r -> lift $ withRunInEffect @m $ \a -> f (a <<< flip run r)
|
||||
|
||||
instance (MonadThrow Effect.Error m, MonadUnliftAff m) => MonadUnliftAff (RE r m) where
|
||||
withRunInAff f = RE $ ReaderT $ \r -> lift $ withRunInAff @m $ \a -> f (a <<< flip run r)
|
||||
|
||||
instance (MonadBase m (RE r m), MonadThrow Effect.Error m) => MonadUnlift m (RE r m) where
|
||||
withRunInBase f = RE $ ReaderT $ \r -> lift $ f (flip run r)
|
||||
|
||||
instance Monad m => MonadBase m (RE r m) where
|
||||
liftBase = lift
|
||||
|
||||
instance (MonadThrow Effect.Error m, MonadFork f m) => MonadFork f (RE r m) where
|
||||
fork m = withRunInBase \f -> fork $ f m
|
||||
suspend m = withRunInBase \f -> suspend $ f m
|
||||
join f = lift $ join f
|
||||
|
||||
instance (MonadKill Effect.Error f m) => MonadKill E f (RE r m) where
|
||||
kill e f = lift $ kill (toException e) f
|
||||
|
||||
instance (MonadBracket Effect.Error f m) => MonadBracket E f (RE r m) where
|
||||
bracket acq rel m = withRunInBase \f -> bracket (f acq) (\c r -> f $ rel ((bracketCondError (pure <<< Other)) c) r) (f <<< m)
|
||||
uninterruptible = hoist uninterruptible
|
||||
never = lift never
|
||||
|
||||
bracketCondError :: forall ea eb a. (ea -> eb) -> BracketCondition ea a -> BracketCondition eb a
|
||||
bracketCondError _ (Bracket.Completed a) = Bracket.Completed a
|
||||
bracketCondError f (Bracket.Failed a) = Bracket.Failed $ f a
|
||||
bracketCondError f (Bracket.Killed a) = Bracket.Killed $ f a
|
5
src/Effect.Postgres.Error.purs
Normal file
5
src/Effect.Postgres.Error.purs
Normal file
@ -0,0 +1,5 @@
|
||||
module Effect.Postgres.Error (module X) where
|
||||
|
||||
import Effect.Postgres.Error.Common as X
|
||||
import Effect.Postgres.Error.RE (RE(..), ParRE(..)) as X
|
||||
import Effect.Postgres.Error.Except (Except) as X
|
@ -9,6 +9,8 @@ import Effect (Effect)
|
||||
import Effect.Exception (Error)
|
||||
import Effect.Postgres.Client (Client)
|
||||
import Effect.Postgres.Client as Client
|
||||
import Effect.Postgres.Error as E
|
||||
import Effect.Postgres.Error.Except as E.Except
|
||||
import Effect.Uncurried (EffectFn2, mkEffectFn1, mkEffectFn2)
|
||||
import Foreign (Foreign, unsafeToForeign)
|
||||
import Node.EventEmitter (EventHandle(..))
|
||||
@ -46,12 +48,12 @@ make r = do
|
||||
__make $ __uncfg { unwrapMillis: unwrap } $ unsafeToForeign asClientConfig
|
||||
|
||||
-- | <https://node-postgres.com/apis/pool#releasing-clients>
|
||||
release :: Pool -> Client -> Effect Unit
|
||||
release p c = __release p c false
|
||||
release :: Pool -> Client -> E.Except Effect Unit
|
||||
release p c = E.Except.with E.Disconnecting $ __release p c false
|
||||
|
||||
-- | <https://node-postgres.com/apis/pool#releasing-clients>
|
||||
destroy :: Pool -> Client -> Effect Unit
|
||||
destroy p c = __release p c true
|
||||
destroy :: Pool -> Client -> E.Except Effect Unit
|
||||
destroy p c = E.Except.with E.Disconnecting $ __release p c true
|
||||
|
||||
-- | <https://node-postgres.com/apis/pool#connect>
|
||||
connectE :: EventHandle1 Pool Client
|
||||
|
@ -2,8 +2,10 @@ module Pipes.Postgres where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Cont (lift)
|
||||
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
|
||||
import Control.Monad.Postgres (class MonadPostgres)
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Postgres (PostgresT)
|
||||
import Control.Monad.Reader (class MonadAsk, ask)
|
||||
import Data.Maybe (Maybe(..))
|
||||
import Effect.Aff.Class (class MonadAff, liftAff)
|
||||
@ -12,6 +14,7 @@ import Effect.Aff.Postgres.Pool (Pool)
|
||||
import Effect.Aff.Postgres.Pool as Pool
|
||||
import Effect.Class (liftEffect)
|
||||
import Effect.Exception (Error)
|
||||
import Effect.Postgres.Error.RE as RE
|
||||
import Node.Buffer (Buffer)
|
||||
import Node.Stream.Object as O
|
||||
import Pipes ((>->))
|
||||
@ -20,46 +23,45 @@ import Pipes.Node.Stream (fromReadable, fromWritable)
|
||||
import Pipes.Prelude as Pipes
|
||||
|
||||
stdin
|
||||
:: forall m s c ct
|
||||
:: forall m
|
||||
. MonadAff m
|
||||
=> MonadError Error m
|
||||
=> MonadAsk Pool m
|
||||
=> MonadPostgres m s c ct
|
||||
=> String
|
||||
-> Consumer (Maybe Buffer) m Unit
|
||||
-> Consumer (Maybe Buffer) (PostgresT m) Unit
|
||||
stdin q = do
|
||||
pool <- ask
|
||||
client <- liftAff $ Pool.connect pool
|
||||
stream <- liftEffect $ Client.execWithStdin q client
|
||||
liftAff $ void $ Client.exec "begin" client
|
||||
pool <- lift ask
|
||||
client <- lift $ RE.liftExcept $ hoist liftAff $ Pool.connect pool
|
||||
stream <- lift $ RE.liftExcept $ hoist liftEffect $ Client.execWithStdin q client
|
||||
lift $ RE.liftExcept $ hoist liftAff $ void $ Client.exec "begin" client
|
||||
let
|
||||
releaseOnEOS Nothing = do
|
||||
liftAff $ void $ Client.exec "commit" client
|
||||
liftEffect $ Pool.release pool client
|
||||
RE.liftExcept $ hoist liftAff $ void $ Client.exec "commit" client
|
||||
RE.liftExcept $ hoist liftEffect $ Pool.release pool client
|
||||
pure Nothing
|
||||
releaseOnEOS (Just a) = pure (Just a)
|
||||
|
||||
pipe = Pipes.mapM releaseOnEOS >-> fromWritable (O.unsafeFromBufferWritable stream)
|
||||
err e = do
|
||||
liftAff $ void $ Client.exec "rollback" client
|
||||
liftEffect $ Pool.release pool client
|
||||
pipe = Pipes.mapM releaseOnEOS >-> hoist lift (fromWritable $ O.unsafeFromBufferWritable stream)
|
||||
err e = lift do
|
||||
RE.liftExcept $ hoist liftAff $ void $ Client.exec "rollback" client
|
||||
RE.liftExcept $ hoist liftEffect $ Pool.release pool client
|
||||
throwError e
|
||||
|
||||
catchError pipe err
|
||||
|
||||
stdout
|
||||
:: forall m s c ct
|
||||
:: forall m
|
||||
. MonadAff m
|
||||
=> MonadThrow Error m
|
||||
=> MonadAsk Pool m
|
||||
=> MonadPostgres m s c ct
|
||||
=> String
|
||||
-> Producer (Maybe Buffer) m Unit
|
||||
-> Producer (Maybe Buffer) (PostgresT m) Unit
|
||||
stdout q = do
|
||||
pool <- ask
|
||||
client <- liftAff $ Pool.connect pool
|
||||
stream <- liftEffect $ Client.queryWithStdout q client
|
||||
pool <- lift ask
|
||||
client <- lift $ RE.liftExcept $ hoist liftAff $ Pool.connect pool
|
||||
stream <- lift $ RE.liftExcept $ hoist liftEffect $ Client.queryWithStdout q client
|
||||
let
|
||||
releaseOnEOS Nothing = liftEffect $ Pool.release pool client $> Nothing
|
||||
releaseOnEOS :: Maybe Buffer -> PostgresT m (Maybe Buffer)
|
||||
releaseOnEOS Nothing = RE.liftExcept $ hoist liftEffect $ Pool.release pool client $> Nothing
|
||||
releaseOnEOS (Just a) = pure (Just a)
|
||||
fromReadable (O.unsafeFromBufferReadable stream) >-> Pipes.mapM releaseOnEOS
|
||||
hoist lift (fromReadable (O.unsafeFromBufferReadable stream))
|
||||
>-> Pipes.mapM releaseOnEOS
|
||||
|
@ -5,7 +5,7 @@ import Prelude
|
||||
import Data.Either (Either(..))
|
||||
import Data.String.Regex (Regex)
|
||||
import Data.String.Regex as Regex
|
||||
import Data.String.Regex.Flags (RegexFlags(..))
|
||||
import Data.String.Regex.Flags (RegexFlags)
|
||||
import Data.Tuple.Nested (type (/\), (/\))
|
||||
import Effect (Effect)
|
||||
import Effect.Aff (Aff, bracket, makeAff)
|
||||
@ -14,14 +14,13 @@ import Effect.Aff.Postgres.Client as Client
|
||||
import Effect.Aff.Postgres.Pool (Pool)
|
||||
import Effect.Aff.Postgres.Pool as Pool
|
||||
import Effect.Class (liftEffect)
|
||||
import Effect.Postgres.Error.Except as X
|
||||
import Effect.Unsafe (unsafePerformEffect)
|
||||
import Node.EventEmitter (EventHandle(..))
|
||||
import Node.EventEmitter (EventHandle)
|
||||
import Node.EventEmitter as EE
|
||||
import Node.Path as Path
|
||||
import Node.Process (cwd)
|
||||
import Partial.Unsafe (unsafePartial)
|
||||
import Record (insert)
|
||||
import Type.Prelude (Proxy(..))
|
||||
|
||||
type Config =
|
||||
{ database :: String
|
||||
@ -41,16 +40,16 @@ withConfig :: (Config -> Aff Unit) -> Aff Unit
|
||||
withConfig f = f =<< liftEffect config
|
||||
|
||||
withClient :: (Client -> Aff Unit) -> Aff Unit
|
||||
withClient = bracket (Client.connected =<< liftEffect config) Client.end
|
||||
withClient = bracket (X.run $ Client.connected =<< liftEffect config) (X.run <<< Client.end)
|
||||
|
||||
pool :: Pool
|
||||
pool = unsafePerformEffect $ Pool.make =<< liftEffect config
|
||||
|
||||
withPool :: (Pool -> Aff Unit) -> Aff Unit
|
||||
withPool = bracket (liftEffect $ Pool.make =<< config) Pool.end
|
||||
withPool = bracket (liftEffect $ Pool.make =<< config) (X.run <<< Pool.end)
|
||||
|
||||
withPoolClient :: (Client -> Aff Unit) -> Aff Unit
|
||||
withPoolClient = bracket (Pool.connect pool) (liftEffect <<< Pool.release pool)
|
||||
withPoolClient = bracket (X.run $ Pool.connect pool) (liftEffect <<< X.run <<< Pool.release pool)
|
||||
|
||||
unsafeFromRight :: forall a b. Either a b -> b
|
||||
unsafeFromRight e = unsafePartial $ case e of Right b -> b
|
||||
|
@ -2,23 +2,32 @@ module Test.Control.Monad.Postgres where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Error.Class (throwError)
|
||||
import Control.Monad.Cont (lift)
|
||||
import Control.Monad.Error.Class (catchError, throwError, try)
|
||||
import Control.Monad.Fork.Class (class MonadBracket, bracket)
|
||||
import Control.Monad.Postgres (PostgresT, exec_, query, runPostgres, session, transaction, cursor, fetch, fetchAll, fetchOne)
|
||||
import Control.Parallel (parTraverse_)
|
||||
import Control.Monad.Postgres (PostgresT, cursor, exec_, fetch, fetchAll, fetchOne, query, runPostgres, transaction)
|
||||
import Control.Monad.Reader (ask)
|
||||
import Control.Parallel (parTraverse)
|
||||
import Data.Array as Array
|
||||
import Data.Array.NonEmpty as Array.NonEmpty
|
||||
import Data.Maybe (Maybe(..), fromJust, maybe)
|
||||
import Data.String.Regex as Regex
|
||||
import Data.String.Regex.Flags as Regex.Flag
|
||||
import Data.Traversable (for_)
|
||||
import Data.Tuple.Nested ((/\))
|
||||
import Effect.Aff (Fiber)
|
||||
import Effect.Aff.Class (class MonadAff)
|
||||
import Effect.Aff.Class (class MonadAff, liftAff)
|
||||
import Effect.Aff.Unlift (UnliftAff(..), askUnliftAff)
|
||||
import Effect.Class (liftEffect)
|
||||
import Effect.Console (log)
|
||||
import Effect.Exception (Error, error)
|
||||
import Effect.Postgres.Error as E
|
||||
import Effect.Postgres.Error.Except as X
|
||||
import Effect.Postgres.Error.RE as RE
|
||||
import Partial.Unsafe (unsafePartial)
|
||||
import Test.Common (re, withConfig)
|
||||
import Test.Spec (Spec, around, describe, it)
|
||||
import Test.Spec.Assertions (expectError, shouldEqual)
|
||||
import Test.Spec.Assertions (shouldEqual)
|
||||
|
||||
withTable :: forall a m. MonadBracket Error Fiber m => MonadAff m => String -> PostgresT m a -> PostgresT m a
|
||||
withTable s m =
|
||||
@ -30,30 +39,41 @@ withTable s m =
|
||||
spec :: Spec Unit
|
||||
spec =
|
||||
around withConfig $ describe "Control.Monad.Postgres" do
|
||||
it "empty works" \cfg -> runPostgres cfg $ pure unit
|
||||
it "connects" \cfg -> runPostgres cfg $ shouldEqual 1 =<< query "select 1"
|
||||
it "multiple sessions serially" \cfg -> runPostgres cfg do
|
||||
shouldEqual 1 =<< query "select 1"
|
||||
shouldEqual 2 =<< query "select 2"
|
||||
it "multiple sessions concurrently" \cfg -> runPostgres cfg do
|
||||
flip parTraverse_ [ 1, 2, 3 ] \_ -> shouldEqual 1 =<< query "select 1"
|
||||
it "transaction commits" \cfg -> runPostgres cfg do
|
||||
it "empty works" \cfg -> X.run $ runPostgres cfg $ pure unit
|
||||
it "connects" \cfg -> shouldEqual 1 =<< X.run (runPostgres cfg $ query "select 1")
|
||||
it "multiple sessions serially" \cfg -> do
|
||||
a /\ b <- X.run $ runPostgres cfg do
|
||||
a <- query "select 1"
|
||||
b <- query "select 2"
|
||||
pure $ a /\ b
|
||||
a `shouldEqual` 1
|
||||
b `shouldEqual` 2
|
||||
it "multiple sessions concurrently" \cfg -> do
|
||||
nums <- X.run $ runPostgres cfg $ parTraverse (\n -> query $ "select $1 :: int" /\ n) (Array.range 1 3)
|
||||
Array.sort nums `shouldEqual` [1, 2, 3]
|
||||
it "transaction commits" \cfg -> do
|
||||
a <- X.run $ runPostgres cfg do
|
||||
exec_ "create temporary table test_txn_commits (id int);"
|
||||
transaction $ exec_ "insert into test_txn_commits values (1);"
|
||||
shouldEqual [ 1 ] =<< query "select * from test_txn_commits"
|
||||
it "transaction rolls back" \cfg -> runPostgres cfg do
|
||||
query "select * from test_txn_commits"
|
||||
a `shouldEqual` [ 1 ]
|
||||
it "transaction rolls back" \cfg -> do
|
||||
a <- X.run $ runPostgres cfg do
|
||||
exec_ "create temporary table test_txn_rolls_back (id int);"
|
||||
exec_ "insert into test_txn_rolls_back values (1);"
|
||||
expectError $ transaction do
|
||||
void $ try $ transaction do
|
||||
exec_ "insert into test_txn_rolls_back values (2);"
|
||||
throwError $ error "foo"
|
||||
shouldEqual [ 1 ] =<< query "select * from test_txn_rolls_back"
|
||||
it "cursor" \cfg -> runPostgres cfg do
|
||||
throwError $ pure $ E.Other $ error "foo"
|
||||
query "select * from test_txn_rolls_back"
|
||||
a `shouldEqual` [1]
|
||||
it "cursor" \cfg ->
|
||||
X.run $ runPostgres cfg do
|
||||
exec_ $ "create temporary table test_cursor_data (id int primary key generated always as identity)"
|
||||
for_ (Array.range 1 50) $ const $ exec_ "insert into test_cursor_data (id) values (default);"
|
||||
cursor @Int "test_cursor" "select id from test_cursor_data" do
|
||||
shouldEqual (Just 1) =<< fetchOne
|
||||
shouldEqual (Just 2) =<< fetchOne
|
||||
shouldEqual (Just 3) =<< fetchOne
|
||||
shouldEqual [ 4, 5, 6, 7, 8 ] =<< fetch 5
|
||||
shouldEqual (Array.range 9 50) =<< fetchAll
|
||||
UnliftAff unliftAff <- askUnliftAff
|
||||
liftAff $ shouldEqual (Just 1) =<< unliftAff fetchOne
|
||||
liftAff $ shouldEqual (Just 2) =<< unliftAff fetchOne
|
||||
liftAff $ shouldEqual (Just 3) =<< unliftAff fetchOne
|
||||
liftAff $ shouldEqual [ 4, 5, 6, 7, 8 ] =<< unliftAff (fetch 5)
|
||||
liftAff $ shouldEqual (Array.range 9 50) =<< unliftAff fetchAll
|
||||
|
@ -31,6 +31,7 @@ import Effect.Aff (Aff)
|
||||
import Effect.Aff.Postgres.Client (exec, query)
|
||||
import Effect.Class (liftEffect)
|
||||
import Effect.Postgres.Client (Client)
|
||||
import Effect.Postgres.Error.Except as X
|
||||
import Effect.Unsafe (unsafePerformEffect)
|
||||
import Foreign (Foreign, unsafeToForeign)
|
||||
import Foreign.Object as Object
|
||||
@ -206,13 +207,13 @@ spec =
|
||||
x' <- Q.param $ fromArb x
|
||||
let val = x' <> " :: " <> sql
|
||||
pure $ "select " <> val
|
||||
void $ exec createtab c
|
||||
void $ X.run $ exec createtab c
|
||||
seed <- liftEffect randomSeed
|
||||
let xs = sample seed 10 (arbitrary @x)
|
||||
flip parTraverse_ xs
|
||||
\x -> do
|
||||
void $ exec (ser x) c
|
||||
res <- query (de x) c
|
||||
void $ X.run $ exec (ser x) c
|
||||
res <- X.run $ query (de x) c
|
||||
let
|
||||
exp = fromArb x
|
||||
act = unsafePartial fromJust $ Array.head res
|
||||
|
@ -12,6 +12,7 @@ import Effect.Aff (forkAff, joinFiber)
|
||||
import Effect.Aff.Postgres.Client (query)
|
||||
import Effect.Aff.Postgres.Client as Client
|
||||
import Effect.Exception as Error
|
||||
import Effect.Postgres.Error.Except as X
|
||||
import Test.Common (onceAff, withClient)
|
||||
import Test.Spec (Spec, around, describe, it)
|
||||
import Test.Spec.Assertions (shouldEqual)
|
||||
@ -23,40 +24,40 @@ spec =
|
||||
describe "events" do
|
||||
it "end" \c -> do
|
||||
expect <- forkAff $ onceAff Client.endE c
|
||||
Client.end c
|
||||
X.run $ Client.end c
|
||||
joinFiber expect
|
||||
it "notice" \c -> do
|
||||
expect <- forkAff do
|
||||
e <- onceAff Client.noticeE c
|
||||
Error.message e `shouldEqual` "hello"
|
||||
void $ Client.exec "do language plpgsql $$ begin raise notice 'hello'; end; $$;" c
|
||||
X.run $ void $ Client.exec "do language plpgsql $$ begin raise notice 'hello'; end; $$;" c
|
||||
joinFiber expect
|
||||
it "notification" \c -> do
|
||||
void $ Client.exec "listen hello;" c
|
||||
X.run $ void $ Client.exec "listen hello;" c
|
||||
expect <- forkAff do
|
||||
n <- onceAff Client.notificationE c
|
||||
n.payload `shouldEqual` (Just "world")
|
||||
void $ Client.exec "notify hello, 'world';" c
|
||||
X.run $ void $ Client.exec "notify hello, 'world';" c
|
||||
joinFiber expect
|
||||
it "connect & end do not throw" $ const $ pure unit
|
||||
describe "query" do
|
||||
it "ok if connected" \c -> shouldEqual [ 1, 2, 3 ] =<< query "select unnest(array[1, 2, 3])" c
|
||||
it "ok if connected" \c -> shouldEqual [ 1, 2, 3 ] =<< X.run (query "select unnest(array[1, 2, 3])" c)
|
||||
it "throws if ended" \c -> do
|
||||
Client.end c
|
||||
res :: Either _ (Array Int) <- try $ query "select 1" c
|
||||
X.run $ Client.end c
|
||||
res :: Either _ (Array Int) <- try $ X.run $ query "select 1" c
|
||||
isLeft res `shouldEqual` true
|
||||
it "rowsAffected is correct" \c -> do
|
||||
void $ Client.exec "create temp table foo (bar int);" c
|
||||
shouldEqual 1 =<< Client.exec "insert into foo values (1);" c
|
||||
shouldEqual 3 =<< Client.exec "insert into foo values (1), (2), (3);" c
|
||||
shouldEqual 4 =<< Client.exec "update foo set bar = 10;" c
|
||||
X.run $ void $ Client.exec "create temp table foo (bar int);" c
|
||||
shouldEqual 1 =<< X.run (Client.exec "insert into foo values (1);" c)
|
||||
shouldEqual 3 =<< X.run (Client.exec "insert into foo values (1), (2), (3);" c)
|
||||
shouldEqual 4 =<< X.run (Client.exec "update foo set bar = 10;" c)
|
||||
describe "timestamp" do
|
||||
it "unmarshals" \c -> do
|
||||
let exp = toDateTimeLossy <$> fromRFC3339String (wrap "2020-01-01T00:00:00Z")
|
||||
shouldEqual exp =<< query "select '2020-01-01T00:00:00Z' :: timestamptz" c
|
||||
it "is string" \c -> shouldEqual "2020-01-01 00:00:00+00" =<< query "select '2020-01-01T00:00:00Z' :: timestamptz" c
|
||||
it "array is string" \c -> shouldEqual [ [ "2020-01-01 00:00:00+00" ] ] =<< query "select array['2020-01-01T00:00:00Z' :: timestamptz]" c
|
||||
shouldEqual exp =<< X.run (query "select '2020-01-01T00:00:00Z' :: timestamptz" c)
|
||||
it "is string" \c -> shouldEqual "2020-01-01 00:00:00+00" =<< X.run (query "select '2020-01-01T00:00:00Z' :: timestamptz" c)
|
||||
it "array is string" \c -> shouldEqual [ [ "2020-01-01 00:00:00+00" ] ] =<< X.run (query "select array['2020-01-01T00:00:00Z' :: timestamptz]" c)
|
||||
describe "json" do
|
||||
it "unmarshals" \c -> shouldEqual (JSON { foo: "bar" }) =<< query "select '{\"foo\": \"bar\"}' :: json" c
|
||||
it "is string" \c -> shouldEqual "{\"foo\": \"bar\"}" =<< query "select '{\"foo\": \"bar\"}' :: json" c
|
||||
it "array is string" \c -> shouldEqual [ [ "{\"foo\": \"bar\"}" ] ] =<< query "select array['{\"foo\": \"bar\"}' :: json]" c
|
||||
it "unmarshals" \c -> shouldEqual (JSON { foo: "bar" }) =<< X.run (query "select '{\"foo\": \"bar\"}' :: json" c)
|
||||
it "is string" \c -> shouldEqual "{\"foo\": \"bar\"}" =<< X.run (query "select '{\"foo\": \"bar\"}' :: json" c)
|
||||
it "array is string" \c -> shouldEqual [ [ "{\"foo\": \"bar\"}" ] ] =<< X.run (query "select array['{\"foo\": \"bar\"}' :: json]" c)
|
||||
|
@ -7,6 +7,7 @@ import Effect.Aff (finally, forkAff, joinFiber)
|
||||
import Effect.Aff.Postgres.Client as Client
|
||||
import Effect.Aff.Postgres.Pool as Pool
|
||||
import Effect.Class (liftEffect)
|
||||
import Effect.Postgres.Error.Except as X
|
||||
import Test.Common (config, onceAff, withPool)
|
||||
import Test.Spec (Spec, around, describe, it)
|
||||
import Test.Spec.Assertions (expectError, shouldEqual)
|
||||
@ -20,78 +21,78 @@ spec = describe "Pool" do
|
||||
void $ liftEffect $ Pool.make cfg
|
||||
around withPool do
|
||||
it "idleCount, totalCount" \p -> do
|
||||
a <- Pool.connect p
|
||||
b <- Pool.connect p
|
||||
c <- Pool.connect p
|
||||
liftEffect $ Pool.release p a
|
||||
liftEffect $ Pool.release p b
|
||||
finally (liftEffect $ Pool.release p c) do
|
||||
a <- X.run $ Pool.connect p
|
||||
b <- X.run $ Pool.connect p
|
||||
c <- X.run $ Pool.connect p
|
||||
liftEffect $ X.run $ Pool.release p a
|
||||
liftEffect $ X.run $ Pool.release p b
|
||||
finally (liftEffect $ X.run $ Pool.release p c) do
|
||||
Pool.clientIdleCount p `shouldEqual` 2
|
||||
Pool.clientCount p `shouldEqual` 3
|
||||
Pool.clientIdleCount p `shouldEqual` 3
|
||||
Pool.clientCount p `shouldEqual` 3
|
||||
it "waitingCount" \p -> do
|
||||
a <- Pool.connect p
|
||||
b <- Pool.connect p
|
||||
c <- Pool.connect p
|
||||
dFiber <- forkAff $ Pool.connect p
|
||||
a <- X.run $ Pool.connect p
|
||||
b <- X.run $ Pool.connect p
|
||||
c <- X.run $ Pool.connect p
|
||||
dFiber <- forkAff $ X.run $ Pool.connect p
|
||||
let
|
||||
rel =
|
||||
do
|
||||
void $ liftEffect $ traverse (Pool.release p) [ a, b, c ]
|
||||
void $ liftEffect $ X.run $ traverse (Pool.release p) [ a, b, c ]
|
||||
d <- joinFiber dFiber
|
||||
liftEffect $ Pool.release p d
|
||||
liftEffect $ X.run $ Pool.release p d
|
||||
finally rel $ Pool.clientWaitingCount p `shouldEqual` 1
|
||||
describe "events" do
|
||||
it "connect" \p -> do
|
||||
expect <- forkAff $ void $ onceAff Pool.connectE p
|
||||
c <- Pool.connect p
|
||||
finally (liftEffect $ Pool.release p c) $ joinFiber expect
|
||||
c <- X.run $ Pool.connect p
|
||||
finally (liftEffect $ X.run $ Pool.release p c) $ joinFiber expect
|
||||
it "acquire" \p -> do
|
||||
c <- Pool.connect p
|
||||
liftEffect $ Pool.release p c
|
||||
c <- X.run$ Pool.connect p
|
||||
liftEffect $ X.run$ Pool.release p c
|
||||
expect <- forkAff do
|
||||
c'' <- onceAff Pool.acquireE p
|
||||
refEq c c'' `shouldEqual` true
|
||||
c' <- Pool.connect p
|
||||
finally (liftEffect $ Pool.release p c') $ joinFiber expect
|
||||
c' <- X.run $ Pool.connect p
|
||||
finally (liftEffect $ X.run$ Pool.release p c') $ joinFiber expect
|
||||
it "release" \p -> do
|
||||
c <- Pool.connect p
|
||||
c <- X.run $ Pool.connect p
|
||||
expect <- forkAff do
|
||||
c' <- onceAff Pool.releaseE p
|
||||
refEq c c' `shouldEqual` true
|
||||
liftEffect $ Pool.release p c
|
||||
liftEffect $ X.run $ Pool.release p c
|
||||
joinFiber expect
|
||||
it "remove" \p -> do
|
||||
c <- Pool.connect p
|
||||
c <- X.run $ Pool.connect p
|
||||
expect <- forkAff do
|
||||
c' <- onceAff Pool.removeE p
|
||||
refEq c c' `shouldEqual` true
|
||||
liftEffect $ Pool.destroy p c
|
||||
liftEffect $ X.run $ Pool.destroy p c
|
||||
joinFiber expect
|
||||
it "connect" \p -> do
|
||||
c <- Pool.connect p
|
||||
let rel = liftEffect $ Pool.release p c
|
||||
finally rel $ shouldEqual 1 =<< Client.query "select 1" c
|
||||
c <- X.run $ Pool.connect p
|
||||
let rel = liftEffect $ X.run $ Pool.release p c
|
||||
finally rel $ shouldEqual 1 =<< X.run (Client.query "select 1" c)
|
||||
describe "destroy" do
|
||||
it "throws on query after destroy" \p -> do
|
||||
c <- Pool.connect p
|
||||
liftEffect $ Pool.destroy p c
|
||||
expectError $ Client.exec "select 1" c
|
||||
c <- X.run $ Pool.connect p
|
||||
liftEffect $ X.run $ Pool.destroy p c
|
||||
expectError $ X.run $ Client.exec "select 1" c
|
||||
it "different client yielded after destroy" \p -> do
|
||||
a <- Pool.connect p
|
||||
liftEffect $ Pool.destroy p a
|
||||
b <- Pool.connect p
|
||||
liftEffect $ Pool.destroy p b
|
||||
a <- X.run $ Pool.connect p
|
||||
liftEffect $ X.run $ Pool.destroy p a
|
||||
b <- X.run $ Pool.connect p
|
||||
liftEffect $ X.run $ Pool.destroy p b
|
||||
refEq a b `shouldEqual` false
|
||||
describe "release" do
|
||||
it "allows reuse" \p -> do
|
||||
a <- Pool.connect p
|
||||
liftEffect $ Pool.release p a
|
||||
b <- Pool.connect p
|
||||
liftEffect $ Pool.release p b
|
||||
a <- X.run $ Pool.connect p
|
||||
liftEffect $ X.run $ Pool.release p a
|
||||
b <- X.run $ Pool.connect p
|
||||
liftEffect $ X.run $ Pool.release p b
|
||||
refEq a b `shouldEqual` true
|
||||
it "throws when invoked twice" \p -> do
|
||||
c <- Pool.connect p
|
||||
liftEffect $ Pool.release p c
|
||||
expectError $ liftEffect $ Pool.release p c
|
||||
c <- X.run $ Pool.connect p
|
||||
liftEffect $ X.run $ Pool.release p c
|
||||
expectError $ liftEffect $ X.run $ Pool.release p c
|
||||
|
Loading…
Reference in New Issue
Block a user