diff --git a/glue/Cargo.lock b/glue/Cargo.lock index dbafa35..7f60726 100644 --- a/glue/Cargo.lock +++ b/glue/Cargo.lock @@ -504,9 +504,9 @@ checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" [[package]] name = "toad" -version = "0.17.5" +version = "0.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a19447d2692bd6d8b1c0c10457f42394da7b3f633b96c8705825aca91c080ad" +checksum = "3d2a40ef725839735e253c13664aef9eb9989c1c1f79ea42f51f9b0e3076e6e6" dependencies = [ "embedded-time", "log", @@ -527,6 +527,7 @@ dependencies = [ "toad-map", "toad-msg", "toad-stem", + "toad-string", "toad-writable", ] @@ -582,9 +583,9 @@ dependencies = [ [[package]] name = "toad-jni" -version = "0.14.1" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15d9feb2f3a24a6d67aa5672d37893d160a57b9dbbc62d09a0f243abaec6c11f" +checksum = "667f57c04355c23f3d295c8a21da8de70eaebd11b075d890d051bfc6381261ca" dependencies = [ "embedded-time", "jni", @@ -651,6 +652,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "834a7cc46ed626b24dfb861d3de19cdd666fa945f5bd1b4d2d675551afef927c" +[[package]] +name = "toad-string" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdad9197bf6974091949a6d3e00946558d69e0d7b3d159283c6c43fffdef59dc" +dependencies = [ + "tinyvec", + "toad-array 0.2.3", + "toad-len", + "toad-writable", +] + [[package]] name = "toad-writable" version = "0.1.1" diff --git a/glue/Cargo.toml b/glue/Cargo.toml index 1284e93..363a6ac 100644 --- a/glue/Cargo.toml +++ b/glue/Cargo.toml @@ -12,10 +12,10 @@ default = ["e2e"] e2e = [] [dependencies] +toad = "0.19.1" +toad-jni = "0.16.1" jni = "0.21.1" nb = "1" -toad = "0.17.5" -toad-jni = "0.14.1" no-std-net = "0.6" toad-msg = "0.18.1" tinyvec = {version = "1.5", default_features = false, features = ["rustc_1_55"]} diff --git a/glue/src/dev/toad/mod.rs b/glue/src/dev/toad/mod.rs index 7be8188..c404010 100644 --- a/glue/src/dev/toad/mod.rs +++ b/glue/src/dev/toad/mod.rs @@ -14,6 +14,7 @@ use toad::time::Millis; use toad_jni::java::io::IOException; use toad_jni::java::net::InetSocketAddress; use toad_jni::java::nio::channels::{DatagramChannel, PeekableDatagramChannel}; +use toad_jni::java::util::logging::Logger; use toad_jni::java::util::Optional; use toad_jni::java::{self, Object, ResultYieldToJavaOrThrow}; @@ -46,8 +47,12 @@ impl Toad { PTR.get(e, self) } - fn init_impl(e: &mut java::Env, cfg: Config, channel: PeekableDatagramChannel) -> i64 { - let r = Runtime::new(&mut java::env(), cfg.log_level(e), cfg.to_toad(e), channel); + fn init_impl(e: &mut java::Env, + logger: Logger, + cfg: Config, + channel: PeekableDatagramChannel) + -> i64 { + let r = Runtime::new(logger, cfg.to_toad(e), channel); unsafe { crate::mem::Shared::add_runtime(r).addr() as i64 } } @@ -157,20 +162,13 @@ impl Config { RUNTIME_CONFIG_CONCURRENCY.get(e, self).to_rust(e) } - pub fn log_level(&self, e: &mut java::Env) -> java::util::logging::Level { - static LOG_LEVEL: java::Method java::util::logging::Level> = - java::Method::new("logLevel"); - LOG_LEVEL.invoke(e, self) - } - pub fn msg(&self, e: &mut java::Env) -> Msg { static RUNTIME_CONFIG_MSG: java::Method Msg> = java::Method::new("msg"); RUNTIME_CONFIG_MSG.invoke(e, self) } pub fn new(e: &mut java::Env, c: toad::config::Config) -> Self { - static CTOR: java::Constructor, ffi::u8, Msg)> = - java::Constructor::new(); + static CTOR: java::Constructor = java::Constructor::new(); let con = Con::new(e, c.msg.con.unacked_retry_strategy, @@ -186,8 +184,7 @@ impl Config { let concurrency = ffi::u8::from_rust(e, c.max_concurrent_requests); - let log_level: Optional = Optional::empty(e); - let jcfg = CTOR.invoke(e, log_level, concurrency, msg); + let jcfg = CTOR.invoke(e, concurrency, msg); jcfg } @@ -351,14 +348,16 @@ pub extern "system" fn Java_dev_toad_Toad_defaultConfigImpl<'local>(mut env: jav #[no_mangle] pub extern "system" fn Java_dev_toad_Toad_init<'local>(mut e: java::Env<'local>, _: JClass<'local>, + logger: JObject<'local>, channel: JObject<'local>, cfg: JObject<'local>) -> i64 { let e = &mut e; let cfg = java::lang::Object::from_local(e, cfg).upcast_to::(e); let channel = java::lang::Object::from_local(e, channel).upcast_to::(e); + let logger = java::lang::Object::from_local(e, logger).upcast_to::(e); - Toad::init_impl(e, cfg, channel.peekable()) + Toad::init_impl(e, logger, cfg, channel.peekable()) } #[no_mangle] diff --git a/glue/src/e2e.rs b/glue/src/e2e.rs index 2dc0244..a64f3ea 100644 --- a/glue/src/e2e.rs +++ b/glue/src/e2e.rs @@ -4,7 +4,9 @@ use no_std_net::SocketAddr; use toad::config::Config; use toad::net::Addrd; use toad::platform::Platform; +use toad_jni::java::lang::System; use toad_jni::java::nio::channels::PeekableDatagramChannel; +use toad_jni::java::util::logging::Logger; use toad_jni::java::{self}; use toad_msg::alloc::Message; use toad_msg::{Code, Id, Token, Type}; diff --git a/glue/src/lib.rs b/glue/src/lib.rs index af18af0..6cb78c7 100644 --- a/glue/src/lib.rs +++ b/glue/src/lib.rs @@ -10,7 +10,6 @@ mod runtime { use toad::config::Config; use toad::platform::{Effect, Platform}; - use toad::step::runtime::Runtime as DefaultSteps; use toad_jni::java::io::IOException; use toad_jni::java::nio::channels::PeekableDatagramChannel; use toad_jni::java::util::logging::{ConsoleHandler, Level, Logger}; @@ -30,7 +29,17 @@ mod runtime { type Effects = Vec>; } - type Steps = DefaultSteps; + #[rustfmt::skip] + pub type Steps = + toad::step::runtime::Observe>>>>>>; pub struct Runtime { steps: Steps, @@ -41,22 +50,7 @@ mod runtime { } impl Runtime { - pub fn new(e: &mut java::Env, - log_level: Level, - config: Config, - channel: PeekableDatagramChannel) - -> Self { - let logger = Logger::get_logger(e, "dev.toad"); - - if logger.uses_parent_handlers(e) { - let handler = ConsoleHandler::new(e); - handler.set_level(e, log_level); - - logger.use_parent_handlers(e, false); - logger.add_handler(e, handler.to_handler()); - logger.set_level(e, log_level); - } - + pub fn new(logger: Logger, config: Config, channel: PeekableDatagramChannel) -> Self { Self { steps: Default::default(), config, channel, diff --git a/src/main/java/dev.toad/Async.java b/src/main/java/dev.toad/Async.java index 61b3af1..19b2f40 100644 --- a/src/main/java/dev.toad/Async.java +++ b/src/main/java/dev.toad/Async.java @@ -2,15 +2,69 @@ package dev.toad; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; public class Async { public static ScheduledExecutorService executor = - Executors.newSingleThreadScheduledExecutor(); + Executors.newScheduledThreadPool(32); + + public static class LoopHandle implements AutoCloseable { + + CompletableFuture fut = null; + final AtomicBoolean break_ = new AtomicBoolean(false); + final Runnable onStop; + + LoopHandle(Runnable onStop) { + this.onStop = onStop; + } + + public void join() throws Throwable, InterruptedException { + try { + this.fut.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + public void stop() throws Throwable, InterruptedException { + this.break_.set(true); + this.join(); + this.onStop.run(); + } + + @Override + public void close() { + try { + this.stop(); + } catch (Throwable t) {} + } + } + + public static LoopHandle loop(Runnable f) { + return Async.loop(f, () -> {}); + } + + public static LoopHandle loop(Runnable f, Runnable onStop) { + var handle = new LoopHandle(onStop); + var fut = Async.pollCompletable(() -> { + if (handle.break_.get()) { + return Optional.of(new Object()); + } else { + f.run(); + return Optional.empty(); + } + }); + + handle.fut = fut; + + return handle; + } public static CompletableFuture pollCompletable( Supplier> sup diff --git a/src/main/java/dev.toad/Client.java b/src/main/java/dev.toad/Client.java index b830910..699c91d 100644 --- a/src/main/java/dev.toad/Client.java +++ b/src/main/java/dev.toad/Client.java @@ -10,6 +10,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.logging.Level; public final class Client implements AutoCloseable { @@ -19,6 +20,14 @@ public final class Client implements AutoCloseable { this.toad = toad; } + public CompletableFuture ping(String uri) + throws URISyntaxException, UnknownHostException { + return this.send( + Message.builder().uri(uri).type(Type.CON).code(Code.EMPTY).build() + ) + .thenAccept(_m -> {}); + } + public CompletableFuture get(String uri) throws URISyntaxException, UnknownHostException { return this.get(Type.CON, uri); @@ -98,6 +107,7 @@ public final class Client implements AutoCloseable { ); } + this.toad.logger().log(Level.FINE, Toad.LogMessage.tx(message)); return Async.pollCompletable(() -> this.toad.sendMessage(message)); } @@ -114,7 +124,10 @@ public final class Client implements AutoCloseable { ) { return Async .pollCompletable(() -> this.toad.pollResp(t, addr)) - .thenApply(msg -> msg.toOwned()); + .thenApply(msg -> { + this.toad.logger().log(Level.FINE, Toad.LogMessage.rx(msg)); + return msg.toOwned(); + }); } @Override diff --git a/src/main/java/dev.toad/ClientObserveStream.java b/src/main/java/dev.toad/ClientObserveStream.java index e5fac4e..bf05263 100644 --- a/src/main/java/dev.toad/ClientObserveStream.java +++ b/src/main/java/dev.toad/ClientObserveStream.java @@ -1,28 +1,46 @@ package dev.toad; +import dev.toad.Async; import dev.toad.msg.Message; import dev.toad.msg.Token; import dev.toad.msg.option.Observe; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; public class ClientObserveStream implements AutoCloseable { - State state; + boolean closed; CompletableFuture initial; Optional token = Optional.empty(); final Client client; final Message message; public ClientObserveStream(Client client, Message message) { - this.state = State.OPEN; + this.closed = false; this.client = client; this.message = message.buildCopy().option(Observe.REGISTER).build(); this.initial = client.send(this.message); } + public Async.LoopHandle subscribe(Consumer event) { + return Async.loop( + () -> { + try { + event.accept(this.next().get()); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + }, + () -> { + this.close(); + } + ); + } + public CompletableFuture next() { - if (State.eq.test(State.CLOSED, this.state)) { + if (this.closed) { throw new RuntimeException( "ClientObserveStream.next() invoked after .close()" ); @@ -48,28 +66,6 @@ public class ClientObserveStream implements AutoCloseable { .get(); } catch (Throwable t) {} - this.state = State.CLOSED; - } - - public static final class State { - - public static final Eq eq = Eq.int_.contramap((State s) -> s.state); - - public static final State OPEN = new State(0); - public static final State CLOSED = new State(1); - - final int state; - - State(int state) { - this.state = state; - } - - @Override - public boolean equals(Object other) { - return switch (other) { - case State s -> State.eq.test(this, s); - default -> false; - }; - } + this.closed = true; } } diff --git a/src/main/java/dev.toad/Server.java b/src/main/java/dev.toad/Server.java index 87554e1..aaeabe2 100644 --- a/src/main/java/dev.toad/Server.java +++ b/src/main/java/dev.toad/Server.java @@ -1,25 +1,30 @@ package dev.toad; +import dev.toad.Async; import dev.toad.msg.Code; import dev.toad.msg.Message; import dev.toad.msg.Payload; +import dev.toad.msg.Type; import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.DatagramChannel; import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.logging.Level; -import java.util.logging.Level; +import java.util.logging.Logger; public final class Server implements AutoCloseable { - boolean exit = false; - Optional thread = Optional.empty(); + boolean closed = false; + Optional loopHandle = Optional.empty(); final Toad toad; final ArrayList middlewares; @@ -28,121 +33,145 @@ public final class Server implements AutoCloseable { this.middlewares = ms; } - public void exit() { - this.exit = true; + public DatagramChannel channel() { + return this.toad.channel(); + } + + dev.toad.msg.ref.Message blockUntilRequest() + throws InterruptedException, ExecutionException { + return Async + .pollCompletable(() -> { + if (this.closed) { + throw new Exit(); + } + + return this.toad.pollReq(); + }) + .get(); + } + + void tick() { + try { + var req = this.blockUntilRequest(); + var addr = req.addr().get(); + + this.toad.logger().log(Level.FINE, Toad.LogMessage.rx(req)); + + var result = Middleware.Result.next(); + var acked = new AtomicBoolean(false); + for (var m : this.middlewares) { + var ctx = new Middleware.Context(this.toad.logger(), result, req); + result = + Middleware.tick( + m, + ctx, + () -> { + if (req.type() != Type.CON) { + return; + } + + acked.set(true); + this.toad.logger().log(Level.FINE, LogMessage.earlyAck(req)); + var ack = req + .buildResponse() + .code(Code.EMPTY) + .type(Type.ACK) + .build(); + this.toad.logger().log(Level.FINE, Toad.LogMessage.tx(ack)); + try { + Async.pollCompletable(() -> this.toad.sendMessage(ack)).get(); + } catch (Throwable t) { + this.toad.logger().log(Level.SEVERE, Toad.LogMessage.tx(ack)); + } + } + ); + } + + var resp = result.response().get(); + if (resp.isEmpty()) { + this.toad.logger().log(Level.SEVERE, LogMessage.unhandledRequest(req)); + } else { + var resp_ = resp.get(); + var resp__ = resp_.type() == Type.ACK && acked.get() + ? resp_.buildCopy().type(Type.CON).build() + : resp_; + this.toad.logger().log(Level.FINE, Toad.LogMessage.tx(resp__)); + Async.pollCompletable(() -> this.toad.sendMessage(resp__)).get(); + } + + req.close(); + } catch (Throwable e) { + if (e.getCause() instanceof Exit || e instanceof Exit) { + return; + } + + this.toad.logger().log(Level.SEVERE, e.toString()); + e.printStackTrace(); + } } public void notify(String path) { this.toad.notify(path); } - public Thread run() { - if (!this.thread.isEmpty()) { - return this.thread.get(); + public Async.LoopHandle run() { + if (!this.loopHandle.isEmpty()) { + return this.loopHandle.get(); } - var thread = new Thread(() -> { - try { - Toad - .logger() - .log( - Level.INFO, - String.format( - "Server listening on %s", - this.toad.localAddress().toString() - ) - ); - } catch (Throwable t) {} + try { + this.toad.logger() + .log(Level.INFO, LogMessage.startup(this.toad.localAddress())); + } catch (Throwable t) {} - while (true) { - try { - dev.toad.msg.ref.Message req = Async - .pollCompletable(() -> { - if (this.exit) { - throw new Exit(); - } - - return this.toad.pollReq(); - }) - .get(); - var addr = req.addr().get(); - - Toad - .logger() - .log( - Level.FINE, - String.format("<== %s\n%s", addr.toString(), req.toDebugString()) - ); - - Middleware.Result result = Middleware.Result.next(); - for (var m : this.middlewares) { - if (!m.shouldRun.test(result, req)) { - continue; - } - - try { - if (result.isAsync()) { - result.response().get(); - // Toad.ack(req); - } - - result = m.run(result, req); - } catch (Throwable e) { - result = Middleware.Result.error(e); - } - } - - var resp = result.response().get(); - if (resp.isEmpty()) { - Toad - .logger() - .log( - Level.SEVERE, - String.format( - "Server never generated response for message\n%s", - req.toDebugString() - ) - ); - } else { - var resp_ = resp.get().toOwned(); - Toad - .logger() - .log( - Level.FINE, - String.format("==> %s\n%s", addr, resp_.toDebugString()) - ); - Async.pollCompletable(() -> this.toad.sendMessage(resp_)).get(); - } - - req.close(); - } catch (Throwable e) { - if (e.getCause() instanceof Exit || e instanceof Exit) { - return; - } - - Toad.logger().log(Level.SEVERE, e.toString()); - e.printStackTrace(); - } - } - }); - - thread.start(); - - this.thread = Optional.of(thread); - return thread; + this.loopHandle = Optional.of(Async.loop(() -> this.tick())); + return this.loopHandle.get(); } @Override public void close() { + this.closed = true; + + try { + if (!this.loopHandle.isEmpty()) { + this.loopHandle.get().stop(); + } + } catch (Throwable e) {} + this.toad.close(); } + static final class LogMessage { + + static String startup(InetSocketAddress addr) { + return String.format("Server listening on %s", addr.toString()); + } + + static String earlyAck(Message req) { + return String.format("Separately ACKing request %s", req.toDebugString()); + } + + static String unhandledRequest(Message req) { + return String.format( + "Server never generated response for message\n%s", + req + ); + } + + static String ex(Message m) { + return String.format( + "Exception thrown while handling:\n%s\nException:", + m.toDebugString() + ); + } + } + static final class Exit extends RuntimeException {} public static final class Middleware { final BiPredicate shouldRun; - final BiFunction fun; + final Function fun; static final CompletableFuture> noop = CompletableFuture.completedFuture(Optional.empty()); @@ -152,46 +181,36 @@ public final class Server implements AutoCloseable { Optional.of(m.buildResponse().code(Code.NOT_FOUND).build()) ); + public static final Middleware handlePing = Middleware.requestHandler(m -> + Type.eq.test(m.type(), Type.CON) && Code.eq.test(m.code(), Code.EMPTY) + ? Optional.of( + m.buildResponse().code(Code.EMPTY).type(Type.RESET).build() + ) + : Optional.empty() + ); + public static final Middleware handleException = new Middleware( (r, _m) -> r instanceof ResultError, - (r, m) -> { - var e = ((ResultError) r).error; + ctx -> { + var e = ((ResultError) ctx.result).error; - Toad - .logger() - .log( - Level.SEVERE, - String.format( - "Exception thrown while handling:\n%s\nException:", - m.toDebugString() - ), - e - ); + ctx.logger.log(Level.SEVERE, LogMessage.ex(ctx.request), e); return Middleware.Result.respond( - m.buildResponse().code(Code.INTERNAL_SERVER_ERROR).build() + ctx.request.buildResponse().code(Code.INTERNAL_SERVER_ERROR).build() ); } ); public static final Middleware handleExceptionDebug = new Middleware( (r, _m) -> r instanceof ResultError, - (r, m) -> { - var e = ((ResultError) r).error; + ctx -> { + var e = ((ResultError) ctx.result).error; - Toad - .logger() - .log( - Level.SEVERE, - String.format( - "Exception thrown while handling:\n%s\nException:", - m.toDebugString() - ), - e - ); + ctx.logger.log(Level.SEVERE, LogMessage.ex(ctx.request), e); return Middleware.Result.respond( - m + ctx.request .buildResponse() .code(Code.INTERNAL_SERVER_ERROR) .payload(Payload.text(e.toString())) @@ -200,14 +219,31 @@ public final class Server implements AutoCloseable { } ); + static Result tick(Middleware m, Context ctx, Runnable separatelyAck) { + if (!m.shouldRun.test(ctx.result, ctx.request)) { + return ctx.result; + } + + try { + if (ctx.result.isAsync()) { + ctx.result.response().get(); + separatelyAck.run(); + } + + return m.run(ctx); + } catch (Throwable e) { + return Middleware.Result.error(e); + } + } + public static Middleware requestHandler( Function> f ) { return new Middleware( (r, _m) -> !r.isFinal(), - (_result, request) -> + ctx -> f - .apply(request) + .apply(ctx.request) .map(Middleware.Result::respond) .orElse(Middleware.Result.next()) ); @@ -218,9 +254,9 @@ public final class Server implements AutoCloseable { ) { return new Middleware( (r, _m) -> !r.isFinal(), - (_result, request) -> + ctx -> f - .apply(request) + .apply(ctx.request) .map(Middleware.Result::respond) .orElse(Middleware.Result.next()) ); @@ -230,11 +266,11 @@ public final class Server implements AutoCloseable { return new Middleware( (r, _m) -> r instanceof ResultRespondSync || r instanceof ResultRespondAsync, - (res, request) -> { + ctx -> { try { - var rep = res.response().get().get(); + var rep = ctx.result.response().get().get(); f.accept(rep); - return res; + return ctx.result; } catch (Throwable e) { throw new RuntimeException(e); } @@ -245,20 +281,20 @@ public final class Server implements AutoCloseable { public static Middleware exceptionHandler(Function f) { return new Middleware( (r, _m) -> r instanceof ResultError, - (res, request) -> f.apply(((ResultError) res).error) + ctx -> f.apply(((ResultError) ctx.result).error) ); } public Middleware( BiPredicate shouldRun, - BiFunction fun + Function fun ) { this.fun = fun; this.shouldRun = shouldRun; } - public Result run(Result res, Message msg) { - return this.fun.apply(res, msg); + public Result run(Context context) { + return this.fun.apply(context); } public Middleware filter(BiPredicate f) { @@ -268,6 +304,19 @@ public final class Server implements AutoCloseable { ); } + public static final class Context { + + public final Logger logger; + public final Result result; + public final Message request; + + public Context(Logger logger, Result result, Message request) { + this.logger = logger; + this.result = result; + this.request = request; + } + } + public static sealed interface Result permits ResultError, @@ -512,12 +561,14 @@ public final class Server implements AutoCloseable { public Server buildDebugExceptionHandler() { this.middleware(Middleware.handleExceptionDebug); + this.middleware(Middleware.handlePing); this.middleware(Middleware.respondNotFound); return new Server(this.toad, this.middlewares); } public Server build() { this.middleware(Middleware.handleException); + this.middleware(Middleware.handlePing); this.middleware(Middleware.respondNotFound); return new Server(this.toad, this.middlewares); } diff --git a/src/main/java/dev.toad/Toad.java b/src/main/java/dev.toad/Toad.java index dacf33f..fe9e932 100644 --- a/src/main/java/dev.toad/Toad.java +++ b/src/main/java/dev.toad/Toad.java @@ -11,17 +11,13 @@ import java.time.Duration; import java.util.Optional; import java.util.function.Function; import java.util.logging.ConsoleHandler; +import java.util.logging.Formatter; +import java.util.logging.Level; import java.util.logging.Logger; +import java.util.logging.SimpleFormatter; public final class Toad implements AutoCloseable { - public static Logger logger() { - // Configured in `glue::Runtime::new()` - var l = Logger.getLogger("dev.toad"); - l.setUseParentHandlers(false); - return l; - } - static native Config defaultConfigImpl(); static Config defaultConfig = null; @@ -42,13 +38,14 @@ public final class Toad implements AutoCloseable { System.loadLibrary("toad_java_glue"); } + final Logger logger; final Ptr ptr; final Config config; final DatagramChannel channel; static native void teardown(); - static native long init(DatagramChannel chan, Config o); + static native long init(Logger logger, DatagramChannel chan, Config o); static native Optional sendMessage( long ptr, @@ -85,15 +82,33 @@ public final class Toad implements AutoCloseable { } Toad(Config o, DatagramChannel channel) { + this.logger = Logger.getLogger("dev.toad"); this.config = o; this.channel = channel; - this.ptr = Ptr.register(this.getClass(), this.init(this.channel, o)); + this.ptr = + Ptr.register(this.getClass(), this.init(this.logger, this.channel, o)); + } + + Toad(Config o, Logger logger, DatagramChannel channel) { + this.logger = logger; + this.config = o; + this.channel = channel; + this.ptr = + Ptr.register(this.getClass(), this.init(this.logger, this.channel, o)); + } + + public DatagramChannel channel() { + return this.channel; } public Config config() { return this.config; } + public Logger logger() { + return this.logger; + } + public InetSocketAddress localAddress() throws IOException { return (InetSocketAddress) this.channel.getLocalAddress(); } @@ -122,6 +137,25 @@ public final class Toad implements AutoCloseable { } } + static final class LogMessage { + + static String rx(Message msg) { + return String.format( + "<== %s\n%s", + msg.addr().get().toString(), + msg.toDebugString() + ); + } + + static String tx(Message msg) { + return String.format( + "==> %s\n%s", + msg.addr().get().toString(), + msg.toDebugString() + ); + } + } + public interface BuilderRequiresSocket { Toad.Builder port(short port); Toad.Builder address(InetSocketAddress addr); @@ -134,14 +168,33 @@ public final class Toad implements AutoCloseable { Config.Msg.Builder msg = Config.Msg.builder(); Optional channel = Optional.empty(); Optional logLevel = Optional.empty(); + Optional logFormatter = Optional.empty(); + Optional loggerName = Optional.empty(); u8 concurrency = Toad.defaultConfig().concurrency; Builder() {} + Logger logger() { + var logger = Logger.getLogger(this.loggerName.orElse("dev.toad")); + if (logger.getUseParentHandlers()) { + var level = this.logLevel.orElse(Level.INFO); + + var handler = new ConsoleHandler(); + handler.setFormatter(this.logFormatter.orElse(new SimpleFormatter())); + handler.setLevel(level); + + logger.setUseParentHandlers(false); + logger.addHandler(handler); + logger.setLevel(level); + } + + return logger; + } + public Client buildClient() throws IOException { if (this.ioException.isEmpty()) { - var cfg = new Config(this.logLevel, this.concurrency, this.msg.build()); - var toad = new Toad(cfg, this.channel.get()); + var cfg = new Config(this.concurrency, this.msg.build()); + var toad = new Toad(cfg, this.logger(), this.channel.get()); return new Client(toad); } else { throw this.ioException.get(); @@ -150,8 +203,8 @@ public final class Toad implements AutoCloseable { public Server.Builder server() throws IOException { if (this.ioException.isEmpty()) { - var cfg = new Config(this.logLevel, this.concurrency, this.msg.build()); - var toad = new Toad(cfg, this.channel.get()); + var cfg = new Config(this.concurrency, this.msg.build()); + var toad = new Toad(cfg, this.logger(), this.channel.get()); return new Server.Builder(toad); } else { throw this.ioException.get(); @@ -172,6 +225,16 @@ public final class Toad implements AutoCloseable { return this; } + public Builder logFormatter(java.util.logging.Formatter f) { + this.logFormatter = Optional.of(f); + return this; + } + + public Builder loggerName(String name) { + this.loggerName = Optional.of(name); + return this; + } + public Builder address(InetSocketAddress addr) { try { DatagramChannel channel = DatagramChannel.open( @@ -199,16 +262,10 @@ public final class Toad implements AutoCloseable { public static final class Config { - final Optional logLevel; final u8 concurrency; final Msg msg; - Config( - Optional logLevel, - u8 concurrency, - Msg msg - ) { - this.logLevel = logLevel; + Config(u8 concurrency, Msg msg) { this.concurrency = concurrency; this.msg = msg; } @@ -222,10 +279,6 @@ public final class Toad implements AutoCloseable { }; } - public java.util.logging.Level logLevel() { - return this.logLevel.orElse(java.util.logging.Level.INFO); - } - public InetSocketAddress addr() { return this.addr(); } diff --git a/src/test/scala/Async.scala b/src/test/scala/Async.scala index 96d6566..4f44fb3 100644 --- a/src/test/scala/Async.scala +++ b/src/test/scala/Async.scala @@ -1,9 +1,53 @@ import dev.toad.Async +import java.util.Date import java.util.Optional import java.util.concurrent.TimeUnit import java.util.concurrent.CompletableFuture +import java.time.Duration class AsyncTest extends munit.FunSuite { + test("loop() runs until LoopHandle.stop() invoked") { + var counter = 0; + val loop = Async.loop(() => { + counter += 1 + }); + + val start = Date().getTime(); + while (counter < 10 && Date().getTime() < start + 1000) { + Thread.sleep(Duration.ofMillis(10)) + } + + loop.stop() + assert(counter >= 10) + } + + test("LoopHandle.stop() prevents further runs") { + var stopped = false; + val loop = Async.loop(() => { + if stopped then throw new Error() + }); + + loop.stop() + stopped = true + } + + test("LoopHandle.join() rethrows exceptions") { + case class LoopThrew() extends Throwable {} + case class LoopDidNotThrow() extends Throwable {} + + try { + val loop = Async.loop((() => { + throw LoopThrew() + }): java.lang.Runnable) + + loop.join(); + throw LoopDidNotThrow() + } catch { + case LoopThrew() => {} + case e => throw e + } + } + test("pollCompletable polls then completes") { var polls = 0 diff --git a/src/test/scala/E2E.scala b/src/test/scala/E2E.scala index 2b0a287..7efcb6c 100644 --- a/src/test/scala/E2E.scala +++ b/src/test/scala/E2E.scala @@ -7,16 +7,79 @@ import java.lang.Thread import java.net.InetAddress import java.net.InetSocketAddress import java.util.logging.Logger +import java.util.logging.LogRecord import java.util.logging.Level +import java.util.logging.Formatter import java.util.ArrayList import java.util.Optional import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.TimeUnit import java.net.InetAddress -val logLevel = Level.INFO; +val logLevel = Level.OFF; val serverPort = 10102; val clientPort = 10101; +val client2Port = 10100; + +val totalWidth = 120 +val padding = 1 +val sep = "||" +val startMillis = java.time.Instant.now.toEpochMilli + +enum Position: + case Left, Center, Right + +def lines(pos: Position, r: LogRecord): String = { + val width = (totalWidth / 3).intValue + val logger = r.getLoggerName + val level = r.getLevel.toString + val message = + s"[$logger($level)] ${r.getMillis - startMillis}\n${r.getMessage}" + + 0.until(message.length) + .foldLeft(Seq(""))((lines, ix) => { + val lines_ = lines.init + val last = lines.last + (last, String.valueOf(message.charAt(ix))) match { + case (_, "\n") => lines :+ "" + case (line, s) if line.length == width => lines :+ s + case (line, s) => lines_ :+ (line ++ s) + } + }) + .map(lineText => line(pos, lineText)) + .foldLeft("")((m, line) => s"$m$line\n") ++ "\n" +} + +def line(pos: Position, text: String): String = { + val width = (totalWidth / 2).intValue + val empty = " ".repeat(width) + val after = " ".repeat(width - text.length) + val pad = " ".repeat(padding) + pos match { + case Position.Left => s"$text$after$pad$sep$pad$empty$pad$sep" + case Position.Center => s"$empty$pad$sep$pad$text$after$pad$sep" + case Position.Right => s"$empty$pad$sep$pad$empty$pad$sep$pad$text" + } +} + +class ServerLogFormatter extends Formatter { + override def format(r: LogRecord): String = { + lines(Position.Left, r) + } +} + +class Client1LogFormatter extends Formatter { + override def format(r: LogRecord): String = { + lines(Position.Center, r) + } +} + +class Client2LogFormatter extends Formatter { + override def format(r: LogRecord): String = { + lines(Position.Right, r) + } +} class E2E extends munit.FunSuite { val client = new Fixture[Client]("client") { @@ -25,8 +88,32 @@ class E2E extends munit.FunSuite { def apply() = this.client override def beforeAll(): Unit = { - this.client = - Toad.builder.port(clientPort.shortValue).logLevel(logLevel).buildClient + this.client = Toad.builder + .port(clientPort.shortValue) + .logLevel(logLevel) + .loggerName("client") + .logFormatter(Client1LogFormatter()) + .buildClient + } + + override def afterAll(): Unit = { + this.client.close() + this.client = null + } + } + + val client2 = new Fixture[Client]("client2") { + private var client: Client = null; + + def apply() = this.client + + override def beforeAll(): Unit = { + this.client = Toad.builder + .port(client2Port.shortValue) + .logLevel(logLevel) + .loggerName("client2") + .logFormatter(Client2LogFormatter()) + .buildClient } override def afterAll(): Unit = { @@ -36,7 +123,6 @@ class E2E extends munit.FunSuite { } val server = new Fixture[Server]("server") { - private var thread: Thread = null; private var server: Server = null; def apply() = this.server @@ -44,10 +130,12 @@ class E2E extends munit.FunSuite { override def beforeAll(): Unit = { Toad.loadNativeLib() - var counterN = 0 + var number = AtomicInteger(0) this.server = Toad.builder .port(serverPort.shortValue) .logLevel(logLevel) + .loggerName("server") + .logFormatter(ServerLogFormatter()) .server .put( "failing", @@ -55,14 +143,21 @@ class E2E extends munit.FunSuite { throw java.lang.RuntimeException("fart") } ) - .get( - "counter", + .post( + "number", + msg => { + number.set(Integer.decode(msg.payload.toString)) + this.server.notify("number") + Optional.of(msg.buildResponse.code(Code.OK_CHANGED).build) + } + ) + .get( + "number", msg => { - counterN += 1 Optional.of( msg.buildResponse .code(Code.OK_CONTENT) - .payload(Payload.text(s"$counterN")) + .payload(Payload.text(s"${number.get()}")) .build ) } @@ -80,17 +175,15 @@ class E2E extends munit.FunSuite { ) .build - this.thread = this.server.run() + this.server.run() } override def afterAll(): Unit = { - this.server.exit() - this.thread.join() this.server.close() } } - override def munitFixtures = List(client, server) + override def munitFixtures = List(client, client2, server) test("server responds 2.05 Ok Content") { server() @@ -116,25 +209,61 @@ class E2E extends munit.FunSuite { test( "ClientObserveStream yields new resource states when server is notified of new state" ) { - val stream = - client() - .observe( - Message.builder - .uri(s"coap://localhost:$serverPort/counter") - .`type`(Type.NON) - .code(Code.GET) - .build - ); + val n = AtomicInteger(-1) + val subscription = client() + .observe( + Message.builder + .uri(s"coap://localhost:$serverPort/number") + .`type`(Type.NON) + .code(Code.GET) + .build + ) + .subscribe(rep => { + n.set(Integer.decode(rep.payload.toString)) + }) - try { - assertNoDiff(stream.next().get().payload.toString, "1") - server().notify("counter") - assertNoDiff(stream.next().get().payload.toString, "2") - server().notify("counter") - assertNoDiff(stream.next().get().payload.toString, "3") - } finally { - stream.close() - } + def shouldBe(x: Int) = + Async + .pollCompletable(() => { + Optional.of(Object()).filter({ case _ => n.get() == x }) + }) + .get(1, TimeUnit.SECONDS) + + shouldBe(0) + client2() + .post( + Type.CON, + s"coap://localhost:$serverPort/number", + Payload.text(1.toString) + ) + .get() + shouldBe(1) + client2() + .post( + Type.CON, + s"coap://localhost:$serverPort/number", + Payload.text(2.toString) + ) + .get() + shouldBe(2) + client2() + .post( + Type.CON, + s"coap://localhost:$serverPort/number", + Payload.text(3.toString) + ) + .get() + shouldBe(3) + client2() + .post( + Type.CON, + s"coap://localhost:$serverPort/number", + Payload.text(4.toString) + ) + .get() + + subscription.stop() + subscription.close() } test("unregistered resource responds 4.04 Not Found") {