diff --git a/.tool-versions b/.tool-versions index 347e5e8..8a1d6d7 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1,2 @@ purescript 0.15.15 +bun 1.1.18 diff --git a/README.md b/README.md index d0c4906..aefa86f 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,16 @@ -# purescript-cbor-stream +# purescript-threading +Concurrency primitives inspired by python's multithreading and rust, allowing for +predictable concurrency with `Aff` -Type-safe bindings for the streaming API of `cbor-x` +## Use Cases +* Create a background worker thread +* Communicate between threads (`Threading.Channel`) +* Limit access to a resource _(eg. a database connection pool, file handle)_ to 1 concurrent actor (`Threading.RWLock`, `Threading.Mutex`) +* Coordinate concurrent threads, waiting for some common goal to be reached before continuing (`Threading.Barrier`) +* Create a pool of concurrent "threads" that can pull work from a queue, with graceful exiting and error handling +* Remotely kill a thread, or non-blockingly ask if it has exited ## Installing ```bash -spago install cbor-stream -{bun|yarn|npm|pnpm} install cbor-x -``` - -## Examples - -### Convert a cbor-encoded dataset to csv -```purescript -import Pipes.Node.Stream as Pipes.Stream -import Pipes.Node.FS as Pipes.FS -import Pipes.Node.Buffer as Pipes.Buffer -import Pipes.CBOR as Pipes.CBOR -import Pipes.CSV as Pipes.CSV -import Pipes.Prelude ((>->)) -import Pipes.Prelude as Pipes - -Pipes.runEffect - $ Pipes.FS.read "foo.bin" - >-> Pipes.CBOR.decode @{id :: Int, name :: String} - >-> Pipes.CSV.stringify - >-> Pipes.FS.write "foo.csv" +spago install threading ``` diff --git a/bun/prepare.js b/bun/prepare.js index a2f0f7e..124b6d7 100644 --- a/bun/prepare.js +++ b/bun/prepare.js @@ -22,8 +22,8 @@ await writeFile("./spago.yaml", spagonew); const readme = await readFile("./README.md", "utf8"); const readmenew = readme.replace( - /packages\/purescript-cbor-stream\/.+?\//g, - `/packages/purescript-cbor-stream/${ver}/`, + /packages\/purescript-threading\/.+?\//g, + `/packages/purescript-threading/${ver}/`, ); await writeFile("./README.md", readmenew); diff --git a/package.json b/package.json index 4b67d75..af8fa16 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,10 @@ { - "name": "purescript-cbor-stream", - "version": "v1.3.0", + "name": "purescript-threading", + "version": "v0.0.1", "type": "module", - "dependencies": { - "cbor-x": "^1.5.9", - "decimal.js": "^10.4.3" - }, + "dependencies": {}, "devDependencies": { - "typescript": "^5.4.5" + "typescript": "^5.4.5", + "bun": "1.1.18" } } diff --git a/spago.lock b/spago.lock index 0ed7559..9e96abe 100644 --- a/spago.lock +++ b/spago.lock @@ -1,35 +1,25 @@ workspace: packages: - cbor-stream: + threading: path: ./ dependencies: - aff: ">=7.1.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0" - - bifunctors: ">=6.0.0 <7.0.0" - - datetime: ">=6.1.0 <7.0.0" + - catenable-lists + - control - effect: ">=4.0.0 <5.0.0" - either: ">=6.1.0 <7.0.0" - - exceptions: ">=6.0.0 <7.0.0" + - exceptions + - filterable - foldable-traversable: ">=6.0.0 <7.0.0" - - foreign: ">=7.0.0 <8.0.0" - - foreign-object: ">=4.1.0 <5.0.0" - - js-bigints: ">=2.2.1 <3.0.0" - - js-date: ">=8.0.0 <9.0.0" - - js-maps: ">=0.1.2 <0.2.0" - maybe: ">=6.0.0 <7.0.0" - - node-buffer: ">=9.0.0 <10.0.0" - - node-event-emitter: ">=3.0.0 <4.0.0" - - node-stream-pipes: ">=2.1.0 <3.0.0" - - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - prelude: ">=6.0.1 <7.0.0" - - record: ">=4.0.0 <5.0.0" - - simple-json: ">=9.0.0 <10.0.0" - - tailrec: ">=6.1.0 <7.0.0" + - refs - transformers: ">=6.0.0 <7.0.0" + - tuples - typelevel-prelude: ">=7.0.0 <8.0.0" - - unsafe-coerce: ">=6.0.0 <7.0.0" test_dependencies: - console - gen @@ -59,6 +49,7 @@ workspace: - enums - exceptions - exists + - filterable - fixed-points - foldable-traversable - foreign @@ -72,9 +63,7 @@ workspace: - identity - integers - invariant - - js-bigints - js-date - - js-maps - lazy - lcg - lists @@ -85,7 +74,6 @@ workspace: - node-event-emitter - node-fs - node-path - - node-stream-pipes - node-streams - node-zlib - nonempty @@ -101,7 +89,6 @@ workspace: - precise-datetime - prelude - profunctor - - profunctor-lenses - quickcheck - random - record @@ -118,9 +105,508 @@ workspace: - typelevel-prelude - unfoldable - unicode - - unordered-collections - unsafe-coerce - variant + package_set: + address: + registry: 53.3.0 + compiler: ">=0.15.15 <0.16.0" + content: + abc-parser: 2.0.1 + ace: 9.1.0 + address-rfc2821: 0.1.1 + aff: 7.1.0 + aff-bus: 6.0.0 + aff-coroutines: 9.0.0 + aff-promise: 4.0.0 + aff-retry: 2.0.0 + affjax: 13.0.0 + affjax-node: 1.0.0 + affjax-web: 1.0.0 + ansi: 7.0.0 + apexcharts: 0.5.0 + applicative-phases: 1.0.0 + argonaut: 9.0.0 + argonaut-aeson-generic: 0.4.1 + argonaut-codecs: 9.1.0 + argonaut-core: 7.0.0 + argonaut-generic: 8.0.0 + argonaut-traversals: 10.0.0 + argparse-basic: 2.0.0 + array-builder: 0.1.2 + array-search: 0.6.0 + arraybuffer: 13.2.0 + arraybuffer-builder: 3.1.0 + arraybuffer-types: 3.0.2 + arrays: 7.3.0 + arrays-extra: 0.6.1 + arrays-zipper: 2.0.1 + ask: 1.0.0 + assert: 6.0.0 + assert-multiple: 0.4.0 + avar: 5.0.0 + b64: 0.0.8 + barbies: 1.0.1 + barlow-lens: 0.9.0 + bifunctors: 6.0.0 + bigints: 7.0.1 + bolson: 0.3.9 + bookhound: 0.1.7 + bower-json: 3.0.0 + call-by-name: 4.0.1 + canvas: 6.0.0 + canvas-action: 9.0.0 + cartesian: 1.0.6 + catenable-lists: 7.0.0 + cbor-stream: 1.3.0 + chameleon: 1.0.0 + chameleon-halogen: 1.0.3 + chameleon-react-basic: 1.1.0 + chameleon-styled: 2.5.0 + chameleon-transformers: 1.0.0 + channel: 1.0.0 + checked-exceptions: 3.1.1 + choku: 1.0.1 + classless: 0.1.1 + classless-arbitrary: 0.1.1 + classless-decode-json: 0.1.1 + classless-encode-json: 0.1.3 + classnames: 2.0.0 + codec: 6.1.0 + codec-argonaut: 10.0.0 + codec-json: 1.2.0 + colors: 7.0.1 + concur-core: 0.5.0 + concur-react: 0.5.0 + concurrent-queues: 3.0.0 + console: 6.1.0 + const: 6.0.0 + contravariant: 6.0.0 + control: 6.0.0 + convertable-options: 1.0.0 + coroutines: 7.0.0 + css: 6.0.0 + css-frameworks: 1.0.1 + csv-stream: 2.3.0 + data-mvc: 0.0.2 + datetime: 6.1.0 + datetime-parsing: 0.2.0 + debounce: 0.1.0 + debug: 6.0.2 + decimals: 7.1.0 + default-values: 1.0.1 + deku: 0.9.23 + deno: 0.0.5 + dissect: 1.0.0 + distributive: 6.0.0 + dom-filereader: 7.0.0 + dom-indexed: 12.0.0 + dom-simple: 0.4.0 + dotenv: 4.0.3 + droplet: 0.6.0 + dts: 1.0.0 + dual-numbers: 1.0.2 + dynamic-buffer: 3.0.1 + echarts-simple: 0.0.1 + effect: 4.0.0 + either: 6.1.0 + elmish: 0.11.4 + elmish-enzyme: 0.1.1 + elmish-hooks: 0.10.0 + elmish-html: 0.8.3 + elmish-testing-library: 0.3.2 + email-validate: 7.0.0 + encoding: 0.0.9 + enums: 6.0.1 + env-names: 0.4.0 + error: 2.0.0 + eta-conversion: 0.3.2 + exceptions: 6.1.0 + exists: 6.0.0 + exitcodes: 4.0.0 + expect-inferred: 3.0.0 + ezfetch: 1.0.0 + fahrtwind: 2.0.0 + fallback: 0.1.0 + fast-vect: 1.2.0 + fetch: 4.1.0 + fetch-argonaut: 1.0.1 + fetch-core: 5.1.0 + fetch-yoga-json: 1.1.0 + ffi-simple: 0.5.1 + fft-js: 0.1.0 + filterable: 5.0.0 + fix-functor: 0.1.0 + fixed-points: 7.0.0 + fixed-precision: 5.0.0 + flame: 1.3.0 + float32: 2.0.0 + fmt: 0.2.1 + foldable-traversable: 6.0.0 + foldable-traversable-extra: 0.0.6 + foreign: 7.0.0 + foreign-object: 4.1.0 + foreign-readwrite: 3.4.0 + forgetmenot: 0.1.0 + fork: 6.0.0 + form-urlencoded: 7.0.0 + formatters: 7.0.0 + framer-motion: 1.0.1 + free: 7.1.0 + freeap: 7.0.0 + freer-free: 0.0.1 + freet: 7.0.0 + functions: 6.0.0 + functor1: 3.0.0 + functors: 5.0.0 + fuzzy: 0.4.0 + gen: 4.0.0 + generate-values: 1.0.1 + generic-router: 0.0.1 + geojson: 0.0.5 + geometry-plane: 1.0.3 + gojs: 0.1.1 + grain: 3.0.0 + grain-router: 3.0.0 + grain-virtualized: 3.0.0 + graphs: 8.1.0 + group: 4.1.1 + halogen: 7.0.0 + halogen-bootstrap5: 5.3.2 + halogen-canvas: 1.0.0 + halogen-css: 10.0.0 + halogen-echarts-simple: 0.0.4 + halogen-formless: 4.0.3 + halogen-helix: 1.0.0 + halogen-hooks: 0.6.3 + halogen-hooks-extra: 0.9.0 + halogen-infinite-scroll: 1.1.0 + halogen-store: 0.5.4 + halogen-storybook: 2.0.0 + halogen-subscriptions: 2.0.0 + halogen-svg-elems: 8.0.0 + halogen-typewriter: 1.0.4 + halogen-vdom: 8.0.0 + halogen-vdom-string-renderer: 0.5.0 + halogen-xterm: 2.0.0 + heckin: 2.0.1 + heterogeneous: 0.6.0 + homogeneous: 0.4.0 + http-methods: 6.0.0 + httpurple: 4.0.0 + huffman: 0.4.0 + humdrum: 0.0.1 + hyrule: 2.3.8 + identity: 6.0.0 + identy: 4.0.1 + indexed-db: 1.0.0 + indexed-monad: 3.0.0 + int64: 3.0.0 + integers: 6.0.0 + interpolate: 5.0.2 + intersection-observer: 1.0.1 + invariant: 6.0.0 + jarilo: 1.0.1 + jelly: 0.10.0 + jelly-router: 0.3.0 + jelly-signal: 0.4.0 + jest: 1.0.0 + js-abort-controller: 1.0.0 + js-bigints: 2.2.1 + js-date: 8.0.0 + js-fetch: 0.2.1 + js-fileio: 3.0.0 + js-intl: 1.0.4 + js-iterators: 0.1.1 + js-maps: 0.1.2 + js-promise: 1.0.0 + js-promise-aff: 1.0.0 + js-timers: 6.1.0 + js-uri: 3.1.0 + json: 1.1.0 + json-codecs: 5.0.0 + justifill: 0.5.0 + jwt: 0.0.9 + labeled-data: 0.2.0 + language-cst-parser: 0.14.0 + lazy: 6.0.0 + lazy-joe: 1.0.0 + lcg: 4.0.0 + leibniz: 5.0.0 + leveldb: 1.0.1 + liminal: 1.0.1 + linalg: 6.0.0 + lists: 7.0.0 + literals: 1.0.2 + logging: 3.0.0 + logging-journald: 0.4.0 + lumi-components: 18.0.0 + machines: 7.0.0 + maps-eager: 0.5.0 + marionette: 1.0.0 + marionette-react-basic-hooks: 0.1.1 + marked: 0.1.0 + matrices: 5.0.1 + matryoshka: 1.0.0 + maybe: 6.0.0 + media-types: 6.0.0 + meowclient: 1.0.0 + midi: 4.0.0 + milkis: 9.0.0 + minibench: 4.0.1 + mmorph: 7.0.0 + monad-control: 5.0.0 + monad-logger: 1.3.1 + monad-loops: 0.5.0 + monad-unlift: 1.0.1 + monoid-extras: 0.0.1 + monoidal: 0.16.0 + morello: 0.4.0 + mote: 3.0.0 + motsunabe: 2.0.0 + mvc: 0.0.1 + mysql: 6.0.1 + n3: 0.1.0 + nano-id: 1.1.0 + nanoid: 0.1.0 + naturals: 3.0.0 + nested-functor: 0.2.1 + newtype: 5.0.0 + nextjs: 0.1.1 + nextui: 0.2.0 + node-buffer: 9.0.0 + node-child-process: 11.1.0 + node-event-emitter: 3.0.0 + node-execa: 5.0.0 + node-fs: 9.2.0 + node-glob-basic: 1.3.0 + node-http: 9.1.0 + node-http2: 1.1.1 + node-human-signals: 1.0.0 + node-net: 5.1.0 + node-os: 5.1.0 + node-path: 5.0.0 + node-process: 11.2.0 + node-readline: 8.1.1 + node-sqlite3: 8.0.0 + node-stream-pipes: 2.1.5 + node-streams: 9.0.0 + node-tls: 0.3.1 + node-url: 7.0.1 + node-zlib: 0.4.0 + nonempty: 7.0.0 + now: 6.0.0 + npm-package-json: 2.0.0 + nullable: 6.0.0 + numberfield: 0.1.0 + numbers: 9.0.1 + oak: 3.1.1 + oak-debug: 1.2.2 + object-maps: 0.3.0 + ocarina: 1.5.4 + open-folds: 6.3.0 + open-memoize: 6.1.0 + open-pairing: 6.1.0 + options: 7.0.0 + optparse: 5.0.1 + ordered-collections: 3.2.0 + ordered-set: 0.4.0 + orders: 6.0.0 + owoify: 1.2.0 + pairs: 9.0.1 + parallel: 7.0.0 + parsing: 10.2.0 + parsing-dataview: 3.2.4 + partial: 4.0.0 + pathy: 9.0.0 + pha: 0.13.0 + phaser: 0.7.0 + phylio: 1.1.2 + pipes: 8.0.0 + pirates-charm: 0.0.1 + pmock: 0.9.0 + point-free: 1.0.0 + pointed-list: 0.5.1 + polymorphic-vectors: 4.0.0 + posix-types: 6.0.0 + postgresql: 2.0.17 + precise: 6.0.0 + precise-datetime: 7.0.0 + prelude: 6.0.1 + prettier-printer: 3.0.0 + profunctor: 6.0.1 + profunctor-lenses: 8.0.0 + protobuf: 4.3.0 + psa-utils: 8.0.0 + psci-support: 6.0.0 + punycode: 1.0.0 + qualified-do: 2.2.0 + quantities: 12.2.0 + quickcheck: 8.0.1 + quickcheck-combinators: 0.1.3 + quickcheck-laws: 7.0.0 + quickcheck-utf8: 0.0.0 + random: 6.0.0 + rationals: 6.0.0 + rdf: 0.1.0 + react: 11.0.0 + react-aria: 0.2.0 + react-basic: 17.0.0 + react-basic-classic: 3.0.0 + react-basic-dnd: 10.1.0 + react-basic-dom: 6.1.0 + react-basic-emotion: 7.1.0 + react-basic-hooks: 8.2.0 + react-basic-storybook: 2.0.0 + react-dom: 8.0.0 + react-halo: 3.0.0 + react-icons: 1.1.5 + react-markdown: 0.1.0 + react-testing-library: 4.0.1 + react-virtuoso: 1.0.0 + reactix: 0.6.1 + read: 1.0.1 + recharts: 1.1.0 + record: 4.0.0 + record-extra: 5.0.1 + record-ptional-fields: 0.1.2 + record-studio: 1.0.4 + refs: 6.0.0 + remotedata: 5.0.1 + repr: 0.5.0 + resize-observer: 1.0.0 + resource: 2.0.1 + resourcet: 1.0.0 + result: 1.0.3 + return: 0.2.0 + ring-modules: 5.0.1 + rito: 0.3.4 + roman: 0.4.0 + rough-notation: 1.0.2 + routing: 11.0.0 + routing-duplex: 0.7.0 + run: 5.0.0 + safe-coerce: 2.0.0 + safely: 4.0.1 + school-of-music: 1.3.0 + selection-foldable: 0.2.0 + selective-functors: 1.0.1 + semirings: 7.0.0 + signal: 13.0.0 + simple-emitter: 3.0.1 + simple-i18n: 2.0.1 + simple-json: 9.0.0 + simple-json-generics: 0.2.1 + simple-ulid: 3.0.0 + sized-matrices: 1.0.0 + sized-vectors: 5.0.2 + slug: 3.0.8 + small-ffi: 4.0.1 + soundfonts: 4.1.0 + sparse-matrices: 1.3.0 + sparse-polynomials: 2.0.5 + spec: 7.6.0 + spec-discovery: 8.3.0 + spec-mocha: 5.1.0 + spec-quickcheck: 5.0.0 + splitmix: 2.1.0 + ssrs: 1.0.0 + st: 6.2.0 + statistics: 0.3.2 + strictlypositiveint: 1.0.1 + string-parsers: 8.0.0 + strings: 6.0.1 + strings-extra: 4.0.0 + stringutils: 0.0.12 + substitute: 0.2.3 + supply: 0.2.0 + svg-parser: 3.0.0 + systemd-journald: 0.3.0 + tagged: 4.0.2 + tailrec: 6.1.0 + tecton: 0.2.1 + tecton-halogen: 0.2.0 + test-unit: 17.0.0 + thermite: 6.3.1 + thermite-dom: 0.3.1 + these: 6.0.0 + toestand: 0.9.0 + transformation-matrix: 1.0.1 + transformers: 6.1.0 + tree-rose: 4.0.2 + ts-bridge: 4.0.0 + tuples: 7.0.0 + two-or-more: 1.0.0 + type-equality: 4.0.1 + typedenv: 2.0.1 + typelevel: 6.0.0 + typelevel-lists: 2.1.0 + typelevel-peano: 1.0.1 + typelevel-prelude: 7.0.0 + typelevel-regex: 0.0.3 + typelevel-rows: 0.1.0 + typisch: 0.4.0 + uint: 7.0.0 + ulid: 3.0.1 + uncurried-transformers: 1.1.0 + undefined: 2.0.0 + undefined-is-not-a-problem: 1.1.0 + unfoldable: 6.0.0 + unicode: 6.0.0 + unique: 0.6.1 + unlift: 1.0.1 + unordered-collections: 3.1.0 + unsafe-coerce: 6.0.0 + unsafe-reference: 5.0.0 + untagged-to-tagged: 0.1.4 + untagged-union: 1.0.0 + uri: 9.0.0 + url-immutable: 1.0.0 + uuid: 9.0.0 + uuidv4: 1.0.0 + validation: 6.0.0 + variant: 8.0.0 + variant-encodings: 2.0.0 + vectorfield: 1.0.1 + vectors: 2.1.0 + versions: 7.0.0 + visx: 0.0.2 + web-clipboard: 5.0.0 + web-cssom: 2.0.0 + web-cssom-view: 0.1.0 + web-dom: 6.0.0 + web-dom-parser: 8.0.0 + web-dom-xpath: 3.0.0 + web-encoding: 3.0.0 + web-events: 4.0.0 + web-fetch: 4.0.1 + web-file: 4.0.0 + web-geometry: 0.1.0 + web-html: 4.1.0 + web-pointerevents: 2.0.0 + web-proletarian: 1.0.0 + web-promise: 3.2.0 + web-resize-observer: 2.1.0 + web-router: 1.0.0 + web-socket: 4.0.0 + web-storage: 5.0.0 + web-streams: 4.0.0 + web-touchevents: 4.0.0 + web-uievents: 5.0.0 + web-url: 2.0.0 + web-workers: 1.1.0 + web-xhr: 5.0.1 + webextension-polyfill: 0.1.0 + webgpu: 0.0.1 + which: 2.0.0 + xterm: 1.0.0 + yoga-fetch: 1.0.1 + yoga-json: 5.1.0 + yoga-om: 0.1.0 + yoga-postgres: 6.0.0 + yoga-tree: 1.0.0 + z3: 0.0.2 + zipperarray: 2.0.0 extra_packages: {} packages: aff: @@ -311,8 +797,8 @@ packages: - unfoldable exceptions: type: registry - version: 6.0.0 - integrity: sha256-y/xTAEIZIARCE+50/u1di0ncebJ+CIwNOLswyOWzMTw= + version: 6.1.0 + integrity: sha256-K0T89IHtF3vBY7eSAO7eDOqSb2J9kZGAcDN5+IKsF8E= dependencies: - effect - either @@ -324,6 +810,17 @@ packages: integrity: sha256-A0JQHpTfo1dNOj9U5/Fd3xndlRSE0g2IQWOGor2yXn8= dependencies: - unsafe-coerce + filterable: + type: registry + version: 5.0.0 + integrity: sha256-cCojJHRnTmpY1j1kegI4CFwghdQ2Fm/8dzM8IlC+lng= + dependencies: + - arrays + - either + - foldable-traversable + - identity + - lists + - ordered-collections fixed-points: type: registry version: 7.0.0 @@ -480,14 +977,6 @@ packages: dependencies: - control - prelude - js-bigints: - type: registry - version: 2.2.1 - integrity: sha256-hKWZo9NxtsAaHmNXr6B8GY4c0olQbYLXPVGWm4TF2Ss= - dependencies: - - integers - - maybe - - prelude js-date: type: registry version: 8.0.0 @@ -499,24 +988,6 @@ packages: - foreign - integers - now - js-maps: - type: registry - version: 0.1.2 - integrity: sha256-xQDZf88nQEiZNmkCVEi3YQGB19hu6Oju6laEi8Os/oM= - dependencies: - - arrays - - either - - foldable-traversable - - functions - - js-bigints - - maybe - - prelude - - profunctor-lenses - - st - - strings - - tuples - - unfoldable - - unsafe-coerce lazy: type: registry version: 6.0.0 @@ -603,8 +1074,8 @@ packages: - unsafe-coerce node-fs: type: registry - version: 9.1.0 - integrity: sha256-TzhvGdrwcM0bazDvrWSqh+M/H8GKYf1Na6aGm2Qg4+c= + version: 9.2.0 + integrity: sha256-Sg0vkXycEzkEerX6hLccz21Ygd9w1+QSk1thotRZPGI= dependencies: - datetime - effect @@ -629,44 +1100,6 @@ packages: integrity: sha256-pd82nQ+2l5UThzaxPdKttgDt7xlsgIDLpPG0yxDEdyE= dependencies: - effect - node-stream-pipes: - type: registry - version: 2.1.0 - integrity: sha256-pYBOQY4bGEZzI5UHsUxJAhsKqtmE6CC1sHmFqgj64V8= - dependencies: - - aff - - arrays - - console - - control - - datetime - - effect - - either - - exceptions - - foldable-traversable - - foreign-object - - fork - - lists - - maybe - - mmorph - - newtype - - node-buffer - - node-event-emitter - - node-fs - - node-path - - node-streams - - node-zlib - - now - - ordered-collections - - parallel - - pipes - - prelude - - st - - strings - - tailrec - - transformers - - tuples - - unordered-collections - - unsafe-coerce node-streams: type: registry version: 9.0.0 @@ -751,8 +1184,8 @@ packages: - prelude parallel: type: registry - version: 6.0.0 - integrity: sha256-VJbkGD0rAKX+NUEeBJbYJ78bEKaZbgow+QwQEfPB6ko= + version: 7.0.0 + integrity: sha256-gUC9i4Txnx9K9RcMLsjujbwZz6BB1bnE2MLvw4GIw5o= dependencies: - control - effect @@ -852,31 +1285,6 @@ packages: - newtype - prelude - tuples - profunctor-lenses: - type: registry - version: 8.0.0 - integrity: sha256-K7f29rHRHgVSb2Y/PaSKtfYPriP6n87BJNO7EhsZHas= - dependencies: - - arrays - - bifunctors - - const - - control - - distributive - - either - - foldable-traversable - - foreign-object - - functors - - identity - - lists - - maybe - - newtype - - ordered-collections - - partial - - prelude - - profunctor - - record - - transformers - - tuples quickcheck: type: registry version: 8.0.1 @@ -1028,8 +1436,8 @@ packages: - refs transformers: type: registry - version: 6.0.0 - integrity: sha256-Pzw40HjthX77tdPAYzjx43LK3X5Bb7ZspYAp27wksFA= + version: 6.1.0 + integrity: sha256-3Bm+Z6tsC/paG888XkywDngJ2JMos+JfOhRlkVfb7gI= dependencies: - control - distributive @@ -1042,6 +1450,7 @@ packages: - maybe - newtype - prelude + - st - tailrec - tuples - unfoldable @@ -1083,21 +1492,6 @@ packages: - foldable-traversable - maybe - strings - unordered-collections: - type: registry - version: 3.1.0 - integrity: sha256-H2eQR+ylI+cljz4XzWfEbdF7ee+pnw2IZCeq69AuJ+Q= - dependencies: - - arrays - - enums - - functions - - integers - - lists - - prelude - - record - - tuples - - typelevel-prelude - - unfoldable unsafe-coerce: type: registry version: 6.0.0 diff --git a/spago.yaml b/spago.yaml index ba6e607..af7d09c 100644 --- a/spago.yaml +++ b/spago.yaml @@ -1,42 +1,32 @@ package: - name: cbor-stream + name: threading publish: - version: '1.3.0' + version: '0.0.1' license: 'GPL-3.0-or-later' location: githubOwner: 'cakekindel' - githubRepo: 'purescript-cbor-stream' + githubRepo: 'purescript-threading' build: strict: true - pedanticPackages: true + # pedanticPackages: true dependencies: + - catenable-lists + - control + - exceptions + - filterable + - refs + - tuples - aff: ">=7.1.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0" - - bifunctors: ">=6.0.0 <7.0.0" - - datetime: ">=6.1.0 <7.0.0" - effect: ">=4.0.0 <5.0.0" - either: ">=6.1.0 <7.0.0" - - exceptions: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0" - - foreign: ">=7.0.0 <8.0.0" - - foreign-object: ">=4.1.0 <5.0.0" - - js-bigints: ">=2.2.1 <3.0.0" - - js-date: ">=8.0.0 <9.0.0" - - js-maps: ">=0.1.2 <0.2.0" - maybe: ">=6.0.0 <7.0.0" - - node-buffer: ">=9.0.0 <10.0.0" - - node-event-emitter: ">=3.0.0 <4.0.0" - - node-stream-pipes: ">=2.1.0 <3.0.0" - - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - prelude: ">=6.0.1 <7.0.0" - - record: ">=4.0.0 <5.0.0" - - simple-json: ">=9.0.0 <10.0.0" - - tailrec: ">=6.1.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0" - typelevel-prelude: ">=7.0.0 <8.0.0" - - unsafe-coerce: ">=6.0.0 <7.0.0" test: main: Test.Main dependencies: @@ -49,4 +39,6 @@ package: - simple-json - spec workspace: + packageSet: + registry: 53.3.0 extraPackages: {} diff --git a/src/Data.CBOR.purs b/src/Data.CBOR.purs deleted file mode 100644 index 6405e4e..0000000 --- a/src/Data.CBOR.purs +++ /dev/null @@ -1,158 +0,0 @@ -module Data.CBOR where - -import Prelude - -import Control.Monad.Error.Class (liftMaybe, try) -import Control.Monad.Except (ExceptT(..), withExcept) -import Control.Monad.Except.Trans (runExceptT) -import Data.Array as Array -import Data.DateTime (DateTime) -import Data.Either (Either(..), isRight) -import Data.Foldable (class Foldable) -import Data.FoldableWithIndex (foldlWithIndex) -import Data.JSDate (JSDate) -import Data.JSDate as JSDate -import Data.Map (Map) -import Data.Maybe (Maybe(..)) -import Data.Symbol (class IsSymbol, reflectSymbol) -import Data.Traversable (traverse) -import Foreign (F, Foreign, ForeignError(..), readArray, readNullOrUndefined, unsafeReadTagged, unsafeToForeign) -import Foreign.Index (readProp) -import JS.BigInt (BigInt) -import JS.Map (Map) as JS -import JS.Map as JS.Map -import Prim.Row as Row -import Prim.RowList (class RowToList, Cons, Nil, RowList) -import Record (get) -import Record.Builder (Builder) -import Record.Builder as Builder -import Simple.JSON (class ReadForeign, class WriteForeign, readImpl, writeImpl) -import Type.Prelude (Proxy(..)) - -class ReadCBOR :: Type -> Constraint -class ReadCBOR a where - readCBOR :: Foreign -> F a - -class WriteCBOR :: Type -> Constraint -class WriteCBOR a where - writeCBOR :: a -> Foreign - -instance ReadCBOR Foreign where - readCBOR = pure -else instance (RowToList r rl, ReadCBORFields rl () r) => ReadCBOR (Record r) where - readCBOR o = do - flip Builder.build {} <$> getFields (Proxy @rl) o -else instance ReadCBOR BigInt where - readCBOR = unsafeReadTagged "BigInt" -else instance ReadCBOR JSDate where - readCBOR = unsafeReadTagged "Date" -else instance ReadCBOR DateTime where - readCBOR a = do - date :: JSDate <- readCBOR a - liftMaybe (pure $ ForeignError $ "Invalid DateTime: " <> show date) $ JSDate.toDateTime date -else instance ReadCBOR a => ReadCBOR (Array a) where - readCBOR a = do - raws :: Array Foreign <- readArray a - traverse readCBOR raws -else instance ReadCBOR a => ReadCBOR (Maybe a) where - readCBOR a = do - isNull <- isRight <$> try (readNullOrUndefined a) - if isNull then - pure Nothing - else - Just <$> readCBOR @a a -else instance (ReadCBOR v) => ReadCBOR (JS.Map String v) where - readCBOR map = do - map' :: JS.Map String Foreign <- unsafeReadTagged "Map" map - foldlWithIndex (\k b v -> do - map'' <- b - v' <- readCBOR v - pure $ JS.Map.insert k v' map'' - ) (pure JS.Map.empty) map' -else instance (ReadForeign a) => ReadCBOR a where - readCBOR = readImpl - -instance WriteCBOR Foreign where - writeCBOR = identity -else instance (RowToList r rl, WriteCBORFields rl r () to) => WriteCBOR (Record r) where - writeCBOR rec = unsafeToForeign $ Builder.build (writeImplFields (Proxy @rl) rec) {} -else instance WriteCBOR BigInt where - writeCBOR = unsafeToForeign -else instance WriteCBOR JSDate where - writeCBOR = unsafeToForeign -else instance WriteCBOR DateTime where - writeCBOR = unsafeToForeign <<< JSDate.fromDateTime -else instance (WriteCBOR k, WriteCBOR v) => WriteCBOR (JS.Map k v) where - writeCBOR = unsafeToForeign -else instance (WriteCBOR a) => WriteCBOR (Array a) where - writeCBOR as = unsafeToForeign $ writeCBOR <$> as -else instance (Foldable f, WriteCBOR a) => WriteCBOR (f a) where - writeCBOR as = unsafeToForeign $ writeCBOR $ Array.fromFoldable as -else instance (JS.Map.EncodeKey k, WriteCBOR k, WriteCBOR v) => WriteCBOR (Map k v) where - writeCBOR map = writeCBOR $ foldlWithIndex (\k m v -> JS.Map.insert k v m) JS.Map.empty map -else instance (WriteForeign a) => WriteCBOR a where - writeCBOR = writeImpl - -applyEither :: forall e a b. Semigroup e => Either e (a -> b) -> Either e a -> Either e b -applyEither (Left e) (Right _) = Left e -applyEither (Left e1) (Left e2) = Left (e1 <> e2) -applyEither (Right _) (Left e) = Left e -applyEither (Right fun) (Right a) = Right (fun a) - -exceptTApply :: forall a b e m. Semigroup e => Applicative m => ExceptT e m (a -> b) -> ExceptT e m a -> ExceptT e m b -exceptTApply fun a = ExceptT $ applyEither - <$> runExceptT fun - <*> runExceptT a - -class ReadCBORFields (xs :: RowList Type) (from :: Row Type) (to :: Row Type) - | xs -> from to where - getFields :: Proxy xs - -> Foreign - -> F (Builder (Record from) (Record to)) - -instance readFieldsCons :: - ( IsSymbol name - , ReadCBOR ty - , ReadCBORFields tail from from' - , Row.Lacks name from' - , Row.Cons name ty from' to - ) => ReadCBORFields (Cons name ty tail) from to where - getFields _ obj = (compose <$> first) `exceptTApply` rest - where - first = do - value <- withExcept' (readCBOR =<< readProp name obj) - pure $ Builder.insert nameP value - rest = getFields tailP obj - nameP = Proxy :: Proxy name - tailP = Proxy :: Proxy tail - name = reflectSymbol nameP - withExcept' = withExcept <<< map $ ErrorAtProperty name - -instance readFieldsNil :: - ReadCBORFields Nil () () where - getFields _ _ = - pure identity - -class WriteCBORFields (rl :: RowList Type) row (from :: Row Type) (to :: Row Type) - | rl -> row from to where - writeImplFields :: forall g. g rl -> Record row -> Builder (Record from) (Record to) - -instance consWriteCBORFields :: - ( IsSymbol name - , WriteCBOR ty - , WriteCBORFields tail row from from' - , Row.Cons name ty whatever row - , Row.Lacks name from' - , Row.Cons name Foreign from' to - ) => WriteCBORFields (Cons name ty tail) row from to where - writeImplFields _ rec = result - where - namep = Proxy :: Proxy name - value = writeCBOR $ get namep rec - tailp = Proxy :: Proxy tail - rest = writeImplFields tailp rec - result = Builder.insert namep value <<< rest - -instance nilWriteCBORFields :: - WriteCBORFields Nil row () () where - writeImplFields _ _ = identity diff --git a/src/Effect.CBOR.js b/src/Effect.CBOR.js deleted file mode 100644 index cf999ac..0000000 --- a/src/Effect.CBOR.js +++ /dev/null @@ -1,7 +0,0 @@ -import {decode, encode} from 'cbor-x' - -/** @type {(a: Buffer) => () => unknown} */ -export const decodeImpl = buf => () => decode(buf) - -/** @type {(a: unknown) => () => Buffer} */ -export const encodeImpl = buf => () => encode(buf) diff --git a/src/Effect.CBOR.purs b/src/Effect.CBOR.purs deleted file mode 100644 index 057f8c8..0000000 --- a/src/Effect.CBOR.purs +++ /dev/null @@ -1,21 +0,0 @@ -module Effect.CBOR where - -import Prelude - -import Control.Monad.Error.Class (liftEither) -import Control.Monad.Except (runExcept) -import Data.Bifunctor (lmap) -import Data.CBOR (class ReadCBOR, class WriteCBOR, readCBOR, writeCBOR) -import Effect (Effect) -import Effect.Exception (error) -import Foreign (Foreign) -import Node.Buffer (Buffer) - -foreign import decodeImpl :: Buffer -> Effect Foreign -foreign import encodeImpl :: Foreign -> Effect Buffer - -decode :: forall a. ReadCBOR a => Buffer -> Effect a -decode = (liftEither <<< lmap (error <<< show) <<< runExcept <<< readCBOR) <=< decodeImpl - -encode :: forall a. WriteCBOR a => a -> Effect Buffer -encode = encodeImpl <<< writeCBOR diff --git a/src/JS.Finalization.js b/src/JS.Finalization.js new file mode 100644 index 0000000..50d2817 --- /dev/null +++ b/src/JS.Finalization.js @@ -0,0 +1,8 @@ +/** @type {(cb: (t: T) => void) => () => FinalizationRegistry} */ +export const registry = (cb) => () => new FinalizationRegistry(cb); + +/** @type {(f: FinalizationRegistry) => (a: WeakRef) => (b: T) => () => void} */ +export const register = (fin) => (a) => (b) => () => fin.register(a, b); + +/** @type {(f: FinalizationRegistry) => (a: WeakRef) => () => void} */ +export const unregister = (fin) => (a) => () => fin.unregister(a); diff --git a/src/JS.Finalization.purs b/src/JS.Finalization.purs new file mode 100644 index 0000000..36c11ae --- /dev/null +++ b/src/JS.Finalization.purs @@ -0,0 +1,15 @@ +module JS.Drop where + +import Prelude + +import Effect (Effect) +import JS.WeakRef (WeakRef) + +type Registry_ a = Registry a Unit + +foreign import data Registry :: Type -> Type -> Type + +foreign import registry :: forall a b. (b -> Effect Unit) -> Effect (Registry a b) + +foreign import register :: forall a b. Registry a b -> WeakRef a -> b -> Effect Unit +foreign import unregister :: forall a b. Registry a b -> WeakRef a -> Effect Unit diff --git a/src/JS.WeakRef.js b/src/JS.WeakRef.js new file mode 100644 index 0000000..ac94199 --- /dev/null +++ b/src/JS.WeakRef.js @@ -0,0 +1,5 @@ +/** @type {(_: T) => () => WeakRef} */ +export const make = (a) => () => new WeakRef(a); + +/** @type {(_: WeakRef) => () => T | undefined} */ +export const _deref = (a) => () => a.deref(); diff --git a/src/JS.WeakRef.purs b/src/JS.WeakRef.purs new file mode 100644 index 0000000..f58671b --- /dev/null +++ b/src/JS.WeakRef.purs @@ -0,0 +1,17 @@ +module JS.WeakRef where + +import Prelude + +import Data.Maybe (Maybe) +import Data.Nullable (Nullable) +import Data.Nullable as Nullable +import Effect (Effect) + +foreign import data WeakRef :: Type -> Type + +foreign import make :: forall a. a -> Effect (WeakRef a) + +deref :: forall a. WeakRef a -> Effect (Maybe a) +deref = map Nullable.toMaybe <<< _deref + +foreign import _deref :: forall a. WeakRef a -> Effect (Nullable a) diff --git a/src/Node.Stream.CBOR.Decode.js b/src/Node.Stream.CBOR.Decode.js deleted file mode 100644 index 6de9555..0000000 --- a/src/Node.Stream.CBOR.Decode.js +++ /dev/null @@ -1,7 +0,0 @@ -import { DecoderStream } from "cbor-x"; - -/** @type {(s: import('cbor-x').Options) => () => DecoderStream} */ -export const makeImpl = (c) => () => new DecoderStream({useRecords: false, ...c, allowHalfOpen: true}); - -/** @type {(s: DecoderStream) => () => unknown | null} */ -export const readImpl = (p) => () => p.read(); diff --git a/src/Node.Stream.CBOR.Decode.purs b/src/Node.Stream.CBOR.Decode.purs deleted file mode 100644 index fe74683..0000000 --- a/src/Node.Stream.CBOR.Decode.purs +++ /dev/null @@ -1,52 +0,0 @@ -module Node.Stream.CBOR.Decode where - -import Prelude hiding (join) - -import Data.Nullable (Nullable) -import Effect (Effect) -import Effect.Uncurried (mkEffectFn1) -import Foreign (Foreign) -import Foreign.Object (Object) -import Node.Buffer (Buffer) -import Node.EventEmitter (EventHandle(..)) -import Node.EventEmitter.UtilTypes (EventHandle1) -import Node.Stream (Read, Stream, Write) -import Node.Stream.CBOR.Options (F32, Options, prepareOptions) -import Node.Stream.Object (Transform) as Object -import Prim.Row (class Nub, class Union) -import Unsafe.Coerce (unsafeCoerce) - -data CBORDecode - --- | CBOR decoding transform stream --- | --- | Accepts unencoded `Buffer` chunks, and transforms them --- | to JS values. -type CBORDecoder :: Row Type -> Type -type CBORDecoder r = Stream (read :: Read, write :: Write, cbor :: CBORDecode | r) - -make - :: forall r missing extra minimal minimalExtra - . Union r missing (Options extra) - => Union r (useFloat32 :: F32) minimal - => Nub minimal (useFloat32 :: F32 | minimalExtra) - => { | r } - -> Effect (CBORDecoder ()) -make = makeImpl <<< prepareOptions @r @missing - -toObjectStream :: forall r. CBORDecoder r -> Object.Transform Buffer Foreign -toObjectStream = unsafeCoerce - --- | `data` event. Emitted when a CSV record has been parsed. -dataH :: forall a. EventHandle1 (CBORDecoder a) Foreign -dataH = EventHandle "data" mkEffectFn1 - --- | FFI -foreign import makeImpl :: forall r. Foreign -> Effect (Stream r) - --- | FFI -foreign import readImpl :: forall r. Stream r -> Effect (Nullable Foreign) - --- | FFI -recordToForeign :: forall r. Record r -> Object Foreign -recordToForeign = unsafeCoerce diff --git a/src/Node.Stream.CBOR.Encode.js b/src/Node.Stream.CBOR.Encode.js deleted file mode 100644 index a2177f6..0000000 --- a/src/Node.Stream.CBOR.Encode.js +++ /dev/null @@ -1,7 +0,0 @@ -import { EncoderStream } from "cbor-x"; - -/** @type {(s: import('cbor-x').Options) => () => EncoderStream} */ -export const makeImpl = (c) => () => new EncoderStream({useRecords: false, ...c, allowHalfOpen: true}); - -/** @type {(s: EncoderStream) => (a: unknown) => () => void} */ -export const writeImpl = (s) => (a) => () => s.write(a); diff --git a/src/Node.Stream.CBOR.Encode.purs b/src/Node.Stream.CBOR.Encode.purs deleted file mode 100644 index 69c5217..0000000 --- a/src/Node.Stream.CBOR.Encode.purs +++ /dev/null @@ -1,49 +0,0 @@ -module Node.Stream.CBOR.Encode where - -import Prelude - -import Data.CBOR (class WriteCBOR, writeCBOR) -import Effect (Effect) -import Foreign (Foreign) -import Foreign.Object (Object) -import Node.Buffer (Buffer) -import Node.Stream (Read, Stream, Write) -import Node.Stream.CBOR.Options (F32, Options, prepareOptions) -import Node.Stream.Object (Transform) as Object -import Prim.Row (class Nub, class Union) -import Unsafe.Coerce (unsafeCoerce) - -data CBOREncode - -type CBOREncoder :: Row Type -> Type -type CBOREncoder r = Stream (read :: Read, write :: Write, csv :: CBOREncode | r) - -foreign import makeImpl :: forall r. Foreign -> Effect (Stream r) -foreign import writeImpl :: forall r. Stream r -> Foreign -> Effect Unit - -recordToForeign :: forall r. Record r -> Object Foreign -recordToForeign = unsafeCoerce - --- | Create a raw Transform stream that accepts chunks of `Array String`, --- | and transforms them into string CSV rows. --- | --- | Requires an ordered array of column names. -make - :: forall r missing extra minimal minimalExtra - . Union r missing (Options extra) - => Union r (useFloat32 :: F32) minimal - => Nub minimal (useFloat32 :: F32 | minimalExtra) - => { | r } - -> Effect (CBOREncoder ()) -make = makeImpl <<< prepareOptions @r @missing - --- | Convert the raw stream to a typed ObjectStream -toObjectStream :: CBOREncoder () -> Object.Transform Foreign Buffer -toObjectStream = unsafeCoerce - --- | Write a record to a CSVStringifier. --- | --- | The record will be emitted on the `Readable` end --- | of the stream as a string chunk. -write :: forall a r. WriteCBOR a => CBOREncoder r -> a -> Effect Unit -write s a = writeImpl s $ writeCBOR a diff --git a/src/Node.Stream.CBOR.Options.js b/src/Node.Stream.CBOR.Options.js deleted file mode 100644 index 73aa0bf..0000000 --- a/src/Node.Stream.CBOR.Options.js +++ /dev/null @@ -1,11 +0,0 @@ -import {FLOAT32_OPTIONS} from 'cbor-x' - -/** @type {(o: {round: (_a: F32) => boolean, fit: (_a: F32) => boolean, always: (_a: F32) => boolean}) => (f: F32) => FLOAT32_OPTIONS} */ -export const f32ToConst = ({round, fit, always}) => a => - round(a) - ? FLOAT32_OPTIONS.ALWAYS - : fit(a) - ? FLOAT32_OPTIONS.DECIMAL_FIT - : round(a) - ? FLOAT32_OPTIONS.DECIMAL_ROUND - : FLOAT32_OPTIONS.NEVER diff --git a/src/Node.Stream.CBOR.Options.purs b/src/Node.Stream.CBOR.Options.purs deleted file mode 100644 index e37b4cb..0000000 --- a/src/Node.Stream.CBOR.Options.purs +++ /dev/null @@ -1,50 +0,0 @@ -module Node.Stream.CBOR.Options where - -import Prelude - -import Foreign (Foreign, unsafeToForeign) -import Prim.Row (class Nub, class Union) -import Record (merge, modify) -import Type.Prelude (Proxy(..)) - -data F32 - = F32Always - | F32DecimalRound - | F32DecimalFit - | F32Never - -derive instance Eq F32 - -foreign import data CBORStruct :: Type -foreign import f32ToConst :: {always :: F32 -> Boolean, round :: F32 -> Boolean, fit :: F32 -> Boolean} -> F32 -> Foreign - -type Options r = - ( useRecords :: Boolean - , structures :: Array CBORStruct - , structuredClone :: Boolean - , mapsAsObject :: Boolean - , useFloat32 :: F32 - , alwaysUseFloat :: Boolean - , pack :: Boolean - , variableMapSize :: Boolean - , copyBuffers :: Boolean - , bundleStrings :: Boolean - , useTimestamp32 :: Boolean - , largeBigIntToFloat :: Boolean - , useTag259ForMaps :: Boolean - , tagUint8Array :: Boolean - , int64AsNumber :: Boolean - | r - ) - -prepareOptions - :: forall @r @missing extra minimal minimalExtra - . Union r missing (Options extra) - => Union r (useFloat32 :: F32) minimal - => Nub minimal (useFloat32 :: F32 | minimalExtra) - => { | r } - -> Foreign -prepareOptions a = - unsafeToForeign - $ modify (Proxy @"useFloat32") (f32ToConst {fit: eq F32DecimalFit, round: eq F32DecimalRound, always: eq F32Always}) - $ merge a {useFloat32: F32Never} diff --git a/src/Pipes.CBOR.purs b/src/Pipes.CBOR.purs deleted file mode 100644 index 979a791..0000000 --- a/src/Pipes.CBOR.purs +++ /dev/null @@ -1,47 +0,0 @@ -module Pipes.CBOR where - -import Prelude - -import Control.Monad.Error.Class (class MonadThrow, liftEither) -import Control.Monad.Except (runExcept) -import Control.Monad.Rec.Class (class MonadRec) -import Data.Bifunctor (lmap) -import Data.CBOR (class ReadCBOR, class WriteCBOR, readCBOR, writeCBOR) -import Data.Maybe (Maybe) -import Data.Traversable (traverse) -import Effect.Aff.Class (class MonadAff) -import Effect.Exception (Error, error) -import Node.Buffer (Buffer) -import Node.Stream.CBOR.Decode as CBOR.Decode -import Node.Stream.CBOR.Encode as CBOR.Encode -import Pipes.Async (AsyncPipe, bindIO, mapIO) -import Pipes.Node.Stream as Pipes.Stream - --- | Transforms buffer chunks of a CBOR file to parsed values --- | of type `a`. -decode - :: forall m @a - . MonadRec m - => MonadAff m - => MonadThrow Error m - => ReadCBOR a - => AsyncPipe (Maybe Buffer) (Maybe a) m Unit -decode = do - let - decoder = Pipes.Stream.fromTransformEffect $ CBOR.Decode.toObjectStream <$> CBOR.Decode.make {} - parse = liftEither <<< lmap (error <<< show) <<< runExcept <<< readCBOR @a - bindIO pure (traverse parse) decoder - --- | Encode purescript values as CBOR buffers -encode - :: forall m a - . MonadAff m - => MonadThrow Error m - => MonadRec m - => WriteCBOR a - => AsyncPipe (Maybe a) (Maybe Buffer) m Unit -encode = - let - p = Pipes.Stream.fromTransformEffect $ CBOR.Encode.toObjectStream <$> CBOR.Encode.make {} - in - mapIO (map writeCBOR) identity p diff --git a/src/Threading.Ath.purs b/src/Threading.Ath.purs new file mode 100644 index 0000000..a19d0b9 --- /dev/null +++ b/src/Threading.Ath.purs @@ -0,0 +1 @@ +module Threading.Ath where diff --git a/src/Threading.Barrier.purs b/src/Threading.Barrier.purs new file mode 100644 index 0000000..4d94f01 --- /dev/null +++ b/src/Threading.Barrier.purs @@ -0,0 +1,35 @@ +module Threading.Barrier (Barrier, barrier, wait) where + +import Prelude + +import Data.Array as Array +import Data.Either (Either(..)) +import Data.Foldable (sequence_) +import Effect (Effect) +import Effect.Aff (Aff) +import Effect.Aff as Aff +import Effect.Class (liftEffect) +import Effect.Ref (Ref) +import Effect.Ref as Ref +import Type.Function (type ($)) + +-- | A barrier enables multiple threads to synchronize the beginning of some computation. +data Barrier = Barrier Int (Ref $ Array $ Effect Unit) + +-- | Create a new barrier that will only unblock waiting threads +-- | when `n` threads are waiting (including this one) +barrier :: Int -> Effect Barrier +barrier n = Barrier n <$> Ref.new [] + +-- | Wait until the provided number of threads +-- | are also `wait`ing +wait :: Barrier -> Aff Unit +wait (Barrier n wakersRef) = do + wakers <- liftEffect $ Ref.read wakersRef + if n <= 1 then + pure unit + else if Array.length wakers == (n - 1) then + liftEffect $ sequence_ wakers + else Aff.makeAff \cb -> do + Ref.modify_ (_ <> [ cb $ Right unit ]) wakersRef + pure $ Aff.nonCanceler diff --git a/src/Threading.Channel.purs b/src/Threading.Channel.purs new file mode 100644 index 0000000..b5a5104 --- /dev/null +++ b/src/Threading.Channel.purs @@ -0,0 +1,163 @@ +module Threading.Channel + ( Channel + , Sender + , Receiver + , recv + , tryRecv + , send + , peek + , tryPeek + , channel + , sender + , receiver + ) where + +import Prelude + +import Control.Monad.Error.Class (throwError) +import Data.Array as Array +import Data.CatList (CatList) +import Data.CatList as CatList +import Data.Either (Either(..)) +import Data.Maybe (Maybe(..), isJust, maybe) +import Data.Traversable (for) +import Data.Tuple (fst) +import Data.Tuple.Nested ((/\)) +import Data.Witherable (wither) +import Effect (Effect) +import Effect.Aff (Aff) +import Effect.Aff as Aff +import Effect.Class (liftEffect) +import Effect.Exception (error) +import JS.WeakRef (WeakRef) +import JS.WeakRef as WeakRef +import Threading.Data.Mutex (Mutex) +import Threading.Data.Mutex as Mutex +import Type.Function (type ($)) + +-- | A multi-producer multi-consumer channel for communication +-- | between threads. +-- | +-- | Senders will broadcast messages to all living receivers, +-- | doing nothing if there are no receivers. +-- | +-- | Receivers can wait for messages to be sent. Messages that +-- | are sent while the receiver is not waiting will be buffered, +-- | and `recv`d in the order they were sent. +data Channel a = Channel (Mutex $ Array $ WeakRef $ Receiver a) + +data Sender a = Sender (Channel a) +data Receiver a = Receiver (Mutex $ Maybe (a -> Effect Unit)) (Mutex $ CatList a) + +-- | Create a new channel +channel :: forall a. Effect (Channel a) +channel = do + recvs <- Mutex.mutex [] + pure $ Channel recvs + +-- | Create a new message receiver +receiver :: forall a. Channel a -> Aff (Receiver a) +receiver (Channel recvsRef) = do + g <- Mutex.lock recvsRef + liftEffect do + queue <- Mutex.mutex CatList.empty + wake <- Mutex.mutex Nothing + recvs <- Mutex.read g + let r = Receiver wake queue + recvWeak <- WeakRef.make r + Mutex.write g $ Array.cons recvWeak recvs + Mutex.release g + pure r + +-- | Create a new message sender +sender :: forall a. Channel a -> Effect (Sender a) +sender c = pure $ Sender c + +-- | Send a message to all living receivers +send :: forall a. Sender a -> a -> Aff Unit +send (Sender (Channel recvsRef)) a = do + recvsG <- Mutex.lock recvsRef + recvWeaks <- liftEffect $ Mutex.read recvsG + recvs <- liftEffect $ wither WeakRef.deref recvWeaks + void $ for recvs \(Receiver wakeRef queueRef) -> do + wakeG <- Mutex.lock wakeRef + wake <- liftEffect $ Mutex.read wakeG + + queueG <- Mutex.lock queueRef + head /\ tail <- + liftEffect (Mutex.read queueG) + <#> CatList.uncons + <#> maybe (a /\ CatList.empty) (\(head /\ tail) -> head /\ CatList.snoc tail a) + + let + q = CatList.cons head tail + + liftEffect do + maybe (Mutex.write queueG q) (\f -> Mutex.write queueG tail *> f head) wake + Mutex.release wakeG + Mutex.release queueG + liftEffect $ Mutex.release recvsG + +-- | Read a queued message and pop it from the queue. +-- | +-- | If no queued messages have been sent, returns Nothing. +tryRecv :: forall a. Receiver a -> Aff (Maybe a) +tryRecv (Receiver _ queueRef) = do + queueG <- Mutex.lock queueRef + queueM <- CatList.uncons <$> liftEffect (Mutex.read queueG) + for queueM \(a /\ tail) -> liftEffect $ Mutex.write queueG tail *> Mutex.release queueG $> a + +-- | Block until a message is sent, and pop it from the queue. +-- | +-- | If a message has been sent since the +-- | last call to `recv`, then it will +-- | be immediately popped & returned. +recv :: forall a. Receiver a -> Aff a +recv (Receiver wakeRef queueRef) = do + wakeG <- Mutex.lock wakeRef + queueG <- Mutex.lock queueRef + liftEffect + $ whenM (isJust <$> Mutex.read wakeG) + $ throwError + $ error "Receiver has been shared between multiple fibers, which is not supported." + + queueM <- liftEffect $ CatList.uncons <$> Mutex.read queueG + case queueM of + Just (a /\ tail) -> liftEffect do + Mutex.write queueG tail + Mutex.release wakeG + Mutex.release queueG + pure a + Nothing -> Aff.makeAff \cb -> do + Mutex.write wakeG $ Just $ cb <<< Right + Mutex.release wakeG + Mutex.release queueG + pure $ Aff.Canceler $ const $ Mutex.put wakeRef Nothing + +-- | Read a queued message without altering the queue. +-- | +-- | If no queued messages have been sent, returns Nothing. +tryPeek :: forall a. Receiver a -> Aff (Maybe a) +tryPeek (Receiver _ queueRef) = map fst <$> CatList.uncons <$> Mutex.get queueRef + +-- | Block until a message is sent, and read +-- | it without removing it from the queue. +-- | +-- | If a message has been sent since the +-- | last call to `recv`, then it will +-- | be immediately returned. +peek :: forall a. Receiver a -> Aff a +peek (Receiver wakeRef queueRef) = do + wakeG <- Mutex.lock wakeRef + queueM <- CatList.uncons <$> Mutex.get queueRef + liftEffect + $ whenM (isJust <$> Mutex.read wakeG) + $ throwError + $ error "Receiver has been shared between multiple fibers, which is not supported." + + case queueM of + Just (a /\ _) -> liftEffect $ Mutex.release wakeG $> a + Nothing -> Aff.makeAff \cb -> do + Mutex.write wakeG $ Just $ cb <<< Right + Mutex.release wakeG + pure $ Aff.Canceler $ const $ Mutex.put wakeRef Nothing diff --git a/src/Threading.Data.Mutex.js b/src/Threading.Data.Mutex.js new file mode 100644 index 0000000..071aa61 --- /dev/null +++ b/src/Threading.Data.Mutex.js @@ -0,0 +1,133 @@ +/** + * @template T + * @typedef {(g: Guard) => () => void} + * Waker + */ + +/** @template T */ +class Guard { + released = false; + + /** + * @param {Mutex} mutex + * @param {() => void} onExplicitRelease + */ + constructor(mutex, onExplicitRelease) { + this.mutex = mutex; + this.cb = onExplicitRelease; + } + + read() { + if (this.released) { + throw new Error("Guard#read after explicit release"); + } + return this.mutex.a; + } + + /** @param {T} a */ + write(a) { + if (this.released) { + throw new Error("Guard#write after explicit release"); + } + this.mutex.a = a; + } + + release() { + if (!this.released) { + this.released = true; + this.cb(); + } + } +} + +/** @template T */ +class Mutex { + /** @type {WeakRef> | undefined} */ + guard = undefined; + + /** @type {Array<(g: Guard) => () => void>} */ + wakers = []; + + /** @type {FinalizationRegistry} */ + cleanup = new FinalizationRegistry(() => this._guardReleased()); + + /** + * @param {T} a + */ + constructor(a) { + this.a = a; + } + + _guardReleased() { + this.guard = undefined; + const wake = this.wakers.shift(); + if (wake) { + wake(this._newGuard())(); + } + } + + _newGuard() { + const g = new Guard(this, () => { + if (!this.guard) throw new Error("unreachable"); + this.cleanup.unregister(this.guard); + this._guardReleased(); + }); + + this.guard = new WeakRef(g); + this.cleanup.register(g, undefined); + return g; + } + + locked() { + return !!this.guard; + } + + /** @param {Waker} cb */ + lock(cb) { + if (!this.guard) { + cb(this._newGuard())(); + return undefined; + } else { + this.wakers.push(cb); + return cb; + } + } + + /** @param {Waker} cb */ + releaseWaker(cb) { + const ix = this.wakers.indexOf(cb); + if (ix > -1) { + this.wakers.splice(ix, 1); + } + } + + tryLock() { + if (!this.guard) { + return this._newGuard(); + } + } +} + +/** @type {(t: T) => () => Mutex} */ +export const _make = (a) => () => new Mutex(a); + +/** @type {(mutex: Mutex) => (cb: Waker) => () => Waker | undefined} */ +export const _lock = (mutex) => (cb) => () => mutex.lock(cb); + +/** @type {(mutex: Mutex) => () => boolean} */ +export const _locked = (mutex) => () => mutex.locked(); + +/** @type {(mutex: Mutex) => () => Guard | undefined} */ +export const _tryLock = (mutex) => () => mutex.tryLock(); + +/** @type {(mutex: Mutex) => (cb: Waker) => () => void} */ +export const _releaseWaker = (mutex) => (cb) => () => mutex.releaseWaker(cb); + +/** @type {(guard: Guard) => () => void} */ +export const _guardRelease = (g) => () => g.release(); + +/** @type {(guard: Guard) => () => T} */ +export const _guardRead = (g) => () => g.read(); + +/** @type {(guard: Guard) => (t: T) => () => void} */ +export const _guardWrite = (g) => (a) => () => g.write(a); diff --git a/src/Threading.Data.Mutex.purs b/src/Threading.Data.Mutex.purs new file mode 100644 index 0000000..00f0a98 --- /dev/null +++ b/src/Threading.Data.Mutex.purs @@ -0,0 +1,140 @@ +-- | A Mutex allows any number of threads to share mutable +-- | state, with at most 1 thread having read or write access +-- | at a time. +-- | +-- | Threads can access the data with `lock` or `tryLock`, +-- | which both return a `Guard`. +-- | +-- | The holder of a `Guard` is guaranteed exclusive read & +-- | write access to the data contained in the `Mutex`. +module Threading.Data.Mutex + ( Mutex + , Guard + , mutex + , lock + , tryLock + , locked + , release + , modify + , modify_ + , write + , read + , get + , put + ) where + +import Prelude + +import Data.Either (Either(..)) +import Data.Maybe (Maybe(..)) +import Data.Nullable (Nullable) +import Data.Nullable as Nullable +import Effect (Effect) +import Effect.Aff (Aff) +import Effect.Aff as Aff +import Effect.Class (liftEffect) + +foreign import data Waker :: Type + +foreign import data Mutex :: Type -> Type + +-- | A lock to a Mutex. +-- | +-- | Guards may be read from, written to, and released. Guards **must** be +-- | released in order for other blocking threads to continue. +-- | +-- | _Note: If a Guard reclaimed by the garbage collector without being released, +-- | its Mutex will notice and behave as if the Guard was explicitly released. +-- | This will hopefully catch deadlocks caused by threads that have exited +-- | while holding a Guard._ +foreign import data Guard :: Type -> Type + +foreign import _make :: forall a. a -> Effect (Mutex a) + +foreign import _locked :: forall a. Mutex a -> Effect Boolean +foreign import _lock :: forall a. Mutex a -> (Guard a -> Effect Unit) -> Effect (Nullable Waker) +foreign import _tryLock :: forall a. Mutex a -> Effect (Nullable (Guard a)) + +foreign import _releaseWaker :: forall a. Mutex a -> Waker -> Effect Unit + +foreign import _guardRead :: forall a. Guard a -> Effect a +foreign import _guardWrite :: forall a. Guard a -> a -> Effect Unit +foreign import _guardRelease :: forall a. Guard a -> Effect Unit + +-- | Create a new Mutex +mutex :: forall a. a -> Effect (Mutex a) +mutex = _make + +-- | Is the Mutex currently locked? +locked :: forall a. Mutex a -> Effect Boolean +locked = _locked + +-- | Attempt to acquire a lock without blocking. +-- | +-- | If the Mutex is currently locked, this will return `Nothing`. +tryLock :: forall a. Mutex a -> Effect (Maybe (Guard a)) +tryLock = map Nullable.toMaybe <<< _tryLock + +-- | Acquire a lock, blocking if another thread +-- | currently holds a lock. +-- | +-- | If multiple threads invoke `lock`, they will +-- | be unlocked in the order that they blocked on `lock`. +lock :: forall a. Mutex a -> Aff (Guard a) +lock m = Aff.makeAff \cb -> do + waker <- Nullable.toMaybe <$> _lock m (cb <<< Right) + pure $ case waker of + Just w -> Aff.effectCanceler $ _releaseWaker m w + Nothing -> Aff.nonCanceler + +-- | Take a snapshot of the value in a Mutex +-- | +-- | This is a shorthand for acquiring a lock, reading it, +-- | then immediately releasing the lock. +get :: forall a. Mutex a -> Aff a +get m = do + g <- lock m + a <- liftEffect $ read g <* release g + pure a + +-- | Write a new value to a Mutex +-- | +-- | This is a shorthand for acquiring a lock, writing to it, +-- | then immediately releasing the lock. +put :: forall a. Mutex a -> a -> Aff Unit +put m a = do + g <- lock m + liftEffect $ write g a *> release g + +-- | Modify the value contained in a Mutex +-- | +-- | This is a shorthand for acquiring a lock, +-- | reading from it, writing to it, then +-- | immediately releasing it. +-- | +-- | Returns the new value. +modify :: forall a. Mutex a -> (a -> a) -> Aff a +modify m f = do + g <- lock m + liftEffect $ ((f <$> read g) >>= (\a -> write g a *> release g $> a)) + +-- | `modify` with its return value ignored. +modify_ :: forall a. Mutex a -> (a -> a) -> Aff Unit +modify_ m f = void $ modify m f + +-- | Release the lock +-- | +-- | Attempting to `read` or `write` this `Guard` +-- | will throw an exception. +-- | +-- | Repeated invocations of `release` are ignored. +release :: forall a. Guard a -> Effect Unit +release = _guardRelease + +-- | Read the value in the Mutex via the Guard +read :: forall a. Guard a -> Effect a +read = _guardRead + +-- | Write a new value into the Mutex via the Guard +write :: forall a. Guard a -> a -> Effect Unit +write = _guardWrite diff --git a/src/Threading.Data.RWLock.purs b/src/Threading.Data.RWLock.purs new file mode 100644 index 0000000..69509f6 --- /dev/null +++ b/src/Threading.Data.RWLock.purs @@ -0,0 +1,282 @@ +-- | A RWLock allows threads to share mutable state. +-- | +-- | Any number of threads can concurrently read the state, +-- | when there isn't a thread with write access. +-- | +-- | Get write access with `lockWrite` or `tryLockWrite`, +-- | or read access with `lockRead` or `tryLockRead`. +-- | +-- | `(try)lockWrite` returns a `WriteGuard`, which guarantees +-- | no other threads have read or write access until it is released. +-- | +-- | `(try)lockRead` returns a `ReadGuard`, which guarantees +-- | no threads have write access until it is released. +module Threading.Data.RWLock + ( RWLock + , ReadGuard + , WriteGuard + , rwLock + , lockWrite + , tryLockWrite + , lockRead + , tryLockRead + , locked + , Locked(..) + , get + , put + , modify + , modify_ + , release + , read + , write + , class RWLockGuard + ) where + +import Prelude + +import Control.Alternative (guard) +import Control.Monad.Error.Class (liftMaybe, throwError) +import Control.Monad.Maybe.Trans (runMaybeT) +import Control.Monad.Trans.Class (lift) +import Data.Foldable (elem, traverse_) +import Data.Generic.Rep (class Generic) +import Data.Maybe (Maybe(..)) +import Data.Set (Set) +import Data.Set as Set +import Data.Show.Generic (genericShow) +import Data.Traversable (traverse) +import Effect (Effect) +import Effect.Aff (Aff) +import Effect.Class (liftEffect) +import Effect.Exception (error) +import Effect.Ref (Ref) +import Effect.Ref as Ref +import Threading.Data.Mutex (Mutex) +import Threading.Data.Mutex as Mutex +import Type.Function (type ($)) + +-- | The lock state of the RWLock +data Locked + -- | There are no readers or writers. + = Unlocked + -- | There is a writer, and the RWLock is not + -- | currently readable or writable. + | LockedWriting + -- | There is at least one reader, and the RWLock is not + -- | currently writable. + | LockedReading + +derive instance Generic Locked _ +derive instance Eq Locked +instance Show Locked where + show = genericShow + +newtype WriteLockHeld = WriteLockHeld (Maybe Int) + +-- | A Read-Write lock +-- | +-- | Ensures that there can be at most 1 thread with write +-- | access to the data contained in the RWLock, or any +-- | number of concurrent readers. +data RWLock a = RWLock + -- Guarantee that state transitions are exclusive + { fence :: Mutex Unit + -- Monotonically increasing guard counter + , id :: Ref Int + -- Condvar-style mutex indicating writability. + -- + -- When a lock is held and the mutex contains `WriteLockHeld Nothing`, then there are 1 or more readers. + -- + -- When a lock is held and the mutex contains `WriteLockHeld (Just )`, then the lock is held by a writer. + , w :: Mutex WriteLockHeld + -- Ref containing the MutexGuard for `w`. + -- + -- When a held WriteGuard or the final held ReadGuard is released, the guard contained will be + -- released, and `Nothing` will be written here. + , wLock :: Ref $ Maybe $ Mutex.Guard WriteLockHeld + -- Ref tracking active readers + , readers :: Mutex $ Set Int + -- The data contained in the RWLock + , state :: Ref a + } + +-- | Internal +-- | +-- | Guarantees that no other `fenced` sections +-- | run concurrently with this one. +fenced :: forall a r. RWLock a -> Aff r -> Aff r +fenced (RWLock { fence }) m = do + g <- Mutex.lock fence + m <* liftEffect (Mutex.release g) + +-- | A guard with read access to data of type `a` +data ReadGuard a = ReadGuard Int (RWLock a) + +-- | A guard with read+write access to data of type `a` +data WriteGuard a = WriteGuard Int (RWLock a) + +-- | Acquire a write-access lock to the data +-- | contained in the RWLock. +-- | +-- | If another thread holds a `ReadGuard` or `WriteGuard`, +-- | this will block until the data is writable. +lockWrite :: forall a. RWLock a -> Aff (WriteGuard a) +lockWrite rw@(RWLock { id: idRef, w, wLock }) = do + id <- liftEffect $ Ref.modify (_ + 1) idRef + g <- Mutex.lock w + liftEffect $ Mutex.write g $ WriteLockHeld $ Just id + liftEffect $ Ref.write (Just g) wLock + pure $ WriteGuard id rw + +-- | Acquire a write-access lock to the data +-- | contained in the RWLock. +-- | +-- | If another thread holds a `ReadGuard` or `WriteGuard`, +-- | this will return Nothing. +tryLockWrite :: forall a. RWLock a -> Aff (Maybe (WriteGuard a)) +tryLockWrite rw = + fenced rw + $ liftEffect (locked rw) >>= case _ of + Unlocked -> Just <$> lockWrite rw + _ -> pure Nothing + +-- | Acquire a read-access lock to the data +-- | contained in the RWLock. +-- | +-- | If another thread holds a `WriteGuard`, +-- | this will block until the data is readable. +lockRead :: forall a. RWLock a -> Aff (ReadGuard a) +lockRead rw@(RWLock { fence, id: idRef, w, wLock, readers: readersM }) = do + fenceG <- Mutex.lock fence + id <- liftEffect $ Ref.modify (_ + 1) idRef + l <- liftEffect $ locked rw + let + block = do + wl' <- Mutex.lock w + liftEffect $ Mutex.write wl' (WriteLockHeld Nothing) + liftEffect $ Ref.write (Just wl') wLock + done = liftEffect (Mutex.release fenceG) + + fenceG' <- case l of + LockedReading -> pure fenceG + LockedWriting -> done *> block *> Mutex.lock fence + Unlocked -> block $> fenceG + + readersG <- Mutex.lock readersM + liftEffect do + readers <- Mutex.read readersG + Mutex.write readersG $ Set.insert id readers + Mutex.release readersG + Mutex.release fenceG' + pure $ ReadGuard id rw + +-- | Acquire a read-access lock to the data +-- | contained in the RWLock. +-- | +-- | If another thread holds a `WriteGuard`, +-- | this will return Nothing. +tryLockRead :: forall a. RWLock a -> Aff (Maybe (ReadGuard a)) +tryLockRead rw = + liftEffect (locked rw) >>= case _ of + LockedWriting -> pure Nothing + _ -> Just <$> lockRead rw + +-- | Create a new RWLock +rwLock :: forall a. a -> Effect (RWLock a) +rwLock a = do + fence <- Mutex.mutex unit + id <- liftEffect $ Ref.new 0 + w <- Mutex.mutex $ WriteLockHeld Nothing + wLock <- liftEffect $ Ref.new Nothing + readers <- Mutex.mutex Set.empty + state <- liftEffect $ Ref.new a + pure $ RWLock { fence, id, w, wLock, readers, state } + +-- | Typeclass implemented by `WriteGuard` and `ReadGuard` +-- | allowing a common `release` + `read` function (as opposed +-- | to `releaseRead`, `releaseWrite`, etc.) +class RWLockGuard g where + release :: forall a. g a -> Aff Unit + read :: forall a. g a -> Aff a + +instance RWLockGuard WriteGuard where + release w@(WriteGuard _ rw@(RWLock { wLock })) = + fenced rw $ void $ liftEffect do + g <- _writeGuardOk w + Ref.write Nothing wLock + Mutex.release g + read (WriteGuard id rw@(RWLock { state, wLock })) = + fenced rw $ liftEffect do + mg <- Ref.read wLock + g <- liftMaybe (error "WriteGuard has been released!") mg + WriteLockHeld id' <- Mutex.read g + when (Just id /= id') $ throwError $ error "WriteGuard has been released!" + Ref.read state + +instance RWLockGuard ReadGuard where + release (ReadGuard id rw@(RWLock { wLock, readers: readersM })) = + fenced rw $ void $ runMaybeT do + readersG <- lift $ Mutex.lock readersM + readers <- liftEffect $ Mutex.read readersG + guard $ elem id readers + liftEffect do + Mutex.write readersG $ Set.delete id readers + empty <- ((_ == 0) <<< Set.size) <$> Mutex.read readersG + Mutex.release readersG + when empty $ Ref.read wLock >>= traverse_ \g -> do + Ref.write Nothing wLock + Mutex.release g + read (ReadGuard id rw@(RWLock { readers: readersM, state })) = + fenced rw do + readersG <- Mutex.lock readersM + readers <- liftEffect $ Mutex.read readersG + when (not $ elem id readers) $ throwError $ error "ReadGuard has been released!" + liftEffect $ Mutex.release readersG + liftEffect $ Ref.read state + +_writeGuardOk :: forall a. WriteGuard a -> Effect (Mutex.Guard WriteLockHeld) +_writeGuardOk (WriteGuard id (RWLock { wLock })) = do + mg <- Ref.read wLock + g <- liftMaybe (error "WriteGuard has been released!") mg + WriteLockHeld id' <- Mutex.read g + when (Just id /= id') $ throwError $ error "WriteGuard has been released!" + pure g + +-- | Writes a new value +write :: forall a. WriteGuard a -> a -> Effect Unit +write w@(WriteGuard _ (RWLock { state })) a = do + void $ _writeGuardOk w + Ref.write a state + +-- | Asks what state the RWLock is currently in +locked :: forall a. RWLock a -> Effect Locked +locked (RWLock { wLock }) = do + Ref.read wLock + >>= traverse Mutex.read + >>= case _ of + Nothing -> pure Unlocked + Just (WriteLockHeld Nothing) -> pure LockedReading + Just (WriteLockHeld (Just _)) -> pure LockedWriting + +-- | Get the value currently in the RWLock. +-- | +-- | Shorthand for `lockRead rw >>= (\l -> read l <* release l)` +get :: forall a. RWLock a -> Aff a +get rw = lockRead rw >>= (\l -> read l <* release l) + +-- | Write a new value to the RWLock. +-- | +-- | Shorthand for `lockWrite rw >>= (\l -> liftEffect (write l a) <* release l)` +put :: forall a. RWLock a -> a -> Aff Unit +put rw a = lockWrite rw >>= (\l -> liftEffect (write l a) <* release l) + +-- | Modify the value in the RWLock using the provided function. +modify :: forall a. RWLock a -> (a -> a) -> Aff a +modify rw f = do + l <- lockWrite rw + a <- f <$> read l + liftEffect (write l a) *> release l $> a + +-- | Shorthand for `void $ modify rw f` +modify_ :: forall a. RWLock a -> (a -> a) -> Aff Unit +modify_ rw f = void $ modify rw f diff --git a/src/Threading.Handle.purs b/src/Threading.Handle.purs new file mode 100644 index 0000000..e074b45 --- /dev/null +++ b/src/Threading.Handle.purs @@ -0,0 +1,3 @@ +module Threading.Handle where + +data Handle = Handle diff --git a/src/Threading.purs b/src/Threading.purs new file mode 100644 index 0000000..329831d --- /dev/null +++ b/src/Threading.purs @@ -0,0 +1 @@ +module Threading where diff --git a/test/Test/Main.purs b/test/Test/Main.purs index c8cd6d4..087cc6f 100644 --- a/test/Test/Main.purs +++ b/test/Test/Main.purs @@ -5,10 +5,19 @@ import Prelude import Data.Maybe (Maybe(..)) import Effect (Effect) import Effect.Aff (launchAff_) -import Test.Pipes.CBOR as Test.Pipes.CBOR +import Effect.Aff as Aff +import Test.Spec (it) import Test.Spec.Reporter (specReporter) import Test.Spec.Runner (defaultConfig, runSpec') +import Test.Threading.Barrier as Test.Threading.Barrier +import Test.Threading.Channel as Test.Threading.Channel +import Test.Threading.Data.Mutex as Test.Threading.Data.Mutex +import Test.Threading.Data.RWLock as Test.Threading.Data.RWLock main :: Effect Unit -main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do - Test.Pipes.CBOR.spec +main = launchAff_ $ Aff.supervise $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do + Test.Threading.Data.Mutex.spec + Test.Threading.Data.RWLock.spec + Test.Threading.Channel.spec + Test.Threading.Barrier.spec + it "all tests were run" $ pure unit diff --git a/test/Test/Pipes.CSV.purs b/test/Test/Pipes.CSV.purs deleted file mode 100644 index ac7e9ec..0000000 --- a/test/Test/Pipes.CSV.purs +++ /dev/null @@ -1,85 +0,0 @@ -module Test.Pipes.CBOR where - -import Prelude - -import Control.Monad.Gen (chooseInt) -import Data.DateTime (DateTime) -import Data.List ((:)) -import Data.List as List -import Data.Maybe (Maybe(..), fromJust) -import Data.Newtype (wrap) -import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy) -import Data.Tuple.Nested ((/\)) -import Effect (Effect) -import Effect.CBOR as CBOR -import Effect.Class (liftEffect) -import Node.Buffer (Buffer) -import Node.Buffer as Buffer -import Node.Encoding (Encoding(..)) -import Partial.Unsafe (unsafePartial) -import Pipes (yield, (>->)) -import Pipes.Async (debug, (>-/->)) -import Pipes.CBOR as Pipes.CBOR -import Pipes.Collect as Pipes.Collect -import Pipes.Node.Stream as Pipes.Stream -import Pipes.Prelude (toListM) as Pipes -import Test.QuickCheck.Gen (randomSample') -import Test.Spec (Spec, before, describe, it) -import Test.Spec.Assertions (shouldEqual) - -cborHex :: String -cborHex = "82b90002646e616d656568656e72796174c1fb41d990ee6d671aa0b90002646e616d65656a756c696f6174c1fbc1d756dad0bbb646" - -cborBuf :: Effect Buffer -cborBuf = Buffer.fromString cborHex Hex - -exp :: Array {name :: String, t :: DateTime} -exp = - [{name: "henry", t: toDateTimeLossy $ unsafePartial fromJust $ fromRFC3339String $ wrap "2024-05-14T19:21:25.611Z"} - ,{name: "julio", t: toDateTimeLossy $ unsafePartial fromJust $ fromRFC3339String $ wrap "1920-05-14T20:21:17.067Z"} - ] - - -dt :: String -> DateTime -dt = toDateTimeLossy <<< unsafePartial fromJust <<< fromRFC3339String <<< wrap - -spec :: Spec Unit -spec = - describe "Pipes.CBOR" do - it "encode" do - bytes - <- Pipes.Collect.toBuffer - $ Pipes.Stream.withEOS (yield exp) - >-/-> Pipes.CBOR.encode - >-> Pipes.Stream.unEOS - act <- liftEffect $ CBOR.decode bytes - act `shouldEqual` exp - - describe "parse" do - it "parses csv" do - buf <- liftEffect $ cborBuf - rows <- Pipes.toListM - $ (yield (Just buf) *> yield Nothing) - >-/-> debug "cbor" Pipes.CBOR.decode - - rows `shouldEqual` ((Just exp) : Nothing : List.Nil) - before - (do - nums <- liftEffect $ randomSample' 100000 (chooseInt 0 9) - let - objs = (\n -> {id: n}) <$> nums - bytes <- - Pipes.Collect.toBuffer - $ Pipes.Stream.withEOS (yield objs) - >-/-> Pipes.CBOR.encode - >-> Pipes.Stream.unEOS - pure $ nums /\ bytes - ) - $ it "parses large csv" \(nums /\ bytes) -> do - rows <- - Pipes.Collect.toArray - $ Pipes.Stream.withEOS (yield bytes) - >-/-> Pipes.CBOR.decode @(Array {id :: Int}) - >-> Pipes.Stream.unEOS - - rows `shouldEqual` [(\id -> { id }) <$> nums] diff --git a/test/Test/Threading.Barrier.purs b/test/Test/Threading.Barrier.purs new file mode 100644 index 0000000..96a7abd --- /dev/null +++ b/test/Test/Threading.Barrier.purs @@ -0,0 +1,41 @@ +module Test.Threading.Barrier where + +import Prelude + +import Data.Newtype (wrap) +import Effect.Aff as Aff +import Effect.Class (liftEffect) +import Effect.Ref as Ref +import Test.Spec (Spec, describe, it) +import Test.Spec.Assertions (shouldEqual) +import Threading.Barrier as Barrier + +spec :: Spec Unit +spec = + describe "Threading.Barrier" do + it "creates" do + void $ liftEffect $ Barrier.barrier 1 + it "barrer 1 >>= wait immediately resolves" do + b <- liftEffect $ Barrier.barrier 1 + Barrier.wait b + it "barrer only resolves when all 3 threads wait" do + barrier <- liftEffect $ Barrier.barrier 3 + + aDone <- liftEffect $ Ref.new false + bDone <- liftEffect $ Ref.new false + a <- Aff.forkAff do + Barrier.wait barrier + liftEffect $ Ref.write true aDone + b <- Aff.forkAff do + Barrier.wait barrier + liftEffect $ Ref.write true bDone + + Aff.delay $ wrap 10.0 + liftEffect (Ref.read aDone) >>= shouldEqual false + liftEffect (Ref.read bDone) >>= shouldEqual false + + Barrier.wait barrier + Aff.joinFiber a + Aff.joinFiber b + liftEffect (Ref.read aDone) >>= shouldEqual true + liftEffect (Ref.read bDone) >>= shouldEqual true diff --git a/test/Test/Threading.Channel.purs b/test/Test/Threading.Channel.purs new file mode 100644 index 0000000..6f5de87 --- /dev/null +++ b/test/Test/Threading.Channel.purs @@ -0,0 +1,91 @@ +module Test.Threading.Channel where + +import Prelude + +import Control.Monad.Rec.Class (Step(..), tailRecM) +import Data.Array as Array +import Data.Maybe (Maybe(..), isNothing, maybe) +import Data.Traversable (traverse) +import Effect.Aff as Aff +import Effect.Class (liftEffect) +import Effect.Console as Console +import Test.Spec (Spec, describe, it) +import Test.Spec.Assertions (expectError, shouldEqual) +import Threading.Channel as Channel + +spec :: Spec Unit +spec = + describe "Threading.Channel" do + describe "channel" do + it "creates" $ liftEffect $ void $ Channel.channel + describe "receiver" do + it "creates" do + c <- liftEffect $ Channel.channel + void $ Channel.receiver c + describe "Sender" do + describe "send" do + it "does nothing when no receivers" do + c <- liftEffect $ Channel.channel + s <- liftEffect $ Channel.sender c + Channel.send s 0 + it "broadcasts to multiple receivers" do + c <- liftEffect $ Channel.channel + s <- liftEffect $ Channel.sender c + ra <- Channel.receiver c + rb <- Channel.receiver c + fiber <- Aff.forkAff $ traverse Channel.recv [ ra, rb ] + Channel.send s 100 + as <- Aff.joinFiber fiber + as `shouldEqual` [ 100, 100 ] + describe "Receiver" do + describe "recv" do + it "throws if multiple fibers blocking" do + c <- liftEffect $ Channel.channel + r <- Channel.receiver c + void $ Aff.forkAff $ Channel.recv r + expectError $ Channel.recv r + it "recv resolves with messages in the order they were sent" do + c <- liftEffect $ Channel.channel + s <- liftEffect $ Channel.sender c + r <- Channel.receiver c + Channel.send s $ Just 1 + Channel.send s $ Just 2 + Channel.send s $ Just 3 + Channel.send s $ Just 4 + fiber <- Aff.forkAff $ flip tailRecM [] \as -> maybe (Done as) (Loop <<< Array.snoc as) <$> Channel.recv r + Channel.send s $ Just 5 + Channel.send s Nothing + as <- Aff.joinFiber fiber + as `shouldEqual` [ 1, 2, 3, 4, 5 ] + it "blocks until a message is sent" do + c <- liftEffect $ Channel.channel + s <- liftEffect $ Channel.sender c + r <- Channel.receiver c + fiber <- Aff.forkAff $ Channel.recv r + Channel.send s 10 + a <- Aff.joinFiber fiber + a `shouldEqual` 10 + it "immediately resolves if a message buffered" do + c <- liftEffect $ Channel.channel + s <- liftEffect $ Channel.sender c + r <- Channel.receiver c + Channel.send s 10 + a <- Channel.recv r + a `shouldEqual` 10 + describe "tryRecv" do + it "returns Nothing when no data has been sent" do + c <- liftEffect $ Channel.channel + r <- Channel.receiver c + ma <- Channel.tryRecv r + isNothing ma `shouldEqual` true + it "returns Just when a message has been buffered" do + c <- liftEffect $ Channel.channel + s <- liftEffect $ Channel.sender c + r <- Channel.receiver c + Channel.send s 10 + ma <- Channel.tryRecv r + ma `shouldEqual` (Just 10) + describe "sender" do + it "creates" do + c <- liftEffect $ Channel.channel + void $ liftEffect $ Channel.sender c diff --git a/test/Test/Threading.Data.Mutex.purs b/test/Test/Threading.Data.Mutex.purs new file mode 100644 index 0000000..aae23ed --- /dev/null +++ b/test/Test/Threading.Data.Mutex.purs @@ -0,0 +1,157 @@ +module Test.Threading.Data.Mutex where + +import Prelude + +import Control.Monad.Error.Class (liftEither, liftMaybe) +import Control.Parallel (parOneOf) +import Data.Either (Either(..)) +import Data.Maybe (isNothing) +import Data.Time.Duration (Milliseconds(..)) +import Data.Traversable (for_) +import Effect.Aff as Aff +import Effect.Class (liftEffect) +import Effect.Exception (error) +import Effect.Ref as Ref +import Test.Spec (Spec, describe, it, pending') +import Test.Spec.Assertions (expectError, shouldEqual) +import Threading.Data.Mutex as Mutex + +spec :: Spec Unit +spec = + describe "Threading.Data.Mutex" do + describe "mutex" do + it "creates" $ liftEffect $ void $ Mutex.mutex 0 + describe "read" do + it "reads the value" do + m <- liftEffect $ Mutex.mutex 0 + g <- Mutex.lock m + v <- liftEffect $ Mutex.read g + v `shouldEqual` 0 + it "throws if released" do + m <- liftEffect $ Mutex.mutex 0 + g <- Mutex.lock m + liftEffect $ Mutex.release g + expectError $ liftEffect $ Mutex.read g + describe "write" do + it "writes the value" do + m <- liftEffect $ Mutex.mutex 0 + g <- Mutex.lock m + liftEffect $ Mutex.write g 1 + v <- liftEffect $ Mutex.read g + v `shouldEqual` 1 + it "throws if released" do + m <- liftEffect $ Mutex.mutex 0 + g <- Mutex.lock m + liftEffect $ Mutex.release g + expectError $ liftEffect $ Mutex.write g 1 + describe "get" do + it "yields immediately when unlocked" do + m <- liftEffect $ Mutex.mutex 0 + val <- Mutex.get m + val `shouldEqual` 0 + it "blocks until unlocked" do + m <- liftEffect $ Mutex.mutex 0 + g <- Mutex.lock m + getFiber <- Aff.forkAff $ Mutex.get m + liftEffect $ Mutex.write g 1 + liftEffect $ Mutex.release g + read <- Aff.joinFiber getFiber + read `shouldEqual` 1 + describe "put" do + it "yields immediately when unlocked" do + m <- liftEffect $ Mutex.mutex 0 + Mutex.put m 1 + val <- Mutex.get m + val `shouldEqual` 1 + it "blocks until unlocked" do + m <- liftEffect $ Mutex.mutex 0 + g <- Mutex.lock m + getFiber <- Aff.forkAff $ Mutex.put m 2 + liftEffect $ Mutex.write g 1 + liftEffect $ Mutex.release g + Aff.joinFiber getFiber + val <- Mutex.get m + val `shouldEqual` 2 + describe "modify" do + it "yields immediately when unlocked" do + m <- liftEffect $ Mutex.mutex 0 + val <- Mutex.modify m (_ + 1) + val `shouldEqual` 1 + it "blocks until unlocked" do + m <- liftEffect $ Mutex.mutex 0 + g <- Mutex.lock m + getFiber <- Aff.forkAff $ Mutex.modify m (_ * 10) + liftEffect $ Mutex.write g 1 + liftEffect $ Mutex.release g + val <- Aff.joinFiber getFiber + val `shouldEqual` 10 + describe "lock" do + it "yields immediately when unlocked" do + m <- liftEffect $ Mutex.mutex 0 + void $ Mutex.lock m + it "blocks when locked" do + m <- liftEffect $ Mutex.mutex 0 + g <- Mutex.lock m + finished <- liftEffect $ Ref.new false + fiber <- Aff.forkAff do + void $ Mutex.lock m + void $ liftEffect $ Ref.write true finished + Aff.delay $ Milliseconds 5.0 + f1 <- liftEffect $ Ref.read finished + f1 `shouldEqual` false + liftEffect $ Mutex.release g + Aff.joinFiber fiber + f2 <- liftEffect $ Ref.read finished + f2 `shouldEqual` true + it "locks are acquired in the order they were requested" do + m <- liftEffect $ Mutex.mutex 0 + g <- Mutex.lock m + a <- Aff.forkAff $ Mutex.modify_ m (_ + 1) -- 1 + b <- Aff.forkAff $ Mutex.modify_ m (_ * 10) -- 10 + c <- Aff.forkAff $ Mutex.modify_ m (_ + 10) -- 20 + d <- Aff.forkAff $ Mutex.modify_ m (_ * 10) -- 200 + liftEffect $ Mutex.release g + for_ [ a, b, c, d ] Aff.joinFiber + n <- Mutex.get m + n `shouldEqual` 200 + pending' "should be (eventually) unlocked if a fiber exits without releasing the lock" do + m <- liftEffect $ Mutex.mutex 0 + -- Fiber acquires a lock then immediately resolves without releasing. + -- + -- When the GC reclaims the guard object, the Mutex should notice and behave + -- as if it was explicitly released. + void $ Aff.forkAff $ void $ Mutex.lock m + liftEither =<< parOneOf + [ Aff.delay (Milliseconds 20000.0) $> Left (error "timed out waiting for GC to reclaim lock") + , Mutex.lock m $> Right unit + ] + describe "tryLock" do + it "returns Just when unlocked" do + m <- liftEffect $ Mutex.mutex 0 + void $ liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m) + it "returns Nothing when locked" do + m <- liftEffect $ Mutex.mutex 0 + _ <- liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m) + g <- liftEffect (Mutex.tryLock m) + isNothing g `shouldEqual` true + it "returns Just after release" do + m <- liftEffect $ Mutex.mutex 0 + g <- liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m) + liftEffect $ Mutex.release g + void $ liftMaybe (error $ "Mutex.tryLock returned Nothing after lock released") =<< liftEffect (Mutex.tryLock m) + describe "locked" do + it "is false when unlocked" do + m <- liftEffect $ Mutex.mutex 0 + l <- liftEffect $ Mutex.locked m + l `shouldEqual` false + it "is true when locked" do + m <- liftEffect $ Mutex.mutex 0 + _ <- liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m) + l <- liftEffect $ Mutex.locked m + l `shouldEqual` true + it "is false after lock released" do + m <- liftEffect $ Mutex.mutex 0 + g <- liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m) + liftEffect $ Mutex.release g + l' <- liftEffect $ Mutex.locked m + l' `shouldEqual` false diff --git a/test/Test/Threading.Data.RWLock.purs b/test/Test/Threading.Data.RWLock.purs new file mode 100644 index 0000000..56f3eb1 --- /dev/null +++ b/test/Test/Threading.Data.RWLock.purs @@ -0,0 +1,203 @@ +module Test.Threading.Data.RWLock where + +import Prelude + +import Control.Monad.Error.Class (liftMaybe) +import Data.Maybe (isNothing) +import Data.Time.Duration (Milliseconds(..)) +import Data.Traversable (for_) +import Effect.Aff as Aff +import Effect.Class (liftEffect) +import Effect.Console as Console +import Effect.Exception (error) +import Effect.Ref as Ref +import Test.Spec (Spec, describe, it) +import Test.Spec.Assertions (expectError, shouldEqual) +import Threading.Data.RWLock as RWLock + +spec :: Spec Unit +spec = + describe "Threading.Data.RWLock" do + describe "rwLock" do + it "creates" $ liftEffect $ void $ RWLock.rwLock 0 + describe "read" do + it "reads the value" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockRead m + v <- RWLock.read g + v `shouldEqual` 0 + it "throws if released" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockRead m + RWLock.release g + expectError $ RWLock.read g + describe "write" do + it "writes the value" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockWrite m + liftEffect $ RWLock.write g 1 + v <- RWLock.read g + v `shouldEqual` 1 + it "throws if released" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockWrite m + RWLock.release g + expectError $ liftEffect $ RWLock.write g 1 + describe "get" do + it "yields immediately when unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + val <- RWLock.get m + val `shouldEqual` 0 + it "blocks until unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockWrite m + getFiber <- Aff.forkAff $ RWLock.get m + liftEffect $ RWLock.write g 1 + RWLock.release g + read <- Aff.joinFiber getFiber + read `shouldEqual` 1 + describe "put" do + it "yields immediately when unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + RWLock.put m 1 + val <- RWLock.get m + val `shouldEqual` 1 + it "blocks until unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockWrite m + getFiber <- Aff.forkAff $ RWLock.put m 2 + liftEffect $ RWLock.write g 1 + RWLock.release g + Aff.joinFiber getFiber + val <- RWLock.get m + val `shouldEqual` 2 + describe "modify" do + it "yields immediately when unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + val <- RWLock.modify m (_ + 1) + val `shouldEqual` 1 + it "blocks until unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockWrite m + getFiber <- Aff.forkAff $ RWLock.modify m (_ * 10) + liftEffect $ RWLock.write g 1 + RWLock.release g + val <- Aff.joinFiber getFiber + val `shouldEqual` 10 + describe "lockRead" do + it "yields immediately when unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + void $ RWLock.lockRead m + it "blocks when write locked" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockWrite m + finished <- liftEffect $ Ref.new false + fiber <- Aff.forkAff do + void $ RWLock.lockRead m + void $ liftEffect $ Ref.write true finished + Aff.delay $ Milliseconds 5.0 + f1 <- liftEffect $ Ref.read finished + f1 `shouldEqual` false + RWLock.release g + Aff.joinFiber fiber + f2 <- liftEffect $ Ref.read finished + f2 `shouldEqual` true + it "does not block when read locked" do + m <- liftEffect $ RWLock.rwLock 0 + void $ Aff.forkAff $ void $ RWLock.lockRead m + void $ Aff.forkAff $ void $ RWLock.lockRead m + void $ RWLock.lockRead m + n <- RWLock.get m + n `shouldEqual` 0 + it "blocks when write locked" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockWrite m + finished <- liftEffect $ Ref.new false + fiber <- Aff.forkAff do + g' <- RWLock.lockRead m + liftEffect $ Ref.write true finished + RWLock.read g' + liftEffect $ RWLock.write g 1 + f <- liftEffect $ Ref.read finished + f `shouldEqual` false + RWLock.release g + n <- Aff.joinFiber fiber + n `shouldEqual` 1 + describe "lockWrite" do + it "yields immediately when unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + void $ RWLock.lockWrite m + it "blocks when write locked" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockWrite m + finished <- liftEffect $ Ref.new false + fiber <- Aff.forkAff do + void $ RWLock.lockWrite m + void $ liftEffect $ Ref.write true finished + Aff.delay $ Milliseconds 5.0 + f1 <- liftEffect $ Ref.read finished + f1 `shouldEqual` false + RWLock.release g + Aff.joinFiber fiber + f2 <- liftEffect $ Ref.read finished + f2 `shouldEqual` true + it "blocks when read locked" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockRead m + finished <- liftEffect $ Ref.new false + fiber <- Aff.forkAff do + void $ RWLock.lockWrite m + void $ liftEffect $ Ref.write true finished + Aff.delay $ Milliseconds 5.0 + f1 <- liftEffect $ Ref.read finished + f1 `shouldEqual` false + RWLock.release g + Aff.joinFiber fiber + f2 <- liftEffect $ Ref.read finished + f2 `shouldEqual` true + it "locks are acquired in the order they were requested" do + m <- liftEffect $ RWLock.rwLock 0 + g <- RWLock.lockWrite m + a <- Aff.forkAff $ RWLock.modify_ m (_ + 1) -- 1 + b <- Aff.forkAff $ RWLock.modify_ m (_ * 10) -- 10 + c <- Aff.forkAff $ RWLock.modify_ m (_ + 10) -- 20 + d <- Aff.forkAff $ RWLock.modify_ m (_ * 10) -- 200 + RWLock.release g + for_ [ a, b, c, d ] Aff.joinFiber + n <- RWLock.get m + n `shouldEqual` 200 + describe "tryLockWrite" do + it "returns Just when unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + void $ liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m + it "returns Nothing when locked" do + m <- liftEffect $ RWLock.rwLock 0 + _ <- liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m + g <- RWLock.tryLockWrite m + isNothing g `shouldEqual` true + it "returns Just after release" do + m <- liftEffect $ RWLock.rwLock 0 + g <- liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m + RWLock.release g + void $ liftMaybe (error $ "RWLock.tryLockWrite returned Nothing after lock released") =<< RWLock.tryLockWrite m + describe "locked" do + it "Unlocked" do + m <- liftEffect $ RWLock.rwLock 0 + l <- liftEffect $ RWLock.locked m + l `shouldEqual` RWLock.Unlocked + it "LockedWriting" do + m <- liftEffect $ RWLock.rwLock 0 + _ <- liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m + l <- liftEffect $ RWLock.locked m + l `shouldEqual` RWLock.LockedWriting + it "LockedReading" do + m <- liftEffect $ RWLock.rwLock 0 + _ <- liftMaybe (error $ "RWLock.tryLockRead returned Nothing on new mutex") =<< RWLock.tryLockRead m + l <- liftEffect $ RWLock.locked m + l `shouldEqual` RWLock.LockedReading + it "Unlocked after lock released" do + m <- liftEffect $ RWLock.rwLock 0 + g <- liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m + RWLock.release g + l' <- liftEffect $ RWLock.locked m + l' `shouldEqual` RWLock.Unlocked