fix: tests, logging

This commit is contained in:
Orion Kindel 2023-04-26 09:48:28 -05:00
parent fde857a563
commit 03f1512458
Signed by untrusted user who does not match committer: orion
GPG Key ID: 6D4165AE4C928719
12 changed files with 602 additions and 254 deletions

21
glue/Cargo.lock generated
View File

@ -504,9 +504,9 @@ checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
[[package]] [[package]]
name = "toad" name = "toad"
version = "0.17.5" version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a19447d2692bd6d8b1c0c10457f42394da7b3f633b96c8705825aca91c080ad" checksum = "3d2a40ef725839735e253c13664aef9eb9989c1c1f79ea42f51f9b0e3076e6e6"
dependencies = [ dependencies = [
"embedded-time", "embedded-time",
"log", "log",
@ -527,6 +527,7 @@ dependencies = [
"toad-map", "toad-map",
"toad-msg", "toad-msg",
"toad-stem", "toad-stem",
"toad-string",
"toad-writable", "toad-writable",
] ]
@ -582,9 +583,9 @@ dependencies = [
[[package]] [[package]]
name = "toad-jni" name = "toad-jni"
version = "0.14.1" version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15d9feb2f3a24a6d67aa5672d37893d160a57b9dbbc62d09a0f243abaec6c11f" checksum = "667f57c04355c23f3d295c8a21da8de70eaebd11b075d890d051bfc6381261ca"
dependencies = [ dependencies = [
"embedded-time", "embedded-time",
"jni", "jni",
@ -651,6 +652,18 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "834a7cc46ed626b24dfb861d3de19cdd666fa945f5bd1b4d2d675551afef927c" checksum = "834a7cc46ed626b24dfb861d3de19cdd666fa945f5bd1b4d2d675551afef927c"
[[package]]
name = "toad-string"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdad9197bf6974091949a6d3e00946558d69e0d7b3d159283c6c43fffdef59dc"
dependencies = [
"tinyvec",
"toad-array 0.2.3",
"toad-len",
"toad-writable",
]
[[package]] [[package]]
name = "toad-writable" name = "toad-writable"
version = "0.1.1" version = "0.1.1"

View File

@ -12,10 +12,10 @@ default = ["e2e"]
e2e = [] e2e = []
[dependencies] [dependencies]
toad = "0.19.1"
toad-jni = "0.16.1"
jni = "0.21.1" jni = "0.21.1"
nb = "1" nb = "1"
toad = "0.17.5"
toad-jni = "0.14.1"
no-std-net = "0.6" no-std-net = "0.6"
toad-msg = "0.18.1" toad-msg = "0.18.1"
tinyvec = {version = "1.5", default_features = false, features = ["rustc_1_55"]} tinyvec = {version = "1.5", default_features = false, features = ["rustc_1_55"]}

View File

@ -14,6 +14,7 @@ use toad::time::Millis;
use toad_jni::java::io::IOException; use toad_jni::java::io::IOException;
use toad_jni::java::net::InetSocketAddress; use toad_jni::java::net::InetSocketAddress;
use toad_jni::java::nio::channels::{DatagramChannel, PeekableDatagramChannel}; use toad_jni::java::nio::channels::{DatagramChannel, PeekableDatagramChannel};
use toad_jni::java::util::logging::Logger;
use toad_jni::java::util::Optional; use toad_jni::java::util::Optional;
use toad_jni::java::{self, Object, ResultYieldToJavaOrThrow}; use toad_jni::java::{self, Object, ResultYieldToJavaOrThrow};
@ -46,8 +47,12 @@ impl Toad {
PTR.get(e, self) PTR.get(e, self)
} }
fn init_impl(e: &mut java::Env, cfg: Config, channel: PeekableDatagramChannel) -> i64 { fn init_impl(e: &mut java::Env,
let r = Runtime::new(&mut java::env(), cfg.log_level(e), cfg.to_toad(e), channel); logger: Logger,
cfg: Config,
channel: PeekableDatagramChannel)
-> i64 {
let r = Runtime::new(logger, cfg.to_toad(e), channel);
unsafe { crate::mem::Shared::add_runtime(r).addr() as i64 } unsafe { crate::mem::Shared::add_runtime(r).addr() as i64 }
} }
@ -157,20 +162,13 @@ impl Config {
RUNTIME_CONFIG_CONCURRENCY.get(e, self).to_rust(e) RUNTIME_CONFIG_CONCURRENCY.get(e, self).to_rust(e)
} }
pub fn log_level(&self, e: &mut java::Env) -> java::util::logging::Level {
static LOG_LEVEL: java::Method<Config, fn() -> java::util::logging::Level> =
java::Method::new("logLevel");
LOG_LEVEL.invoke(e, self)
}
pub fn msg(&self, e: &mut java::Env) -> Msg { pub fn msg(&self, e: &mut java::Env) -> Msg {
static RUNTIME_CONFIG_MSG: java::Method<Config, fn() -> Msg> = java::Method::new("msg"); static RUNTIME_CONFIG_MSG: java::Method<Config, fn() -> Msg> = java::Method::new("msg");
RUNTIME_CONFIG_MSG.invoke(e, self) RUNTIME_CONFIG_MSG.invoke(e, self)
} }
pub fn new(e: &mut java::Env, c: toad::config::Config) -> Self { pub fn new(e: &mut java::Env, c: toad::config::Config) -> Self {
static CTOR: java::Constructor<Config, fn(Optional<java::util::logging::Level>, ffi::u8, Msg)> = static CTOR: java::Constructor<Config, fn(ffi::u8, Msg)> = java::Constructor::new();
java::Constructor::new();
let con = Con::new(e, let con = Con::new(e,
c.msg.con.unacked_retry_strategy, c.msg.con.unacked_retry_strategy,
@ -186,8 +184,7 @@ impl Config {
let concurrency = ffi::u8::from_rust(e, c.max_concurrent_requests); let concurrency = ffi::u8::from_rust(e, c.max_concurrent_requests);
let log_level: Optional<java::util::logging::Level> = Optional::empty(e); let jcfg = CTOR.invoke(e, concurrency, msg);
let jcfg = CTOR.invoke(e, log_level, concurrency, msg);
jcfg jcfg
} }
@ -351,14 +348,16 @@ pub extern "system" fn Java_dev_toad_Toad_defaultConfigImpl<'local>(mut env: jav
#[no_mangle] #[no_mangle]
pub extern "system" fn Java_dev_toad_Toad_init<'local>(mut e: java::Env<'local>, pub extern "system" fn Java_dev_toad_Toad_init<'local>(mut e: java::Env<'local>,
_: JClass<'local>, _: JClass<'local>,
logger: JObject<'local>,
channel: JObject<'local>, channel: JObject<'local>,
cfg: JObject<'local>) cfg: JObject<'local>)
-> i64 { -> i64 {
let e = &mut e; let e = &mut e;
let cfg = java::lang::Object::from_local(e, cfg).upcast_to::<Config>(e); let cfg = java::lang::Object::from_local(e, cfg).upcast_to::<Config>(e);
let channel = java::lang::Object::from_local(e, channel).upcast_to::<DatagramChannel>(e); let channel = java::lang::Object::from_local(e, channel).upcast_to::<DatagramChannel>(e);
let logger = java::lang::Object::from_local(e, logger).upcast_to::<Logger>(e);
Toad::init_impl(e, cfg, channel.peekable()) Toad::init_impl(e, logger, cfg, channel.peekable())
} }
#[no_mangle] #[no_mangle]

View File

@ -4,7 +4,9 @@ use no_std_net::SocketAddr;
use toad::config::Config; use toad::config::Config;
use toad::net::Addrd; use toad::net::Addrd;
use toad::platform::Platform; use toad::platform::Platform;
use toad_jni::java::lang::System;
use toad_jni::java::nio::channels::PeekableDatagramChannel; use toad_jni::java::nio::channels::PeekableDatagramChannel;
use toad_jni::java::util::logging::Logger;
use toad_jni::java::{self}; use toad_jni::java::{self};
use toad_msg::alloc::Message; use toad_msg::alloc::Message;
use toad_msg::{Code, Id, Token, Type}; use toad_msg::{Code, Id, Token, Type};

View File

@ -10,7 +10,6 @@ mod runtime {
use toad::config::Config; use toad::config::Config;
use toad::platform::{Effect, Platform}; use toad::platform::{Effect, Platform};
use toad::step::runtime::Runtime as DefaultSteps;
use toad_jni::java::io::IOException; use toad_jni::java::io::IOException;
use toad_jni::java::nio::channels::PeekableDatagramChannel; use toad_jni::java::nio::channels::PeekableDatagramChannel;
use toad_jni::java::util::logging::{ConsoleHandler, Level, Logger}; use toad_jni::java::util::logging::{ConsoleHandler, Level, Logger};
@ -30,7 +29,17 @@ mod runtime {
type Effects = Vec<Effect<Self>>; type Effects = Vec<Effect<Self>>;
} }
type Steps = DefaultSteps<PlatformTypes, naan::hkt::Vec, naan::hkt::BTreeMap>; #[rustfmt::skip]
pub type Steps =
toad::step::runtime::Observe<PlatformTypes, naan::hkt::Vec,
toad::step::runtime::BufferResponses<PlatformTypes, naan::hkt::BTreeMap,
toad::step::runtime::HandleAcks<naan::hkt::BTreeMap,
toad::step::runtime::Retry<PlatformTypes, naan::hkt::Vec,
toad::step::provision_tokens::ProvisionTokens<
toad::step::runtime::ProvisionIds<PlatformTypes, naan::hkt::BTreeMap, naan::hkt::Vec,
toad::step::parse::Parse<
()
>>>>>>>;
pub struct Runtime { pub struct Runtime {
steps: Steps, steps: Steps,
@ -41,22 +50,7 @@ mod runtime {
} }
impl Runtime { impl Runtime {
pub fn new(e: &mut java::Env, pub fn new(logger: Logger, config: Config, channel: PeekableDatagramChannel) -> Self {
log_level: Level,
config: Config,
channel: PeekableDatagramChannel)
-> Self {
let logger = Logger::get_logger(e, "dev.toad");
if logger.uses_parent_handlers(e) {
let handler = ConsoleHandler::new(e);
handler.set_level(e, log_level);
logger.use_parent_handlers(e, false);
logger.add_handler(e, handler.to_handler());
logger.set_level(e, log_level);
}
Self { steps: Default::default(), Self { steps: Default::default(),
config, config,
channel, channel,

View File

@ -2,15 +2,69 @@ package dev.toad;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier; import java.util.function.Supplier;
public class Async { public class Async {
public static ScheduledExecutorService executor = public static ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(); Executors.newScheduledThreadPool(32);
public static class LoopHandle implements AutoCloseable {
CompletableFuture<Object> fut = null;
final AtomicBoolean break_ = new AtomicBoolean(false);
final Runnable onStop;
LoopHandle(Runnable onStop) {
this.onStop = onStop;
}
public void join() throws Throwable, InterruptedException {
try {
this.fut.get();
} catch (ExecutionException e) {
throw e.getCause();
}
}
public void stop() throws Throwable, InterruptedException {
this.break_.set(true);
this.join();
this.onStop.run();
}
@Override
public void close() {
try {
this.stop();
} catch (Throwable t) {}
}
}
public static LoopHandle loop(Runnable f) {
return Async.loop(f, () -> {});
}
public static LoopHandle loop(Runnable f, Runnable onStop) {
var handle = new LoopHandle(onStop);
var fut = Async.pollCompletable(() -> {
if (handle.break_.get()) {
return Optional.of(new Object());
} else {
f.run();
return Optional.empty();
}
});
handle.fut = fut;
return handle;
}
public static <T> CompletableFuture<T> pollCompletable( public static <T> CompletableFuture<T> pollCompletable(
Supplier<Optional<T>> sup Supplier<Optional<T>> sup

View File

@ -10,6 +10,7 @@ import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
public final class Client implements AutoCloseable { public final class Client implements AutoCloseable {
@ -19,6 +20,14 @@ public final class Client implements AutoCloseable {
this.toad = toad; this.toad = toad;
} }
public CompletableFuture<Void> ping(String uri)
throws URISyntaxException, UnknownHostException {
return this.send(
Message.builder().uri(uri).type(Type.CON).code(Code.EMPTY).build()
)
.thenAccept(_m -> {});
}
public CompletableFuture<Message> get(String uri) public CompletableFuture<Message> get(String uri)
throws URISyntaxException, UnknownHostException { throws URISyntaxException, UnknownHostException {
return this.get(Type.CON, uri); return this.get(Type.CON, uri);
@ -98,6 +107,7 @@ public final class Client implements AutoCloseable {
); );
} }
this.toad.logger().log(Level.FINE, Toad.LogMessage.tx(message));
return Async.pollCompletable(() -> this.toad.sendMessage(message)); return Async.pollCompletable(() -> this.toad.sendMessage(message));
} }
@ -114,7 +124,10 @@ public final class Client implements AutoCloseable {
) { ) {
return Async return Async
.pollCompletable(() -> this.toad.pollResp(t, addr)) .pollCompletable(() -> this.toad.pollResp(t, addr))
.thenApply(msg -> msg.toOwned()); .thenApply(msg -> {
this.toad.logger().log(Level.FINE, Toad.LogMessage.rx(msg));
return msg.toOwned();
});
} }
@Override @Override

View File

@ -1,28 +1,46 @@
package dev.toad; package dev.toad;
import dev.toad.Async;
import dev.toad.msg.Message; import dev.toad.msg.Message;
import dev.toad.msg.Token; import dev.toad.msg.Token;
import dev.toad.msg.option.Observe; import dev.toad.msg.option.Observe;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
public class ClientObserveStream implements AutoCloseable { public class ClientObserveStream implements AutoCloseable {
State state; boolean closed;
CompletableFuture<Message> initial; CompletableFuture<Message> initial;
Optional<Token> token = Optional.empty(); Optional<Token> token = Optional.empty();
final Client client; final Client client;
final Message message; final Message message;
public ClientObserveStream(Client client, Message message) { public ClientObserveStream(Client client, Message message) {
this.state = State.OPEN; this.closed = false;
this.client = client; this.client = client;
this.message = message.buildCopy().option(Observe.REGISTER).build(); this.message = message.buildCopy().option(Observe.REGISTER).build();
this.initial = client.send(this.message); this.initial = client.send(this.message);
} }
public Async.LoopHandle subscribe(Consumer<Message> event) {
return Async.loop(
() -> {
try {
event.accept(this.next().get());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
},
() -> {
this.close();
}
);
}
public CompletableFuture<Message> next() { public CompletableFuture<Message> next() {
if (State.eq.test(State.CLOSED, this.state)) { if (this.closed) {
throw new RuntimeException( throw new RuntimeException(
"ClientObserveStream.next() invoked after .close()" "ClientObserveStream.next() invoked after .close()"
); );
@ -48,28 +66,6 @@ public class ClientObserveStream implements AutoCloseable {
.get(); .get();
} catch (Throwable t) {} } catch (Throwable t) {}
this.state = State.CLOSED; this.closed = true;
}
public static final class State {
public static final Eq<State> eq = Eq.int_.contramap((State s) -> s.state);
public static final State OPEN = new State(0);
public static final State CLOSED = new State(1);
final int state;
State(int state) {
this.state = state;
}
@Override
public boolean equals(Object other) {
return switch (other) {
case State s -> State.eq.test(this, s);
default -> false;
};
}
} }
} }

View File

@ -1,25 +1,30 @@
package dev.toad; package dev.toad;
import dev.toad.Async;
import dev.toad.msg.Code; import dev.toad.msg.Code;
import dev.toad.msg.Message; import dev.toad.msg.Message;
import dev.toad.msg.Payload; import dev.toad.msg.Payload;
import dev.toad.msg.Type;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.BiPredicate; import java.util.function.BiPredicate;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Level; import java.util.logging.Logger;
public final class Server implements AutoCloseable { public final class Server implements AutoCloseable {
boolean exit = false; boolean closed = false;
Optional<Thread> thread = Optional.empty(); Optional<Async.LoopHandle> loopHandle = Optional.empty();
final Toad toad; final Toad toad;
final ArrayList<Middleware> middlewares; final ArrayList<Middleware> middlewares;
@ -28,121 +33,145 @@ public final class Server implements AutoCloseable {
this.middlewares = ms; this.middlewares = ms;
} }
public void exit() { public DatagramChannel channel() {
this.exit = true; return this.toad.channel();
}
dev.toad.msg.ref.Message blockUntilRequest()
throws InterruptedException, ExecutionException {
return Async
.pollCompletable(() -> {
if (this.closed) {
throw new Exit();
}
return this.toad.pollReq();
})
.get();
}
void tick() {
try {
var req = this.blockUntilRequest();
var addr = req.addr().get();
this.toad.logger().log(Level.FINE, Toad.LogMessage.rx(req));
var result = Middleware.Result.next();
var acked = new AtomicBoolean(false);
for (var m : this.middlewares) {
var ctx = new Middleware.Context(this.toad.logger(), result, req);
result =
Middleware.tick(
m,
ctx,
() -> {
if (req.type() != Type.CON) {
return;
}
acked.set(true);
this.toad.logger().log(Level.FINE, LogMessage.earlyAck(req));
var ack = req
.buildResponse()
.code(Code.EMPTY)
.type(Type.ACK)
.build();
this.toad.logger().log(Level.FINE, Toad.LogMessage.tx(ack));
try {
Async.pollCompletable(() -> this.toad.sendMessage(ack)).get();
} catch (Throwable t) {
this.toad.logger().log(Level.SEVERE, Toad.LogMessage.tx(ack));
}
}
);
}
var resp = result.response().get();
if (resp.isEmpty()) {
this.toad.logger().log(Level.SEVERE, LogMessage.unhandledRequest(req));
} else {
var resp_ = resp.get();
var resp__ = resp_.type() == Type.ACK && acked.get()
? resp_.buildCopy().type(Type.CON).build()
: resp_;
this.toad.logger().log(Level.FINE, Toad.LogMessage.tx(resp__));
Async.pollCompletable(() -> this.toad.sendMessage(resp__)).get();
}
req.close();
} catch (Throwable e) {
if (e.getCause() instanceof Exit || e instanceof Exit) {
return;
}
this.toad.logger().log(Level.SEVERE, e.toString());
e.printStackTrace();
}
} }
public void notify(String path) { public void notify(String path) {
this.toad.notify(path); this.toad.notify(path);
} }
public Thread run() { public Async.LoopHandle run() {
if (!this.thread.isEmpty()) { if (!this.loopHandle.isEmpty()) {
return this.thread.get(); return this.loopHandle.get();
} }
var thread = new Thread(() -> { try {
try { this.toad.logger()
Toad .log(Level.INFO, LogMessage.startup(this.toad.localAddress()));
.logger() } catch (Throwable t) {}
.log(
Level.INFO,
String.format(
"Server listening on %s",
this.toad.localAddress().toString()
)
);
} catch (Throwable t) {}
while (true) { this.loopHandle = Optional.of(Async.loop(() -> this.tick()));
try { return this.loopHandle.get();
dev.toad.msg.ref.Message req = Async
.pollCompletable(() -> {
if (this.exit) {
throw new Exit();
}
return this.toad.pollReq();
})
.get();
var addr = req.addr().get();
Toad
.logger()
.log(
Level.FINE,
String.format("<== %s\n%s", addr.toString(), req.toDebugString())
);
Middleware.Result result = Middleware.Result.next();
for (var m : this.middlewares) {
if (!m.shouldRun.test(result, req)) {
continue;
}
try {
if (result.isAsync()) {
result.response().get();
// Toad.ack(req);
}
result = m.run(result, req);
} catch (Throwable e) {
result = Middleware.Result.error(e);
}
}
var resp = result.response().get();
if (resp.isEmpty()) {
Toad
.logger()
.log(
Level.SEVERE,
String.format(
"Server never generated response for message\n%s",
req.toDebugString()
)
);
} else {
var resp_ = resp.get().toOwned();
Toad
.logger()
.log(
Level.FINE,
String.format("==> %s\n%s", addr, resp_.toDebugString())
);
Async.pollCompletable(() -> this.toad.sendMessage(resp_)).get();
}
req.close();
} catch (Throwable e) {
if (e.getCause() instanceof Exit || e instanceof Exit) {
return;
}
Toad.logger().log(Level.SEVERE, e.toString());
e.printStackTrace();
}
}
});
thread.start();
this.thread = Optional.of(thread);
return thread;
} }
@Override @Override
public void close() { public void close() {
this.closed = true;
try {
if (!this.loopHandle.isEmpty()) {
this.loopHandle.get().stop();
}
} catch (Throwable e) {}
this.toad.close(); this.toad.close();
} }
static final class LogMessage {
static String startup(InetSocketAddress addr) {
return String.format("Server listening on %s", addr.toString());
}
static String earlyAck(Message req) {
return String.format("Separately ACKing request %s", req.toDebugString());
}
static String unhandledRequest(Message req) {
return String.format(
"Server never generated response for message\n%s",
req
);
}
static String ex(Message m) {
return String.format(
"Exception thrown while handling:\n%s\nException:",
m.toDebugString()
);
}
}
static final class Exit extends RuntimeException {} static final class Exit extends RuntimeException {}
public static final class Middleware { public static final class Middleware {
final BiPredicate<Result, Message> shouldRun; final BiPredicate<Result, Message> shouldRun;
final BiFunction<Result, Message, Result> fun; final Function<Context, Result> fun;
static final CompletableFuture<Optional<Message>> noop = static final CompletableFuture<Optional<Message>> noop =
CompletableFuture.completedFuture(Optional.empty()); CompletableFuture.completedFuture(Optional.empty());
@ -152,46 +181,36 @@ public final class Server implements AutoCloseable {
Optional.of(m.buildResponse().code(Code.NOT_FOUND).build()) Optional.of(m.buildResponse().code(Code.NOT_FOUND).build())
); );
public static final Middleware handlePing = Middleware.requestHandler(m ->
Type.eq.test(m.type(), Type.CON) && Code.eq.test(m.code(), Code.EMPTY)
? Optional.of(
m.buildResponse().code(Code.EMPTY).type(Type.RESET).build()
)
: Optional.empty()
);
public static final Middleware handleException = new Middleware( public static final Middleware handleException = new Middleware(
(r, _m) -> r instanceof ResultError, (r, _m) -> r instanceof ResultError,
(r, m) -> { ctx -> {
var e = ((ResultError) r).error; var e = ((ResultError) ctx.result).error;
Toad ctx.logger.log(Level.SEVERE, LogMessage.ex(ctx.request), e);
.logger()
.log(
Level.SEVERE,
String.format(
"Exception thrown while handling:\n%s\nException:",
m.toDebugString()
),
e
);
return Middleware.Result.respond( return Middleware.Result.respond(
m.buildResponse().code(Code.INTERNAL_SERVER_ERROR).build() ctx.request.buildResponse().code(Code.INTERNAL_SERVER_ERROR).build()
); );
} }
); );
public static final Middleware handleExceptionDebug = new Middleware( public static final Middleware handleExceptionDebug = new Middleware(
(r, _m) -> r instanceof ResultError, (r, _m) -> r instanceof ResultError,
(r, m) -> { ctx -> {
var e = ((ResultError) r).error; var e = ((ResultError) ctx.result).error;
Toad ctx.logger.log(Level.SEVERE, LogMessage.ex(ctx.request), e);
.logger()
.log(
Level.SEVERE,
String.format(
"Exception thrown while handling:\n%s\nException:",
m.toDebugString()
),
e
);
return Middleware.Result.respond( return Middleware.Result.respond(
m ctx.request
.buildResponse() .buildResponse()
.code(Code.INTERNAL_SERVER_ERROR) .code(Code.INTERNAL_SERVER_ERROR)
.payload(Payload.text(e.toString())) .payload(Payload.text(e.toString()))
@ -200,14 +219,31 @@ public final class Server implements AutoCloseable {
} }
); );
static Result tick(Middleware m, Context ctx, Runnable separatelyAck) {
if (!m.shouldRun.test(ctx.result, ctx.request)) {
return ctx.result;
}
try {
if (ctx.result.isAsync()) {
ctx.result.response().get();
separatelyAck.run();
}
return m.run(ctx);
} catch (Throwable e) {
return Middleware.Result.error(e);
}
}
public static Middleware requestHandler( public static Middleware requestHandler(
Function<Message, Optional<Message>> f Function<Message, Optional<Message>> f
) { ) {
return new Middleware( return new Middleware(
(r, _m) -> !r.isFinal(), (r, _m) -> !r.isFinal(),
(_result, request) -> ctx ->
f f
.apply(request) .apply(ctx.request)
.map(Middleware.Result::respond) .map(Middleware.Result::respond)
.orElse(Middleware.Result.next()) .orElse(Middleware.Result.next())
); );
@ -218,9 +254,9 @@ public final class Server implements AutoCloseable {
) { ) {
return new Middleware( return new Middleware(
(r, _m) -> !r.isFinal(), (r, _m) -> !r.isFinal(),
(_result, request) -> ctx ->
f f
.apply(request) .apply(ctx.request)
.map(Middleware.Result::respond) .map(Middleware.Result::respond)
.orElse(Middleware.Result.next()) .orElse(Middleware.Result.next())
); );
@ -230,11 +266,11 @@ public final class Server implements AutoCloseable {
return new Middleware( return new Middleware(
(r, _m) -> (r, _m) ->
r instanceof ResultRespondSync || r instanceof ResultRespondAsync, r instanceof ResultRespondSync || r instanceof ResultRespondAsync,
(res, request) -> { ctx -> {
try { try {
var rep = res.response().get().get(); var rep = ctx.result.response().get().get();
f.accept(rep); f.accept(rep);
return res; return ctx.result;
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -245,20 +281,20 @@ public final class Server implements AutoCloseable {
public static Middleware exceptionHandler(Function<Throwable, Result> f) { public static Middleware exceptionHandler(Function<Throwable, Result> f) {
return new Middleware( return new Middleware(
(r, _m) -> r instanceof ResultError, (r, _m) -> r instanceof ResultError,
(res, request) -> f.apply(((ResultError) res).error) ctx -> f.apply(((ResultError) ctx.result).error)
); );
} }
public Middleware( public Middleware(
BiPredicate<Result, Message> shouldRun, BiPredicate<Result, Message> shouldRun,
BiFunction<Result, Message, Result> fun Function<Context, Result> fun
) { ) {
this.fun = fun; this.fun = fun;
this.shouldRun = shouldRun; this.shouldRun = shouldRun;
} }
public Result run(Result res, Message msg) { public Result run(Context context) {
return this.fun.apply(res, msg); return this.fun.apply(context);
} }
public Middleware filter(BiPredicate<Result, Message> f) { public Middleware filter(BiPredicate<Result, Message> f) {
@ -268,6 +304,19 @@ public final class Server implements AutoCloseable {
); );
} }
public static final class Context {
public final Logger logger;
public final Result result;
public final Message request;
public Context(Logger logger, Result result, Message request) {
this.logger = logger;
this.result = result;
this.request = request;
}
}
public static sealed interface Result public static sealed interface Result
permits permits
ResultError, ResultError,
@ -512,12 +561,14 @@ public final class Server implements AutoCloseable {
public Server buildDebugExceptionHandler() { public Server buildDebugExceptionHandler() {
this.middleware(Middleware.handleExceptionDebug); this.middleware(Middleware.handleExceptionDebug);
this.middleware(Middleware.handlePing);
this.middleware(Middleware.respondNotFound); this.middleware(Middleware.respondNotFound);
return new Server(this.toad, this.middlewares); return new Server(this.toad, this.middlewares);
} }
public Server build() { public Server build() {
this.middleware(Middleware.handleException); this.middleware(Middleware.handleException);
this.middleware(Middleware.handlePing);
this.middleware(Middleware.respondNotFound); this.middleware(Middleware.respondNotFound);
return new Server(this.toad, this.middlewares); return new Server(this.toad, this.middlewares);
} }

View File

@ -11,17 +11,13 @@ import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
import java.util.logging.ConsoleHandler; import java.util.logging.ConsoleHandler;
import java.util.logging.Formatter;
import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
public final class Toad implements AutoCloseable { public final class Toad implements AutoCloseable {
public static Logger logger() {
// Configured in `glue::Runtime::new()`
var l = Logger.getLogger("dev.toad");
l.setUseParentHandlers(false);
return l;
}
static native Config defaultConfigImpl(); static native Config defaultConfigImpl();
static Config defaultConfig = null; static Config defaultConfig = null;
@ -42,13 +38,14 @@ public final class Toad implements AutoCloseable {
System.loadLibrary("toad_java_glue"); System.loadLibrary("toad_java_glue");
} }
final Logger logger;
final Ptr ptr; final Ptr ptr;
final Config config; final Config config;
final DatagramChannel channel; final DatagramChannel channel;
static native void teardown(); static native void teardown();
static native long init(DatagramChannel chan, Config o); static native long init(Logger logger, DatagramChannel chan, Config o);
static native Optional<IdAndToken> sendMessage( static native Optional<IdAndToken> sendMessage(
long ptr, long ptr,
@ -85,15 +82,33 @@ public final class Toad implements AutoCloseable {
} }
Toad(Config o, DatagramChannel channel) { Toad(Config o, DatagramChannel channel) {
this.logger = Logger.getLogger("dev.toad");
this.config = o; this.config = o;
this.channel = channel; this.channel = channel;
this.ptr = Ptr.register(this.getClass(), this.init(this.channel, o)); this.ptr =
Ptr.register(this.getClass(), this.init(this.logger, this.channel, o));
}
Toad(Config o, Logger logger, DatagramChannel channel) {
this.logger = logger;
this.config = o;
this.channel = channel;
this.ptr =
Ptr.register(this.getClass(), this.init(this.logger, this.channel, o));
}
public DatagramChannel channel() {
return this.channel;
} }
public Config config() { public Config config() {
return this.config; return this.config;
} }
public Logger logger() {
return this.logger;
}
public InetSocketAddress localAddress() throws IOException { public InetSocketAddress localAddress() throws IOException {
return (InetSocketAddress) this.channel.getLocalAddress(); return (InetSocketAddress) this.channel.getLocalAddress();
} }
@ -122,6 +137,25 @@ public final class Toad implements AutoCloseable {
} }
} }
static final class LogMessage {
static String rx(Message msg) {
return String.format(
"<== %s\n%s",
msg.addr().get().toString(),
msg.toDebugString()
);
}
static String tx(Message msg) {
return String.format(
"==> %s\n%s",
msg.addr().get().toString(),
msg.toDebugString()
);
}
}
public interface BuilderRequiresSocket { public interface BuilderRequiresSocket {
Toad.Builder port(short port); Toad.Builder port(short port);
Toad.Builder address(InetSocketAddress addr); Toad.Builder address(InetSocketAddress addr);
@ -134,14 +168,33 @@ public final class Toad implements AutoCloseable {
Config.Msg.Builder msg = Config.Msg.builder(); Config.Msg.Builder msg = Config.Msg.builder();
Optional<DatagramChannel> channel = Optional.empty(); Optional<DatagramChannel> channel = Optional.empty();
Optional<java.util.logging.Level> logLevel = Optional.empty(); Optional<java.util.logging.Level> logLevel = Optional.empty();
Optional<java.util.logging.Formatter> logFormatter = Optional.empty();
Optional<String> loggerName = Optional.empty();
u8 concurrency = Toad.defaultConfig().concurrency; u8 concurrency = Toad.defaultConfig().concurrency;
Builder() {} Builder() {}
Logger logger() {
var logger = Logger.getLogger(this.loggerName.orElse("dev.toad"));
if (logger.getUseParentHandlers()) {
var level = this.logLevel.orElse(Level.INFO);
var handler = new ConsoleHandler();
handler.setFormatter(this.logFormatter.orElse(new SimpleFormatter()));
handler.setLevel(level);
logger.setUseParentHandlers(false);
logger.addHandler(handler);
logger.setLevel(level);
}
return logger;
}
public Client buildClient() throws IOException { public Client buildClient() throws IOException {
if (this.ioException.isEmpty()) { if (this.ioException.isEmpty()) {
var cfg = new Config(this.logLevel, this.concurrency, this.msg.build()); var cfg = new Config(this.concurrency, this.msg.build());
var toad = new Toad(cfg, this.channel.get()); var toad = new Toad(cfg, this.logger(), this.channel.get());
return new Client(toad); return new Client(toad);
} else { } else {
throw this.ioException.get(); throw this.ioException.get();
@ -150,8 +203,8 @@ public final class Toad implements AutoCloseable {
public Server.Builder server() throws IOException { public Server.Builder server() throws IOException {
if (this.ioException.isEmpty()) { if (this.ioException.isEmpty()) {
var cfg = new Config(this.logLevel, this.concurrency, this.msg.build()); var cfg = new Config(this.concurrency, this.msg.build());
var toad = new Toad(cfg, this.channel.get()); var toad = new Toad(cfg, this.logger(), this.channel.get());
return new Server.Builder(toad); return new Server.Builder(toad);
} else { } else {
throw this.ioException.get(); throw this.ioException.get();
@ -172,6 +225,16 @@ public final class Toad implements AutoCloseable {
return this; return this;
} }
public Builder logFormatter(java.util.logging.Formatter f) {
this.logFormatter = Optional.of(f);
return this;
}
public Builder loggerName(String name) {
this.loggerName = Optional.of(name);
return this;
}
public Builder address(InetSocketAddress addr) { public Builder address(InetSocketAddress addr) {
try { try {
DatagramChannel channel = DatagramChannel.open( DatagramChannel channel = DatagramChannel.open(
@ -199,16 +262,10 @@ public final class Toad implements AutoCloseable {
public static final class Config { public static final class Config {
final Optional<java.util.logging.Level> logLevel;
final u8 concurrency; final u8 concurrency;
final Msg msg; final Msg msg;
Config( Config(u8 concurrency, Msg msg) {
Optional<java.util.logging.Level> logLevel,
u8 concurrency,
Msg msg
) {
this.logLevel = logLevel;
this.concurrency = concurrency; this.concurrency = concurrency;
this.msg = msg; this.msg = msg;
} }
@ -222,10 +279,6 @@ public final class Toad implements AutoCloseable {
}; };
} }
public java.util.logging.Level logLevel() {
return this.logLevel.orElse(java.util.logging.Level.INFO);
}
public InetSocketAddress addr() { public InetSocketAddress addr() {
return this.addr(); return this.addr();
} }

View File

@ -1,9 +1,53 @@
import dev.toad.Async import dev.toad.Async
import java.util.Date
import java.util.Optional import java.util.Optional
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.time.Duration
class AsyncTest extends munit.FunSuite { class AsyncTest extends munit.FunSuite {
test("loop() runs until LoopHandle.stop() invoked") {
var counter = 0;
val loop = Async.loop(() => {
counter += 1
});
val start = Date().getTime();
while (counter < 10 && Date().getTime() < start + 1000) {
Thread.sleep(Duration.ofMillis(10))
}
loop.stop()
assert(counter >= 10)
}
test("LoopHandle.stop() prevents further runs") {
var stopped = false;
val loop = Async.loop(() => {
if stopped then throw new Error()
});
loop.stop()
stopped = true
}
test("LoopHandle.join() rethrows exceptions") {
case class LoopThrew() extends Throwable {}
case class LoopDidNotThrow() extends Throwable {}
try {
val loop = Async.loop((() => {
throw LoopThrew()
}): java.lang.Runnable)
loop.join();
throw LoopDidNotThrow()
} catch {
case LoopThrew() => {}
case e => throw e
}
}
test("pollCompletable polls then completes") { test("pollCompletable polls then completes") {
var polls = 0 var polls = 0

View File

@ -7,16 +7,79 @@ import java.lang.Thread
import java.net.InetAddress import java.net.InetAddress
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.logging.Logger import java.util.logging.Logger
import java.util.logging.LogRecord
import java.util.logging.Level import java.util.logging.Level
import java.util.logging.Formatter
import java.util.ArrayList import java.util.ArrayList
import java.util.Optional import java.util.Optional
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.net.InetAddress import java.net.InetAddress
val logLevel = Level.INFO; val logLevel = Level.OFF;
val serverPort = 10102; val serverPort = 10102;
val clientPort = 10101; val clientPort = 10101;
val client2Port = 10100;
val totalWidth = 120
val padding = 1
val sep = "||"
val startMillis = java.time.Instant.now.toEpochMilli
enum Position:
case Left, Center, Right
def lines(pos: Position, r: LogRecord): String = {
val width = (totalWidth / 3).intValue
val logger = r.getLoggerName
val level = r.getLevel.toString
val message =
s"[$logger($level)] ${r.getMillis - startMillis}\n${r.getMessage}"
0.until(message.length)
.foldLeft(Seq(""))((lines, ix) => {
val lines_ = lines.init
val last = lines.last
(last, String.valueOf(message.charAt(ix))) match {
case (_, "\n") => lines :+ ""
case (line, s) if line.length == width => lines :+ s
case (line, s) => lines_ :+ (line ++ s)
}
})
.map(lineText => line(pos, lineText))
.foldLeft("")((m, line) => s"$m$line\n") ++ "\n"
}
def line(pos: Position, text: String): String = {
val width = (totalWidth / 2).intValue
val empty = " ".repeat(width)
val after = " ".repeat(width - text.length)
val pad = " ".repeat(padding)
pos match {
case Position.Left => s"$text$after$pad$sep$pad$empty$pad$sep"
case Position.Center => s"$empty$pad$sep$pad$text$after$pad$sep"
case Position.Right => s"$empty$pad$sep$pad$empty$pad$sep$pad$text"
}
}
class ServerLogFormatter extends Formatter {
override def format(r: LogRecord): String = {
lines(Position.Left, r)
}
}
class Client1LogFormatter extends Formatter {
override def format(r: LogRecord): String = {
lines(Position.Center, r)
}
}
class Client2LogFormatter extends Formatter {
override def format(r: LogRecord): String = {
lines(Position.Right, r)
}
}
class E2E extends munit.FunSuite { class E2E extends munit.FunSuite {
val client = new Fixture[Client]("client") { val client = new Fixture[Client]("client") {
@ -25,8 +88,32 @@ class E2E extends munit.FunSuite {
def apply() = this.client def apply() = this.client
override def beforeAll(): Unit = { override def beforeAll(): Unit = {
this.client = this.client = Toad.builder
Toad.builder.port(clientPort.shortValue).logLevel(logLevel).buildClient .port(clientPort.shortValue)
.logLevel(logLevel)
.loggerName("client")
.logFormatter(Client1LogFormatter())
.buildClient
}
override def afterAll(): Unit = {
this.client.close()
this.client = null
}
}
val client2 = new Fixture[Client]("client2") {
private var client: Client = null;
def apply() = this.client
override def beforeAll(): Unit = {
this.client = Toad.builder
.port(client2Port.shortValue)
.logLevel(logLevel)
.loggerName("client2")
.logFormatter(Client2LogFormatter())
.buildClient
} }
override def afterAll(): Unit = { override def afterAll(): Unit = {
@ -36,7 +123,6 @@ class E2E extends munit.FunSuite {
} }
val server = new Fixture[Server]("server") { val server = new Fixture[Server]("server") {
private var thread: Thread = null;
private var server: Server = null; private var server: Server = null;
def apply() = this.server def apply() = this.server
@ -44,10 +130,12 @@ class E2E extends munit.FunSuite {
override def beforeAll(): Unit = { override def beforeAll(): Unit = {
Toad.loadNativeLib() Toad.loadNativeLib()
var counterN = 0 var number = AtomicInteger(0)
this.server = Toad.builder this.server = Toad.builder
.port(serverPort.shortValue) .port(serverPort.shortValue)
.logLevel(logLevel) .logLevel(logLevel)
.loggerName("server")
.logFormatter(ServerLogFormatter())
.server .server
.put( .put(
"failing", "failing",
@ -55,14 +143,21 @@ class E2E extends munit.FunSuite {
throw java.lang.RuntimeException("fart") throw java.lang.RuntimeException("fart")
} }
) )
.get( .post(
"counter", "number",
msg => {
number.set(Integer.decode(msg.payload.toString))
this.server.notify("number")
Optional.of(msg.buildResponse.code(Code.OK_CHANGED).build)
}
)
.get(
"number",
msg => { msg => {
counterN += 1
Optional.of( Optional.of(
msg.buildResponse msg.buildResponse
.code(Code.OK_CONTENT) .code(Code.OK_CONTENT)
.payload(Payload.text(s"$counterN")) .payload(Payload.text(s"${number.get()}"))
.build .build
) )
} }
@ -80,17 +175,15 @@ class E2E extends munit.FunSuite {
) )
.build .build
this.thread = this.server.run() this.server.run()
} }
override def afterAll(): Unit = { override def afterAll(): Unit = {
this.server.exit()
this.thread.join()
this.server.close() this.server.close()
} }
} }
override def munitFixtures = List(client, server) override def munitFixtures = List(client, client2, server)
test("server responds 2.05 Ok Content") { test("server responds 2.05 Ok Content") {
server() server()
@ -116,25 +209,61 @@ class E2E extends munit.FunSuite {
test( test(
"ClientObserveStream yields new resource states when server is notified of new state" "ClientObserveStream yields new resource states when server is notified of new state"
) { ) {
val stream = val n = AtomicInteger(-1)
client() val subscription = client()
.observe( .observe(
Message.builder Message.builder
.uri(s"coap://localhost:$serverPort/counter") .uri(s"coap://localhost:$serverPort/number")
.`type`(Type.NON) .`type`(Type.NON)
.code(Code.GET) .code(Code.GET)
.build .build
); )
.subscribe(rep => {
n.set(Integer.decode(rep.payload.toString))
})
try { def shouldBe(x: Int) =
assertNoDiff(stream.next().get().payload.toString, "1") Async
server().notify("counter") .pollCompletable(() => {
assertNoDiff(stream.next().get().payload.toString, "2") Optional.of(Object()).filter({ case _ => n.get() == x })
server().notify("counter") })
assertNoDiff(stream.next().get().payload.toString, "3") .get(1, TimeUnit.SECONDS)
} finally {
stream.close() shouldBe(0)
} client2()
.post(
Type.CON,
s"coap://localhost:$serverPort/number",
Payload.text(1.toString)
)
.get()
shouldBe(1)
client2()
.post(
Type.CON,
s"coap://localhost:$serverPort/number",
Payload.text(2.toString)
)
.get()
shouldBe(2)
client2()
.post(
Type.CON,
s"coap://localhost:$serverPort/number",
Payload.text(3.toString)
)
.get()
shouldBe(3)
client2()
.post(
Type.CON,
s"coap://localhost:$serverPort/number",
Payload.text(4.toString)
)
.get()
subscription.stop()
subscription.close()
} }
test("unregistered resource responds 4.04 Not Found") { test("unregistered resource responds 4.04 Not Found") {