From 27a7abb32952b447aa52e7bfade7a74a8df8200b Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Sat, 11 May 2024 23:00:05 -0500 Subject: [PATCH] fix: stdin txn --- spago.lock | 27 ++++++++++++++++++++++++--- spago.yaml | 2 +- src/Pipes.Postgres.purs | 28 +++++++++++++++++++--------- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/spago.lock b/spago.lock index 15aee99..6db3e56 100644 --- a/spago.lock +++ b/spago.lock @@ -23,7 +23,7 @@ workspace: - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - - node-stream-pipes: ">=1.0.2 <2.0.0" + - node-stream-pipes: ">=1.2.3 <2.0.0" - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0" @@ -137,6 +137,7 @@ workspace: - unfoldable - unicode - unlift + - unordered-collections - unsafe-coerce - variant extra_packages: {} @@ -727,8 +728,8 @@ packages: - unsafe-coerce node-stream-pipes: type: registry - version: 1.0.5 - integrity: sha256-1icxdi6ocY4zs7iUYLmRzGiqNj/gDe1rgg2Qar9PkEU= + version: 1.2.3 + integrity: sha256-lXD3x6+p72uBrRHGHrob2jrrBDakhhZE9O9EYE4aFiE= dependencies: - aff - arrays @@ -736,6 +737,8 @@ packages: - either - exceptions - foldable-traversable + - foreign-object + - lists - maybe - mmorph - newtype @@ -745,6 +748,7 @@ packages: - node-path - node-streams - node-zlib + - ordered-collections - parallel - pipes - prelude @@ -752,6 +756,8 @@ packages: - strings - tailrec - transformers + - tuples + - unordered-collections - unsafe-coerce node-streams: type: registry @@ -1178,6 +1184,21 @@ packages: - st - transformers - tuples + unordered-collections: + type: registry + version: 3.1.0 + integrity: sha256-H2eQR+ylI+cljz4XzWfEbdF7ee+pnw2IZCeq69AuJ+Q= + dependencies: + - arrays + - enums + - functions + - integers + - lists + - prelude + - record + - tuples + - typelevel-prelude + - unfoldable unsafe-coerce: type: registry version: 6.0.0 diff --git a/spago.yaml b/spago.yaml index 75d719e..229a758 100644 --- a/spago.yaml +++ b/spago.yaml @@ -33,7 +33,7 @@ package: - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - - node-stream-pipes: ">=1.0.2 <2.0.0" + - node-stream-pipes: ">=1.2.3 <2.0.0" - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0" diff --git a/src/Pipes.Postgres.purs b/src/Pipes.Postgres.purs index 5e0dad2..f329d50 100644 --- a/src/Pipes.Postgres.purs +++ b/src/Pipes.Postgres.purs @@ -2,7 +2,7 @@ module Pipes.Postgres where import Prelude -import Control.Monad.Morph (hoist) +import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError) import Control.Monad.Postgres (class MonadPostgres) import Control.Monad.Reader (class MonadAsk, ask) import Data.Maybe (Maybe(..)) @@ -11,6 +11,7 @@ import Effect.Aff.Postgres.Client as Client import Effect.Aff.Postgres.Pool (Pool) import Effect.Aff.Postgres.Pool as Pool import Effect.Class (liftEffect) +import Effect.Exception (Error) import Node.Buffer (Buffer) import Node.Stream.Object as O import Pipes ((>->)) @@ -21,6 +22,7 @@ import Pipes.Prelude as Pipes stdin :: forall m s c ct . MonadAff m + => MonadError Error m => MonadAsk Pool m => MonadPostgres m s c ct => String @@ -29,16 +31,26 @@ stdin q = do pool <- ask client <- liftAff $ Pool.connect pool stream <- liftEffect $ Client.execWithStdin q client + liftAff $ void $ Client.exec "begin" client let - releaseOnEOS Nothing = Pool.release pool client $> Nothing + releaseOnEOS Nothing = do + liftAff $ void $ Client.exec "commit" client + liftEffect $ Pool.release pool client + pure Nothing releaseOnEOS (Just a) = pure (Just a) - hoist liftAff - $ hoist liftEffect (Pipes.mapM releaseOnEOS) - >-> fromWritable (O.fromBufferWritable stream) + + pipe = Pipes.mapM releaseOnEOS >-> fromWritable (O.fromBufferWritable stream) + err e = do + liftAff $ void $ Client.exec "rollback" client + liftEffect $ Pool.release pool client + throwError e + + catchError pipe err stdout :: forall m s c ct . MonadAff m + => MonadThrow Error m => MonadAsk Pool m => MonadPostgres m s c ct => String @@ -48,8 +60,6 @@ stdout q = do client <- liftAff $ Pool.connect pool stream <- liftEffect $ Client.queryWithStdout q client let - releaseOnEOS Nothing = Pool.release pool client $> Nothing + releaseOnEOS Nothing = liftEffect $ Pool.release pool client $> Nothing releaseOnEOS (Just a) = pure (Just a) - hoist liftAff - $ fromReadable (O.fromBufferReadable stream) - >-> hoist liftEffect (Pipes.mapM releaseOnEOS) + fromReadable (O.fromBufferReadable stream) >-> Pipes.mapM releaseOnEOS