fix: init

This commit is contained in:
orion 2024-07-16 12:55:45 -05:00
parent 22c966626a
commit a4368b944a
Signed by: orion
GPG Key ID: 6D4165AE4C928719
34 changed files with 1866 additions and 683 deletions

View File

@ -1 +1,2 @@
purescript 0.15.15
bun 1.1.18

View File

@ -1,28 +1,16 @@
# purescript-cbor-stream
# purescript-threading
Concurrency primitives inspired by python's multithreading and rust, allowing for
predictable concurrency with `Aff`
Type-safe bindings for the streaming API of `cbor-x`
## Use Cases
* Create a background worker thread
* Communicate between threads (`Threading.Channel`)
* Limit access to a resource _(eg. a database connection pool, file handle)_ to 1 concurrent actor (`Threading.RWLock`, `Threading.Mutex`)
* Coordinate concurrent threads, waiting for some common goal to be reached before continuing (`Threading.Barrier`)
* Create a pool of concurrent "threads" that can pull work from a queue, with graceful exiting and error handling
* Remotely kill a thread, or non-blockingly ask if it has exited
## Installing
```bash
spago install cbor-stream
{bun|yarn|npm|pnpm} install cbor-x
```
## Examples
### Convert a cbor-encoded dataset to csv
```purescript
import Pipes.Node.Stream as Pipes.Stream
import Pipes.Node.FS as Pipes.FS
import Pipes.Node.Buffer as Pipes.Buffer
import Pipes.CBOR as Pipes.CBOR
import Pipes.CSV as Pipes.CSV
import Pipes.Prelude ((>->))
import Pipes.Prelude as Pipes
Pipes.runEffect
$ Pipes.FS.read "foo.bin"
>-> Pipes.CBOR.decode @{id :: Int, name :: String}
>-> Pipes.CSV.stringify
>-> Pipes.FS.write "foo.csv"
spago install threading
```

View File

@ -22,8 +22,8 @@ await writeFile("./spago.yaml", spagonew);
const readme = await readFile("./README.md", "utf8");
const readmenew = readme.replace(
/packages\/purescript-cbor-stream\/.+?\//g,
`/packages/purescript-cbor-stream/${ver}/`,
/packages\/purescript-threading\/.+?\//g,
`/packages/purescript-threading/${ver}/`,
);
await writeFile("./README.md", readmenew);

View File

@ -1,12 +1,10 @@
{
"name": "purescript-cbor-stream",
"version": "v1.3.0",
"name": "purescript-threading",
"version": "v0.0.1",
"type": "module",
"dependencies": {
"cbor-x": "^1.5.9",
"decimal.js": "^10.4.3"
},
"dependencies": {},
"devDependencies": {
"typescript": "^5.4.5"
"typescript": "^5.4.5",
"bun": "1.1.18"
}
}

View File

@ -1,35 +1,25 @@
workspace:
packages:
cbor-stream:
threading:
path: ./
dependencies:
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- catenable-lists
- control
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- exceptions
- filterable
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- js-bigints: ">=2.2.1 <3.0.0"
- js-date: ">=8.0.0 <9.0.0"
- js-maps: ">=0.1.2 <0.2.0"
- maybe: ">=6.0.0 <7.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=2.1.0 <3.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
- simple-json: ">=9.0.0 <10.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- refs
- transformers: ">=6.0.0 <7.0.0"
- tuples
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console
- gen
@ -59,6 +49,7 @@ workspace:
- enums
- exceptions
- exists
- filterable
- fixed-points
- foldable-traversable
- foreign
@ -72,9 +63,7 @@ workspace:
- identity
- integers
- invariant
- js-bigints
- js-date
- js-maps
- lazy
- lcg
- lists
@ -85,7 +74,6 @@ workspace:
- node-event-emitter
- node-fs
- node-path
- node-stream-pipes
- node-streams
- node-zlib
- nonempty
@ -101,7 +89,6 @@ workspace:
- precise-datetime
- prelude
- profunctor
- profunctor-lenses
- quickcheck
- random
- record
@ -118,9 +105,508 @@ workspace:
- typelevel-prelude
- unfoldable
- unicode
- unordered-collections
- unsafe-coerce
- variant
package_set:
address:
registry: 53.3.0
compiler: ">=0.15.15 <0.16.0"
content:
abc-parser: 2.0.1
ace: 9.1.0
address-rfc2821: 0.1.1
aff: 7.1.0
aff-bus: 6.0.0
aff-coroutines: 9.0.0
aff-promise: 4.0.0
aff-retry: 2.0.0
affjax: 13.0.0
affjax-node: 1.0.0
affjax-web: 1.0.0
ansi: 7.0.0
apexcharts: 0.5.0
applicative-phases: 1.0.0
argonaut: 9.0.0
argonaut-aeson-generic: 0.4.1
argonaut-codecs: 9.1.0
argonaut-core: 7.0.0
argonaut-generic: 8.0.0
argonaut-traversals: 10.0.0
argparse-basic: 2.0.0
array-builder: 0.1.2
array-search: 0.6.0
arraybuffer: 13.2.0
arraybuffer-builder: 3.1.0
arraybuffer-types: 3.0.2
arrays: 7.3.0
arrays-extra: 0.6.1
arrays-zipper: 2.0.1
ask: 1.0.0
assert: 6.0.0
assert-multiple: 0.4.0
avar: 5.0.0
b64: 0.0.8
barbies: 1.0.1
barlow-lens: 0.9.0
bifunctors: 6.0.0
bigints: 7.0.1
bolson: 0.3.9
bookhound: 0.1.7
bower-json: 3.0.0
call-by-name: 4.0.1
canvas: 6.0.0
canvas-action: 9.0.0
cartesian: 1.0.6
catenable-lists: 7.0.0
cbor-stream: 1.3.0
chameleon: 1.0.0
chameleon-halogen: 1.0.3
chameleon-react-basic: 1.1.0
chameleon-styled: 2.5.0
chameleon-transformers: 1.0.0
channel: 1.0.0
checked-exceptions: 3.1.1
choku: 1.0.1
classless: 0.1.1
classless-arbitrary: 0.1.1
classless-decode-json: 0.1.1
classless-encode-json: 0.1.3
classnames: 2.0.0
codec: 6.1.0
codec-argonaut: 10.0.0
codec-json: 1.2.0
colors: 7.0.1
concur-core: 0.5.0
concur-react: 0.5.0
concurrent-queues: 3.0.0
console: 6.1.0
const: 6.0.0
contravariant: 6.0.0
control: 6.0.0
convertable-options: 1.0.0
coroutines: 7.0.0
css: 6.0.0
css-frameworks: 1.0.1
csv-stream: 2.3.0
data-mvc: 0.0.2
datetime: 6.1.0
datetime-parsing: 0.2.0
debounce: 0.1.0
debug: 6.0.2
decimals: 7.1.0
default-values: 1.0.1
deku: 0.9.23
deno: 0.0.5
dissect: 1.0.0
distributive: 6.0.0
dom-filereader: 7.0.0
dom-indexed: 12.0.0
dom-simple: 0.4.0
dotenv: 4.0.3
droplet: 0.6.0
dts: 1.0.0
dual-numbers: 1.0.2
dynamic-buffer: 3.0.1
echarts-simple: 0.0.1
effect: 4.0.0
either: 6.1.0
elmish: 0.11.4
elmish-enzyme: 0.1.1
elmish-hooks: 0.10.0
elmish-html: 0.8.3
elmish-testing-library: 0.3.2
email-validate: 7.0.0
encoding: 0.0.9
enums: 6.0.1
env-names: 0.4.0
error: 2.0.0
eta-conversion: 0.3.2
exceptions: 6.1.0
exists: 6.0.0
exitcodes: 4.0.0
expect-inferred: 3.0.0
ezfetch: 1.0.0
fahrtwind: 2.0.0
fallback: 0.1.0
fast-vect: 1.2.0
fetch: 4.1.0
fetch-argonaut: 1.0.1
fetch-core: 5.1.0
fetch-yoga-json: 1.1.0
ffi-simple: 0.5.1
fft-js: 0.1.0
filterable: 5.0.0
fix-functor: 0.1.0
fixed-points: 7.0.0
fixed-precision: 5.0.0
flame: 1.3.0
float32: 2.0.0
fmt: 0.2.1
foldable-traversable: 6.0.0
foldable-traversable-extra: 0.0.6
foreign: 7.0.0
foreign-object: 4.1.0
foreign-readwrite: 3.4.0
forgetmenot: 0.1.0
fork: 6.0.0
form-urlencoded: 7.0.0
formatters: 7.0.0
framer-motion: 1.0.1
free: 7.1.0
freeap: 7.0.0
freer-free: 0.0.1
freet: 7.0.0
functions: 6.0.0
functor1: 3.0.0
functors: 5.0.0
fuzzy: 0.4.0
gen: 4.0.0
generate-values: 1.0.1
generic-router: 0.0.1
geojson: 0.0.5
geometry-plane: 1.0.3
gojs: 0.1.1
grain: 3.0.0
grain-router: 3.0.0
grain-virtualized: 3.0.0
graphs: 8.1.0
group: 4.1.1
halogen: 7.0.0
halogen-bootstrap5: 5.3.2
halogen-canvas: 1.0.0
halogen-css: 10.0.0
halogen-echarts-simple: 0.0.4
halogen-formless: 4.0.3
halogen-helix: 1.0.0
halogen-hooks: 0.6.3
halogen-hooks-extra: 0.9.0
halogen-infinite-scroll: 1.1.0
halogen-store: 0.5.4
halogen-storybook: 2.0.0
halogen-subscriptions: 2.0.0
halogen-svg-elems: 8.0.0
halogen-typewriter: 1.0.4
halogen-vdom: 8.0.0
halogen-vdom-string-renderer: 0.5.0
halogen-xterm: 2.0.0
heckin: 2.0.1
heterogeneous: 0.6.0
homogeneous: 0.4.0
http-methods: 6.0.0
httpurple: 4.0.0
huffman: 0.4.0
humdrum: 0.0.1
hyrule: 2.3.8
identity: 6.0.0
identy: 4.0.1
indexed-db: 1.0.0
indexed-monad: 3.0.0
int64: 3.0.0
integers: 6.0.0
interpolate: 5.0.2
intersection-observer: 1.0.1
invariant: 6.0.0
jarilo: 1.0.1
jelly: 0.10.0
jelly-router: 0.3.0
jelly-signal: 0.4.0
jest: 1.0.0
js-abort-controller: 1.0.0
js-bigints: 2.2.1
js-date: 8.0.0
js-fetch: 0.2.1
js-fileio: 3.0.0
js-intl: 1.0.4
js-iterators: 0.1.1
js-maps: 0.1.2
js-promise: 1.0.0
js-promise-aff: 1.0.0
js-timers: 6.1.0
js-uri: 3.1.0
json: 1.1.0
json-codecs: 5.0.0
justifill: 0.5.0
jwt: 0.0.9
labeled-data: 0.2.0
language-cst-parser: 0.14.0
lazy: 6.0.0
lazy-joe: 1.0.0
lcg: 4.0.0
leibniz: 5.0.0
leveldb: 1.0.1
liminal: 1.0.1
linalg: 6.0.0
lists: 7.0.0
literals: 1.0.2
logging: 3.0.0
logging-journald: 0.4.0
lumi-components: 18.0.0
machines: 7.0.0
maps-eager: 0.5.0
marionette: 1.0.0
marionette-react-basic-hooks: 0.1.1
marked: 0.1.0
matrices: 5.0.1
matryoshka: 1.0.0
maybe: 6.0.0
media-types: 6.0.0
meowclient: 1.0.0
midi: 4.0.0
milkis: 9.0.0
minibench: 4.0.1
mmorph: 7.0.0
monad-control: 5.0.0
monad-logger: 1.3.1
monad-loops: 0.5.0
monad-unlift: 1.0.1
monoid-extras: 0.0.1
monoidal: 0.16.0
morello: 0.4.0
mote: 3.0.0
motsunabe: 2.0.0
mvc: 0.0.1
mysql: 6.0.1
n3: 0.1.0
nano-id: 1.1.0
nanoid: 0.1.0
naturals: 3.0.0
nested-functor: 0.2.1
newtype: 5.0.0
nextjs: 0.1.1
nextui: 0.2.0
node-buffer: 9.0.0
node-child-process: 11.1.0
node-event-emitter: 3.0.0
node-execa: 5.0.0
node-fs: 9.2.0
node-glob-basic: 1.3.0
node-http: 9.1.0
node-http2: 1.1.1
node-human-signals: 1.0.0
node-net: 5.1.0
node-os: 5.1.0
node-path: 5.0.0
node-process: 11.2.0
node-readline: 8.1.1
node-sqlite3: 8.0.0
node-stream-pipes: 2.1.5
node-streams: 9.0.0
node-tls: 0.3.1
node-url: 7.0.1
node-zlib: 0.4.0
nonempty: 7.0.0
now: 6.0.0
npm-package-json: 2.0.0
nullable: 6.0.0
numberfield: 0.1.0
numbers: 9.0.1
oak: 3.1.1
oak-debug: 1.2.2
object-maps: 0.3.0
ocarina: 1.5.4
open-folds: 6.3.0
open-memoize: 6.1.0
open-pairing: 6.1.0
options: 7.0.0
optparse: 5.0.1
ordered-collections: 3.2.0
ordered-set: 0.4.0
orders: 6.0.0
owoify: 1.2.0
pairs: 9.0.1
parallel: 7.0.0
parsing: 10.2.0
parsing-dataview: 3.2.4
partial: 4.0.0
pathy: 9.0.0
pha: 0.13.0
phaser: 0.7.0
phylio: 1.1.2
pipes: 8.0.0
pirates-charm: 0.0.1
pmock: 0.9.0
point-free: 1.0.0
pointed-list: 0.5.1
polymorphic-vectors: 4.0.0
posix-types: 6.0.0
postgresql: 2.0.17
precise: 6.0.0
precise-datetime: 7.0.0
prelude: 6.0.1
prettier-printer: 3.0.0
profunctor: 6.0.1
profunctor-lenses: 8.0.0
protobuf: 4.3.0
psa-utils: 8.0.0
psci-support: 6.0.0
punycode: 1.0.0
qualified-do: 2.2.0
quantities: 12.2.0
quickcheck: 8.0.1
quickcheck-combinators: 0.1.3
quickcheck-laws: 7.0.0
quickcheck-utf8: 0.0.0
random: 6.0.0
rationals: 6.0.0
rdf: 0.1.0
react: 11.0.0
react-aria: 0.2.0
react-basic: 17.0.0
react-basic-classic: 3.0.0
react-basic-dnd: 10.1.0
react-basic-dom: 6.1.0
react-basic-emotion: 7.1.0
react-basic-hooks: 8.2.0
react-basic-storybook: 2.0.0
react-dom: 8.0.0
react-halo: 3.0.0
react-icons: 1.1.5
react-markdown: 0.1.0
react-testing-library: 4.0.1
react-virtuoso: 1.0.0
reactix: 0.6.1
read: 1.0.1
recharts: 1.1.0
record: 4.0.0
record-extra: 5.0.1
record-ptional-fields: 0.1.2
record-studio: 1.0.4
refs: 6.0.0
remotedata: 5.0.1
repr: 0.5.0
resize-observer: 1.0.0
resource: 2.0.1
resourcet: 1.0.0
result: 1.0.3
return: 0.2.0
ring-modules: 5.0.1
rito: 0.3.4
roman: 0.4.0
rough-notation: 1.0.2
routing: 11.0.0
routing-duplex: 0.7.0
run: 5.0.0
safe-coerce: 2.0.0
safely: 4.0.1
school-of-music: 1.3.0
selection-foldable: 0.2.0
selective-functors: 1.0.1
semirings: 7.0.0
signal: 13.0.0
simple-emitter: 3.0.1
simple-i18n: 2.0.1
simple-json: 9.0.0
simple-json-generics: 0.2.1
simple-ulid: 3.0.0
sized-matrices: 1.0.0
sized-vectors: 5.0.2
slug: 3.0.8
small-ffi: 4.0.1
soundfonts: 4.1.0
sparse-matrices: 1.3.0
sparse-polynomials: 2.0.5
spec: 7.6.0
spec-discovery: 8.3.0
spec-mocha: 5.1.0
spec-quickcheck: 5.0.0
splitmix: 2.1.0
ssrs: 1.0.0
st: 6.2.0
statistics: 0.3.2
strictlypositiveint: 1.0.1
string-parsers: 8.0.0
strings: 6.0.1
strings-extra: 4.0.0
stringutils: 0.0.12
substitute: 0.2.3
supply: 0.2.0
svg-parser: 3.0.0
systemd-journald: 0.3.0
tagged: 4.0.2
tailrec: 6.1.0
tecton: 0.2.1
tecton-halogen: 0.2.0
test-unit: 17.0.0
thermite: 6.3.1
thermite-dom: 0.3.1
these: 6.0.0
toestand: 0.9.0
transformation-matrix: 1.0.1
transformers: 6.1.0
tree-rose: 4.0.2
ts-bridge: 4.0.0
tuples: 7.0.0
two-or-more: 1.0.0
type-equality: 4.0.1
typedenv: 2.0.1
typelevel: 6.0.0
typelevel-lists: 2.1.0
typelevel-peano: 1.0.1
typelevel-prelude: 7.0.0
typelevel-regex: 0.0.3
typelevel-rows: 0.1.0
typisch: 0.4.0
uint: 7.0.0
ulid: 3.0.1
uncurried-transformers: 1.1.0
undefined: 2.0.0
undefined-is-not-a-problem: 1.1.0
unfoldable: 6.0.0
unicode: 6.0.0
unique: 0.6.1
unlift: 1.0.1
unordered-collections: 3.1.0
unsafe-coerce: 6.0.0
unsafe-reference: 5.0.0
untagged-to-tagged: 0.1.4
untagged-union: 1.0.0
uri: 9.0.0
url-immutable: 1.0.0
uuid: 9.0.0
uuidv4: 1.0.0
validation: 6.0.0
variant: 8.0.0
variant-encodings: 2.0.0
vectorfield: 1.0.1
vectors: 2.1.0
versions: 7.0.0
visx: 0.0.2
web-clipboard: 5.0.0
web-cssom: 2.0.0
web-cssom-view: 0.1.0
web-dom: 6.0.0
web-dom-parser: 8.0.0
web-dom-xpath: 3.0.0
web-encoding: 3.0.0
web-events: 4.0.0
web-fetch: 4.0.1
web-file: 4.0.0
web-geometry: 0.1.0
web-html: 4.1.0
web-pointerevents: 2.0.0
web-proletarian: 1.0.0
web-promise: 3.2.0
web-resize-observer: 2.1.0
web-router: 1.0.0
web-socket: 4.0.0
web-storage: 5.0.0
web-streams: 4.0.0
web-touchevents: 4.0.0
web-uievents: 5.0.0
web-url: 2.0.0
web-workers: 1.1.0
web-xhr: 5.0.1
webextension-polyfill: 0.1.0
webgpu: 0.0.1
which: 2.0.0
xterm: 1.0.0
yoga-fetch: 1.0.1
yoga-json: 5.1.0
yoga-om: 0.1.0
yoga-postgres: 6.0.0
yoga-tree: 1.0.0
z3: 0.0.2
zipperarray: 2.0.0
extra_packages: {}
packages:
aff:
@ -311,8 +797,8 @@ packages:
- unfoldable
exceptions:
type: registry
version: 6.0.0
integrity: sha256-y/xTAEIZIARCE+50/u1di0ncebJ+CIwNOLswyOWzMTw=
version: 6.1.0
integrity: sha256-K0T89IHtF3vBY7eSAO7eDOqSb2J9kZGAcDN5+IKsF8E=
dependencies:
- effect
- either
@ -324,6 +810,17 @@ packages:
integrity: sha256-A0JQHpTfo1dNOj9U5/Fd3xndlRSE0g2IQWOGor2yXn8=
dependencies:
- unsafe-coerce
filterable:
type: registry
version: 5.0.0
integrity: sha256-cCojJHRnTmpY1j1kegI4CFwghdQ2Fm/8dzM8IlC+lng=
dependencies:
- arrays
- either
- foldable-traversable
- identity
- lists
- ordered-collections
fixed-points:
type: registry
version: 7.0.0
@ -480,14 +977,6 @@ packages:
dependencies:
- control
- prelude
js-bigints:
type: registry
version: 2.2.1
integrity: sha256-hKWZo9NxtsAaHmNXr6B8GY4c0olQbYLXPVGWm4TF2Ss=
dependencies:
- integers
- maybe
- prelude
js-date:
type: registry
version: 8.0.0
@ -499,24 +988,6 @@ packages:
- foreign
- integers
- now
js-maps:
type: registry
version: 0.1.2
integrity: sha256-xQDZf88nQEiZNmkCVEi3YQGB19hu6Oju6laEi8Os/oM=
dependencies:
- arrays
- either
- foldable-traversable
- functions
- js-bigints
- maybe
- prelude
- profunctor-lenses
- st
- strings
- tuples
- unfoldable
- unsafe-coerce
lazy:
type: registry
version: 6.0.0
@ -603,8 +1074,8 @@ packages:
- unsafe-coerce
node-fs:
type: registry
version: 9.1.0
integrity: sha256-TzhvGdrwcM0bazDvrWSqh+M/H8GKYf1Na6aGm2Qg4+c=
version: 9.2.0
integrity: sha256-Sg0vkXycEzkEerX6hLccz21Ygd9w1+QSk1thotRZPGI=
dependencies:
- datetime
- effect
@ -629,44 +1100,6 @@ packages:
integrity: sha256-pd82nQ+2l5UThzaxPdKttgDt7xlsgIDLpPG0yxDEdyE=
dependencies:
- effect
node-stream-pipes:
type: registry
version: 2.1.0
integrity: sha256-pYBOQY4bGEZzI5UHsUxJAhsKqtmE6CC1sHmFqgj64V8=
dependencies:
- aff
- arrays
- console
- control
- datetime
- effect
- either
- exceptions
- foldable-traversable
- foreign-object
- fork
- lists
- maybe
- mmorph
- newtype
- node-buffer
- node-event-emitter
- node-fs
- node-path
- node-streams
- node-zlib
- now
- ordered-collections
- parallel
- pipes
- prelude
- st
- strings
- tailrec
- transformers
- tuples
- unordered-collections
- unsafe-coerce
node-streams:
type: registry
version: 9.0.0
@ -751,8 +1184,8 @@ packages:
- prelude
parallel:
type: registry
version: 6.0.0
integrity: sha256-VJbkGD0rAKX+NUEeBJbYJ78bEKaZbgow+QwQEfPB6ko=
version: 7.0.0
integrity: sha256-gUC9i4Txnx9K9RcMLsjujbwZz6BB1bnE2MLvw4GIw5o=
dependencies:
- control
- effect
@ -852,31 +1285,6 @@ packages:
- newtype
- prelude
- tuples
profunctor-lenses:
type: registry
version: 8.0.0
integrity: sha256-K7f29rHRHgVSb2Y/PaSKtfYPriP6n87BJNO7EhsZHas=
dependencies:
- arrays
- bifunctors
- const
- control
- distributive
- either
- foldable-traversable
- foreign-object
- functors
- identity
- lists
- maybe
- newtype
- ordered-collections
- partial
- prelude
- profunctor
- record
- transformers
- tuples
quickcheck:
type: registry
version: 8.0.1
@ -1028,8 +1436,8 @@ packages:
- refs
transformers:
type: registry
version: 6.0.0
integrity: sha256-Pzw40HjthX77tdPAYzjx43LK3X5Bb7ZspYAp27wksFA=
version: 6.1.0
integrity: sha256-3Bm+Z6tsC/paG888XkywDngJ2JMos+JfOhRlkVfb7gI=
dependencies:
- control
- distributive
@ -1042,6 +1450,7 @@ packages:
- maybe
- newtype
- prelude
- st
- tailrec
- tuples
- unfoldable
@ -1083,21 +1492,6 @@ packages:
- foldable-traversable
- maybe
- strings
unordered-collections:
type: registry
version: 3.1.0
integrity: sha256-H2eQR+ylI+cljz4XzWfEbdF7ee+pnw2IZCeq69AuJ+Q=
dependencies:
- arrays
- enums
- functions
- integers
- lists
- prelude
- record
- tuples
- typelevel-prelude
- unfoldable
unsafe-coerce:
type: registry
version: 6.0.0

View File

@ -1,42 +1,32 @@
package:
name: cbor-stream
name: threading
publish:
version: '1.3.0'
version: '0.0.1'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
githubRepo: 'purescript-cbor-stream'
githubRepo: 'purescript-threading'
build:
strict: true
pedanticPackages: true
# pedanticPackages: true
dependencies:
- catenable-lists
- control
- exceptions
- filterable
- refs
- tuples
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- js-bigints: ">=2.2.1 <3.0.0"
- js-date: ">=8.0.0 <9.0.0"
- js-maps: ">=0.1.2 <0.2.0"
- maybe: ">=6.0.0 <7.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=2.1.0 <3.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
- simple-json: ">=9.0.0 <10.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main
dependencies:
@ -49,4 +39,6 @@ package:
- simple-json
- spec
workspace:
packageSet:
registry: 53.3.0
extraPackages: {}

View File

@ -1,158 +0,0 @@
module Data.CBOR where
import Prelude
import Control.Monad.Error.Class (liftMaybe, try)
import Control.Monad.Except (ExceptT(..), withExcept)
import Control.Monad.Except.Trans (runExceptT)
import Data.Array as Array
import Data.DateTime (DateTime)
import Data.Either (Either(..), isRight)
import Data.Foldable (class Foldable)
import Data.FoldableWithIndex (foldlWithIndex)
import Data.JSDate (JSDate)
import Data.JSDate as JSDate
import Data.Map (Map)
import Data.Maybe (Maybe(..))
import Data.Symbol (class IsSymbol, reflectSymbol)
import Data.Traversable (traverse)
import Foreign (F, Foreign, ForeignError(..), readArray, readNullOrUndefined, unsafeReadTagged, unsafeToForeign)
import Foreign.Index (readProp)
import JS.BigInt (BigInt)
import JS.Map (Map) as JS
import JS.Map as JS.Map
import Prim.Row as Row
import Prim.RowList (class RowToList, Cons, Nil, RowList)
import Record (get)
import Record.Builder (Builder)
import Record.Builder as Builder
import Simple.JSON (class ReadForeign, class WriteForeign, readImpl, writeImpl)
import Type.Prelude (Proxy(..))
class ReadCBOR :: Type -> Constraint
class ReadCBOR a where
readCBOR :: Foreign -> F a
class WriteCBOR :: Type -> Constraint
class WriteCBOR a where
writeCBOR :: a -> Foreign
instance ReadCBOR Foreign where
readCBOR = pure
else instance (RowToList r rl, ReadCBORFields rl () r) => ReadCBOR (Record r) where
readCBOR o = do
flip Builder.build {} <$> getFields (Proxy @rl) o
else instance ReadCBOR BigInt where
readCBOR = unsafeReadTagged "BigInt"
else instance ReadCBOR JSDate where
readCBOR = unsafeReadTagged "Date"
else instance ReadCBOR DateTime where
readCBOR a = do
date :: JSDate <- readCBOR a
liftMaybe (pure $ ForeignError $ "Invalid DateTime: " <> show date) $ JSDate.toDateTime date
else instance ReadCBOR a => ReadCBOR (Array a) where
readCBOR a = do
raws :: Array Foreign <- readArray a
traverse readCBOR raws
else instance ReadCBOR a => ReadCBOR (Maybe a) where
readCBOR a = do
isNull <- isRight <$> try (readNullOrUndefined a)
if isNull then
pure Nothing
else
Just <$> readCBOR @a a
else instance (ReadCBOR v) => ReadCBOR (JS.Map String v) where
readCBOR map = do
map' :: JS.Map String Foreign <- unsafeReadTagged "Map" map
foldlWithIndex (\k b v -> do
map'' <- b
v' <- readCBOR v
pure $ JS.Map.insert k v' map''
) (pure JS.Map.empty) map'
else instance (ReadForeign a) => ReadCBOR a where
readCBOR = readImpl
instance WriteCBOR Foreign where
writeCBOR = identity
else instance (RowToList r rl, WriteCBORFields rl r () to) => WriteCBOR (Record r) where
writeCBOR rec = unsafeToForeign $ Builder.build (writeImplFields (Proxy @rl) rec) {}
else instance WriteCBOR BigInt where
writeCBOR = unsafeToForeign
else instance WriteCBOR JSDate where
writeCBOR = unsafeToForeign
else instance WriteCBOR DateTime where
writeCBOR = unsafeToForeign <<< JSDate.fromDateTime
else instance (WriteCBOR k, WriteCBOR v) => WriteCBOR (JS.Map k v) where
writeCBOR = unsafeToForeign
else instance (WriteCBOR a) => WriteCBOR (Array a) where
writeCBOR as = unsafeToForeign $ writeCBOR <$> as
else instance (Foldable f, WriteCBOR a) => WriteCBOR (f a) where
writeCBOR as = unsafeToForeign $ writeCBOR $ Array.fromFoldable as
else instance (JS.Map.EncodeKey k, WriteCBOR k, WriteCBOR v) => WriteCBOR (Map k v) where
writeCBOR map = writeCBOR $ foldlWithIndex (\k m v -> JS.Map.insert k v m) JS.Map.empty map
else instance (WriteForeign a) => WriteCBOR a where
writeCBOR = writeImpl
applyEither :: forall e a b. Semigroup e => Either e (a -> b) -> Either e a -> Either e b
applyEither (Left e) (Right _) = Left e
applyEither (Left e1) (Left e2) = Left (e1 <> e2)
applyEither (Right _) (Left e) = Left e
applyEither (Right fun) (Right a) = Right (fun a)
exceptTApply :: forall a b e m. Semigroup e => Applicative m => ExceptT e m (a -> b) -> ExceptT e m a -> ExceptT e m b
exceptTApply fun a = ExceptT $ applyEither
<$> runExceptT fun
<*> runExceptT a
class ReadCBORFields (xs :: RowList Type) (from :: Row Type) (to :: Row Type)
| xs -> from to where
getFields :: Proxy xs
-> Foreign
-> F (Builder (Record from) (Record to))
instance readFieldsCons ::
( IsSymbol name
, ReadCBOR ty
, ReadCBORFields tail from from'
, Row.Lacks name from'
, Row.Cons name ty from' to
) => ReadCBORFields (Cons name ty tail) from to where
getFields _ obj = (compose <$> first) `exceptTApply` rest
where
first = do
value <- withExcept' (readCBOR =<< readProp name obj)
pure $ Builder.insert nameP value
rest = getFields tailP obj
nameP = Proxy :: Proxy name
tailP = Proxy :: Proxy tail
name = reflectSymbol nameP
withExcept' = withExcept <<< map $ ErrorAtProperty name
instance readFieldsNil ::
ReadCBORFields Nil () () where
getFields _ _ =
pure identity
class WriteCBORFields (rl :: RowList Type) row (from :: Row Type) (to :: Row Type)
| rl -> row from to where
writeImplFields :: forall g. g rl -> Record row -> Builder (Record from) (Record to)
instance consWriteCBORFields ::
( IsSymbol name
, WriteCBOR ty
, WriteCBORFields tail row from from'
, Row.Cons name ty whatever row
, Row.Lacks name from'
, Row.Cons name Foreign from' to
) => WriteCBORFields (Cons name ty tail) row from to where
writeImplFields _ rec = result
where
namep = Proxy :: Proxy name
value = writeCBOR $ get namep rec
tailp = Proxy :: Proxy tail
rest = writeImplFields tailp rec
result = Builder.insert namep value <<< rest
instance nilWriteCBORFields ::
WriteCBORFields Nil row () () where
writeImplFields _ _ = identity

View File

@ -1,7 +0,0 @@
import {decode, encode} from 'cbor-x'
/** @type {(a: Buffer) => () => unknown} */
export const decodeImpl = buf => () => decode(buf)
/** @type {(a: unknown) => () => Buffer} */
export const encodeImpl = buf => () => encode(buf)

View File

@ -1,21 +0,0 @@
module Effect.CBOR where
import Prelude
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Data.Bifunctor (lmap)
import Data.CBOR (class ReadCBOR, class WriteCBOR, readCBOR, writeCBOR)
import Effect (Effect)
import Effect.Exception (error)
import Foreign (Foreign)
import Node.Buffer (Buffer)
foreign import decodeImpl :: Buffer -> Effect Foreign
foreign import encodeImpl :: Foreign -> Effect Buffer
decode :: forall a. ReadCBOR a => Buffer -> Effect a
decode = (liftEither <<< lmap (error <<< show) <<< runExcept <<< readCBOR) <=< decodeImpl
encode :: forall a. WriteCBOR a => a -> Effect Buffer
encode = encodeImpl <<< writeCBOR

8
src/JS.Finalization.js Normal file
View File

@ -0,0 +1,8 @@
/** @type {<T>(cb: (t: T) => void) => () => FinalizationRegistry<T>} */
export const registry = (cb) => () => new FinalizationRegistry(cb);
/** @type {<T>(f: FinalizationRegistry<T>) => <O extends WeakKey>(a: WeakRef<O>) => (b: T) => () => void} */
export const register = (fin) => (a) => (b) => () => fin.register(a, b);
/** @type {<T>(f: FinalizationRegistry<T>) => <O extends WeakKey>(a: WeakRef<O>) => () => void} */
export const unregister = (fin) => (a) => () => fin.unregister(a);

15
src/JS.Finalization.purs Normal file
View File

@ -0,0 +1,15 @@
module JS.Drop where
import Prelude
import Effect (Effect)
import JS.WeakRef (WeakRef)
type Registry_ a = Registry a Unit
foreign import data Registry :: Type -> Type -> Type
foreign import registry :: forall a b. (b -> Effect Unit) -> Effect (Registry a b)
foreign import register :: forall a b. Registry a b -> WeakRef a -> b -> Effect Unit
foreign import unregister :: forall a b. Registry a b -> WeakRef a -> Effect Unit

5
src/JS.WeakRef.js Normal file
View File

@ -0,0 +1,5 @@
/** @type {<T extends WeakKey>(_: T) => () => WeakRef<T>} */
export const make = (a) => () => new WeakRef(a);
/** @type {<T extends WeakKey>(_: WeakRef<T>) => () => T | undefined} */
export const _deref = (a) => () => a.deref();

17
src/JS.WeakRef.purs Normal file
View File

@ -0,0 +1,17 @@
module JS.WeakRef where
import Prelude
import Data.Maybe (Maybe)
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Effect (Effect)
foreign import data WeakRef :: Type -> Type
foreign import make :: forall a. a -> Effect (WeakRef a)
deref :: forall a. WeakRef a -> Effect (Maybe a)
deref = map Nullable.toMaybe <<< _deref
foreign import _deref :: forall a. WeakRef a -> Effect (Nullable a)

View File

@ -1,7 +0,0 @@
import { DecoderStream } from "cbor-x";
/** @type {(s: import('cbor-x').Options) => () => DecoderStream} */
export const makeImpl = (c) => () => new DecoderStream({useRecords: false, ...c, allowHalfOpen: true});
/** @type {(s: DecoderStream) => () => unknown | null} */
export const readImpl = (p) => () => p.read();

View File

@ -1,52 +0,0 @@
module Node.Stream.CBOR.Decode where
import Prelude hiding (join)
import Data.Nullable (Nullable)
import Effect (Effect)
import Effect.Uncurried (mkEffectFn1)
import Foreign (Foreign)
import Foreign.Object (Object)
import Node.Buffer (Buffer)
import Node.EventEmitter (EventHandle(..))
import Node.EventEmitter.UtilTypes (EventHandle1)
import Node.Stream (Read, Stream, Write)
import Node.Stream.CBOR.Options (F32, Options, prepareOptions)
import Node.Stream.Object (Transform) as Object
import Prim.Row (class Nub, class Union)
import Unsafe.Coerce (unsafeCoerce)
data CBORDecode
-- | CBOR decoding transform stream
-- |
-- | Accepts unencoded `Buffer` chunks, and transforms them
-- | to JS values.
type CBORDecoder :: Row Type -> Type
type CBORDecoder r = Stream (read :: Read, write :: Write, cbor :: CBORDecode | r)
make
:: forall r missing extra minimal minimalExtra
. Union r missing (Options extra)
=> Union r (useFloat32 :: F32) minimal
=> Nub minimal (useFloat32 :: F32 | minimalExtra)
=> { | r }
-> Effect (CBORDecoder ())
make = makeImpl <<< prepareOptions @r @missing
toObjectStream :: forall r. CBORDecoder r -> Object.Transform Buffer Foreign
toObjectStream = unsafeCoerce
-- | `data` event. Emitted when a CSV record has been parsed.
dataH :: forall a. EventHandle1 (CBORDecoder a) Foreign
dataH = EventHandle "data" mkEffectFn1
-- | FFI
foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)
-- | FFI
foreign import readImpl :: forall r. Stream r -> Effect (Nullable Foreign)
-- | FFI
recordToForeign :: forall r. Record r -> Object Foreign
recordToForeign = unsafeCoerce

View File

@ -1,7 +0,0 @@
import { EncoderStream } from "cbor-x";
/** @type {(s: import('cbor-x').Options) => () => EncoderStream} */
export const makeImpl = (c) => () => new EncoderStream({useRecords: false, ...c, allowHalfOpen: true});
/** @type {(s: EncoderStream) => (a: unknown) => () => void} */
export const writeImpl = (s) => (a) => () => s.write(a);

View File

@ -1,49 +0,0 @@
module Node.Stream.CBOR.Encode where
import Prelude
import Data.CBOR (class WriteCBOR, writeCBOR)
import Effect (Effect)
import Foreign (Foreign)
import Foreign.Object (Object)
import Node.Buffer (Buffer)
import Node.Stream (Read, Stream, Write)
import Node.Stream.CBOR.Options (F32, Options, prepareOptions)
import Node.Stream.Object (Transform) as Object
import Prim.Row (class Nub, class Union)
import Unsafe.Coerce (unsafeCoerce)
data CBOREncode
type CBOREncoder :: Row Type -> Type
type CBOREncoder r = Stream (read :: Read, write :: Write, csv :: CBOREncode | r)
foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)
foreign import writeImpl :: forall r. Stream r -> Foreign -> Effect Unit
recordToForeign :: forall r. Record r -> Object Foreign
recordToForeign = unsafeCoerce
-- | Create a raw Transform stream that accepts chunks of `Array String`,
-- | and transforms them into string CSV rows.
-- |
-- | Requires an ordered array of column names.
make
:: forall r missing extra minimal minimalExtra
. Union r missing (Options extra)
=> Union r (useFloat32 :: F32) minimal
=> Nub minimal (useFloat32 :: F32 | minimalExtra)
=> { | r }
-> Effect (CBOREncoder ())
make = makeImpl <<< prepareOptions @r @missing
-- | Convert the raw stream to a typed ObjectStream
toObjectStream :: CBOREncoder () -> Object.Transform Foreign Buffer
toObjectStream = unsafeCoerce
-- | Write a record to a CSVStringifier.
-- |
-- | The record will be emitted on the `Readable` end
-- | of the stream as a string chunk.
write :: forall a r. WriteCBOR a => CBOREncoder r -> a -> Effect Unit
write s a = writeImpl s $ writeCBOR a

View File

@ -1,11 +0,0 @@
import {FLOAT32_OPTIONS} from 'cbor-x'
/** @type {<F32>(o: {round: (_a: F32) => boolean, fit: (_a: F32) => boolean, always: (_a: F32) => boolean}) => (f: F32) => FLOAT32_OPTIONS} */
export const f32ToConst = ({round, fit, always}) => a =>
round(a)
? FLOAT32_OPTIONS.ALWAYS
: fit(a)
? FLOAT32_OPTIONS.DECIMAL_FIT
: round(a)
? FLOAT32_OPTIONS.DECIMAL_ROUND
: FLOAT32_OPTIONS.NEVER

View File

@ -1,50 +0,0 @@
module Node.Stream.CBOR.Options where
import Prelude
import Foreign (Foreign, unsafeToForeign)
import Prim.Row (class Nub, class Union)
import Record (merge, modify)
import Type.Prelude (Proxy(..))
data F32
= F32Always
| F32DecimalRound
| F32DecimalFit
| F32Never
derive instance Eq F32
foreign import data CBORStruct :: Type
foreign import f32ToConst :: {always :: F32 -> Boolean, round :: F32 -> Boolean, fit :: F32 -> Boolean} -> F32 -> Foreign
type Options r =
( useRecords :: Boolean
, structures :: Array CBORStruct
, structuredClone :: Boolean
, mapsAsObject :: Boolean
, useFloat32 :: F32
, alwaysUseFloat :: Boolean
, pack :: Boolean
, variableMapSize :: Boolean
, copyBuffers :: Boolean
, bundleStrings :: Boolean
, useTimestamp32 :: Boolean
, largeBigIntToFloat :: Boolean
, useTag259ForMaps :: Boolean
, tagUint8Array :: Boolean
, int64AsNumber :: Boolean
| r
)
prepareOptions
:: forall @r @missing extra minimal minimalExtra
. Union r missing (Options extra)
=> Union r (useFloat32 :: F32) minimal
=> Nub minimal (useFloat32 :: F32 | minimalExtra)
=> { | r }
-> Foreign
prepareOptions a =
unsafeToForeign
$ modify (Proxy @"useFloat32") (f32ToConst {fit: eq F32DecimalFit, round: eq F32DecimalRound, always: eq F32Always})
$ merge a {useFloat32: F32Never}

View File

@ -1,47 +0,0 @@
module Pipes.CBOR where
import Prelude
import Control.Monad.Error.Class (class MonadThrow, liftEither)
import Control.Monad.Except (runExcept)
import Control.Monad.Rec.Class (class MonadRec)
import Data.Bifunctor (lmap)
import Data.CBOR (class ReadCBOR, class WriteCBOR, readCBOR, writeCBOR)
import Data.Maybe (Maybe)
import Data.Traversable (traverse)
import Effect.Aff.Class (class MonadAff)
import Effect.Exception (Error, error)
import Node.Buffer (Buffer)
import Node.Stream.CBOR.Decode as CBOR.Decode
import Node.Stream.CBOR.Encode as CBOR.Encode
import Pipes.Async (AsyncPipe, bindIO, mapIO)
import Pipes.Node.Stream as Pipes.Stream
-- | Transforms buffer chunks of a CBOR file to parsed values
-- | of type `a`.
decode
:: forall m @a
. MonadRec m
=> MonadAff m
=> MonadThrow Error m
=> ReadCBOR a
=> AsyncPipe (Maybe Buffer) (Maybe a) m Unit
decode = do
let
decoder = Pipes.Stream.fromTransformEffect $ CBOR.Decode.toObjectStream <$> CBOR.Decode.make {}
parse = liftEither <<< lmap (error <<< show) <<< runExcept <<< readCBOR @a
bindIO pure (traverse parse) decoder
-- | Encode purescript values as CBOR buffers
encode
:: forall m a
. MonadAff m
=> MonadThrow Error m
=> MonadRec m
=> WriteCBOR a
=> AsyncPipe (Maybe a) (Maybe Buffer) m Unit
encode =
let
p = Pipes.Stream.fromTransformEffect $ CBOR.Encode.toObjectStream <$> CBOR.Encode.make {}
in
mapIO (map writeCBOR) identity p

1
src/Threading.Ath.purs Normal file
View File

@ -0,0 +1 @@
module Threading.Ath where

View File

@ -0,0 +1,35 @@
module Threading.Barrier (Barrier, barrier, wait) where
import Prelude
import Data.Array as Array
import Data.Either (Either(..))
import Data.Foldable (sequence_)
import Effect (Effect)
import Effect.Aff (Aff)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Ref (Ref)
import Effect.Ref as Ref
import Type.Function (type ($))
-- | A barrier enables multiple threads to synchronize the beginning of some computation.
data Barrier = Barrier Int (Ref $ Array $ Effect Unit)
-- | Create a new barrier that will only unblock waiting threads
-- | when `n` threads are waiting (including this one)
barrier :: Int -> Effect Barrier
barrier n = Barrier n <$> Ref.new []
-- | Wait until the provided number of threads
-- | are also `wait`ing
wait :: Barrier -> Aff Unit
wait (Barrier n wakersRef) = do
wakers <- liftEffect $ Ref.read wakersRef
if n <= 1 then
pure unit
else if Array.length wakers == (n - 1) then
liftEffect $ sequence_ wakers
else Aff.makeAff \cb -> do
Ref.modify_ (_ <> [ cb $ Right unit ]) wakersRef
pure $ Aff.nonCanceler

163
src/Threading.Channel.purs Normal file
View File

@ -0,0 +1,163 @@
module Threading.Channel
( Channel
, Sender
, Receiver
, recv
, tryRecv
, send
, peek
, tryPeek
, channel
, sender
, receiver
) where
import Prelude
import Control.Monad.Error.Class (throwError)
import Data.Array as Array
import Data.CatList (CatList)
import Data.CatList as CatList
import Data.Either (Either(..))
import Data.Maybe (Maybe(..), isJust, maybe)
import Data.Traversable (for)
import Data.Tuple (fst)
import Data.Tuple.Nested ((/\))
import Data.Witherable (wither)
import Effect (Effect)
import Effect.Aff (Aff)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Exception (error)
import JS.WeakRef (WeakRef)
import JS.WeakRef as WeakRef
import Threading.Data.Mutex (Mutex)
import Threading.Data.Mutex as Mutex
import Type.Function (type ($))
-- | A multi-producer multi-consumer channel for communication
-- | between threads.
-- |
-- | Senders will broadcast messages to all living receivers,
-- | doing nothing if there are no receivers.
-- |
-- | Receivers can wait for messages to be sent. Messages that
-- | are sent while the receiver is not waiting will be buffered,
-- | and `recv`d in the order they were sent.
data Channel a = Channel (Mutex $ Array $ WeakRef $ Receiver a)
data Sender a = Sender (Channel a)
data Receiver a = Receiver (Mutex $ Maybe (a -> Effect Unit)) (Mutex $ CatList a)
-- | Create a new channel
channel :: forall a. Effect (Channel a)
channel = do
recvs <- Mutex.mutex []
pure $ Channel recvs
-- | Create a new message receiver
receiver :: forall a. Channel a -> Aff (Receiver a)
receiver (Channel recvsRef) = do
g <- Mutex.lock recvsRef
liftEffect do
queue <- Mutex.mutex CatList.empty
wake <- Mutex.mutex Nothing
recvs <- Mutex.read g
let r = Receiver wake queue
recvWeak <- WeakRef.make r
Mutex.write g $ Array.cons recvWeak recvs
Mutex.release g
pure r
-- | Create a new message sender
sender :: forall a. Channel a -> Effect (Sender a)
sender c = pure $ Sender c
-- | Send a message to all living receivers
send :: forall a. Sender a -> a -> Aff Unit
send (Sender (Channel recvsRef)) a = do
recvsG <- Mutex.lock recvsRef
recvWeaks <- liftEffect $ Mutex.read recvsG
recvs <- liftEffect $ wither WeakRef.deref recvWeaks
void $ for recvs \(Receiver wakeRef queueRef) -> do
wakeG <- Mutex.lock wakeRef
wake <- liftEffect $ Mutex.read wakeG
queueG <- Mutex.lock queueRef
head /\ tail <-
liftEffect (Mutex.read queueG)
<#> CatList.uncons
<#> maybe (a /\ CatList.empty) (\(head /\ tail) -> head /\ CatList.snoc tail a)
let
q = CatList.cons head tail
liftEffect do
maybe (Mutex.write queueG q) (\f -> Mutex.write queueG tail *> f head) wake
Mutex.release wakeG
Mutex.release queueG
liftEffect $ Mutex.release recvsG
-- | Read a queued message and pop it from the queue.
-- |
-- | If no queued messages have been sent, returns Nothing.
tryRecv :: forall a. Receiver a -> Aff (Maybe a)
tryRecv (Receiver _ queueRef) = do
queueG <- Mutex.lock queueRef
queueM <- CatList.uncons <$> liftEffect (Mutex.read queueG)
for queueM \(a /\ tail) -> liftEffect $ Mutex.write queueG tail *> Mutex.release queueG $> a
-- | Block until a message is sent, and pop it from the queue.
-- |
-- | If a message has been sent since the
-- | last call to `recv`, then it will
-- | be immediately popped & returned.
recv :: forall a. Receiver a -> Aff a
recv (Receiver wakeRef queueRef) = do
wakeG <- Mutex.lock wakeRef
queueG <- Mutex.lock queueRef
liftEffect
$ whenM (isJust <$> Mutex.read wakeG)
$ throwError
$ error "Receiver has been shared between multiple fibers, which is not supported."
queueM <- liftEffect $ CatList.uncons <$> Mutex.read queueG
case queueM of
Just (a /\ tail) -> liftEffect do
Mutex.write queueG tail
Mutex.release wakeG
Mutex.release queueG
pure a
Nothing -> Aff.makeAff \cb -> do
Mutex.write wakeG $ Just $ cb <<< Right
Mutex.release wakeG
Mutex.release queueG
pure $ Aff.Canceler $ const $ Mutex.put wakeRef Nothing
-- | Read a queued message without altering the queue.
-- |
-- | If no queued messages have been sent, returns Nothing.
tryPeek :: forall a. Receiver a -> Aff (Maybe a)
tryPeek (Receiver _ queueRef) = map fst <$> CatList.uncons <$> Mutex.get queueRef
-- | Block until a message is sent, and read
-- | it without removing it from the queue.
-- |
-- | If a message has been sent since the
-- | last call to `recv`, then it will
-- | be immediately returned.
peek :: forall a. Receiver a -> Aff a
peek (Receiver wakeRef queueRef) = do
wakeG <- Mutex.lock wakeRef
queueM <- CatList.uncons <$> Mutex.get queueRef
liftEffect
$ whenM (isJust <$> Mutex.read wakeG)
$ throwError
$ error "Receiver has been shared between multiple fibers, which is not supported."
case queueM of
Just (a /\ _) -> liftEffect $ Mutex.release wakeG $> a
Nothing -> Aff.makeAff \cb -> do
Mutex.write wakeG $ Just $ cb <<< Right
Mutex.release wakeG
pure $ Aff.Canceler $ const $ Mutex.put wakeRef Nothing

133
src/Threading.Data.Mutex.js Normal file
View File

@ -0,0 +1,133 @@
/**
* @template T
* @typedef {(g: Guard<T>) => () => void}
* Waker
*/
/** @template T */
class Guard {
released = false;
/**
* @param {Mutex<T>} mutex
* @param {() => void} onExplicitRelease
*/
constructor(mutex, onExplicitRelease) {
this.mutex = mutex;
this.cb = onExplicitRelease;
}
read() {
if (this.released) {
throw new Error("Guard#read after explicit release");
}
return this.mutex.a;
}
/** @param {T} a */
write(a) {
if (this.released) {
throw new Error("Guard#write after explicit release");
}
this.mutex.a = a;
}
release() {
if (!this.released) {
this.released = true;
this.cb();
}
}
}
/** @template T */
class Mutex {
/** @type {WeakRef<Guard<T>> | undefined} */
guard = undefined;
/** @type {Array<(g: Guard<T>) => () => void>} */
wakers = [];
/** @type {FinalizationRegistry<undefined>} */
cleanup = new FinalizationRegistry(() => this._guardReleased());
/**
* @param {T} a
*/
constructor(a) {
this.a = a;
}
_guardReleased() {
this.guard = undefined;
const wake = this.wakers.shift();
if (wake) {
wake(this._newGuard())();
}
}
_newGuard() {
const g = new Guard(this, () => {
if (!this.guard) throw new Error("unreachable");
this.cleanup.unregister(this.guard);
this._guardReleased();
});
this.guard = new WeakRef(g);
this.cleanup.register(g, undefined);
return g;
}
locked() {
return !!this.guard;
}
/** @param {Waker<T>} cb */
lock(cb) {
if (!this.guard) {
cb(this._newGuard())();
return undefined;
} else {
this.wakers.push(cb);
return cb;
}
}
/** @param {Waker<T>} cb */
releaseWaker(cb) {
const ix = this.wakers.indexOf(cb);
if (ix > -1) {
this.wakers.splice(ix, 1);
}
}
tryLock() {
if (!this.guard) {
return this._newGuard();
}
}
}
/** @type {<T>(t: T) => () => Mutex<T>} */
export const _make = (a) => () => new Mutex(a);
/** @type {<T>(mutex: Mutex<T>) => (cb: Waker<T>) => () => Waker<T> | undefined} */
export const _lock = (mutex) => (cb) => () => mutex.lock(cb);
/** @type {<T>(mutex: Mutex<T>) => () => boolean} */
export const _locked = (mutex) => () => mutex.locked();
/** @type {<T>(mutex: Mutex<T>) => () => Guard<T> | undefined} */
export const _tryLock = (mutex) => () => mutex.tryLock();
/** @type {<T>(mutex: Mutex<T>) => (cb: Waker<T>) => () => void} */
export const _releaseWaker = (mutex) => (cb) => () => mutex.releaseWaker(cb);
/** @type {<T>(guard: Guard<T>) => () => void} */
export const _guardRelease = (g) => () => g.release();
/** @type {<T>(guard: Guard<T>) => () => T} */
export const _guardRead = (g) => () => g.read();
/** @type {<T>(guard: Guard<T>) => (t: T) => () => void} */
export const _guardWrite = (g) => (a) => () => g.write(a);

View File

@ -0,0 +1,140 @@
-- | A Mutex allows any number of threads to share mutable
-- | state, with at most 1 thread having read or write access
-- | at a time.
-- |
-- | Threads can access the data with `lock` or `tryLock`,
-- | which both return a `Guard`.
-- |
-- | The holder of a `Guard` is guaranteed exclusive read &
-- | write access to the data contained in the `Mutex`.
module Threading.Data.Mutex
( Mutex
, Guard
, mutex
, lock
, tryLock
, locked
, release
, modify
, modify_
, write
, read
, get
, put
) where
import Prelude
import Data.Either (Either(..))
import Data.Maybe (Maybe(..))
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Effect (Effect)
import Effect.Aff (Aff)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
foreign import data Waker :: Type
foreign import data Mutex :: Type -> Type
-- | A lock to a Mutex.
-- |
-- | Guards may be read from, written to, and released. Guards **must** be
-- | released in order for other blocking threads to continue.
-- |
-- | _Note: If a Guard reclaimed by the garbage collector without being released,
-- | its Mutex will notice and behave as if the Guard was explicitly released.
-- | This will hopefully catch deadlocks caused by threads that have exited
-- | while holding a Guard._
foreign import data Guard :: Type -> Type
foreign import _make :: forall a. a -> Effect (Mutex a)
foreign import _locked :: forall a. Mutex a -> Effect Boolean
foreign import _lock :: forall a. Mutex a -> (Guard a -> Effect Unit) -> Effect (Nullable Waker)
foreign import _tryLock :: forall a. Mutex a -> Effect (Nullable (Guard a))
foreign import _releaseWaker :: forall a. Mutex a -> Waker -> Effect Unit
foreign import _guardRead :: forall a. Guard a -> Effect a
foreign import _guardWrite :: forall a. Guard a -> a -> Effect Unit
foreign import _guardRelease :: forall a. Guard a -> Effect Unit
-- | Create a new Mutex
mutex :: forall a. a -> Effect (Mutex a)
mutex = _make
-- | Is the Mutex currently locked?
locked :: forall a. Mutex a -> Effect Boolean
locked = _locked
-- | Attempt to acquire a lock without blocking.
-- |
-- | If the Mutex is currently locked, this will return `Nothing`.
tryLock :: forall a. Mutex a -> Effect (Maybe (Guard a))
tryLock = map Nullable.toMaybe <<< _tryLock
-- | Acquire a lock, blocking if another thread
-- | currently holds a lock.
-- |
-- | If multiple threads invoke `lock`, they will
-- | be unlocked in the order that they blocked on `lock`.
lock :: forall a. Mutex a -> Aff (Guard a)
lock m = Aff.makeAff \cb -> do
waker <- Nullable.toMaybe <$> _lock m (cb <<< Right)
pure $ case waker of
Just w -> Aff.effectCanceler $ _releaseWaker m w
Nothing -> Aff.nonCanceler
-- | Take a snapshot of the value in a Mutex
-- |
-- | This is a shorthand for acquiring a lock, reading it,
-- | then immediately releasing the lock.
get :: forall a. Mutex a -> Aff a
get m = do
g <- lock m
a <- liftEffect $ read g <* release g
pure a
-- | Write a new value to a Mutex
-- |
-- | This is a shorthand for acquiring a lock, writing to it,
-- | then immediately releasing the lock.
put :: forall a. Mutex a -> a -> Aff Unit
put m a = do
g <- lock m
liftEffect $ write g a *> release g
-- | Modify the value contained in a Mutex
-- |
-- | This is a shorthand for acquiring a lock,
-- | reading from it, writing to it, then
-- | immediately releasing it.
-- |
-- | Returns the new value.
modify :: forall a. Mutex a -> (a -> a) -> Aff a
modify m f = do
g <- lock m
liftEffect $ ((f <$> read g) >>= (\a -> write g a *> release g $> a))
-- | `modify` with its return value ignored.
modify_ :: forall a. Mutex a -> (a -> a) -> Aff Unit
modify_ m f = void $ modify m f
-- | Release the lock
-- |
-- | Attempting to `read` or `write` this `Guard`
-- | will throw an exception.
-- |
-- | Repeated invocations of `release` are ignored.
release :: forall a. Guard a -> Effect Unit
release = _guardRelease
-- | Read the value in the Mutex via the Guard
read :: forall a. Guard a -> Effect a
read = _guardRead
-- | Write a new value into the Mutex via the Guard
write :: forall a. Guard a -> a -> Effect Unit
write = _guardWrite

View File

@ -0,0 +1,282 @@
-- | A RWLock allows threads to share mutable state.
-- |
-- | Any number of threads can concurrently read the state,
-- | when there isn't a thread with write access.
-- |
-- | Get write access with `lockWrite` or `tryLockWrite`,
-- | or read access with `lockRead` or `tryLockRead`.
-- |
-- | `(try)lockWrite` returns a `WriteGuard`, which guarantees
-- | no other threads have read or write access until it is released.
-- |
-- | `(try)lockRead` returns a `ReadGuard`, which guarantees
-- | no threads have write access until it is released.
module Threading.Data.RWLock
( RWLock
, ReadGuard
, WriteGuard
, rwLock
, lockWrite
, tryLockWrite
, lockRead
, tryLockRead
, locked
, Locked(..)
, get
, put
, modify
, modify_
, release
, read
, write
, class RWLockGuard
) where
import Prelude
import Control.Alternative (guard)
import Control.Monad.Error.Class (liftMaybe, throwError)
import Control.Monad.Maybe.Trans (runMaybeT)
import Control.Monad.Trans.Class (lift)
import Data.Foldable (elem, traverse_)
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..))
import Data.Set (Set)
import Data.Set as Set
import Data.Show.Generic (genericShow)
import Data.Traversable (traverse)
import Effect (Effect)
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Effect.Ref (Ref)
import Effect.Ref as Ref
import Threading.Data.Mutex (Mutex)
import Threading.Data.Mutex as Mutex
import Type.Function (type ($))
-- | The lock state of the RWLock
data Locked
-- | There are no readers or writers.
= Unlocked
-- | There is a writer, and the RWLock is not
-- | currently readable or writable.
| LockedWriting
-- | There is at least one reader, and the RWLock is not
-- | currently writable.
| LockedReading
derive instance Generic Locked _
derive instance Eq Locked
instance Show Locked where
show = genericShow
newtype WriteLockHeld = WriteLockHeld (Maybe Int)
-- | A Read-Write lock
-- |
-- | Ensures that there can be at most 1 thread with write
-- | access to the data contained in the RWLock, or any
-- | number of concurrent readers.
data RWLock a = RWLock
-- Guarantee that state transitions are exclusive
{ fence :: Mutex Unit
-- Monotonically increasing guard counter
, id :: Ref Int
-- Condvar-style mutex indicating writability.
--
-- When a lock is held and the mutex contains `WriteLockHeld Nothing`, then there are 1 or more readers.
--
-- When a lock is held and the mutex contains `WriteLockHeld (Just <id>)`, then the lock is held by a writer.
, w :: Mutex WriteLockHeld
-- Ref containing the MutexGuard for `w`.
--
-- When a held WriteGuard or the final held ReadGuard is released, the guard contained will be
-- released, and `Nothing` will be written here.
, wLock :: Ref $ Maybe $ Mutex.Guard WriteLockHeld
-- Ref tracking active readers
, readers :: Mutex $ Set Int
-- The data contained in the RWLock
, state :: Ref a
}
-- | Internal
-- |
-- | Guarantees that no other `fenced` sections
-- | run concurrently with this one.
fenced :: forall a r. RWLock a -> Aff r -> Aff r
fenced (RWLock { fence }) m = do
g <- Mutex.lock fence
m <* liftEffect (Mutex.release g)
-- | A guard with read access to data of type `a`
data ReadGuard a = ReadGuard Int (RWLock a)
-- | A guard with read+write access to data of type `a`
data WriteGuard a = WriteGuard Int (RWLock a)
-- | Acquire a write-access lock to the data
-- | contained in the RWLock.
-- |
-- | If another thread holds a `ReadGuard` or `WriteGuard`,
-- | this will block until the data is writable.
lockWrite :: forall a. RWLock a -> Aff (WriteGuard a)
lockWrite rw@(RWLock { id: idRef, w, wLock }) = do
id <- liftEffect $ Ref.modify (_ + 1) idRef
g <- Mutex.lock w
liftEffect $ Mutex.write g $ WriteLockHeld $ Just id
liftEffect $ Ref.write (Just g) wLock
pure $ WriteGuard id rw
-- | Acquire a write-access lock to the data
-- | contained in the RWLock.
-- |
-- | If another thread holds a `ReadGuard` or `WriteGuard`,
-- | this will return Nothing.
tryLockWrite :: forall a. RWLock a -> Aff (Maybe (WriteGuard a))
tryLockWrite rw =
fenced rw
$ liftEffect (locked rw) >>= case _ of
Unlocked -> Just <$> lockWrite rw
_ -> pure Nothing
-- | Acquire a read-access lock to the data
-- | contained in the RWLock.
-- |
-- | If another thread holds a `WriteGuard`,
-- | this will block until the data is readable.
lockRead :: forall a. RWLock a -> Aff (ReadGuard a)
lockRead rw@(RWLock { fence, id: idRef, w, wLock, readers: readersM }) = do
fenceG <- Mutex.lock fence
id <- liftEffect $ Ref.modify (_ + 1) idRef
l <- liftEffect $ locked rw
let
block = do
wl' <- Mutex.lock w
liftEffect $ Mutex.write wl' (WriteLockHeld Nothing)
liftEffect $ Ref.write (Just wl') wLock
done = liftEffect (Mutex.release fenceG)
fenceG' <- case l of
LockedReading -> pure fenceG
LockedWriting -> done *> block *> Mutex.lock fence
Unlocked -> block $> fenceG
readersG <- Mutex.lock readersM
liftEffect do
readers <- Mutex.read readersG
Mutex.write readersG $ Set.insert id readers
Mutex.release readersG
Mutex.release fenceG'
pure $ ReadGuard id rw
-- | Acquire a read-access lock to the data
-- | contained in the RWLock.
-- |
-- | If another thread holds a `WriteGuard`,
-- | this will return Nothing.
tryLockRead :: forall a. RWLock a -> Aff (Maybe (ReadGuard a))
tryLockRead rw =
liftEffect (locked rw) >>= case _ of
LockedWriting -> pure Nothing
_ -> Just <$> lockRead rw
-- | Create a new RWLock
rwLock :: forall a. a -> Effect (RWLock a)
rwLock a = do
fence <- Mutex.mutex unit
id <- liftEffect $ Ref.new 0
w <- Mutex.mutex $ WriteLockHeld Nothing
wLock <- liftEffect $ Ref.new Nothing
readers <- Mutex.mutex Set.empty
state <- liftEffect $ Ref.new a
pure $ RWLock { fence, id, w, wLock, readers, state }
-- | Typeclass implemented by `WriteGuard` and `ReadGuard`
-- | allowing a common `release` + `read` function (as opposed
-- | to `releaseRead`, `releaseWrite`, etc.)
class RWLockGuard g where
release :: forall a. g a -> Aff Unit
read :: forall a. g a -> Aff a
instance RWLockGuard WriteGuard where
release w@(WriteGuard _ rw@(RWLock { wLock })) =
fenced rw $ void $ liftEffect do
g <- _writeGuardOk w
Ref.write Nothing wLock
Mutex.release g
read (WriteGuard id rw@(RWLock { state, wLock })) =
fenced rw $ liftEffect do
mg <- Ref.read wLock
g <- liftMaybe (error "WriteGuard has been released!") mg
WriteLockHeld id' <- Mutex.read g
when (Just id /= id') $ throwError $ error "WriteGuard has been released!"
Ref.read state
instance RWLockGuard ReadGuard where
release (ReadGuard id rw@(RWLock { wLock, readers: readersM })) =
fenced rw $ void $ runMaybeT do
readersG <- lift $ Mutex.lock readersM
readers <- liftEffect $ Mutex.read readersG
guard $ elem id readers
liftEffect do
Mutex.write readersG $ Set.delete id readers
empty <- ((_ == 0) <<< Set.size) <$> Mutex.read readersG
Mutex.release readersG
when empty $ Ref.read wLock >>= traverse_ \g -> do
Ref.write Nothing wLock
Mutex.release g
read (ReadGuard id rw@(RWLock { readers: readersM, state })) =
fenced rw do
readersG <- Mutex.lock readersM
readers <- liftEffect $ Mutex.read readersG
when (not $ elem id readers) $ throwError $ error "ReadGuard has been released!"
liftEffect $ Mutex.release readersG
liftEffect $ Ref.read state
_writeGuardOk :: forall a. WriteGuard a -> Effect (Mutex.Guard WriteLockHeld)
_writeGuardOk (WriteGuard id (RWLock { wLock })) = do
mg <- Ref.read wLock
g <- liftMaybe (error "WriteGuard has been released!") mg
WriteLockHeld id' <- Mutex.read g
when (Just id /= id') $ throwError $ error "WriteGuard has been released!"
pure g
-- | Writes a new value
write :: forall a. WriteGuard a -> a -> Effect Unit
write w@(WriteGuard _ (RWLock { state })) a = do
void $ _writeGuardOk w
Ref.write a state
-- | Asks what state the RWLock is currently in
locked :: forall a. RWLock a -> Effect Locked
locked (RWLock { wLock }) = do
Ref.read wLock
>>= traverse Mutex.read
>>= case _ of
Nothing -> pure Unlocked
Just (WriteLockHeld Nothing) -> pure LockedReading
Just (WriteLockHeld (Just _)) -> pure LockedWriting
-- | Get the value currently in the RWLock.
-- |
-- | Shorthand for `lockRead rw >>= (\l -> read l <* release l)`
get :: forall a. RWLock a -> Aff a
get rw = lockRead rw >>= (\l -> read l <* release l)
-- | Write a new value to the RWLock.
-- |
-- | Shorthand for `lockWrite rw >>= (\l -> liftEffect (write l a) <* release l)`
put :: forall a. RWLock a -> a -> Aff Unit
put rw a = lockWrite rw >>= (\l -> liftEffect (write l a) <* release l)
-- | Modify the value in the RWLock using the provided function.
modify :: forall a. RWLock a -> (a -> a) -> Aff a
modify rw f = do
l <- lockWrite rw
a <- f <$> read l
liftEffect (write l a) *> release l $> a
-- | Shorthand for `void $ modify rw f`
modify_ :: forall a. RWLock a -> (a -> a) -> Aff Unit
modify_ rw f = void $ modify rw f

View File

@ -0,0 +1,3 @@
module Threading.Handle where
data Handle = Handle

1
src/Threading.purs Normal file
View File

@ -0,0 +1 @@
module Threading where

View File

@ -5,10 +5,19 @@ import Prelude
import Data.Maybe (Maybe(..))
import Effect (Effect)
import Effect.Aff (launchAff_)
import Test.Pipes.CBOR as Test.Pipes.CBOR
import Effect.Aff as Aff
import Test.Spec (it)
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
import Test.Threading.Barrier as Test.Threading.Barrier
import Test.Threading.Channel as Test.Threading.Channel
import Test.Threading.Data.Mutex as Test.Threading.Data.Mutex
import Test.Threading.Data.RWLock as Test.Threading.Data.RWLock
main :: Effect Unit
main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do
Test.Pipes.CBOR.spec
main = launchAff_ $ Aff.supervise $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do
Test.Threading.Data.Mutex.spec
Test.Threading.Data.RWLock.spec
Test.Threading.Channel.spec
Test.Threading.Barrier.spec
it "all tests were run" $ pure unit

View File

@ -1,85 +0,0 @@
module Test.Pipes.CBOR where
import Prelude
import Control.Monad.Gen (chooseInt)
import Data.DateTime (DateTime)
import Data.List ((:))
import Data.List as List
import Data.Maybe (Maybe(..), fromJust)
import Data.Newtype (wrap)
import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy)
import Data.Tuple.Nested ((/\))
import Effect (Effect)
import Effect.CBOR as CBOR
import Effect.Class (liftEffect)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding(..))
import Partial.Unsafe (unsafePartial)
import Pipes (yield, (>->))
import Pipes.Async (debug, (>-/->))
import Pipes.CBOR as Pipes.CBOR
import Pipes.Collect as Pipes.Collect
import Pipes.Node.Stream as Pipes.Stream
import Pipes.Prelude (toListM) as Pipes
import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, before, describe, it)
import Test.Spec.Assertions (shouldEqual)
cborHex :: String
cborHex = "82b90002646e616d656568656e72796174c1fb41d990ee6d671aa0b90002646e616d65656a756c696f6174c1fbc1d756dad0bbb646"
cborBuf :: Effect Buffer
cborBuf = Buffer.fromString cborHex Hex
exp :: Array {name :: String, t :: DateTime}
exp =
[{name: "henry", t: toDateTimeLossy $ unsafePartial fromJust $ fromRFC3339String $ wrap "2024-05-14T19:21:25.611Z"}
,{name: "julio", t: toDateTimeLossy $ unsafePartial fromJust $ fromRFC3339String $ wrap "1920-05-14T20:21:17.067Z"}
]
dt :: String -> DateTime
dt = toDateTimeLossy <<< unsafePartial fromJust <<< fromRFC3339String <<< wrap
spec :: Spec Unit
spec =
describe "Pipes.CBOR" do
it "encode" do
bytes
<- Pipes.Collect.toBuffer
$ Pipes.Stream.withEOS (yield exp)
>-/-> Pipes.CBOR.encode
>-> Pipes.Stream.unEOS
act <- liftEffect $ CBOR.decode bytes
act `shouldEqual` exp
describe "parse" do
it "parses csv" do
buf <- liftEffect $ cborBuf
rows <- Pipes.toListM
$ (yield (Just buf) *> yield Nothing)
>-/-> debug "cbor" Pipes.CBOR.decode
rows `shouldEqual` ((Just exp) : Nothing : List.Nil)
before
(do
nums <- liftEffect $ randomSample' 100000 (chooseInt 0 9)
let
objs = (\n -> {id: n}) <$> nums
bytes <-
Pipes.Collect.toBuffer
$ Pipes.Stream.withEOS (yield objs)
>-/-> Pipes.CBOR.encode
>-> Pipes.Stream.unEOS
pure $ nums /\ bytes
)
$ it "parses large csv" \(nums /\ bytes) -> do
rows <-
Pipes.Collect.toArray
$ Pipes.Stream.withEOS (yield bytes)
>-/-> Pipes.CBOR.decode @(Array {id :: Int})
>-> Pipes.Stream.unEOS
rows `shouldEqual` [(\id -> { id }) <$> nums]

View File

@ -0,0 +1,41 @@
module Test.Threading.Barrier where
import Prelude
import Data.Newtype (wrap)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Ref as Ref
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
import Threading.Barrier as Barrier
spec :: Spec Unit
spec =
describe "Threading.Barrier" do
it "creates" do
void $ liftEffect $ Barrier.barrier 1
it "barrer 1 >>= wait immediately resolves" do
b <- liftEffect $ Barrier.barrier 1
Barrier.wait b
it "barrer only resolves when all 3 threads wait" do
barrier <- liftEffect $ Barrier.barrier 3
aDone <- liftEffect $ Ref.new false
bDone <- liftEffect $ Ref.new false
a <- Aff.forkAff do
Barrier.wait barrier
liftEffect $ Ref.write true aDone
b <- Aff.forkAff do
Barrier.wait barrier
liftEffect $ Ref.write true bDone
Aff.delay $ wrap 10.0
liftEffect (Ref.read aDone) >>= shouldEqual false
liftEffect (Ref.read bDone) >>= shouldEqual false
Barrier.wait barrier
Aff.joinFiber a
Aff.joinFiber b
liftEffect (Ref.read aDone) >>= shouldEqual true
liftEffect (Ref.read bDone) >>= shouldEqual true

View File

@ -0,0 +1,91 @@
module Test.Threading.Channel where
import Prelude
import Control.Monad.Rec.Class (Step(..), tailRecM)
import Data.Array as Array
import Data.Maybe (Maybe(..), isNothing, maybe)
import Data.Traversable (traverse)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Console as Console
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (expectError, shouldEqual)
import Threading.Channel as Channel
spec :: Spec Unit
spec =
describe "Threading.Channel" do
describe "channel" do
it "creates" $ liftEffect $ void $ Channel.channel
describe "receiver" do
it "creates" do
c <- liftEffect $ Channel.channel
void $ Channel.receiver c
describe "Sender" do
describe "send" do
it "does nothing when no receivers" do
c <- liftEffect $ Channel.channel
s <- liftEffect $ Channel.sender c
Channel.send s 0
it "broadcasts to multiple receivers" do
c <- liftEffect $ Channel.channel
s <- liftEffect $ Channel.sender c
ra <- Channel.receiver c
rb <- Channel.receiver c
fiber <- Aff.forkAff $ traverse Channel.recv [ ra, rb ]
Channel.send s 100
as <- Aff.joinFiber fiber
as `shouldEqual` [ 100, 100 ]
describe "Receiver" do
describe "recv" do
it "throws if multiple fibers blocking" do
c <- liftEffect $ Channel.channel
r <- Channel.receiver c
void $ Aff.forkAff $ Channel.recv r
expectError $ Channel.recv r
it "recv resolves with messages in the order they were sent" do
c <- liftEffect $ Channel.channel
s <- liftEffect $ Channel.sender c
r <- Channel.receiver c
Channel.send s $ Just 1
Channel.send s $ Just 2
Channel.send s $ Just 3
Channel.send s $ Just 4
fiber <- Aff.forkAff $ flip tailRecM [] \as -> maybe (Done as) (Loop <<< Array.snoc as) <$> Channel.recv r
Channel.send s $ Just 5
Channel.send s Nothing
as <- Aff.joinFiber fiber
as `shouldEqual` [ 1, 2, 3, 4, 5 ]
it "blocks until a message is sent" do
c <- liftEffect $ Channel.channel
s <- liftEffect $ Channel.sender c
r <- Channel.receiver c
fiber <- Aff.forkAff $ Channel.recv r
Channel.send s 10
a <- Aff.joinFiber fiber
a `shouldEqual` 10
it "immediately resolves if a message buffered" do
c <- liftEffect $ Channel.channel
s <- liftEffect $ Channel.sender c
r <- Channel.receiver c
Channel.send s 10
a <- Channel.recv r
a `shouldEqual` 10
describe "tryRecv" do
it "returns Nothing when no data has been sent" do
c <- liftEffect $ Channel.channel
r <- Channel.receiver c
ma <- Channel.tryRecv r
isNothing ma `shouldEqual` true
it "returns Just when a message has been buffered" do
c <- liftEffect $ Channel.channel
s <- liftEffect $ Channel.sender c
r <- Channel.receiver c
Channel.send s 10
ma <- Channel.tryRecv r
ma `shouldEqual` (Just 10)
describe "sender" do
it "creates" do
c <- liftEffect $ Channel.channel
void $ liftEffect $ Channel.sender c

View File

@ -0,0 +1,157 @@
module Test.Threading.Data.Mutex where
import Prelude
import Control.Monad.Error.Class (liftEither, liftMaybe)
import Control.Parallel (parOneOf)
import Data.Either (Either(..))
import Data.Maybe (isNothing)
import Data.Time.Duration (Milliseconds(..))
import Data.Traversable (for_)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Effect.Ref as Ref
import Test.Spec (Spec, describe, it, pending')
import Test.Spec.Assertions (expectError, shouldEqual)
import Threading.Data.Mutex as Mutex
spec :: Spec Unit
spec =
describe "Threading.Data.Mutex" do
describe "mutex" do
it "creates" $ liftEffect $ void $ Mutex.mutex 0
describe "read" do
it "reads the value" do
m <- liftEffect $ Mutex.mutex 0
g <- Mutex.lock m
v <- liftEffect $ Mutex.read g
v `shouldEqual` 0
it "throws if released" do
m <- liftEffect $ Mutex.mutex 0
g <- Mutex.lock m
liftEffect $ Mutex.release g
expectError $ liftEffect $ Mutex.read g
describe "write" do
it "writes the value" do
m <- liftEffect $ Mutex.mutex 0
g <- Mutex.lock m
liftEffect $ Mutex.write g 1
v <- liftEffect $ Mutex.read g
v `shouldEqual` 1
it "throws if released" do
m <- liftEffect $ Mutex.mutex 0
g <- Mutex.lock m
liftEffect $ Mutex.release g
expectError $ liftEffect $ Mutex.write g 1
describe "get" do
it "yields immediately when unlocked" do
m <- liftEffect $ Mutex.mutex 0
val <- Mutex.get m
val `shouldEqual` 0
it "blocks until unlocked" do
m <- liftEffect $ Mutex.mutex 0
g <- Mutex.lock m
getFiber <- Aff.forkAff $ Mutex.get m
liftEffect $ Mutex.write g 1
liftEffect $ Mutex.release g
read <- Aff.joinFiber getFiber
read `shouldEqual` 1
describe "put" do
it "yields immediately when unlocked" do
m <- liftEffect $ Mutex.mutex 0
Mutex.put m 1
val <- Mutex.get m
val `shouldEqual` 1
it "blocks until unlocked" do
m <- liftEffect $ Mutex.mutex 0
g <- Mutex.lock m
getFiber <- Aff.forkAff $ Mutex.put m 2
liftEffect $ Mutex.write g 1
liftEffect $ Mutex.release g
Aff.joinFiber getFiber
val <- Mutex.get m
val `shouldEqual` 2
describe "modify" do
it "yields immediately when unlocked" do
m <- liftEffect $ Mutex.mutex 0
val <- Mutex.modify m (_ + 1)
val `shouldEqual` 1
it "blocks until unlocked" do
m <- liftEffect $ Mutex.mutex 0
g <- Mutex.lock m
getFiber <- Aff.forkAff $ Mutex.modify m (_ * 10)
liftEffect $ Mutex.write g 1
liftEffect $ Mutex.release g
val <- Aff.joinFiber getFiber
val `shouldEqual` 10
describe "lock" do
it "yields immediately when unlocked" do
m <- liftEffect $ Mutex.mutex 0
void $ Mutex.lock m
it "blocks when locked" do
m <- liftEffect $ Mutex.mutex 0
g <- Mutex.lock m
finished <- liftEffect $ Ref.new false
fiber <- Aff.forkAff do
void $ Mutex.lock m
void $ liftEffect $ Ref.write true finished
Aff.delay $ Milliseconds 5.0
f1 <- liftEffect $ Ref.read finished
f1 `shouldEqual` false
liftEffect $ Mutex.release g
Aff.joinFiber fiber
f2 <- liftEffect $ Ref.read finished
f2 `shouldEqual` true
it "locks are acquired in the order they were requested" do
m <- liftEffect $ Mutex.mutex 0
g <- Mutex.lock m
a <- Aff.forkAff $ Mutex.modify_ m (_ + 1) -- 1
b <- Aff.forkAff $ Mutex.modify_ m (_ * 10) -- 10
c <- Aff.forkAff $ Mutex.modify_ m (_ + 10) -- 20
d <- Aff.forkAff $ Mutex.modify_ m (_ * 10) -- 200
liftEffect $ Mutex.release g
for_ [ a, b, c, d ] Aff.joinFiber
n <- Mutex.get m
n `shouldEqual` 200
pending' "should be (eventually) unlocked if a fiber exits without releasing the lock" do
m <- liftEffect $ Mutex.mutex 0
-- Fiber acquires a lock then immediately resolves without releasing.
--
-- When the GC reclaims the guard object, the Mutex should notice and behave
-- as if it was explicitly released.
void $ Aff.forkAff $ void $ Mutex.lock m
liftEither =<< parOneOf
[ Aff.delay (Milliseconds 20000.0) $> Left (error "timed out waiting for GC to reclaim lock")
, Mutex.lock m $> Right unit
]
describe "tryLock" do
it "returns Just when unlocked" do
m <- liftEffect $ Mutex.mutex 0
void $ liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m)
it "returns Nothing when locked" do
m <- liftEffect $ Mutex.mutex 0
_ <- liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m)
g <- liftEffect (Mutex.tryLock m)
isNothing g `shouldEqual` true
it "returns Just after release" do
m <- liftEffect $ Mutex.mutex 0
g <- liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m)
liftEffect $ Mutex.release g
void $ liftMaybe (error $ "Mutex.tryLock returned Nothing after lock released") =<< liftEffect (Mutex.tryLock m)
describe "locked" do
it "is false when unlocked" do
m <- liftEffect $ Mutex.mutex 0
l <- liftEffect $ Mutex.locked m
l `shouldEqual` false
it "is true when locked" do
m <- liftEffect $ Mutex.mutex 0
_ <- liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m)
l <- liftEffect $ Mutex.locked m
l `shouldEqual` true
it "is false after lock released" do
m <- liftEffect $ Mutex.mutex 0
g <- liftMaybe (error $ "Mutex.tryLock returned Nothing on new mutex") =<< liftEffect (Mutex.tryLock m)
liftEffect $ Mutex.release g
l' <- liftEffect $ Mutex.locked m
l' `shouldEqual` false

View File

@ -0,0 +1,203 @@
module Test.Threading.Data.RWLock where
import Prelude
import Control.Monad.Error.Class (liftMaybe)
import Data.Maybe (isNothing)
import Data.Time.Duration (Milliseconds(..))
import Data.Traversable (for_)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Console as Console
import Effect.Exception (error)
import Effect.Ref as Ref
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (expectError, shouldEqual)
import Threading.Data.RWLock as RWLock
spec :: Spec Unit
spec =
describe "Threading.Data.RWLock" do
describe "rwLock" do
it "creates" $ liftEffect $ void $ RWLock.rwLock 0
describe "read" do
it "reads the value" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockRead m
v <- RWLock.read g
v `shouldEqual` 0
it "throws if released" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockRead m
RWLock.release g
expectError $ RWLock.read g
describe "write" do
it "writes the value" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockWrite m
liftEffect $ RWLock.write g 1
v <- RWLock.read g
v `shouldEqual` 1
it "throws if released" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockWrite m
RWLock.release g
expectError $ liftEffect $ RWLock.write g 1
describe "get" do
it "yields immediately when unlocked" do
m <- liftEffect $ RWLock.rwLock 0
val <- RWLock.get m
val `shouldEqual` 0
it "blocks until unlocked" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockWrite m
getFiber <- Aff.forkAff $ RWLock.get m
liftEffect $ RWLock.write g 1
RWLock.release g
read <- Aff.joinFiber getFiber
read `shouldEqual` 1
describe "put" do
it "yields immediately when unlocked" do
m <- liftEffect $ RWLock.rwLock 0
RWLock.put m 1
val <- RWLock.get m
val `shouldEqual` 1
it "blocks until unlocked" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockWrite m
getFiber <- Aff.forkAff $ RWLock.put m 2
liftEffect $ RWLock.write g 1
RWLock.release g
Aff.joinFiber getFiber
val <- RWLock.get m
val `shouldEqual` 2
describe "modify" do
it "yields immediately when unlocked" do
m <- liftEffect $ RWLock.rwLock 0
val <- RWLock.modify m (_ + 1)
val `shouldEqual` 1
it "blocks until unlocked" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockWrite m
getFiber <- Aff.forkAff $ RWLock.modify m (_ * 10)
liftEffect $ RWLock.write g 1
RWLock.release g
val <- Aff.joinFiber getFiber
val `shouldEqual` 10
describe "lockRead" do
it "yields immediately when unlocked" do
m <- liftEffect $ RWLock.rwLock 0
void $ RWLock.lockRead m
it "blocks when write locked" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockWrite m
finished <- liftEffect $ Ref.new false
fiber <- Aff.forkAff do
void $ RWLock.lockRead m
void $ liftEffect $ Ref.write true finished
Aff.delay $ Milliseconds 5.0
f1 <- liftEffect $ Ref.read finished
f1 `shouldEqual` false
RWLock.release g
Aff.joinFiber fiber
f2 <- liftEffect $ Ref.read finished
f2 `shouldEqual` true
it "does not block when read locked" do
m <- liftEffect $ RWLock.rwLock 0
void $ Aff.forkAff $ void $ RWLock.lockRead m
void $ Aff.forkAff $ void $ RWLock.lockRead m
void $ RWLock.lockRead m
n <- RWLock.get m
n `shouldEqual` 0
it "blocks when write locked" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockWrite m
finished <- liftEffect $ Ref.new false
fiber <- Aff.forkAff do
g' <- RWLock.lockRead m
liftEffect $ Ref.write true finished
RWLock.read g'
liftEffect $ RWLock.write g 1
f <- liftEffect $ Ref.read finished
f `shouldEqual` false
RWLock.release g
n <- Aff.joinFiber fiber
n `shouldEqual` 1
describe "lockWrite" do
it "yields immediately when unlocked" do
m <- liftEffect $ RWLock.rwLock 0
void $ RWLock.lockWrite m
it "blocks when write locked" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockWrite m
finished <- liftEffect $ Ref.new false
fiber <- Aff.forkAff do
void $ RWLock.lockWrite m
void $ liftEffect $ Ref.write true finished
Aff.delay $ Milliseconds 5.0
f1 <- liftEffect $ Ref.read finished
f1 `shouldEqual` false
RWLock.release g
Aff.joinFiber fiber
f2 <- liftEffect $ Ref.read finished
f2 `shouldEqual` true
it "blocks when read locked" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockRead m
finished <- liftEffect $ Ref.new false
fiber <- Aff.forkAff do
void $ RWLock.lockWrite m
void $ liftEffect $ Ref.write true finished
Aff.delay $ Milliseconds 5.0
f1 <- liftEffect $ Ref.read finished
f1 `shouldEqual` false
RWLock.release g
Aff.joinFiber fiber
f2 <- liftEffect $ Ref.read finished
f2 `shouldEqual` true
it "locks are acquired in the order they were requested" do
m <- liftEffect $ RWLock.rwLock 0
g <- RWLock.lockWrite m
a <- Aff.forkAff $ RWLock.modify_ m (_ + 1) -- 1
b <- Aff.forkAff $ RWLock.modify_ m (_ * 10) -- 10
c <- Aff.forkAff $ RWLock.modify_ m (_ + 10) -- 20
d <- Aff.forkAff $ RWLock.modify_ m (_ * 10) -- 200
RWLock.release g
for_ [ a, b, c, d ] Aff.joinFiber
n <- RWLock.get m
n `shouldEqual` 200
describe "tryLockWrite" do
it "returns Just when unlocked" do
m <- liftEffect $ RWLock.rwLock 0
void $ liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m
it "returns Nothing when locked" do
m <- liftEffect $ RWLock.rwLock 0
_ <- liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m
g <- RWLock.tryLockWrite m
isNothing g `shouldEqual` true
it "returns Just after release" do
m <- liftEffect $ RWLock.rwLock 0
g <- liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m
RWLock.release g
void $ liftMaybe (error $ "RWLock.tryLockWrite returned Nothing after lock released") =<< RWLock.tryLockWrite m
describe "locked" do
it "Unlocked" do
m <- liftEffect $ RWLock.rwLock 0
l <- liftEffect $ RWLock.locked m
l `shouldEqual` RWLock.Unlocked
it "LockedWriting" do
m <- liftEffect $ RWLock.rwLock 0
_ <- liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m
l <- liftEffect $ RWLock.locked m
l `shouldEqual` RWLock.LockedWriting
it "LockedReading" do
m <- liftEffect $ RWLock.rwLock 0
_ <- liftMaybe (error $ "RWLock.tryLockRead returned Nothing on new mutex") =<< RWLock.tryLockRead m
l <- liftEffect $ RWLock.locked m
l `shouldEqual` RWLock.LockedReading
it "Unlocked after lock released" do
m <- liftEffect $ RWLock.rwLock 0
g <- liftMaybe (error $ "RWLock.tryLockWrite returned Nothing on new mutex") =<< RWLock.tryLockWrite m
RWLock.release g
l' <- liftEffect $ RWLock.locked m
l' `shouldEqual` RWLock.Unlocked