fix: stdin txn

This commit is contained in:
orion 2024-05-11 23:00:05 -05:00
parent 86263a7521
commit 27a7abb329
Signed by: orion
GPG Key ID: 6D4165AE4C928719
3 changed files with 44 additions and 13 deletions

View File

@ -23,7 +23,7 @@ workspace:
- newtype: ">=5.0.0 <6.0.0" - newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=1.0.2 <2.0.0" - node-stream-pipes: ">=1.2.3 <2.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
@ -137,6 +137,7 @@ workspace:
- unfoldable - unfoldable
- unicode - unicode
- unlift - unlift
- unordered-collections
- unsafe-coerce - unsafe-coerce
- variant - variant
extra_packages: {} extra_packages: {}
@ -727,8 +728,8 @@ packages:
- unsafe-coerce - unsafe-coerce
node-stream-pipes: node-stream-pipes:
type: registry type: registry
version: 1.0.5 version: 1.2.3
integrity: sha256-1icxdi6ocY4zs7iUYLmRzGiqNj/gDe1rgg2Qar9PkEU= integrity: sha256-lXD3x6+p72uBrRHGHrob2jrrBDakhhZE9O9EYE4aFiE=
dependencies: dependencies:
- aff - aff
- arrays - arrays
@ -736,6 +737,8 @@ packages:
- either - either
- exceptions - exceptions
- foldable-traversable - foldable-traversable
- foreign-object
- lists
- maybe - maybe
- mmorph - mmorph
- newtype - newtype
@ -745,6 +748,7 @@ packages:
- node-path - node-path
- node-streams - node-streams
- node-zlib - node-zlib
- ordered-collections
- parallel - parallel
- pipes - pipes
- prelude - prelude
@ -752,6 +756,8 @@ packages:
- strings - strings
- tailrec - tailrec
- transformers - transformers
- tuples
- unordered-collections
- unsafe-coerce - unsafe-coerce
node-streams: node-streams:
type: registry type: registry
@ -1178,6 +1184,21 @@ packages:
- st - st
- transformers - transformers
- tuples - tuples
unordered-collections:
type: registry
version: 3.1.0
integrity: sha256-H2eQR+ylI+cljz4XzWfEbdF7ee+pnw2IZCeq69AuJ+Q=
dependencies:
- arrays
- enums
- functions
- integers
- lists
- prelude
- record
- tuples
- typelevel-prelude
- unfoldable
unsafe-coerce: unsafe-coerce:
type: registry type: registry
version: 6.0.0 version: 6.0.0

View File

@ -33,7 +33,7 @@ package:
- newtype: ">=5.0.0 <6.0.0" - newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=1.0.2 <2.0.0" - node-stream-pipes: ">=1.2.3 <2.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"

View File

@ -2,7 +2,7 @@ module Pipes.Postgres where
import Prelude import Prelude
import Control.Monad.Morph (hoist) import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
import Control.Monad.Postgres (class MonadPostgres) import Control.Monad.Postgres (class MonadPostgres)
import Control.Monad.Reader (class MonadAsk, ask) import Control.Monad.Reader (class MonadAsk, ask)
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..))
@ -11,6 +11,7 @@ import Effect.Aff.Postgres.Client as Client
import Effect.Aff.Postgres.Pool (Pool) import Effect.Aff.Postgres.Pool (Pool)
import Effect.Aff.Postgres.Pool as Pool import Effect.Aff.Postgres.Pool as Pool
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Stream.Object as O import Node.Stream.Object as O
import Pipes ((>->)) import Pipes ((>->))
@ -21,6 +22,7 @@ import Pipes.Prelude as Pipes
stdin stdin
:: forall m s c ct :: forall m s c ct
. MonadAff m . MonadAff m
=> MonadError Error m
=> MonadAsk Pool m => MonadAsk Pool m
=> MonadPostgres m s c ct => MonadPostgres m s c ct
=> String => String
@ -29,16 +31,26 @@ stdin q = do
pool <- ask pool <- ask
client <- liftAff $ Pool.connect pool client <- liftAff $ Pool.connect pool
stream <- liftEffect $ Client.execWithStdin q client stream <- liftEffect $ Client.execWithStdin q client
liftAff $ void $ Client.exec "begin" client
let let
releaseOnEOS Nothing = Pool.release pool client $> Nothing releaseOnEOS Nothing = do
liftAff $ void $ Client.exec "commit" client
liftEffect $ Pool.release pool client
pure Nothing
releaseOnEOS (Just a) = pure (Just a) releaseOnEOS (Just a) = pure (Just a)
hoist liftAff
$ hoist liftEffect (Pipes.mapM releaseOnEOS) pipe = Pipes.mapM releaseOnEOS >-> fromWritable (O.fromBufferWritable stream)
>-> fromWritable (O.fromBufferWritable stream) err e = do
liftAff $ void $ Client.exec "rollback" client
liftEffect $ Pool.release pool client
throwError e
catchError pipe err
stdout stdout
:: forall m s c ct :: forall m s c ct
. MonadAff m . MonadAff m
=> MonadThrow Error m
=> MonadAsk Pool m => MonadAsk Pool m
=> MonadPostgres m s c ct => MonadPostgres m s c ct
=> String => String
@ -48,8 +60,6 @@ stdout q = do
client <- liftAff $ Pool.connect pool client <- liftAff $ Pool.connect pool
stream <- liftEffect $ Client.queryWithStdout q client stream <- liftEffect $ Client.queryWithStdout q client
let let
releaseOnEOS Nothing = Pool.release pool client $> Nothing releaseOnEOS Nothing = liftEffect $ Pool.release pool client $> Nothing
releaseOnEOS (Just a) = pure (Just a) releaseOnEOS (Just a) = pure (Just a)
hoist liftAff fromReadable (O.fromBufferReadable stream) >-> Pipes.mapM releaseOnEOS
$ fromReadable (O.fromBufferReadable stream)
>-> hoist liftEffect (Pipes.mapM releaseOnEOS)