generated from tpl/purs
feat!: rework error handling
This commit is contained in:
parent
c9e5ee9205
commit
83f799ab5d
@ -15,11 +15,13 @@ workspace:
|
|||||||
- foldable-traversable: ">=6.0.0 <7.0.0"
|
- foldable-traversable: ">=6.0.0 <7.0.0"
|
||||||
- foreign: ">=7.0.0 <8.0.0"
|
- foreign: ">=7.0.0 <8.0.0"
|
||||||
- fork: ">=6.0.0 <7.0.0"
|
- fork: ">=6.0.0 <7.0.0"
|
||||||
|
- functors
|
||||||
- integers: ">=6.0.0 <7.0.0"
|
- integers: ">=6.0.0 <7.0.0"
|
||||||
- js-bigints: ">=2.2.1 <3.0.0"
|
- js-bigints: ">=2.2.1 <3.0.0"
|
||||||
- lists: ">=7.0.0 <8.0.0"
|
- lists: ">=7.0.0 <8.0.0"
|
||||||
- maybe: ">=6.0.0 <7.0.0"
|
- maybe: ">=6.0.0 <7.0.0"
|
||||||
- mmorph: ">=7.0.0 <8.0.0"
|
- mmorph: ">=7.0.0 <8.0.0"
|
||||||
|
- monad-control
|
||||||
- newtype: ">=5.0.0 <6.0.0"
|
- newtype: ">=5.0.0 <6.0.0"
|
||||||
- node-buffer: ">=9.0.0 <10.0.0"
|
- node-buffer: ">=9.0.0 <10.0.0"
|
||||||
- node-event-emitter: ">=3.0.0 <4.0.0"
|
- node-event-emitter: ">=3.0.0 <4.0.0"
|
||||||
@ -33,6 +35,7 @@ workspace:
|
|||||||
- prelude: ">=6.0.1 <7.0.0"
|
- prelude: ">=6.0.1 <7.0.0"
|
||||||
- profunctor: ">=6.0.1 <7.0.0"
|
- profunctor: ">=6.0.1 <7.0.0"
|
||||||
- record: ">=4.0.0 <5.0.0"
|
- record: ">=4.0.0 <5.0.0"
|
||||||
|
- refs
|
||||||
- simple-json: ">=9.0.0 <10.0.0"
|
- simple-json: ">=9.0.0 <10.0.0"
|
||||||
- strings: ">=6.0.1 <7.0.0"
|
- strings: ">=6.0.1 <7.0.0"
|
||||||
- tailrec: ">=6.1.0 <7.0.0"
|
- tailrec: ">=6.1.0 <7.0.0"
|
||||||
@ -728,8 +731,8 @@ packages:
|
|||||||
- unsafe-coerce
|
- unsafe-coerce
|
||||||
node-stream-pipes:
|
node-stream-pipes:
|
||||||
type: registry
|
type: registry
|
||||||
version: 2.1.0
|
version: 2.1.1
|
||||||
integrity: sha256-pYBOQY4bGEZzI5UHsUxJAhsKqtmE6CC1sHmFqgj64V8=
|
integrity: sha256-j7ZE+Vtc9gwXlH8s8pyVsbbCMd91AIRk05IOsZAO7x0=
|
||||||
dependencies:
|
dependencies:
|
||||||
- aff
|
- aff
|
||||||
- arrays
|
- arrays
|
||||||
|
@ -11,8 +11,11 @@ package:
|
|||||||
- 'ImplicitQualifiedImport'
|
- 'ImplicitQualifiedImport'
|
||||||
- 'ImplicitQualifiedImportReExport'
|
- 'ImplicitQualifiedImportReExport'
|
||||||
strict: true
|
strict: true
|
||||||
pedanticPackages: true
|
# pedanticPackages: true
|
||||||
dependencies:
|
dependencies:
|
||||||
|
- functors
|
||||||
|
- monad-control
|
||||||
|
- refs
|
||||||
- aff: ">=7.1.0 <8.0.0"
|
- aff: ">=7.1.0 <8.0.0"
|
||||||
- aff-promise: ">=4.0.0 <5.0.0"
|
- aff-promise: ">=4.0.0 <5.0.0"
|
||||||
- arrays: ">=7.3.0 <8.0.0"
|
- arrays: ">=7.3.0 <8.0.0"
|
||||||
|
@ -2,18 +2,13 @@ module Control.Monad.Postgres.Base where
|
|||||||
|
|
||||||
import Prelude
|
import Prelude
|
||||||
|
|
||||||
import Control.Alt (class Alt)
|
import Control.Monad.Error.Class (catchError, throwError)
|
||||||
import Control.Alternative (class Alternative, class Plus)
|
import Control.Monad.Fork.Class (class MonadBracket, bracket)
|
||||||
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
|
import Control.Monad.Morph (hoist)
|
||||||
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, class MonadKill, bracket, kill, never, uninterruptible)
|
|
||||||
import Control.Monad.Morph (class MFunctor, class MMonad)
|
|
||||||
import Control.Monad.Postgres.Cursor (class MonadCursor, CursorT)
|
import Control.Monad.Postgres.Cursor (class MonadCursor, CursorT)
|
||||||
import Control.Monad.Postgres.Session (class MonadSession, SessionT, exec, exec_, query, streamIn, streamOut)
|
import Control.Monad.Postgres.Session (class MonadSession, SessionT, endSession, exec, exec_, startSession)
|
||||||
import Control.Monad.Reader (class MonadAsk, class MonadReader, ReaderT, ask, local, runReaderT)
|
import Control.Monad.Reader (ask, runReaderT)
|
||||||
import Control.Monad.Rec.Class (class MonadRec)
|
import Data.Newtype (unwrap)
|
||||||
import Control.Monad.Trans.Class (class MonadTrans, lift)
|
|
||||||
import Control.Parallel (class Parallel, parallel, sequential)
|
|
||||||
import Data.Newtype (class Newtype, unwrap, wrap)
|
|
||||||
import Data.Postgres (RepT)
|
import Data.Postgres (RepT)
|
||||||
import Data.Postgres.Query (class AsQuery, asQuery)
|
import Data.Postgres.Query (class AsQuery, asQuery)
|
||||||
import Data.Postgres.Raw (Raw)
|
import Data.Postgres.Raw (Raw)
|
||||||
@ -22,9 +17,12 @@ import Data.Tuple.Nested ((/\))
|
|||||||
import Effect.Aff.Class (class MonadAff, liftAff)
|
import Effect.Aff.Class (class MonadAff, liftAff)
|
||||||
import Effect.Aff.Postgres.Pool (Pool)
|
import Effect.Aff.Postgres.Pool (Pool)
|
||||||
import Effect.Aff.Postgres.Pool as Pool
|
import Effect.Aff.Postgres.Pool as Pool
|
||||||
import Effect.Aff.Unlift (class MonadUnliftAff)
|
import Effect.Class (liftEffect)
|
||||||
import Effect.Class (class MonadEffect, liftEffect)
|
import Effect.Exception as Effect
|
||||||
import Effect.Unlift (class MonadUnliftEffect)
|
import Effect.Postgres.Error (RE)
|
||||||
|
import Effect.Postgres.Error as E
|
||||||
|
import Effect.Postgres.Error.Except as X
|
||||||
|
import Effect.Postgres.Error.RE as RE
|
||||||
import Prim.Row (class Union)
|
import Prim.Row (class Union)
|
||||||
|
|
||||||
-- | Monad handling pool resource acquisition & release
|
-- | Monad handling pool resource acquisition & release
|
||||||
@ -51,53 +49,8 @@ import Prim.Row (class Union)
|
|||||||
-- | res <- Client.query "select * from foo" client
|
-- | res <- Client.query "select * from foo" client
|
||||||
-- | pure $ res == 1
|
-- | pure $ res == 1
|
||||||
-- | ```
|
-- | ```
|
||||||
newtype PostgresT :: forall k. (k -> Type) -> k -> Type
|
type PostgresT :: (Type -> Type) -> Type -> Type
|
||||||
newtype PostgresT m a = PostgresT (ReaderT Pool m a)
|
type PostgresT = RE Pool
|
||||||
|
|
||||||
derive instance Newtype (PostgresT m a) _
|
|
||||||
derive newtype instance (Functor m) => Functor (PostgresT m)
|
|
||||||
derive newtype instance (Apply m) => Apply (PostgresT m)
|
|
||||||
derive newtype instance (Alternative m) => Alternative (PostgresT m)
|
|
||||||
derive newtype instance (Applicative m) => Applicative (PostgresT m)
|
|
||||||
derive newtype instance (Plus m) => Plus (PostgresT m)
|
|
||||||
derive newtype instance (Alt m) => Alt (PostgresT m)
|
|
||||||
derive newtype instance (Bind m) => Bind (PostgresT m)
|
|
||||||
derive newtype instance (Monad m) => Monad (PostgresT m)
|
|
||||||
derive newtype instance (MonadEffect m) => MonadEffect (PostgresT m)
|
|
||||||
derive newtype instance (MonadAff m) => MonadAff (PostgresT m)
|
|
||||||
derive newtype instance (MonadUnliftEffect m) => MonadUnliftEffect (PostgresT m)
|
|
||||||
derive newtype instance (MonadUnliftAff m) => MonadUnliftAff (PostgresT m)
|
|
||||||
derive newtype instance MonadRec m => MonadRec (PostgresT m)
|
|
||||||
derive newtype instance MonadTrans (PostgresT)
|
|
||||||
derive newtype instance (MonadThrow e m) => MonadThrow e (PostgresT m)
|
|
||||||
derive newtype instance (MonadError e m) => MonadError e (PostgresT m)
|
|
||||||
derive newtype instance (MonadFork f m) => MonadFork f (PostgresT m)
|
|
||||||
derive newtype instance MFunctor PostgresT
|
|
||||||
derive newtype instance MMonad PostgresT
|
|
||||||
instance (Apply m, Apply p, Parallel p m) => Parallel (PostgresT p) (PostgresT m) where
|
|
||||||
parallel = wrap <<< parallel <<< unwrap
|
|
||||||
sequential = wrap <<< sequential <<< unwrap
|
|
||||||
|
|
||||||
instance (Monad m, MonadKill e f m) => MonadKill e f (PostgresT m) where
|
|
||||||
kill a b = lift $ kill a b
|
|
||||||
|
|
||||||
instance (Monad m, MonadBracket e f (ReaderT Pool m), MonadBracket e f m) => MonadBracket e f (PostgresT m) where
|
|
||||||
bracket acq rel m = wrap $ bracket (unwrap acq) (\a b -> unwrap $ rel a b) (unwrap <<< m)
|
|
||||||
uninterruptible a = wrap $ uninterruptible $ unwrap a
|
|
||||||
never = lift $ never
|
|
||||||
|
|
||||||
instance Monad m => MonadAsk Pool (PostgresT m) where
|
|
||||||
ask = wrap ask
|
|
||||||
|
|
||||||
instance Monad m => MonadReader Pool (PostgresT m) where
|
|
||||||
local f m = wrap $ local f $ unwrap m
|
|
||||||
|
|
||||||
instance (MonadBracket e f m, MonadAff m) => MonadSession (PostgresT m) where
|
|
||||||
query = session <<< query
|
|
||||||
exec = session <<< exec
|
|
||||||
exec_ = session <<< exec_
|
|
||||||
streamIn = session <<< streamIn
|
|
||||||
streamOut = session <<< streamOut
|
|
||||||
|
|
||||||
-- | Typeclass generalizing `PostgresT`. Allows for dependency-injecting different
|
-- | Typeclass generalizing `PostgresT`. Allows for dependency-injecting different
|
||||||
-- | implementations of the idea of a postgres connection.
|
-- | implementations of the idea of a postgres connection.
|
||||||
@ -117,13 +70,23 @@ class (Monad m, MonadSession session, MonadCursor cursor ct) <= MonadPostgres m
|
|||||||
-- | yielded by the cursor
|
-- | yielded by the cursor
|
||||||
cursorWith :: forall q. AsQuery q => (Array Raw -> RepT ct) -> String -> q -> cursor ~> m
|
cursorWith :: forall q. AsQuery q => (Array Raw -> RepT ct) -> String -> q -> cursor ~> m
|
||||||
|
|
||||||
instance (MonadBracket e f m, MonadAff m, MonadSession (SessionT m), MonadCursor (CursorT t (SessionT m)) t) => MonadPostgres (PostgresT m) (SessionT m) (CursorT ct (SessionT m)) ct where
|
instance
|
||||||
|
( MonadBracket Effect.Error f m
|
||||||
|
, MonadAff m
|
||||||
|
, MonadSession (SessionT m)
|
||||||
|
, MonadCursor (CursorT t (SessionT m)) t
|
||||||
|
) => MonadPostgres
|
||||||
|
(PostgresT m)
|
||||||
|
(SessionT m)
|
||||||
|
(CursorT ct (SessionT m))
|
||||||
|
ct
|
||||||
|
where
|
||||||
session m = do
|
session m = do
|
||||||
pool <- ask
|
pool <- ask
|
||||||
let
|
client <- RE.liftExcept $ hoist liftAff $ startSession pool
|
||||||
acq = liftAff $ Pool.connect pool
|
RE.finally
|
||||||
rel _ c = liftEffect $ Pool.release pool c
|
(RE.liftExcept $ hoist liftEffect $ endSession pool client)
|
||||||
lift $ bracket acq rel (runReaderT m)
|
(RE.liftExcept $ RE.toExcept m client)
|
||||||
transaction m =
|
transaction m =
|
||||||
let
|
let
|
||||||
begin = void $ exec "begin;"
|
begin = void $ exec "begin;"
|
||||||
@ -133,27 +96,29 @@ instance (MonadBracket e f m, MonadAff m, MonadSession (SessionT m), MonadCursor
|
|||||||
session $ begin *> catchError commit rollback
|
session $ begin *> catchError commit rollback
|
||||||
cursorWith f cur q m =
|
cursorWith f cur q m =
|
||||||
transaction do
|
transaction do
|
||||||
q' <- liftEffect $ asQuery q
|
q' <- RE.liftExcept $ X.printing $ asQuery q
|
||||||
exec_ $ "declare " <> cur <> " cursor for (" <> (unwrap q').text <> ");"
|
exec_ $ "declare " <> cur <> " cursor for (" <> (unwrap q').text <> ");"
|
||||||
runReaderT (unwrap m) (cur /\ f)
|
runReaderT (unwrap m) (cur /\ f)
|
||||||
|
|
||||||
-- | Create a server-side cursor for a query in a transaction,
|
-- | Create a server-side cursor for a query in a transaction,
|
||||||
-- | and execute a `CursorT` with a view to the new cursor.
|
-- | and execute a `CursorT` with a view to the new cursor.
|
||||||
cursor :: forall @cursort t session cursor q a. MonadPostgres t session cursor cursort => AsQuery q => FromRow cursort => String -> q -> cursor a -> t a
|
cursor :: forall @cursort t session cursor q a. MonadPostgres t session cursor cursort => AsQuery q => FromRow cursort => String -> q -> cursor a -> t a
|
||||||
cursor = cursorWith fromRow
|
cursor = cursorWith (fromRow 0)
|
||||||
|
|
||||||
-- | Execute a `PostgresT` using an existing connection pool.
|
-- | Execute a `PostgresT` using an existing connection pool.
|
||||||
-- |
|
-- |
|
||||||
-- | This will not invoke `Pool.end` after executing.
|
-- | This will not invoke `Pool.end` after executing.
|
||||||
withPool :: forall m a. PostgresT m a -> Pool -> m a
|
withPool :: forall m a. PostgresT m a -> Pool -> E.Except m a
|
||||||
withPool = runReaderT <<< unwrap
|
withPool m p = runReaderT (unwrap m) p
|
||||||
|
|
||||||
-- | Create a new connection pool from the provided config and execute
|
-- | Create a new connection pool from the provided config and execute
|
||||||
-- | the postgres monad, invoking `Effect.Aff.Postgres.Pool.end` afterwards.
|
-- | the postgres monad, invoking `Effect.Aff.Postgres.Pool.end` afterwards.
|
||||||
runPostgres :: forall m a missing trash r e f. MonadBracket e f m => MonadAff m => Union r missing (Pool.Config trash) => Record r -> PostgresT m a -> m a
|
runPostgres :: forall m a missing trash r f. MonadBracket Effect.Error f m => MonadAff m => Union r missing (Pool.Config trash) => Record r -> PostgresT m a -> E.Except m a
|
||||||
runPostgres cfg m =
|
runPostgres cfg m =
|
||||||
let
|
let
|
||||||
|
acq :: RE Unit m Pool
|
||||||
acq = liftEffect $ Pool.make @r @missing @trash cfg
|
acq = liftEffect $ Pool.make @r @missing @trash cfg
|
||||||
rel _ p = liftAff $ Pool.end p
|
rel :: _ -> Pool -> RE Unit m Unit
|
||||||
|
rel _ p = RE.liftExcept $ hoist liftAff $ Pool.end p
|
||||||
in
|
in
|
||||||
bracket acq rel $ withPool m
|
RE.toExcept (bracket acq rel $ RE.liftExcept <<< withPool m) unit
|
||||||
|
@ -22,6 +22,7 @@ import Data.Tuple.Nested (type (/\), (/\))
|
|||||||
import Effect.Aff.Class (class MonadAff)
|
import Effect.Aff.Class (class MonadAff)
|
||||||
import Effect.Aff.Unlift (class MonadUnliftAff)
|
import Effect.Aff.Unlift (class MonadUnliftAff)
|
||||||
import Effect.Class (class MonadEffect, liftEffect)
|
import Effect.Class (class MonadEffect, liftEffect)
|
||||||
|
import Effect.Postgres.Error as E
|
||||||
import Effect.Unlift (class MonadUnliftEffect)
|
import Effect.Unlift (class MonadUnliftEffect)
|
||||||
|
|
||||||
data Move
|
data Move
|
||||||
@ -118,7 +119,7 @@ instance (MonadSession m) => MonadCursor (CursorT t m) t where
|
|||||||
RowsAffected n' <- query $ ("move relative $1 from " <> cur) /\ n
|
RowsAffected n' <- query $ ("move relative $1 from " <> cur) /\ n
|
||||||
pure n'
|
pure n'
|
||||||
|
|
||||||
instance (MonadSession m) => MonadSession (CursorT t m) where
|
instance (MonadThrow E.E m, MonadSession m) => MonadSession (CursorT t m) where
|
||||||
query = lift <<< query
|
query = lift <<< query
|
||||||
exec = lift <<< exec
|
exec = lift <<< exec
|
||||||
exec_ = lift <<< exec_
|
exec_ = lift <<< exec_
|
||||||
|
@ -1,21 +1,50 @@
|
|||||||
module Control.Monad.Postgres.Session where
|
module Control.Monad.Postgres.Session where
|
||||||
|
|
||||||
import Prelude
|
import Prelude hiding (join)
|
||||||
|
|
||||||
import Control.Monad.Reader (ReaderT, ask)
|
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
|
||||||
|
import Control.Monad.Fork.Class (class MonadBracket)
|
||||||
|
import Control.Monad.Morph (hoist)
|
||||||
|
import Control.Monad.Reader (ask)
|
||||||
|
import Control.Monad.Trans.Class (lift)
|
||||||
|
import Data.Newtype (wrap)
|
||||||
import Data.Postgres.Query (class AsQuery)
|
import Data.Postgres.Query (class AsQuery)
|
||||||
import Data.Postgres.Result (class FromRows)
|
import Data.Postgres.Result (class FromRows)
|
||||||
|
import Effect (Effect)
|
||||||
|
import Effect.Aff (Aff)
|
||||||
import Effect.Aff.Class (class MonadAff, liftAff)
|
import Effect.Aff.Class (class MonadAff, liftAff)
|
||||||
import Effect.Aff.Postgres.Client (Client)
|
import Effect.Aff.Postgres.Client (Client)
|
||||||
import Effect.Aff.Postgres.Client as Client
|
import Effect.Aff.Postgres.Client as Client
|
||||||
import Effect.Class (liftEffect)
|
import Effect.Aff.Postgres.Pool (Pool)
|
||||||
import Node.Stream (Writable, Readable)
|
import Effect.Aff.Postgres.Pool as Pool
|
||||||
|
import Effect.Class (class MonadEffect, liftEffect)
|
||||||
|
import Effect.Exception as Effect
|
||||||
|
import Effect.Postgres.Error (RE)
|
||||||
|
import Effect.Postgres.Error as E
|
||||||
|
import Effect.Postgres.Error.Except as X
|
||||||
|
import Effect.Postgres.Error.RE as RE
|
||||||
|
import Effect.Ref as Ref
|
||||||
|
import Node.EventEmitter as Event
|
||||||
|
import Node.Stream (Readable, Stream, Writable)
|
||||||
|
import Node.Stream as Stream
|
||||||
|
|
||||||
type SessionT :: forall k. (k -> Type) -> k -> Type
|
type SessionT :: (Type -> Type) -> Type -> Type
|
||||||
type SessionT = ReaderT Client
|
type SessionT = RE Client
|
||||||
|
|
||||||
|
class MonadStartSession a where
|
||||||
|
startSession :: a -> E.Except Aff Client
|
||||||
|
endSession :: a -> Client -> E.Except Effect Unit
|
||||||
|
|
||||||
|
instance MonadStartSession Pool where
|
||||||
|
startSession = Pool.connect
|
||||||
|
endSession p c = Pool.release p c
|
||||||
|
|
||||||
|
instance MonadStartSession Client where
|
||||||
|
startSession = pure
|
||||||
|
endSession _ _ = pure unit
|
||||||
|
|
||||||
-- | A monad representing a connected session to a database
|
-- | A monad representing a connected session to a database
|
||||||
class MonadAff m <= MonadSession m where
|
class (MonadThrow E.E m, MonadAff m) <= MonadSession m where
|
||||||
-- | Executes a query and unmarshals the result into `r`
|
-- | Executes a query and unmarshals the result into `r`
|
||||||
query :: forall q r. AsQuery q => FromRows r => q -> m r
|
query :: forall q r. AsQuery q => FromRows r => q -> m r
|
||||||
-- | Executes a query and returns the number of rows affected
|
-- | Executes a query and returns the number of rows affected
|
||||||
@ -41,17 +70,44 @@ class MonadAff m <= MonadSession m where
|
|||||||
-- | ```
|
-- | ```
|
||||||
streamOut :: String -> m (Readable ())
|
streamOut :: String -> m (Readable ())
|
||||||
|
|
||||||
instance MonadAff m => MonadSession (SessionT m) where
|
instance (MonadStartSession s, MonadBracket Effect.Error f m, MonadAff m) => MonadSession (RE s m) where
|
||||||
query q = do
|
query q = do
|
||||||
client <- ask
|
pool <- ask
|
||||||
liftAff $ Client.query q client
|
client <- hoist liftAff $ RE.liftExcept $ startSession pool
|
||||||
|
RE.finally
|
||||||
|
(hoist liftEffect $ RE.liftExcept $ endSession pool client)
|
||||||
|
(wrap $ lift $ hoist liftAff $ Client.query q client)
|
||||||
exec q = do
|
exec q = do
|
||||||
client <- ask
|
pool <- ask
|
||||||
liftAff $ Client.exec q client
|
client <- hoist liftAff $ RE.liftExcept $ startSession pool
|
||||||
|
RE.finally
|
||||||
|
(hoist liftEffect $ RE.liftExcept $ endSession pool client)
|
||||||
|
(wrap $ lift $ hoist liftAff $ Client.exec q client)
|
||||||
exec_ = void <<< exec
|
exec_ = void <<< exec
|
||||||
streamIn q = do
|
streamIn q = do
|
||||||
client <- ask
|
pool <- ask
|
||||||
liftEffect $ Client.execWithStdin q client
|
client <- hoist liftAff $ RE.liftExcept $ startSession pool
|
||||||
|
handleStream (X.run $ endSession pool client) (RE.liftExcept $ hoist liftEffect $ Client.execWithStdin q client)
|
||||||
streamOut q = do
|
streamOut q = do
|
||||||
client <- ask
|
pool <- ask
|
||||||
liftEffect $ Client.queryWithStdout q client
|
client <- hoist liftAff $ RE.liftExcept $ startSession pool
|
||||||
|
handleStream (X.run $ endSession pool client) (RE.liftExcept $ hoist liftEffect $ Client.queryWithStdout q client)
|
||||||
|
|
||||||
|
handleStream :: forall e m r. MonadEffect m => MonadError e m => Effect Unit -> m (Stream r) -> m (Stream r)
|
||||||
|
handleStream onError getStream =
|
||||||
|
flip catchError
|
||||||
|
(\e -> liftEffect onError *> throwError e)
|
||||||
|
do
|
||||||
|
stream <- getStream
|
||||||
|
liftEffect $ onErrorOrClose stream onError
|
||||||
|
pure stream
|
||||||
|
|
||||||
|
onErrorOrClose :: forall r. Stream r -> Effect Unit -> Effect Unit
|
||||||
|
onErrorOrClose stream eff = do
|
||||||
|
did <- Ref.new false
|
||||||
|
let
|
||||||
|
onE = do
|
||||||
|
did' <- Ref.read did
|
||||||
|
when (not did') (Ref.write true did *> eff)
|
||||||
|
Event.once_ Stream.errorH (const onE) stream
|
||||||
|
Event.once_ Stream.closeH onE stream
|
||||||
|
@ -2,10 +2,11 @@ module Data.Postgres.Query where
|
|||||||
|
|
||||||
import Prelude
|
import Prelude
|
||||||
|
|
||||||
|
import Control.Monad.Cont (lift)
|
||||||
import Data.Maybe (Maybe(..))
|
import Data.Maybe (Maybe(..))
|
||||||
import Data.Newtype (class Newtype)
|
import Data.Newtype (class Newtype)
|
||||||
import Data.Nullable (Nullable, toNullable)
|
import Data.Nullable (Nullable, toNullable)
|
||||||
import Data.Postgres (class Rep, serialize, smash)
|
import Data.Postgres (class Rep, RepT, serialize, smash)
|
||||||
import Data.Postgres.Raw (Raw)
|
import Data.Postgres.Raw (Raw)
|
||||||
import Data.Tuple.Nested (type (/\), (/\))
|
import Data.Tuple.Nested (type (/\), (/\))
|
||||||
import Effect (Effect)
|
import Effect (Effect)
|
||||||
@ -30,26 +31,29 @@ derive newtype instance Show Query
|
|||||||
emptyQuery :: Query
|
emptyQuery :: Query
|
||||||
emptyQuery = Query { text: "", values: [], name: Nothing }
|
emptyQuery = Query { text: "", values: [], name: Nothing }
|
||||||
|
|
||||||
|
stringQuery :: String -> Query
|
||||||
|
stringQuery s = Query { text: s, values: [], name: Nothing }
|
||||||
|
|
||||||
-- | Any value that can be converted to an array of query parameters
|
-- | Any value that can be converted to an array of query parameters
|
||||||
class AsQueryParams a where
|
class AsQueryParams a where
|
||||||
asQueryParams :: a -> Effect (Array Raw)
|
asQueryParams :: a -> RepT (Array Raw)
|
||||||
|
|
||||||
instance AsQueryParams (Array Raw) where
|
instance AsQueryParams (Array Raw) where
|
||||||
asQueryParams = pure
|
asQueryParams = pure
|
||||||
else instance (Rep a, AsQueryParams b) => AsQueryParams (a /\ b) where
|
else instance (Rep a, AsQueryParams b) => AsQueryParams (a /\ b) where
|
||||||
asQueryParams (a /\ tail) = do
|
asQueryParams (a /\ tail) = do
|
||||||
a' <- map pure $ smash $ serialize a
|
a' <- map pure $ serialize a
|
||||||
tail' <- asQueryParams tail
|
tail' <- asQueryParams tail
|
||||||
pure $ a' <> tail'
|
pure $ a' <> tail'
|
||||||
else instance (Rep a) => AsQueryParams a where
|
else instance (Rep a) => AsQueryParams a where
|
||||||
asQueryParams = map pure <<< smash <<< serialize
|
asQueryParams = map pure <<< serialize
|
||||||
|
|
||||||
-- | Values that can be rendered as a SQL query
|
-- | Values that can be rendered as a SQL query
|
||||||
class AsQuery a where
|
class AsQuery a where
|
||||||
asQuery :: a -> Effect Query
|
asQuery :: a -> RepT Query
|
||||||
|
|
||||||
instance AsQuery a => AsQuery (Effect a) where
|
instance AsQuery a => AsQuery (Effect a) where
|
||||||
asQuery a = asQuery =<< a
|
asQuery a = asQuery =<< lift a
|
||||||
|
|
||||||
instance AsQuery Query where
|
instance AsQuery Query where
|
||||||
asQuery = pure
|
asQuery = pure
|
||||||
|
@ -2,7 +2,7 @@ module Data.Postgres.Result where
|
|||||||
|
|
||||||
import Prelude
|
import Prelude
|
||||||
|
|
||||||
import Control.Monad.Error.Class (liftMaybe, throwError)
|
import Control.Monad.Error.Class (catchError, liftMaybe, throwError)
|
||||||
import Data.Array as Array
|
import Data.Array as Array
|
||||||
import Data.Generic.Rep (class Generic)
|
import Data.Generic.Rep (class Generic)
|
||||||
import Data.Int as Int
|
import Data.Int as Int
|
||||||
@ -43,9 +43,9 @@ class FromRows a where
|
|||||||
instance FromRows RowsAffected where
|
instance FromRows RowsAffected where
|
||||||
fromRows a _ = pure a
|
fromRows a _ = pure a
|
||||||
else instance (FromRow a) => FromRows (Array a) where
|
else instance (FromRow a) => FromRows (Array a) where
|
||||||
fromRows _ = traverse fromRow
|
fromRows _ = traverse (fromRow 0)
|
||||||
else instance (FromRow a) => FromRows (Maybe a) where
|
else instance (FromRow a) => FromRows (Maybe a) where
|
||||||
fromRows _ = map Array.head <<< traverse fromRow
|
fromRows _ = map Array.head <<< traverse (fromRow 0)
|
||||||
else instance (FromRow a) => FromRows a where
|
else instance (FromRow a) => FromRows a where
|
||||||
fromRows a =
|
fromRows a =
|
||||||
let
|
let
|
||||||
@ -89,29 +89,34 @@ class FromRow (a :: Type) where
|
|||||||
-- | Minimum length of row for type `a`
|
-- | Minimum length of row for type `a`
|
||||||
minColumnCount :: forall g. g a -> Int
|
minColumnCount :: forall g. g a -> Int
|
||||||
-- | Performs the conversion
|
-- | Performs the conversion
|
||||||
fromRow :: Array Raw -> RepT a
|
fromRow :: Int -> Array Raw -> RepT a
|
||||||
|
|
||||||
instance (Deserialize a, FromRow b) => FromRow (a /\ b) where
|
instance (Deserialize a, FromRow b) => FromRow (a /\ b) where
|
||||||
minColumnCount _ = minColumnCount (Proxy @b) + 1
|
minColumnCount _ = minColumnCount (Proxy @b) + 1
|
||||||
fromRow r =
|
fromRow n r =
|
||||||
let
|
let
|
||||||
minLen = minColumnCount (Proxy @(Tuple a b))
|
minLen = minColumnCount (Proxy @(Tuple a b))
|
||||||
lengthMismatch = pure $ TypeMismatch ("Expected row to have at least " <> show minLen <> " columns") ("Found row of length " <> show (Array.length r))
|
lengthMismatch = pure $ TypeMismatch ("Expected row to have at least " <> show minLen <> " columns") ("Found row of length " <> show (Array.length r))
|
||||||
in
|
in
|
||||||
do
|
do
|
||||||
|
let
|
||||||
|
de a =
|
||||||
|
catchError
|
||||||
|
(deserialize @a a)
|
||||||
|
(\e -> throwError $ ErrorAtIndex n <$> e)
|
||||||
when (Array.length r < minLen) (throwError lengthMismatch)
|
when (Array.length r < minLen) (throwError lengthMismatch)
|
||||||
a <- deserialize =<< liftMaybe lengthMismatch (Array.head r)
|
a <- de =<< liftMaybe lengthMismatch (Array.head r)
|
||||||
b <- fromRow =<< liftMaybe lengthMismatch (Array.tail r)
|
b <- fromRow (n + 1) =<< liftMaybe lengthMismatch (Array.tail r)
|
||||||
pure $ a /\ b
|
pure $ a /\ b
|
||||||
else instance FromRow (Array Raw) where
|
else instance FromRow (Array Raw) where
|
||||||
minColumnCount _ = 0
|
minColumnCount _ = 0
|
||||||
fromRow = pure
|
fromRow _ = pure
|
||||||
else instance FromRow Unit where
|
else instance FromRow Unit where
|
||||||
minColumnCount _ = 0
|
minColumnCount _ = 0
|
||||||
fromRow _ = pure unit
|
fromRow _ _ = pure unit
|
||||||
else instance Deserialize a => FromRow a where
|
else instance Deserialize a => FromRow a where
|
||||||
minColumnCount _ = 1
|
minColumnCount _ = 1
|
||||||
fromRow r =
|
fromRow _ r =
|
||||||
let
|
let
|
||||||
err = pure $ TypeMismatch "Expected row of length >= 1" "Empty row"
|
err = pure $ TypeMismatch "Expected row of length >= 1" "Empty row"
|
||||||
in
|
in
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
module Effect.Aff.Postgres.Client (connected, connect, end, exec, execWithStdin, queryWithStdout, query, queryRaw, __connect, __end, __query, __execStreamStdin, __execStreamStdout, module X) where
|
module Effect.Aff.Postgres.Client (connected, connect, end, exec, execWithStdin, queryWithStdout, query, queryRaw, __connect, __end, __query, __execStreamStdin, __execStreamStdout, module Reexport) where
|
||||||
|
|
||||||
import Prelude
|
import Prelude
|
||||||
|
|
||||||
@ -7,14 +7,15 @@ import Control.Promise as Promise
|
|||||||
import Data.Functor (voidRight)
|
import Data.Functor (voidRight)
|
||||||
import Data.Maybe (fromMaybe)
|
import Data.Maybe (fromMaybe)
|
||||||
import Data.Newtype (wrap)
|
import Data.Newtype (wrap)
|
||||||
import Data.Postgres (smash)
|
import Data.Postgres.Query (class AsQuery, QueryRaw, __queryToRaw, asQuery, stringQuery)
|
||||||
import Data.Postgres.Query (class AsQuery, QueryRaw, asQuery, __queryToRaw)
|
|
||||||
import Data.Postgres.Result (class FromRows, Result, fromRows, rows, rowsAffected)
|
import Data.Postgres.Result (class FromRows, Result, fromRows, rows, rowsAffected)
|
||||||
import Effect (Effect)
|
import Effect (Effect)
|
||||||
import Effect.Aff (Aff)
|
import Effect.Aff (Aff)
|
||||||
import Effect.Class (liftEffect)
|
import Effect.Class (liftEffect)
|
||||||
import Effect.Postgres.Client (Client, ClientConfigRaw, Config, Notification, NotificationRaw, __make, __uncfg, endE, errorE, make, noticeE, notificationE) as X
|
import Effect.Postgres.Client (Client, ClientConfigRaw, Config, Notification, NotificationRaw, __make, __uncfg, endE, errorE, make, noticeE, notificationE) as Reexport
|
||||||
import Effect.Postgres.Client (Client, Config, make)
|
import Effect.Postgres.Client (Client, Config, make)
|
||||||
|
import Effect.Postgres.Error as E
|
||||||
|
import Effect.Postgres.Error.Except as X
|
||||||
import Node.Stream (Readable, Writable)
|
import Node.Stream (Readable, Writable)
|
||||||
import Prim.Row (class Union)
|
import Prim.Row (class Union)
|
||||||
|
|
||||||
@ -23,8 +24,8 @@ import Prim.Row (class Union)
|
|||||||
-- | The config parameter `r` is `Config` with all keys optional.
|
-- | The config parameter `r` is `Config` with all keys optional.
|
||||||
-- |
|
-- |
|
||||||
-- | This is a shorthand for `(voidRight <*> connect) =<< liftEffect (make cfg)`
|
-- | This is a shorthand for `(voidRight <*> connect) =<< liftEffect (make cfg)`
|
||||||
connected :: forall r missing trash. Union r missing (Config trash) => Record r -> Aff Client
|
connected :: forall r missing trash. Union r missing (Config trash) => Record r -> E.Except Aff Client
|
||||||
connected cfg = (voidRight <*> connect) =<< liftEffect (make @r @missing @trash cfg)
|
connected cfg = X.with E.Connecting $ (voidRight <*> connect) =<< liftEffect (make @r @missing @trash cfg)
|
||||||
|
|
||||||
-- | Connects the client to the database
|
-- | Connects the client to the database
|
||||||
-- |
|
-- |
|
||||||
@ -35,41 +36,43 @@ connect = Promise.toAffE <<< __connect
|
|||||||
-- | Disconnects the client from the database
|
-- | Disconnects the client from the database
|
||||||
-- |
|
-- |
|
||||||
-- | <https://node-postgres.com/apis/client#clientend>
|
-- | <https://node-postgres.com/apis/client#clientend>
|
||||||
end :: Client -> Aff Unit
|
end :: Client -> E.Except Aff Unit
|
||||||
end = Promise.toAffE <<< __end
|
end = X.with E.Disconnecting <<< Promise.toAffE <<< __end
|
||||||
|
|
||||||
-- | Performs a query, returning the raw `Result` object
|
-- | Performs a query, returning the raw `Result` object
|
||||||
-- |
|
-- |
|
||||||
-- | <https://node-postgres.com/apis/client#clientquery>
|
-- | <https://node-postgres.com/apis/client#clientquery>
|
||||||
queryRaw :: forall q. AsQuery q => q -> Client -> Aff Result
|
queryRaw :: forall q. AsQuery q => q -> Client -> E.Except Aff Result
|
||||||
queryRaw q c = do
|
queryRaw q c = do
|
||||||
q' <- __queryToRaw <$> liftEffect (asQuery q)
|
q' <- X.printing $ asQuery q
|
||||||
Promise.toAffE $ __query q' c
|
let q'' = __queryToRaw q'
|
||||||
|
X.executing q' $ Promise.toAffE $ __query q'' c
|
||||||
|
|
||||||
-- | Performs a query that we expect to not yield any rows,
|
-- | Performs a query that we expect to not yield any rows,
|
||||||
-- | returning the number of rows affected by the statement.
|
-- | returning the number of rows affected by the statement.
|
||||||
-- |
|
-- |
|
||||||
-- | <https://node-postgres.com/apis/client#clientquery>
|
-- | <https://node-postgres.com/apis/client#clientquery>
|
||||||
exec :: forall q. AsQuery q => q -> Client -> Aff Int
|
exec :: forall q. AsQuery q => q -> Client -> E.Except Aff Int
|
||||||
exec q = map (fromMaybe 0 <<< rowsAffected) <<< queryRaw q
|
exec q = map (fromMaybe 0 <<< rowsAffected) <<< queryRaw q
|
||||||
|
|
||||||
-- | Performs a query that we expect to yield rows,
|
-- | Performs a query that we expect to yield rows,
|
||||||
-- | returning them unmarshalled into destination type `r`.
|
-- | returning them unmarshalled into destination type `r`.
|
||||||
-- |
|
-- |
|
||||||
-- | <https://node-postgres.com/apis/client#clientquery>
|
-- | <https://node-postgres.com/apis/client#clientquery>
|
||||||
query :: forall q r. AsQuery q => FromRows r => q -> Client -> Aff r
|
query :: forall q r. AsQuery q => FromRows r => q -> Client -> E.Except Aff r
|
||||||
query q c = do
|
query q c = do
|
||||||
|
q' <- X.printing $ asQuery q
|
||||||
raw <- queryRaw q c
|
raw <- queryRaw q c
|
||||||
let
|
let
|
||||||
affected = rowsAffected raw
|
affected = rowsAffected raw
|
||||||
rows' = rows raw
|
rows' = rows raw
|
||||||
liftEffect $ smash $ fromRows (wrap $ fromMaybe 0 affected) rows'
|
X.parsing q' $ fromRows (wrap $ fromMaybe 0 affected) rows'
|
||||||
|
|
||||||
execWithStdin :: String -> Client -> Effect (Writable ())
|
execWithStdin :: String -> Client -> E.Except Effect (Writable ())
|
||||||
execWithStdin q c = __execStreamStdin q c
|
execWithStdin q c = X.executing (stringQuery q) $ __execStreamStdin q c
|
||||||
|
|
||||||
queryWithStdout :: String -> Client -> Effect (Readable ())
|
queryWithStdout :: String -> Client -> E.Except Effect (Readable ())
|
||||||
queryWithStdout q c = __execStreamStdout q c
|
queryWithStdout q c = X.executing (stringQuery q) $ __execStreamStdout q c
|
||||||
|
|
||||||
-- | FFI binding to `Client#connect`
|
-- | FFI binding to `Client#connect`
|
||||||
foreign import __connect :: Client -> Effect (Promise Unit)
|
foreign import __connect :: Client -> Effect (Promise Unit)
|
||||||
|
@ -7,8 +7,25 @@ import Control.Promise as Promise
|
|||||||
import Effect (Effect)
|
import Effect (Effect)
|
||||||
import Effect.Aff (Aff)
|
import Effect.Aff (Aff)
|
||||||
import Effect.Postgres.Client (Client)
|
import Effect.Postgres.Client (Client)
|
||||||
|
import Effect.Postgres.Error as E
|
||||||
|
import Effect.Postgres.Error.Except as E.Except
|
||||||
import Effect.Postgres.Pool (Pool)
|
import Effect.Postgres.Pool (Pool)
|
||||||
import Effect.Postgres.Pool as X
|
import Effect.Postgres.Pool
|
||||||
|
(Config
|
||||||
|
, Pool
|
||||||
|
, PoolConfigRaw
|
||||||
|
, acquireE
|
||||||
|
, clientCount
|
||||||
|
, clientIdleCount
|
||||||
|
, clientWaitingCount
|
||||||
|
, connectE
|
||||||
|
, destroy
|
||||||
|
, errorE
|
||||||
|
, make
|
||||||
|
, release
|
||||||
|
, releaseE
|
||||||
|
, removeE
|
||||||
|
) as X
|
||||||
|
|
||||||
-- | Acquires a client from the pool.
|
-- | Acquires a client from the pool.
|
||||||
-- |
|
-- |
|
||||||
@ -17,8 +34,8 @@ import Effect.Postgres.Pool as X
|
|||||||
-- | * If the pool is 'full' and all clients are currently checked out will wait in a FIFO queue until a client becomes available by it being released back to the pool.
|
-- | * If the pool is 'full' and all clients are currently checked out will wait in a FIFO queue until a client becomes available by it being released back to the pool.
|
||||||
-- |
|
-- |
|
||||||
-- | <https://node-postgres.com/apis/pool#poolconnect>
|
-- | <https://node-postgres.com/apis/pool#poolconnect>
|
||||||
connect :: Pool -> Aff Client
|
connect :: Pool -> E.Except Aff Client
|
||||||
connect = Promise.toAffE <<< __connect
|
connect = E.Except.with E.Connecting <<< Promise.toAffE <<< __connect
|
||||||
|
|
||||||
-- | Drain the pool of all active clients, disconnect them,
|
-- | Drain the pool of all active clients, disconnect them,
|
||||||
-- | and shut down any internal timers in the pool.
|
-- | and shut down any internal timers in the pool.
|
||||||
@ -27,8 +44,8 @@ connect = Promise.toAffE <<< __connect
|
|||||||
-- | your process is attempting to shut down cleanly.
|
-- | your process is attempting to shut down cleanly.
|
||||||
-- |
|
-- |
|
||||||
-- | <https://node-postgres.com/apis/pool#poolend>
|
-- | <https://node-postgres.com/apis/pool#poolend>
|
||||||
end :: Pool -> Aff Unit
|
end :: Pool -> E.Except Aff Unit
|
||||||
end = Promise.toAffE <<< __end
|
end = E.Except.with E.Disconnecting <<< Promise.toAffE <<< __end
|
||||||
|
|
||||||
-- | FFI binding to `Pool#end`
|
-- | FFI binding to `Pool#end`
|
||||||
foreign import __end :: Pool -> Effect (Promise Unit)
|
foreign import __end :: Pool -> Effect (Promise Unit)
|
||||||
|
30
src/Effect.Postgres.Error.Common.purs
Normal file
30
src/Effect.Postgres.Error.Common.purs
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
module Effect.Postgres.Error.Common where
|
||||||
|
|
||||||
|
import Prelude
|
||||||
|
|
||||||
|
import Data.Array.NonEmpty (NonEmptyArray)
|
||||||
|
import Data.Generic.Rep (class Generic)
|
||||||
|
import Data.Postgres.Query (Query)
|
||||||
|
import Data.Show.Generic (genericShow)
|
||||||
|
import Effect.Exception as Effect
|
||||||
|
import Foreign (MultipleErrors)
|
||||||
|
|
||||||
|
type E = NonEmptyArray Error
|
||||||
|
|
||||||
|
data Error
|
||||||
|
= Deserializing Query MultipleErrors
|
||||||
|
| Serializing MultipleErrors
|
||||||
|
| Executing Query Effect.Error
|
||||||
|
| Connecting Effect.Error
|
||||||
|
| Disconnecting Effect.Error
|
||||||
|
| Other Effect.Error
|
||||||
|
|
||||||
|
derive instance Generic Error _
|
||||||
|
instance Show Error where
|
||||||
|
show = genericShow
|
||||||
|
|
||||||
|
toException' :: Error -> Effect.Error
|
||||||
|
toException' = Effect.error <<< show
|
||||||
|
|
||||||
|
toException :: E -> Effect.Error
|
||||||
|
toException = Effect.error <<< show
|
39
src/Effect.Postgres.Error.Except.purs
Normal file
39
src/Effect.Postgres.Error.Except.purs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
module Effect.Postgres.Error.Except where
|
||||||
|
|
||||||
|
import Prelude
|
||||||
|
|
||||||
|
import Control.Monad.Error.Class (class MonadError, class MonadThrow, liftEither, try)
|
||||||
|
import Control.Monad.Except (ExceptT(..), runExceptT)
|
||||||
|
import Data.Bifunctor (lmap)
|
||||||
|
import Data.Either (Either)
|
||||||
|
import Data.Postgres (RepT)
|
||||||
|
import Data.Postgres.Query (Query)
|
||||||
|
import Effect.Class (class MonadEffect, liftEffect)
|
||||||
|
import Effect.Exception as Effect
|
||||||
|
import Effect.Postgres.Error.Common (E, Error(..), toException)
|
||||||
|
|
||||||
|
type Except m = ExceptT E m
|
||||||
|
|
||||||
|
run :: forall m a. MonadThrow Effect.Error m => Except m a -> m a
|
||||||
|
run m = liftEither =<< lmap toException <$> runExceptT m
|
||||||
|
|
||||||
|
toEither :: forall m a. Except m a -> m (Either E a)
|
||||||
|
toEither = runExceptT
|
||||||
|
|
||||||
|
with :: forall e m a. MonadError e m => (e -> Error) -> m a -> Except m a
|
||||||
|
with e m = ExceptT $ map (lmap $ pure <<< e) $ try m
|
||||||
|
|
||||||
|
withEither :: forall e m a. Monad m => (e -> Error) -> m (Either e a) -> Except m a
|
||||||
|
withEither e m = ExceptT $ map (lmap $ pure <<< e) $ m
|
||||||
|
|
||||||
|
exception :: forall m a. MonadError Effect.Error m => m a -> Except m a
|
||||||
|
exception = with Other
|
||||||
|
|
||||||
|
parsing :: forall m a. MonadEffect m => Query -> RepT a -> Except m a
|
||||||
|
parsing q = withEither (Deserializing q) <<< liftEffect <<< runExceptT
|
||||||
|
|
||||||
|
printing :: forall m a. MonadEffect m => RepT a -> Except m a
|
||||||
|
printing = withEither Serializing <<< liftEffect <<< runExceptT
|
||||||
|
|
||||||
|
executing :: forall m a. MonadError Effect.Error m => Query -> m a -> Except m a
|
||||||
|
executing q = with (Executing q)
|
116
src/Effect.Postgres.Error.RE.purs
Normal file
116
src/Effect.Postgres.Error.RE.purs
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
module Effect.Postgres.Error.RE where
|
||||||
|
|
||||||
|
import Prelude hiding (join)
|
||||||
|
|
||||||
|
import Control.Alt (class Alt)
|
||||||
|
import Control.Monad.Base (class MonadBase)
|
||||||
|
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, liftEither, throwError)
|
||||||
|
import Control.Monad.Except (ExceptT, runExceptT)
|
||||||
|
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, class MonadKill, BracketCondition, bracket, fork, join, kill, never, suspend, uninterruptible)
|
||||||
|
import Control.Monad.Fork.Class as Bracket
|
||||||
|
import Control.Monad.Morph (class MFunctor, class MMonad, embed, hoist)
|
||||||
|
import Control.Monad.Reader (class MonadAsk, class MonadReader, ReaderT(..), runReaderT)
|
||||||
|
import Control.Monad.Rec.Class (class MonadRec)
|
||||||
|
import Control.Monad.Trans.Class (class MonadTrans, lift)
|
||||||
|
import Control.Monad.Unlift (class MonadUnlift, withRunInBase)
|
||||||
|
import Control.Parallel (class Parallel, parallel, sequential)
|
||||||
|
import Data.Bifunctor (lmap)
|
||||||
|
import Data.Either (Either)
|
||||||
|
import Data.Functor.Compose (Compose)
|
||||||
|
import Data.Newtype (class Newtype, unwrap, wrap)
|
||||||
|
import Effect.Aff.Class (class MonadAff)
|
||||||
|
import Effect.Aff.Unlift (class MonadUnliftAff, withRunInAff)
|
||||||
|
import Effect.Class (class MonadEffect)
|
||||||
|
import Effect.Exception as Effect
|
||||||
|
import Effect.Postgres.Error.Common (E, Error(..), toException)
|
||||||
|
import Effect.Postgres.Error.Except (Except)
|
||||||
|
import Effect.Unlift (class MonadUnliftEffect, withRunInEffect)
|
||||||
|
|
||||||
|
-- | `ReaderT` with `ExceptT E`
|
||||||
|
-- |
|
||||||
|
-- | `ReaderT r (ExceptT (NonEmptyList Effect.Postgres.Error.Error) m) a`
|
||||||
|
newtype RE r m a = RE (ReaderT r (ExceptT E m) a)
|
||||||
|
|
||||||
|
newtype ParRE r f a = ParRE (ReaderT r (Compose f (Either E)) a)
|
||||||
|
|
||||||
|
finally :: forall r f m a. MonadBracket Effect.Error f m => RE r m Unit -> RE r m a -> RE r m a
|
||||||
|
finally after m = (m <* after) `catchError` \e -> after *> throwError e
|
||||||
|
|
||||||
|
run :: forall m r a. MonadThrow Effect.Error m => RE r m a -> r -> m a
|
||||||
|
run m r = liftEither =<< lmap toException <$> runExceptT (runReaderT (unwrap m) r)
|
||||||
|
|
||||||
|
toExcept :: forall m r a. RE r m a -> r -> Except m a
|
||||||
|
toExcept m r = runReaderT (unwrap m) r
|
||||||
|
|
||||||
|
toEither :: forall m r a. RE r m a -> r -> m (Either E a)
|
||||||
|
toEither m r = runExceptT $ runReaderT (unwrap m) r
|
||||||
|
|
||||||
|
liftExcept :: forall m r a. Monad m => Except m a -> RE r m a
|
||||||
|
liftExcept = wrap <<< lift
|
||||||
|
|
||||||
|
derive instance Newtype (ParRE r m a) _
|
||||||
|
derive newtype instance Functor m => Functor (ParRE r m)
|
||||||
|
derive newtype instance Apply m => Apply (ParRE r m)
|
||||||
|
derive newtype instance Applicative m => Applicative (ParRE r m)
|
||||||
|
|
||||||
|
derive instance Newtype (RE r m a) _
|
||||||
|
derive newtype instance Monad m => MonadAsk r (RE r m)
|
||||||
|
derive newtype instance Monad m => MonadReader r (RE r m)
|
||||||
|
derive newtype instance Monad m => Functor (RE r m)
|
||||||
|
derive newtype instance Monad m => Apply (RE r m)
|
||||||
|
derive newtype instance Monad m => Applicative (RE r m)
|
||||||
|
derive newtype instance Monad m => Bind (RE r m)
|
||||||
|
derive newtype instance Monad m => Monad (RE r m)
|
||||||
|
derive newtype instance Monad m => MonadError E (RE r m)
|
||||||
|
derive newtype instance Monad m => MonadThrow E (RE r m)
|
||||||
|
derive newtype instance MonadEffect m => MonadEffect (RE r m)
|
||||||
|
derive newtype instance MonadAff m => MonadAff (RE r m)
|
||||||
|
derive newtype instance MonadRec m => MonadRec (RE r m)
|
||||||
|
|
||||||
|
instance Parallel p m => Parallel (ParRE r p) (RE r m) where
|
||||||
|
parallel = wrap <<< parallel <<< unwrap
|
||||||
|
sequential = wrap <<< sequential <<< unwrap
|
||||||
|
|
||||||
|
instance MonadTrans (RE r) where
|
||||||
|
lift = wrap <<< lift <<< lift
|
||||||
|
|
||||||
|
instance MFunctor (RE r) where
|
||||||
|
hoist mn (RE m) = RE $ hoist (hoist mn) m
|
||||||
|
|
||||||
|
instance MMonad (RE r) where
|
||||||
|
embed :: forall n m b. Monad n => (forall a. m a -> RE r n a) -> RE r m b -> RE r n b
|
||||||
|
embed mtn (RE m) =
|
||||||
|
RE $ ReaderT $ \r -> embed (flip runReaderT r <<< unwrap <<< mtn) (runReaderT m r)
|
||||||
|
|
||||||
|
instance Monad m => Alt (RE r m) where
|
||||||
|
alt a b = catchError b (\e -> catchError a (\e' -> throwError $ e <> e'))
|
||||||
|
|
||||||
|
instance (MonadThrow Effect.Error m, MonadUnliftEffect m) => MonadUnliftEffect (RE r m) where
|
||||||
|
withRunInEffect f = RE $ ReaderT $ \r -> lift $ withRunInEffect @m $ \a -> f (a <<< flip run r)
|
||||||
|
|
||||||
|
instance (MonadThrow Effect.Error m, MonadUnliftAff m) => MonadUnliftAff (RE r m) where
|
||||||
|
withRunInAff f = RE $ ReaderT $ \r -> lift $ withRunInAff @m $ \a -> f (a <<< flip run r)
|
||||||
|
|
||||||
|
instance (MonadBase m (RE r m), MonadThrow Effect.Error m) => MonadUnlift m (RE r m) where
|
||||||
|
withRunInBase f = RE $ ReaderT $ \r -> lift $ f (flip run r)
|
||||||
|
|
||||||
|
instance Monad m => MonadBase m (RE r m) where
|
||||||
|
liftBase = lift
|
||||||
|
|
||||||
|
instance (MonadThrow Effect.Error m, MonadFork f m) => MonadFork f (RE r m) where
|
||||||
|
fork m = withRunInBase \f -> fork $ f m
|
||||||
|
suspend m = withRunInBase \f -> suspend $ f m
|
||||||
|
join f = lift $ join f
|
||||||
|
|
||||||
|
instance (MonadKill Effect.Error f m) => MonadKill E f (RE r m) where
|
||||||
|
kill e f = lift $ kill (toException e) f
|
||||||
|
|
||||||
|
instance (MonadBracket Effect.Error f m) => MonadBracket E f (RE r m) where
|
||||||
|
bracket acq rel m = withRunInBase \f -> bracket (f acq) (\c r -> f $ rel ((bracketCondError (pure <<< Other)) c) r) (f <<< m)
|
||||||
|
uninterruptible = hoist uninterruptible
|
||||||
|
never = lift never
|
||||||
|
|
||||||
|
bracketCondError :: forall ea eb a. (ea -> eb) -> BracketCondition ea a -> BracketCondition eb a
|
||||||
|
bracketCondError _ (Bracket.Completed a) = Bracket.Completed a
|
||||||
|
bracketCondError f (Bracket.Failed a) = Bracket.Failed $ f a
|
||||||
|
bracketCondError f (Bracket.Killed a) = Bracket.Killed $ f a
|
5
src/Effect.Postgres.Error.purs
Normal file
5
src/Effect.Postgres.Error.purs
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
module Effect.Postgres.Error (module X) where
|
||||||
|
|
||||||
|
import Effect.Postgres.Error.Common as X
|
||||||
|
import Effect.Postgres.Error.RE (RE(..), ParRE(..)) as X
|
||||||
|
import Effect.Postgres.Error.Except (Except) as X
|
@ -9,6 +9,8 @@ import Effect (Effect)
|
|||||||
import Effect.Exception (Error)
|
import Effect.Exception (Error)
|
||||||
import Effect.Postgres.Client (Client)
|
import Effect.Postgres.Client (Client)
|
||||||
import Effect.Postgres.Client as Client
|
import Effect.Postgres.Client as Client
|
||||||
|
import Effect.Postgres.Error as E
|
||||||
|
import Effect.Postgres.Error.Except as E.Except
|
||||||
import Effect.Uncurried (EffectFn2, mkEffectFn1, mkEffectFn2)
|
import Effect.Uncurried (EffectFn2, mkEffectFn1, mkEffectFn2)
|
||||||
import Foreign (Foreign, unsafeToForeign)
|
import Foreign (Foreign, unsafeToForeign)
|
||||||
import Node.EventEmitter (EventHandle(..))
|
import Node.EventEmitter (EventHandle(..))
|
||||||
@ -46,12 +48,12 @@ make r = do
|
|||||||
__make $ __uncfg { unwrapMillis: unwrap } $ unsafeToForeign asClientConfig
|
__make $ __uncfg { unwrapMillis: unwrap } $ unsafeToForeign asClientConfig
|
||||||
|
|
||||||
-- | <https://node-postgres.com/apis/pool#releasing-clients>
|
-- | <https://node-postgres.com/apis/pool#releasing-clients>
|
||||||
release :: Pool -> Client -> Effect Unit
|
release :: Pool -> Client -> E.Except Effect Unit
|
||||||
release p c = __release p c false
|
release p c = E.Except.with E.Disconnecting $ __release p c false
|
||||||
|
|
||||||
-- | <https://node-postgres.com/apis/pool#releasing-clients>
|
-- | <https://node-postgres.com/apis/pool#releasing-clients>
|
||||||
destroy :: Pool -> Client -> Effect Unit
|
destroy :: Pool -> Client -> E.Except Effect Unit
|
||||||
destroy p c = __release p c true
|
destroy p c = E.Except.with E.Disconnecting $ __release p c true
|
||||||
|
|
||||||
-- | <https://node-postgres.com/apis/pool#connect>
|
-- | <https://node-postgres.com/apis/pool#connect>
|
||||||
connectE :: EventHandle1 Pool Client
|
connectE :: EventHandle1 Pool Client
|
||||||
|
@ -2,8 +2,10 @@ module Pipes.Postgres where
|
|||||||
|
|
||||||
import Prelude
|
import Prelude
|
||||||
|
|
||||||
|
import Control.Monad.Cont (lift)
|
||||||
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
|
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
|
||||||
import Control.Monad.Postgres (class MonadPostgres)
|
import Control.Monad.Morph (hoist)
|
||||||
|
import Control.Monad.Postgres (PostgresT)
|
||||||
import Control.Monad.Reader (class MonadAsk, ask)
|
import Control.Monad.Reader (class MonadAsk, ask)
|
||||||
import Data.Maybe (Maybe(..))
|
import Data.Maybe (Maybe(..))
|
||||||
import Effect.Aff.Class (class MonadAff, liftAff)
|
import Effect.Aff.Class (class MonadAff, liftAff)
|
||||||
@ -12,6 +14,7 @@ import Effect.Aff.Postgres.Pool (Pool)
|
|||||||
import Effect.Aff.Postgres.Pool as Pool
|
import Effect.Aff.Postgres.Pool as Pool
|
||||||
import Effect.Class (liftEffect)
|
import Effect.Class (liftEffect)
|
||||||
import Effect.Exception (Error)
|
import Effect.Exception (Error)
|
||||||
|
import Effect.Postgres.Error.RE as RE
|
||||||
import Node.Buffer (Buffer)
|
import Node.Buffer (Buffer)
|
||||||
import Node.Stream.Object as O
|
import Node.Stream.Object as O
|
||||||
import Pipes ((>->))
|
import Pipes ((>->))
|
||||||
@ -20,46 +23,45 @@ import Pipes.Node.Stream (fromReadable, fromWritable)
|
|||||||
import Pipes.Prelude as Pipes
|
import Pipes.Prelude as Pipes
|
||||||
|
|
||||||
stdin
|
stdin
|
||||||
:: forall m s c ct
|
:: forall m
|
||||||
. MonadAff m
|
. MonadAff m
|
||||||
=> MonadError Error m
|
=> MonadError Error m
|
||||||
=> MonadAsk Pool m
|
=> MonadAsk Pool m
|
||||||
=> MonadPostgres m s c ct
|
|
||||||
=> String
|
=> String
|
||||||
-> Consumer (Maybe Buffer) m Unit
|
-> Consumer (Maybe Buffer) (PostgresT m) Unit
|
||||||
stdin q = do
|
stdin q = do
|
||||||
pool <- ask
|
pool <- lift ask
|
||||||
client <- liftAff $ Pool.connect pool
|
client <- lift $ RE.liftExcept $ hoist liftAff $ Pool.connect pool
|
||||||
stream <- liftEffect $ Client.execWithStdin q client
|
stream <- lift $ RE.liftExcept $ hoist liftEffect $ Client.execWithStdin q client
|
||||||
liftAff $ void $ Client.exec "begin" client
|
lift $ RE.liftExcept $ hoist liftAff $ void $ Client.exec "begin" client
|
||||||
let
|
let
|
||||||
releaseOnEOS Nothing = do
|
releaseOnEOS Nothing = do
|
||||||
liftAff $ void $ Client.exec "commit" client
|
RE.liftExcept $ hoist liftAff $ void $ Client.exec "commit" client
|
||||||
liftEffect $ Pool.release pool client
|
RE.liftExcept $ hoist liftEffect $ Pool.release pool client
|
||||||
pure Nothing
|
pure Nothing
|
||||||
releaseOnEOS (Just a) = pure (Just a)
|
releaseOnEOS (Just a) = pure (Just a)
|
||||||
|
|
||||||
pipe = Pipes.mapM releaseOnEOS >-> fromWritable (O.unsafeFromBufferWritable stream)
|
pipe = Pipes.mapM releaseOnEOS >-> hoist lift (fromWritable $ O.unsafeFromBufferWritable stream)
|
||||||
err e = do
|
err e = lift do
|
||||||
liftAff $ void $ Client.exec "rollback" client
|
RE.liftExcept $ hoist liftAff $ void $ Client.exec "rollback" client
|
||||||
liftEffect $ Pool.release pool client
|
RE.liftExcept $ hoist liftEffect $ Pool.release pool client
|
||||||
throwError e
|
throwError e
|
||||||
|
|
||||||
catchError pipe err
|
catchError pipe err
|
||||||
|
|
||||||
stdout
|
stdout
|
||||||
:: forall m s c ct
|
:: forall m
|
||||||
. MonadAff m
|
. MonadAff m
|
||||||
=> MonadThrow Error m
|
=> MonadThrow Error m
|
||||||
=> MonadAsk Pool m
|
|
||||||
=> MonadPostgres m s c ct
|
|
||||||
=> String
|
=> String
|
||||||
-> Producer (Maybe Buffer) m Unit
|
-> Producer (Maybe Buffer) (PostgresT m) Unit
|
||||||
stdout q = do
|
stdout q = do
|
||||||
pool <- ask
|
pool <- lift ask
|
||||||
client <- liftAff $ Pool.connect pool
|
client <- lift $ RE.liftExcept $ hoist liftAff $ Pool.connect pool
|
||||||
stream <- liftEffect $ Client.queryWithStdout q client
|
stream <- lift $ RE.liftExcept $ hoist liftEffect $ Client.queryWithStdout q client
|
||||||
let
|
let
|
||||||
releaseOnEOS Nothing = liftEffect $ Pool.release pool client $> Nothing
|
releaseOnEOS :: Maybe Buffer -> PostgresT m (Maybe Buffer)
|
||||||
|
releaseOnEOS Nothing = RE.liftExcept $ hoist liftEffect $ Pool.release pool client $> Nothing
|
||||||
releaseOnEOS (Just a) = pure (Just a)
|
releaseOnEOS (Just a) = pure (Just a)
|
||||||
fromReadable (O.unsafeFromBufferReadable stream) >-> Pipes.mapM releaseOnEOS
|
hoist lift (fromReadable (O.unsafeFromBufferReadable stream))
|
||||||
|
>-> Pipes.mapM releaseOnEOS
|
||||||
|
@ -5,7 +5,7 @@ import Prelude
|
|||||||
import Data.Either (Either(..))
|
import Data.Either (Either(..))
|
||||||
import Data.String.Regex (Regex)
|
import Data.String.Regex (Regex)
|
||||||
import Data.String.Regex as Regex
|
import Data.String.Regex as Regex
|
||||||
import Data.String.Regex.Flags (RegexFlags(..))
|
import Data.String.Regex.Flags (RegexFlags)
|
||||||
import Data.Tuple.Nested (type (/\), (/\))
|
import Data.Tuple.Nested (type (/\), (/\))
|
||||||
import Effect (Effect)
|
import Effect (Effect)
|
||||||
import Effect.Aff (Aff, bracket, makeAff)
|
import Effect.Aff (Aff, bracket, makeAff)
|
||||||
@ -14,14 +14,13 @@ import Effect.Aff.Postgres.Client as Client
|
|||||||
import Effect.Aff.Postgres.Pool (Pool)
|
import Effect.Aff.Postgres.Pool (Pool)
|
||||||
import Effect.Aff.Postgres.Pool as Pool
|
import Effect.Aff.Postgres.Pool as Pool
|
||||||
import Effect.Class (liftEffect)
|
import Effect.Class (liftEffect)
|
||||||
|
import Effect.Postgres.Error.Except as X
|
||||||
import Effect.Unsafe (unsafePerformEffect)
|
import Effect.Unsafe (unsafePerformEffect)
|
||||||
import Node.EventEmitter (EventHandle(..))
|
import Node.EventEmitter (EventHandle)
|
||||||
import Node.EventEmitter as EE
|
import Node.EventEmitter as EE
|
||||||
import Node.Path as Path
|
import Node.Path as Path
|
||||||
import Node.Process (cwd)
|
import Node.Process (cwd)
|
||||||
import Partial.Unsafe (unsafePartial)
|
import Partial.Unsafe (unsafePartial)
|
||||||
import Record (insert)
|
|
||||||
import Type.Prelude (Proxy(..))
|
|
||||||
|
|
||||||
type Config =
|
type Config =
|
||||||
{ database :: String
|
{ database :: String
|
||||||
@ -41,16 +40,16 @@ withConfig :: (Config -> Aff Unit) -> Aff Unit
|
|||||||
withConfig f = f =<< liftEffect config
|
withConfig f = f =<< liftEffect config
|
||||||
|
|
||||||
withClient :: (Client -> Aff Unit) -> Aff Unit
|
withClient :: (Client -> Aff Unit) -> Aff Unit
|
||||||
withClient = bracket (Client.connected =<< liftEffect config) Client.end
|
withClient = bracket (X.run $ Client.connected =<< liftEffect config) (X.run <<< Client.end)
|
||||||
|
|
||||||
pool :: Pool
|
pool :: Pool
|
||||||
pool = unsafePerformEffect $ Pool.make =<< liftEffect config
|
pool = unsafePerformEffect $ Pool.make =<< liftEffect config
|
||||||
|
|
||||||
withPool :: (Pool -> Aff Unit) -> Aff Unit
|
withPool :: (Pool -> Aff Unit) -> Aff Unit
|
||||||
withPool = bracket (liftEffect $ Pool.make =<< config) Pool.end
|
withPool = bracket (liftEffect $ Pool.make =<< config) (X.run <<< Pool.end)
|
||||||
|
|
||||||
withPoolClient :: (Client -> Aff Unit) -> Aff Unit
|
withPoolClient :: (Client -> Aff Unit) -> Aff Unit
|
||||||
withPoolClient = bracket (Pool.connect pool) (liftEffect <<< Pool.release pool)
|
withPoolClient = bracket (X.run $ Pool.connect pool) (liftEffect <<< X.run <<< Pool.release pool)
|
||||||
|
|
||||||
unsafeFromRight :: forall a b. Either a b -> b
|
unsafeFromRight :: forall a b. Either a b -> b
|
||||||
unsafeFromRight e = unsafePartial $ case e of Right b -> b
|
unsafeFromRight e = unsafePartial $ case e of Right b -> b
|
||||||
|
@ -2,23 +2,32 @@ module Test.Control.Monad.Postgres where
|
|||||||
|
|
||||||
import Prelude
|
import Prelude
|
||||||
|
|
||||||
import Control.Monad.Error.Class (throwError)
|
import Control.Monad.Cont (lift)
|
||||||
|
import Control.Monad.Error.Class (catchError, throwError, try)
|
||||||
import Control.Monad.Fork.Class (class MonadBracket, bracket)
|
import Control.Monad.Fork.Class (class MonadBracket, bracket)
|
||||||
import Control.Monad.Postgres (PostgresT, exec_, query, runPostgres, session, transaction, cursor, fetch, fetchAll, fetchOne)
|
import Control.Monad.Postgres (PostgresT, cursor, exec_, fetch, fetchAll, fetchOne, query, runPostgres, transaction)
|
||||||
import Control.Parallel (parTraverse_)
|
import Control.Monad.Reader (ask)
|
||||||
|
import Control.Parallel (parTraverse)
|
||||||
import Data.Array as Array
|
import Data.Array as Array
|
||||||
import Data.Array.NonEmpty as Array.NonEmpty
|
import Data.Array.NonEmpty as Array.NonEmpty
|
||||||
import Data.Maybe (Maybe(..), fromJust, maybe)
|
import Data.Maybe (Maybe(..), fromJust, maybe)
|
||||||
import Data.String.Regex as Regex
|
import Data.String.Regex as Regex
|
||||||
import Data.String.Regex.Flags as Regex.Flag
|
import Data.String.Regex.Flags as Regex.Flag
|
||||||
import Data.Traversable (for_)
|
import Data.Traversable (for_)
|
||||||
|
import Data.Tuple.Nested ((/\))
|
||||||
import Effect.Aff (Fiber)
|
import Effect.Aff (Fiber)
|
||||||
import Effect.Aff.Class (class MonadAff)
|
import Effect.Aff.Class (class MonadAff, liftAff)
|
||||||
|
import Effect.Aff.Unlift (UnliftAff(..), askUnliftAff)
|
||||||
|
import Effect.Class (liftEffect)
|
||||||
|
import Effect.Console (log)
|
||||||
import Effect.Exception (Error, error)
|
import Effect.Exception (Error, error)
|
||||||
|
import Effect.Postgres.Error as E
|
||||||
|
import Effect.Postgres.Error.Except as X
|
||||||
|
import Effect.Postgres.Error.RE as RE
|
||||||
import Partial.Unsafe (unsafePartial)
|
import Partial.Unsafe (unsafePartial)
|
||||||
import Test.Common (re, withConfig)
|
import Test.Common (re, withConfig)
|
||||||
import Test.Spec (Spec, around, describe, it)
|
import Test.Spec (Spec, around, describe, it)
|
||||||
import Test.Spec.Assertions (expectError, shouldEqual)
|
import Test.Spec.Assertions (shouldEqual)
|
||||||
|
|
||||||
withTable :: forall a m. MonadBracket Error Fiber m => MonadAff m => String -> PostgresT m a -> PostgresT m a
|
withTable :: forall a m. MonadBracket Error Fiber m => MonadAff m => String -> PostgresT m a -> PostgresT m a
|
||||||
withTable s m =
|
withTable s m =
|
||||||
@ -30,30 +39,41 @@ withTable s m =
|
|||||||
spec :: Spec Unit
|
spec :: Spec Unit
|
||||||
spec =
|
spec =
|
||||||
around withConfig $ describe "Control.Monad.Postgres" do
|
around withConfig $ describe "Control.Monad.Postgres" do
|
||||||
it "empty works" \cfg -> runPostgres cfg $ pure unit
|
it "empty works" \cfg -> X.run $ runPostgres cfg $ pure unit
|
||||||
it "connects" \cfg -> runPostgres cfg $ shouldEqual 1 =<< query "select 1"
|
it "connects" \cfg -> shouldEqual 1 =<< X.run (runPostgres cfg $ query "select 1")
|
||||||
it "multiple sessions serially" \cfg -> runPostgres cfg do
|
it "multiple sessions serially" \cfg -> do
|
||||||
shouldEqual 1 =<< query "select 1"
|
a /\ b <- X.run $ runPostgres cfg do
|
||||||
shouldEqual 2 =<< query "select 2"
|
a <- query "select 1"
|
||||||
it "multiple sessions concurrently" \cfg -> runPostgres cfg do
|
b <- query "select 2"
|
||||||
flip parTraverse_ [ 1, 2, 3 ] \_ -> shouldEqual 1 =<< query "select 1"
|
pure $ a /\ b
|
||||||
it "transaction commits" \cfg -> runPostgres cfg do
|
a `shouldEqual` 1
|
||||||
exec_ "create temporary table test_txn_commits (id int);"
|
b `shouldEqual` 2
|
||||||
transaction $ exec_ "insert into test_txn_commits values (1);"
|
it "multiple sessions concurrently" \cfg -> do
|
||||||
shouldEqual [ 1 ] =<< query "select * from test_txn_commits"
|
nums <- X.run $ runPostgres cfg $ parTraverse (\n -> query $ "select $1 :: int" /\ n) (Array.range 1 3)
|
||||||
it "transaction rolls back" \cfg -> runPostgres cfg do
|
Array.sort nums `shouldEqual` [1, 2, 3]
|
||||||
exec_ "create temporary table test_txn_rolls_back (id int);"
|
it "transaction commits" \cfg -> do
|
||||||
exec_ "insert into test_txn_rolls_back values (1);"
|
a <- X.run $ runPostgres cfg do
|
||||||
expectError $ transaction do
|
exec_ "create temporary table test_txn_commits (id int);"
|
||||||
exec_ "insert into test_txn_rolls_back values (2);"
|
transaction $ exec_ "insert into test_txn_commits values (1);"
|
||||||
throwError $ error "foo"
|
query "select * from test_txn_commits"
|
||||||
shouldEqual [ 1 ] =<< query "select * from test_txn_rolls_back"
|
a `shouldEqual` [ 1 ]
|
||||||
it "cursor" \cfg -> runPostgres cfg do
|
it "transaction rolls back" \cfg -> do
|
||||||
exec_ $ "create temporary table test_cursor_data (id int primary key generated always as identity)"
|
a <- X.run $ runPostgres cfg do
|
||||||
for_ (Array.range 1 50) $ const $ exec_ "insert into test_cursor_data (id) values (default);"
|
exec_ "create temporary table test_txn_rolls_back (id int);"
|
||||||
cursor @Int "test_cursor" "select id from test_cursor_data" do
|
exec_ "insert into test_txn_rolls_back values (1);"
|
||||||
shouldEqual (Just 1) =<< fetchOne
|
void $ try $ transaction do
|
||||||
shouldEqual (Just 2) =<< fetchOne
|
exec_ "insert into test_txn_rolls_back values (2);"
|
||||||
shouldEqual (Just 3) =<< fetchOne
|
throwError $ pure $ E.Other $ error "foo"
|
||||||
shouldEqual [ 4, 5, 6, 7, 8 ] =<< fetch 5
|
query "select * from test_txn_rolls_back"
|
||||||
shouldEqual (Array.range 9 50) =<< fetchAll
|
a `shouldEqual` [1]
|
||||||
|
it "cursor" \cfg ->
|
||||||
|
X.run $ runPostgres cfg do
|
||||||
|
exec_ $ "create temporary table test_cursor_data (id int primary key generated always as identity)"
|
||||||
|
for_ (Array.range 1 50) $ const $ exec_ "insert into test_cursor_data (id) values (default);"
|
||||||
|
cursor @Int "test_cursor" "select id from test_cursor_data" do
|
||||||
|
UnliftAff unliftAff <- askUnliftAff
|
||||||
|
liftAff $ shouldEqual (Just 1) =<< unliftAff fetchOne
|
||||||
|
liftAff $ shouldEqual (Just 2) =<< unliftAff fetchOne
|
||||||
|
liftAff $ shouldEqual (Just 3) =<< unliftAff fetchOne
|
||||||
|
liftAff $ shouldEqual [ 4, 5, 6, 7, 8 ] =<< unliftAff (fetch 5)
|
||||||
|
liftAff $ shouldEqual (Array.range 9 50) =<< unliftAff fetchAll
|
||||||
|
@ -31,6 +31,7 @@ import Effect.Aff (Aff)
|
|||||||
import Effect.Aff.Postgres.Client (exec, query)
|
import Effect.Aff.Postgres.Client (exec, query)
|
||||||
import Effect.Class (liftEffect)
|
import Effect.Class (liftEffect)
|
||||||
import Effect.Postgres.Client (Client)
|
import Effect.Postgres.Client (Client)
|
||||||
|
import Effect.Postgres.Error.Except as X
|
||||||
import Effect.Unsafe (unsafePerformEffect)
|
import Effect.Unsafe (unsafePerformEffect)
|
||||||
import Foreign (Foreign, unsafeToForeign)
|
import Foreign (Foreign, unsafeToForeign)
|
||||||
import Foreign.Object as Object
|
import Foreign.Object as Object
|
||||||
@ -206,13 +207,13 @@ spec =
|
|||||||
x' <- Q.param $ fromArb x
|
x' <- Q.param $ fromArb x
|
||||||
let val = x' <> " :: " <> sql
|
let val = x' <> " :: " <> sql
|
||||||
pure $ "select " <> val
|
pure $ "select " <> val
|
||||||
void $ exec createtab c
|
void $ X.run $ exec createtab c
|
||||||
seed <- liftEffect randomSeed
|
seed <- liftEffect randomSeed
|
||||||
let xs = sample seed 10 (arbitrary @x)
|
let xs = sample seed 10 (arbitrary @x)
|
||||||
flip parTraverse_ xs
|
flip parTraverse_ xs
|
||||||
\x -> do
|
\x -> do
|
||||||
void $ exec (ser x) c
|
void $ X.run $ exec (ser x) c
|
||||||
res <- query (de x) c
|
res <- X.run $ query (de x) c
|
||||||
let
|
let
|
||||||
exp = fromArb x
|
exp = fromArb x
|
||||||
act = unsafePartial fromJust $ Array.head res
|
act = unsafePartial fromJust $ Array.head res
|
||||||
|
@ -12,6 +12,7 @@ import Effect.Aff (forkAff, joinFiber)
|
|||||||
import Effect.Aff.Postgres.Client (query)
|
import Effect.Aff.Postgres.Client (query)
|
||||||
import Effect.Aff.Postgres.Client as Client
|
import Effect.Aff.Postgres.Client as Client
|
||||||
import Effect.Exception as Error
|
import Effect.Exception as Error
|
||||||
|
import Effect.Postgres.Error.Except as X
|
||||||
import Test.Common (onceAff, withClient)
|
import Test.Common (onceAff, withClient)
|
||||||
import Test.Spec (Spec, around, describe, it)
|
import Test.Spec (Spec, around, describe, it)
|
||||||
import Test.Spec.Assertions (shouldEqual)
|
import Test.Spec.Assertions (shouldEqual)
|
||||||
@ -23,40 +24,40 @@ spec =
|
|||||||
describe "events" do
|
describe "events" do
|
||||||
it "end" \c -> do
|
it "end" \c -> do
|
||||||
expect <- forkAff $ onceAff Client.endE c
|
expect <- forkAff $ onceAff Client.endE c
|
||||||
Client.end c
|
X.run $ Client.end c
|
||||||
joinFiber expect
|
joinFiber expect
|
||||||
it "notice" \c -> do
|
it "notice" \c -> do
|
||||||
expect <- forkAff do
|
expect <- forkAff do
|
||||||
e <- onceAff Client.noticeE c
|
e <- onceAff Client.noticeE c
|
||||||
Error.message e `shouldEqual` "hello"
|
Error.message e `shouldEqual` "hello"
|
||||||
void $ Client.exec "do language plpgsql $$ begin raise notice 'hello'; end; $$;" c
|
X.run $ void $ Client.exec "do language plpgsql $$ begin raise notice 'hello'; end; $$;" c
|
||||||
joinFiber expect
|
joinFiber expect
|
||||||
it "notification" \c -> do
|
it "notification" \c -> do
|
||||||
void $ Client.exec "listen hello;" c
|
X.run $ void $ Client.exec "listen hello;" c
|
||||||
expect <- forkAff do
|
expect <- forkAff do
|
||||||
n <- onceAff Client.notificationE c
|
n <- onceAff Client.notificationE c
|
||||||
n.payload `shouldEqual` (Just "world")
|
n.payload `shouldEqual` (Just "world")
|
||||||
void $ Client.exec "notify hello, 'world';" c
|
X.run $ void $ Client.exec "notify hello, 'world';" c
|
||||||
joinFiber expect
|
joinFiber expect
|
||||||
it "connect & end do not throw" $ const $ pure unit
|
it "connect & end do not throw" $ const $ pure unit
|
||||||
describe "query" do
|
describe "query" do
|
||||||
it "ok if connected" \c -> shouldEqual [ 1, 2, 3 ] =<< query "select unnest(array[1, 2, 3])" c
|
it "ok if connected" \c -> shouldEqual [ 1, 2, 3 ] =<< X.run (query "select unnest(array[1, 2, 3])" c)
|
||||||
it "throws if ended" \c -> do
|
it "throws if ended" \c -> do
|
||||||
Client.end c
|
X.run $ Client.end c
|
||||||
res :: Either _ (Array Int) <- try $ query "select 1" c
|
res :: Either _ (Array Int) <- try $ X.run $ query "select 1" c
|
||||||
isLeft res `shouldEqual` true
|
isLeft res `shouldEqual` true
|
||||||
it "rowsAffected is correct" \c -> do
|
it "rowsAffected is correct" \c -> do
|
||||||
void $ Client.exec "create temp table foo (bar int);" c
|
X.run $ void $ Client.exec "create temp table foo (bar int);" c
|
||||||
shouldEqual 1 =<< Client.exec "insert into foo values (1);" c
|
shouldEqual 1 =<< X.run (Client.exec "insert into foo values (1);" c)
|
||||||
shouldEqual 3 =<< Client.exec "insert into foo values (1), (2), (3);" c
|
shouldEqual 3 =<< X.run (Client.exec "insert into foo values (1), (2), (3);" c)
|
||||||
shouldEqual 4 =<< Client.exec "update foo set bar = 10;" c
|
shouldEqual 4 =<< X.run (Client.exec "update foo set bar = 10;" c)
|
||||||
describe "timestamp" do
|
describe "timestamp" do
|
||||||
it "unmarshals" \c -> do
|
it "unmarshals" \c -> do
|
||||||
let exp = toDateTimeLossy <$> fromRFC3339String (wrap "2020-01-01T00:00:00Z")
|
let exp = toDateTimeLossy <$> fromRFC3339String (wrap "2020-01-01T00:00:00Z")
|
||||||
shouldEqual exp =<< query "select '2020-01-01T00:00:00Z' :: timestamptz" c
|
shouldEqual exp =<< X.run (query "select '2020-01-01T00:00:00Z' :: timestamptz" c)
|
||||||
it "is string" \c -> shouldEqual "2020-01-01 00:00:00+00" =<< query "select '2020-01-01T00:00:00Z' :: timestamptz" c
|
it "is string" \c -> shouldEqual "2020-01-01 00:00:00+00" =<< X.run (query "select '2020-01-01T00:00:00Z' :: timestamptz" c)
|
||||||
it "array is string" \c -> shouldEqual [ [ "2020-01-01 00:00:00+00" ] ] =<< query "select array['2020-01-01T00:00:00Z' :: timestamptz]" c
|
it "array is string" \c -> shouldEqual [ [ "2020-01-01 00:00:00+00" ] ] =<< X.run (query "select array['2020-01-01T00:00:00Z' :: timestamptz]" c)
|
||||||
describe "json" do
|
describe "json" do
|
||||||
it "unmarshals" \c -> shouldEqual (JSON { foo: "bar" }) =<< query "select '{\"foo\": \"bar\"}' :: json" c
|
it "unmarshals" \c -> shouldEqual (JSON { foo: "bar" }) =<< X.run (query "select '{\"foo\": \"bar\"}' :: json" c)
|
||||||
it "is string" \c -> shouldEqual "{\"foo\": \"bar\"}" =<< query "select '{\"foo\": \"bar\"}' :: json" c
|
it "is string" \c -> shouldEqual "{\"foo\": \"bar\"}" =<< X.run (query "select '{\"foo\": \"bar\"}' :: json" c)
|
||||||
it "array is string" \c -> shouldEqual [ [ "{\"foo\": \"bar\"}" ] ] =<< query "select array['{\"foo\": \"bar\"}' :: json]" c
|
it "array is string" \c -> shouldEqual [ [ "{\"foo\": \"bar\"}" ] ] =<< X.run (query "select array['{\"foo\": \"bar\"}' :: json]" c)
|
||||||
|
@ -7,6 +7,7 @@ import Effect.Aff (finally, forkAff, joinFiber)
|
|||||||
import Effect.Aff.Postgres.Client as Client
|
import Effect.Aff.Postgres.Client as Client
|
||||||
import Effect.Aff.Postgres.Pool as Pool
|
import Effect.Aff.Postgres.Pool as Pool
|
||||||
import Effect.Class (liftEffect)
|
import Effect.Class (liftEffect)
|
||||||
|
import Effect.Postgres.Error.Except as X
|
||||||
import Test.Common (config, onceAff, withPool)
|
import Test.Common (config, onceAff, withPool)
|
||||||
import Test.Spec (Spec, around, describe, it)
|
import Test.Spec (Spec, around, describe, it)
|
||||||
import Test.Spec.Assertions (expectError, shouldEqual)
|
import Test.Spec.Assertions (expectError, shouldEqual)
|
||||||
@ -20,78 +21,78 @@ spec = describe "Pool" do
|
|||||||
void $ liftEffect $ Pool.make cfg
|
void $ liftEffect $ Pool.make cfg
|
||||||
around withPool do
|
around withPool do
|
||||||
it "idleCount, totalCount" \p -> do
|
it "idleCount, totalCount" \p -> do
|
||||||
a <- Pool.connect p
|
a <- X.run $ Pool.connect p
|
||||||
b <- Pool.connect p
|
b <- X.run $ Pool.connect p
|
||||||
c <- Pool.connect p
|
c <- X.run $ Pool.connect p
|
||||||
liftEffect $ Pool.release p a
|
liftEffect $ X.run $ Pool.release p a
|
||||||
liftEffect $ Pool.release p b
|
liftEffect $ X.run $ Pool.release p b
|
||||||
finally (liftEffect $ Pool.release p c) do
|
finally (liftEffect $ X.run $ Pool.release p c) do
|
||||||
Pool.clientIdleCount p `shouldEqual` 2
|
Pool.clientIdleCount p `shouldEqual` 2
|
||||||
Pool.clientCount p `shouldEqual` 3
|
Pool.clientCount p `shouldEqual` 3
|
||||||
Pool.clientIdleCount p `shouldEqual` 3
|
Pool.clientIdleCount p `shouldEqual` 3
|
||||||
Pool.clientCount p `shouldEqual` 3
|
Pool.clientCount p `shouldEqual` 3
|
||||||
it "waitingCount" \p -> do
|
it "waitingCount" \p -> do
|
||||||
a <- Pool.connect p
|
a <- X.run $ Pool.connect p
|
||||||
b <- Pool.connect p
|
b <- X.run $ Pool.connect p
|
||||||
c <- Pool.connect p
|
c <- X.run $ Pool.connect p
|
||||||
dFiber <- forkAff $ Pool.connect p
|
dFiber <- forkAff $ X.run $ Pool.connect p
|
||||||
let
|
let
|
||||||
rel =
|
rel =
|
||||||
do
|
do
|
||||||
void $ liftEffect $ traverse (Pool.release p) [ a, b, c ]
|
void $ liftEffect $ X.run $ traverse (Pool.release p) [ a, b, c ]
|
||||||
d <- joinFiber dFiber
|
d <- joinFiber dFiber
|
||||||
liftEffect $ Pool.release p d
|
liftEffect $ X.run $ Pool.release p d
|
||||||
finally rel $ Pool.clientWaitingCount p `shouldEqual` 1
|
finally rel $ Pool.clientWaitingCount p `shouldEqual` 1
|
||||||
describe "events" do
|
describe "events" do
|
||||||
it "connect" \p -> do
|
it "connect" \p -> do
|
||||||
expect <- forkAff $ void $ onceAff Pool.connectE p
|
expect <- forkAff $ void $ onceAff Pool.connectE p
|
||||||
c <- Pool.connect p
|
c <- X.run $ Pool.connect p
|
||||||
finally (liftEffect $ Pool.release p c) $ joinFiber expect
|
finally (liftEffect $ X.run $ Pool.release p c) $ joinFiber expect
|
||||||
it "acquire" \p -> do
|
it "acquire" \p -> do
|
||||||
c <- Pool.connect p
|
c <- X.run$ Pool.connect p
|
||||||
liftEffect $ Pool.release p c
|
liftEffect $ X.run$ Pool.release p c
|
||||||
expect <- forkAff do
|
expect <- forkAff do
|
||||||
c'' <- onceAff Pool.acquireE p
|
c'' <- onceAff Pool.acquireE p
|
||||||
refEq c c'' `shouldEqual` true
|
refEq c c'' `shouldEqual` true
|
||||||
c' <- Pool.connect p
|
c' <- X.run $ Pool.connect p
|
||||||
finally (liftEffect $ Pool.release p c') $ joinFiber expect
|
finally (liftEffect $ X.run$ Pool.release p c') $ joinFiber expect
|
||||||
it "release" \p -> do
|
it "release" \p -> do
|
||||||
c <- Pool.connect p
|
c <- X.run $ Pool.connect p
|
||||||
expect <- forkAff do
|
expect <- forkAff do
|
||||||
c' <- onceAff Pool.releaseE p
|
c' <- onceAff Pool.releaseE p
|
||||||
refEq c c' `shouldEqual` true
|
refEq c c' `shouldEqual` true
|
||||||
liftEffect $ Pool.release p c
|
liftEffect $ X.run $ Pool.release p c
|
||||||
joinFiber expect
|
joinFiber expect
|
||||||
it "remove" \p -> do
|
it "remove" \p -> do
|
||||||
c <- Pool.connect p
|
c <- X.run $ Pool.connect p
|
||||||
expect <- forkAff do
|
expect <- forkAff do
|
||||||
c' <- onceAff Pool.removeE p
|
c' <- onceAff Pool.removeE p
|
||||||
refEq c c' `shouldEqual` true
|
refEq c c' `shouldEqual` true
|
||||||
liftEffect $ Pool.destroy p c
|
liftEffect $ X.run $ Pool.destroy p c
|
||||||
joinFiber expect
|
joinFiber expect
|
||||||
it "connect" \p -> do
|
it "connect" \p -> do
|
||||||
c <- Pool.connect p
|
c <- X.run $ Pool.connect p
|
||||||
let rel = liftEffect $ Pool.release p c
|
let rel = liftEffect $ X.run $ Pool.release p c
|
||||||
finally rel $ shouldEqual 1 =<< Client.query "select 1" c
|
finally rel $ shouldEqual 1 =<< X.run (Client.query "select 1" c)
|
||||||
describe "destroy" do
|
describe "destroy" do
|
||||||
it "throws on query after destroy" \p -> do
|
it "throws on query after destroy" \p -> do
|
||||||
c <- Pool.connect p
|
c <- X.run $ Pool.connect p
|
||||||
liftEffect $ Pool.destroy p c
|
liftEffect $ X.run $ Pool.destroy p c
|
||||||
expectError $ Client.exec "select 1" c
|
expectError $ X.run $ Client.exec "select 1" c
|
||||||
it "different client yielded after destroy" \p -> do
|
it "different client yielded after destroy" \p -> do
|
||||||
a <- Pool.connect p
|
a <- X.run $ Pool.connect p
|
||||||
liftEffect $ Pool.destroy p a
|
liftEffect $ X.run $ Pool.destroy p a
|
||||||
b <- Pool.connect p
|
b <- X.run $ Pool.connect p
|
||||||
liftEffect $ Pool.destroy p b
|
liftEffect $ X.run $ Pool.destroy p b
|
||||||
refEq a b `shouldEqual` false
|
refEq a b `shouldEqual` false
|
||||||
describe "release" do
|
describe "release" do
|
||||||
it "allows reuse" \p -> do
|
it "allows reuse" \p -> do
|
||||||
a <- Pool.connect p
|
a <- X.run $ Pool.connect p
|
||||||
liftEffect $ Pool.release p a
|
liftEffect $ X.run $ Pool.release p a
|
||||||
b <- Pool.connect p
|
b <- X.run $ Pool.connect p
|
||||||
liftEffect $ Pool.release p b
|
liftEffect $ X.run $ Pool.release p b
|
||||||
refEq a b `shouldEqual` true
|
refEq a b `shouldEqual` true
|
||||||
it "throws when invoked twice" \p -> do
|
it "throws when invoked twice" \p -> do
|
||||||
c <- Pool.connect p
|
c <- X.run $ Pool.connect p
|
||||||
liftEffect $ Pool.release p c
|
liftEffect $ X.run $ Pool.release p c
|
||||||
expectError $ liftEffect $ Pool.release p c
|
expectError $ liftEffect $ X.run $ Pool.release p c
|
||||||
|
Loading…
Reference in New Issue
Block a user