diff --git a/glue/Cargo.lock b/glue/Cargo.lock index a220312..dbafa35 100644 --- a/glue/Cargo.lock +++ b/glue/Cargo.lock @@ -504,9 +504,9 @@ checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" [[package]] name = "toad" -version = "0.17.4" +version = "0.17.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eeb47a7e30efe198acc91ee919d420b478d9874c9362611d61f2ad9a3d27848" +checksum = "7a19447d2692bd6d8b1c0c10457f42394da7b3f633b96c8705825aca91c080ad" dependencies = [ "embedded-time", "log", @@ -582,9 +582,9 @@ dependencies = [ [[package]] name = "toad-jni" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0bdcbce1dfef5ac76668490d12944aec41816999b2bd6ab889a5afbcbf13eb5" +checksum = "15d9feb2f3a24a6d67aa5672d37893d160a57b9dbbc62d09a0f243abaec6c11f" dependencies = [ "embedded-time", "jni", diff --git a/glue/Cargo.toml b/glue/Cargo.toml index 8674be8..1284e93 100644 --- a/glue/Cargo.toml +++ b/glue/Cargo.toml @@ -14,8 +14,8 @@ e2e = [] [dependencies] jni = "0.21.1" nb = "1" -toad = "0.17.4" -toad-jni = "0.14.0" +toad = "0.17.5" +toad-jni = "0.14.1" no-std-net = "0.6" toad-msg = "0.18.1" tinyvec = {version = "1.5", default_features = false, features = ["rustc_1_55"]} diff --git a/glue/src/dev/toad/mod.rs b/glue/src/dev/toad/mod.rs index bfd0e2f..7be8188 100644 --- a/glue/src/dev/toad/mod.rs +++ b/glue/src/dev/toad/mod.rs @@ -3,18 +3,21 @@ pub mod msg; mod retry_strategy; -use jni::objects::{JClass, JObject}; +use jni::objects::{JClass, JObject, JThrowable}; use jni::sys::jobject; pub use retry_strategy::RetryStrategy; use toad::net::Addrd; -use toad::platform::Platform; +use toad::platform::{Platform, PlatformError}; use toad::retry::{Attempts, Strategy}; +use toad::step::Step; use toad::time::Millis; +use toad_jni::java::io::IOException; use toad_jni::java::net::InetSocketAddress; use toad_jni::java::nio::channels::{DatagramChannel, PeekableDatagramChannel}; use toad_jni::java::util::Optional; -use toad_jni::java::{self, Object}; +use toad_jni::java::{self, Object, ResultYieldToJavaOrThrow}; +use self::ffi::Ptr; use crate::mem::{Shared, SharedMemoryRegion}; use crate::Runtime; @@ -38,11 +41,27 @@ impl Toad { CONFIG.invoke(e, self) } + pub fn ptr(&self, e: &mut java::Env) -> Ptr { + static PTR: java::Field = java::Field::new("ptr"); + PTR.get(e, self) + } + fn init_impl(e: &mut java::Env, cfg: Config, channel: PeekableDatagramChannel) -> i64 { let r = Runtime::new(&mut java::env(), cfg.log_level(e), cfg.to_toad(e), channel); unsafe { crate::mem::Shared::add_runtime(r).addr() as i64 } } + fn notify_impl(&self, + e: &mut java::Env, + path: impl AsRef + Clone) + -> Result<(), java::io::IOException> { + self.ptr(e) + .addr(e) + .map_err(|err| IOException::new_caused_by(e, "", err)) + .map(|addr| unsafe { Shared::deref::(addr.inner(e)).as_ref().unwrap() }) + .and_then(|r| r.notify(path).map_err(PlatformError::step)) + } + fn poll_req_impl(e: &mut java::Env, addr: i64) -> java::util::Optional { let r = unsafe { Shared::deref::(addr).as_ref().unwrap() }; match r.poll_req() { @@ -375,6 +394,21 @@ pub extern "system" fn Java_dev_toad_Toad_pollResp<'local>(mut e: java::Env<'loc Toad::poll_resp_impl(e, addr, token, sock).yield_to_java(e) } +#[no_mangle] +pub extern "system" fn Java_dev_toad_Toad_notify<'local>(mut e: java::Env<'local>, + toad: JObject<'local>, + path: JObject<'local>) + -> () { + let e = &mut e; + let toad = java::lang::Object::from_local(e, toad).upcast_to::(e); + let path = java::lang::Object::from_local(e, path).upcast_to::(e); + + if let Err(err) = toad.notify_impl(e, path) { + let err = JThrowable::from(err.downcast(e).to_local(e)); + e.throw(err).unwrap() + } +} + #[no_mangle] pub extern "system" fn Java_dev_toad_Toad_teardown<'local>(_: java::Env<'local>, _: JClass<'local>) diff --git a/glue/src/lib.rs b/glue/src/lib.rs index 708a32b..af18af0 100644 --- a/glue/src/lib.rs +++ b/glue/src/lib.rs @@ -46,13 +46,16 @@ mod runtime { config: Config, channel: PeekableDatagramChannel) -> Self { - let handler = ConsoleHandler::new(e); - handler.set_level(e, log_level); - let logger = Logger::get_logger(e, "dev.toad"); - logger.use_parent_handlers(e, false); - logger.add_handler(e, handler.to_handler()); - logger.set_level(e, log_level); + + if logger.uses_parent_handlers(e) { + let handler = ConsoleHandler::new(e); + handler.set_level(e, log_level); + + logger.use_parent_handlers(e, false); + logger.add_handler(e, handler.to_handler()); + logger.set_level(e, log_level); + } Self { steps: Default::default(), config, diff --git a/src/main/java/dev.toad/ClientObserveStream.java b/src/main/java/dev.toad/ClientObserveStream.java index d816baf..e5fac4e 100644 --- a/src/main/java/dev.toad/ClientObserveStream.java +++ b/src/main/java/dev.toad/ClientObserveStream.java @@ -1,14 +1,16 @@ package dev.toad; import dev.toad.msg.Message; +import dev.toad.msg.Token; import dev.toad.msg.option.Observe; import java.util.Optional; import java.util.concurrent.CompletableFuture; -public class ClientObserveStream { +public class ClientObserveStream implements AutoCloseable { State state; - Optional> buffered; + CompletableFuture initial; + Optional token = Optional.empty(); final Client client; final Message message; @@ -16,16 +18,7 @@ public class ClientObserveStream { this.state = State.OPEN; this.client = client; this.message = message.buildCopy().option(Observe.REGISTER).build(); - this.buffered = Optional.of(client.send(this.message)); - } - - public CompletableFuture close() { - return this.client.send( - this.message.buildCopy().option(Observe.DEREGISTER).unsetId().build() - ) - .thenAccept(m -> { - this.state = State.CLOSED; - }); + this.initial = client.send(this.message); } public CompletableFuture next() { @@ -33,18 +26,31 @@ public class ClientObserveStream { throw new RuntimeException( "ClientObserveStream.next() invoked after .close()" ); - } else if (this.buffered.isEmpty()) { - var buffered = this.buffered.get(); - this.buffered = Optional.empty(); - return buffered; + } else if (this.token.isEmpty()) { + return this.initial.whenComplete((rep, e) -> { + if (rep != null) { + this.token = Optional.of(rep.token()); + } + }); } else { - return this.client.awaitResponse( - this.message.token(), - this.message.addr().get() + return this.initial.thenCompose(_i -> + this.client.awaitResponse(this.token.get(), this.message.addr().get()) ); } } + @Override + public void close() { + try { + this.client.send( + this.message.buildCopy().option(Observe.DEREGISTER).unsetId().build() + ) + .get(); + } catch (Throwable t) {} + + this.state = State.CLOSED; + } + public static final class State { public static final Eq eq = Eq.int_.contramap((State s) -> s.state); diff --git a/src/main/java/dev.toad/Eq.java b/src/main/java/dev.toad/Eq.java index ebb26c9..671fb99 100644 --- a/src/main/java/dev.toad/Eq.java +++ b/src/main/java/dev.toad/Eq.java @@ -9,6 +9,7 @@ import java.util.function.Function; public final class Eq { + public static final Eq bool = new Eq<>((a, b) -> a == b); public static final Eq short_ = new Eq<>((a, b) -> a == b); public static final Eq int_ = new Eq<>((a, b) -> a == b); public static final Eq long_ = new Eq<>((a, b) -> a == b); diff --git a/src/main/java/dev.toad/Server.java b/src/main/java/dev.toad/Server.java index 13a8389..87554e1 100644 --- a/src/main/java/dev.toad/Server.java +++ b/src/main/java/dev.toad/Server.java @@ -3,207 +3,305 @@ package dev.toad; import dev.toad.msg.Code; import dev.toad.msg.Message; import dev.toad.msg.Payload; +import java.io.IOException; import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.BiFunction; +import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Level; -public final class Server { +public final class Server implements AutoCloseable { + boolean exit = false; + Optional thread = Optional.empty(); final Toad toad; - final ArrayList> middlewares; - final Function notFoundHandler; - final BiFunction exceptionHandler; + final ArrayList middlewares; - Server( - Toad toad, - ArrayList> ms, - Function notFoundHandler, - BiFunction exHandler - ) { + Server(Toad toad, ArrayList ms) { this.toad = toad; this.middlewares = ms; - this.notFoundHandler = notFoundHandler; - this.exceptionHandler = exHandler; } - public void run() { - while (true) { + public void exit() { + this.exit = true; + } + + public void notify(String path) { + this.toad.notify(path); + } + + public Thread run() { + if (!this.thread.isEmpty()) { + return this.thread.get(); + } + + var thread = new Thread(() -> { try { - dev.toad.msg.ref.Message req = Async - .pollCompletable(() -> this.toad.pollReq()) - .get(); + Toad + .logger() + .log( + Level.INFO, + String.format( + "Server listening on %s", + this.toad.localAddress().toString() + ) + ); + } catch (Throwable t) {} - Middleware.Result result = Middleware.next(); - for (var f : this.middlewares) { - if (!result.shouldContinue()) { - break; - } + while (true) { + try { + dev.toad.msg.ref.Message req = Async + .pollCompletable(() -> { + if (this.exit) { + throw new Exit(); + } - if (result.isAsync()) { - try { - result.response().get(); - } catch (Throwable e) { - result = Middleware.error(e); - break; - } - // Toad.ack(req); - } + return this.toad.pollReq(); + }) + .get(); + var addr = req.addr().get(); - try { - result = f.apply(req); - } catch (Throwable e) { - result = Middleware.error(e); - } - } - - switch (result) { - case Middleware.ResultExit e: - this.toad.close(); - return; - case Middleware.ResultError e: - result = this.exceptionHandler.apply(req, e.error); - break; - case Middleware.ResultNextSync n: - result = this.notFoundHandler.apply(req); - break; - case Middleware.ResultNextAsync n: - result = this.notFoundHandler.apply(req); - break; - default: - break; - } - - req.close(); - - var resp = result.response().get(); - if (resp.isEmpty()) { Toad .logger() .log( - Level.SEVERE, - String.format( - "Server never generated response for message\n%s", - req.toDebugString() - ) + Level.FINE, + String.format("<== %s\n%s", addr.toString(), req.toDebugString()) ); - } else { - Async - .pollCompletable(() -> this.toad.sendMessage(resp.get().toOwned())) - .get(); + + Middleware.Result result = Middleware.Result.next(); + for (var m : this.middlewares) { + if (!m.shouldRun.test(result, req)) { + continue; + } + + try { + if (result.isAsync()) { + result.response().get(); + // Toad.ack(req); + } + + result = m.run(result, req); + } catch (Throwable e) { + result = Middleware.Result.error(e); + } + } + + var resp = result.response().get(); + if (resp.isEmpty()) { + Toad + .logger() + .log( + Level.SEVERE, + String.format( + "Server never generated response for message\n%s", + req.toDebugString() + ) + ); + } else { + var resp_ = resp.get().toOwned(); + Toad + .logger() + .log( + Level.FINE, + String.format("==> %s\n%s", addr, resp_.toDebugString()) + ); + Async.pollCompletable(() -> this.toad.sendMessage(resp_)).get(); + } + + req.close(); + } catch (Throwable e) { + if (e.getCause() instanceof Exit || e instanceof Exit) { + return; + } + + Toad.logger().log(Level.SEVERE, e.toString()); + e.printStackTrace(); } - } catch (Throwable e) { - Toad.logger().log(Level.SEVERE, e.toString()); } - } + }); + + thread.start(); + + this.thread = Optional.of(thread); + return thread; } + @Override + public void close() { + this.toad.close(); + } + + static final class Exit extends RuntimeException {} + public static final class Middleware { - public static final CompletableFuture> noop = + final BiPredicate shouldRun; + final BiFunction fun; + + static final CompletableFuture> noop = CompletableFuture.completedFuture(Optional.empty()); - public static final Function notFound = m -> { - return Middleware.respond(m.buildResponse().code(Code.NOT_FOUND).build()); - }; + public static final Middleware respondNotFound = + Middleware.requestHandler(m -> + Optional.of(m.buildResponse().code(Code.NOT_FOUND).build()) + ); + + public static final Middleware handleException = new Middleware( + (r, _m) -> r instanceof ResultError, + (r, m) -> { + var e = ((ResultError) r).error; - public static final BiFunction debugExceptionHandler = - (m, e) -> { Toad .logger() .log( Level.SEVERE, - String.format("while handling %s", m.toDebugString()), + String.format( + "Exception thrown while handling:\n%s\nException:", + m.toDebugString() + ), e ); - var rep = m - .buildResponse() - .code(Code.INTERNAL_SERVER_ERROR) - .payload(Payload.text(e.toString())) - .build(); + return Middleware.Result.respond( + m.buildResponse().code(Code.INTERNAL_SERVER_ERROR).build() + ); + } + ); - return Middleware.respond(rep); - }; + public static final Middleware handleExceptionDebug = new Middleware( + (r, _m) -> r instanceof ResultError, + (r, m) -> { + var e = ((ResultError) r).error; - public static final BiFunction exceptionHandler = - (m, e) -> { Toad .logger() .log( Level.SEVERE, - String.format("while handling %s", m.toDebugString()), + String.format( + "Exception thrown while handling:\n%s\nException:", + m.toDebugString() + ), e ); - var rep = m.buildResponse().code(Code.INTERNAL_SERVER_ERROR).build(); - return Middleware.respond(rep); - }; - public static Result respond(Message m) { - return new ResultRespondSync(m); + return Middleware.Result.respond( + m + .buildResponse() + .code(Code.INTERNAL_SERVER_ERROR) + .payload(Payload.text(e.toString())) + .build() + ); + } + ); + + public static Middleware requestHandler( + Function> f + ) { + return new Middleware( + (r, _m) -> !r.isFinal(), + (_result, request) -> + f + .apply(request) + .map(Middleware.Result::respond) + .orElse(Middleware.Result.next()) + ); } - public static Result respond(CompletableFuture m) { - return new ResultRespondAsync(m); + public static Middleware requestHandlerAsync( + Function>> f + ) { + return new Middleware( + (r, _m) -> !r.isFinal(), + (_result, request) -> + f + .apply(request) + .map(Middleware.Result::respond) + .orElse(Middleware.Result.next()) + ); } - public static Result error(Throwable e) { - return new ResultError(e); + public static Middleware responseConsumer(Consumer f) { + return new Middleware( + (r, _m) -> + r instanceof ResultRespondSync || r instanceof ResultRespondAsync, + (res, request) -> { + try { + var rep = res.response().get().get(); + f.accept(rep); + return res; + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + ); } - public static Result exit() { - return new ResultExit(); + public static Middleware exceptionHandler(Function f) { + return new Middleware( + (r, _m) -> r instanceof ResultError, + (res, request) -> f.apply(((ResultError) res).error) + ); } - public static Result next() { - return new ResultNextSync(); + public Middleware( + BiPredicate shouldRun, + BiFunction fun + ) { + this.fun = fun; + this.shouldRun = shouldRun; } - public static Result next(CompletableFuture work) { - return new ResultNextAsync(work); + public Result run(Result res, Message msg) { + return this.fun.apply(res, msg); + } + + public Middleware filter(BiPredicate f) { + return new Middleware( + (r, m) -> f.test(r, m) && this.shouldRun.test(r, m), + this.fun + ); } public static sealed interface Result permits - ResultExit, ResultError, ResultNextSync, ResultNextAsync, ResultRespondSync, ResultRespondAsync { - public boolean shouldContinue(); + public static Result respond(Message m) { + return new ResultRespondSync(m); + } + + public static Result respond(CompletableFuture m) { + return new ResultRespondAsync(m); + } + + public static Result error(Throwable e) { + return new ResultError(e); + } + + public static Result next() { + return new ResultNextSync(); + } + + public static Result next(CompletableFuture work) { + return new ResultNextAsync(work); + } + + public boolean isFinal(); public boolean isAsync(); public CompletableFuture> response(); } - public static final class ResultExit implements Result { - - public ResultExit() {} - - @Override - public boolean shouldContinue() { - return false; - } - - @Override - public boolean isAsync() { - return false; - } - - @Override - public CompletableFuture> response() { - return Middleware.noop; - } - } - public static final class ResultError implements Result { public final Throwable error; @@ -213,8 +311,8 @@ public final class Server { } @Override - public boolean shouldContinue() { - return false; + public boolean isFinal() { + return true; } @Override @@ -233,8 +331,8 @@ public final class Server { public ResultNextSync() {} @Override - public boolean shouldContinue() { - return true; + public boolean isFinal() { + return false; } @Override @@ -257,8 +355,8 @@ public final class Server { } @Override - public boolean shouldContinue() { - return true; + public boolean isFinal() { + return false; } @Override @@ -281,8 +379,8 @@ public final class Server { } @Override - public boolean shouldContinue() { - return false; + public boolean isFinal() { + return true; } @Override @@ -305,8 +403,8 @@ public final class Server { } @Override - public boolean shouldContinue() { - return false; + public boolean isFinal() { + return true; } @Override @@ -324,120 +422,104 @@ public final class Server { public static final class Builder { final Toad toad; - final ArrayList> middlewares = - new ArrayList<>(); - Function notFoundHandler = Middleware.notFound; - BiFunction exceptionHandler = - Middleware.exceptionHandler; + final ArrayList middlewares = new ArrayList<>(); Builder(Toad toad) { this.toad = toad; } - public Builder middleware(Function f) { - this.middlewares.add(f); + public Builder middleware(Middleware m) { + this.middlewares.add(m); return this; } - public Builder when( - Predicate pred, - Function f - ) { - return this.middleware(m -> { - if (pred.test(m)) { - return f.apply(m); - } else { - return Middleware.next(); - } - }); + public Builder put(String path, Function> f) { + return this.put(path, Middleware.requestHandler(f)); } - public Builder put(String path, Function f) { - return this.when( - m -> - Code.eq.test(m.code(), Code.PUT) && - m + public Builder put(String path, Middleware m) { + return this.middleware( + m.filter((_r, req) -> + Code.eq.test(req.code(), Code.PUT) && + req .getPath() .map(p -> p.matches(path)) - .orElse(path == null || path.isEmpty()), - f + .orElse(path == null || path.isEmpty()) + ) ); } - public Builder post(String path, Function f) { - return this.when( - m -> - Code.eq.test(m.code(), Code.POST) && - m + public Builder post(String path, Function> f) { + return this.post(path, Middleware.requestHandler(f)); + } + + public Builder post(String path, Middleware m) { + return this.middleware( + m.filter((_r, req) -> + Code.eq.test(req.code(), Code.POST) && + req .getPath() .map(p -> p.matches(path)) - .orElse(path == null || path.isEmpty()), - f + .orElse(path == null || path.isEmpty()) + ) ); } - public Builder delete(String path, Function f) { - return this.when( - m -> - Code.eq.test(m.code(), Code.DELETE) && - m + public Builder get(String path, Function> f) { + return this.get(path, Middleware.requestHandler(f)); + } + + public Builder get(String path, Middleware m) { + return this.middleware( + m.filter((_r, req) -> + Code.eq.test(req.code(), Code.GET) && + req .getPath() .map(p -> p.matches(path)) - .orElse(path == null || path.isEmpty()), - f + .orElse(path == null || path.isEmpty()) + ) ); } - public Builder get(String path, Function f) { - return this.when( - m -> - Code.eq.test(m.code(), Code.GET) && - m + public Builder delete(String path, Function> f) { + return this.delete(path, Middleware.requestHandler(f)); + } + + public Builder delete(String path, Middleware m) { + return this.middleware( + m.filter((_r, req) -> + Code.eq.test(req.code(), Code.DELETE) && + req .getPath() .map(p -> p.matches(path)) - .orElse(path == null || path.isEmpty()), - f + .orElse(path == null || path.isEmpty()) + ) ); } - public Builder tap(Consumer f) { - return this.middleware(m -> { - f.accept(m); - return Middleware.next(); - }); + public Builder onRequest(Consumer f) { + return this.middleware( + Middleware.requestHandler(m -> { + f.accept(m); + return Optional.empty(); + }) + ); } - public Builder tapAsync(Function> f) { - return this.middleware(m -> { - return Middleware.next(f.apply(m).thenAccept(_void -> {})); - }); + public Builder onResponse(Consumer f) { + return this.middleware(Middleware.responseConsumer(f)); } - public Builder debugExceptions() { - return this.exceptionHandler(Middleware.debugExceptionHandler); - } - - public Builder exceptionHandler( - BiFunction handler - ) { - this.exceptionHandler = handler; - return this; - } - - public Builder notFoundHandler( - Function handler - ) { - this.notFoundHandler = handler; - return this; + public Server buildDebugExceptionHandler() { + this.middleware(Middleware.handleExceptionDebug); + this.middleware(Middleware.respondNotFound); + return new Server(this.toad, this.middlewares); } public Server build() { - return new Server( - this.toad, - this.middlewares, - this.notFoundHandler, - this.exceptionHandler - ); + this.middleware(Middleware.handleException); + this.middleware(Middleware.respondNotFound); + return new Server(this.toad, this.middlewares); } } } diff --git a/src/main/java/dev.toad/Toad.java b/src/main/java/dev.toad/Toad.java index ea58e8b..dacf33f 100644 --- a/src/main/java/dev.toad/Toad.java +++ b/src/main/java/dev.toad/Toad.java @@ -17,7 +17,9 @@ public final class Toad implements AutoCloseable { public static Logger logger() { // Configured in `glue::Runtime::new()` - return Logger.getLogger("dev.toad"); + var l = Logger.getLogger("dev.toad"); + l.setUseParentHandlers(false); + return l; } static native Config defaultConfigImpl(); @@ -53,6 +55,8 @@ public final class Toad implements AutoCloseable { dev.toad.msg.owned.Message msg ); + native void notify(String path); + static native Optional pollReq(long ptr); static native Optional pollResp( @@ -90,6 +94,10 @@ public final class Toad implements AutoCloseable { return this.config; } + public InetSocketAddress localAddress() throws IOException { + return (InetSocketAddress) this.channel.getLocalAddress(); + } + @Override public void close() { Toad.teardown(); diff --git a/src/main/java/dev.toad/msg/Code.java b/src/main/java/dev.toad/msg/Code.java index 49c0898..2df2633 100644 --- a/src/main/java/dev.toad/msg/Code.java +++ b/src/main/java/dev.toad/msg/Code.java @@ -72,7 +72,7 @@ public final class Code implements Debug { }; } else { var str = String.format( - "%d.%d", + "%d.%02d", this.clazz.shortValue(), this.detail.shortValue() ); diff --git a/src/main/java/dev.toad/msg/Message.java b/src/main/java/dev.toad/msg/Message.java index 2ed0d0e..d5d0e52 100644 --- a/src/main/java/dev.toad/msg/Message.java +++ b/src/main/java/dev.toad/msg/Message.java @@ -72,8 +72,12 @@ public interface Message extends Debug { int port = this.addr().map(a -> a.getPort()).orElse(5683); String scheme = port == 5684 ? "coaps" : "coap"; String hostAddr = - this.addr().map(a -> a.getAddress().toString()).orElse(null); - String host = this.getHost().map(h -> h.toString()).orElse(hostAddr); + this.addr().map(a -> a.getAddress().getHostAddress()).orElse(null); + String host = + this.getHost() + .map(h -> h.toString()) + .filter(h -> h != null && !h.trim().isEmpty()) + .orElse(hostAddr); String path = this.getPath() .map(p -> p.toString()) @@ -130,8 +134,8 @@ public interface Message extends Debug { .stream() .map(Debug::toDebugString) .reduce("", (b, a) -> b + "\n " + a) + - "\n\n" + - this.payload().toDebugString() + "\n" + + (!this.payload().isEmpty() ? "\n" + this.payload().toDebugString() : "") ); } } diff --git a/src/main/java/dev.toad/msg/Option.java b/src/main/java/dev.toad/msg/Option.java index 084f718..4c1b5c1 100644 --- a/src/main/java/dev.toad/msg/Option.java +++ b/src/main/java/dev.toad/msg/Option.java @@ -26,7 +26,9 @@ public interface Option extends Debug { @Override public default String toDebugString() { - if (this.number() == Path.number) { + if (this.number() == Observe.number) { + return new Observe(this).toDebugString(); + } else if (this.number() == Path.number) { return new Path(this).toDebugString(); } else if (this.number() == Host.number) { return new Host(this).toDebugString(); diff --git a/src/main/java/dev.toad/msg/Payload.java b/src/main/java/dev.toad/msg/Payload.java index ba96b9f..a84aa91 100644 --- a/src/main/java/dev.toad/msg/Payload.java +++ b/src/main/java/dev.toad/msg/Payload.java @@ -42,6 +42,10 @@ public final class Payload implements Debug { this.bytes = bytes; } + public boolean isEmpty() { + return this.bytes.length == 0; + } + @Override public String toString() { return new String(this.bytes, StandardCharsets.UTF_8); diff --git a/src/main/java/dev.toad/msg/build/Message.java b/src/main/java/dev.toad/msg/build/Message.java index 3c90634..5cb9e32 100644 --- a/src/main/java/dev.toad/msg/build/Message.java +++ b/src/main/java/dev.toad/msg/build/Message.java @@ -6,6 +6,7 @@ import dev.toad.msg.Payload; import dev.toad.msg.Token; import dev.toad.msg.Type; import dev.toad.msg.option.Host; +import dev.toad.msg.option.Observe; import dev.toad.msg.option.Path; import dev.toad.msg.option.Query; import java.net.InetAddress; @@ -48,7 +49,7 @@ public final class Message : conResponseToNonRequest ? Type.CON : Type.NON; - return Message.copyOf(other).unsetId().type(type); + return Message.copyOf(other).unsetId().unsetOption(Observe.number).type(type); } public static Message copyOf(dev.toad.msg.Message other) { diff --git a/src/main/java/dev.toad/msg/option/Observe.java b/src/main/java/dev.toad/msg/option/Observe.java index 7d3262d..3a832da 100644 --- a/src/main/java/dev.toad/msg/option/Observe.java +++ b/src/main/java/dev.toad/msg/option/Observe.java @@ -1,5 +1,6 @@ package dev.toad.msg.option; +import dev.toad.Eq; import dev.toad.msg.Option; import dev.toad.msg.OptionValue; import java.util.ArrayList; @@ -11,6 +12,8 @@ public final class Observe implements Option { final boolean register; + public static final Eq eq = Eq.bool.contramap(Observe::isRegister); + public static final Observe REGISTER = new Observe(true); public static final Observe DEREGISTER = new Observe(false); @@ -41,13 +44,17 @@ public final class Observe implements Option { } else if (o.values().size() == 0) { this.register = false; } else { - this.register = o.values().get(0).asBytes()[0] == 1; + this.register = o.values().get(0).asBytes()[0] == 0; } } @Override public long number() { - return Host.number; + return Observe.number; + } + + public boolean isRegister() { + return this.register; } @Override @@ -68,9 +75,14 @@ public final class Observe implements Option { var list = new ArrayList(); list.add( new dev.toad.msg.owned.OptionValue( - new byte[] { this.register ? (byte) 1 : (byte) 0 } + new byte[] { this.register ? (byte) 0 : (byte) 1 } ) ); return list; } + + @Override + public String toDebugString() { + return this.register ? "Observe: Register" : "Observe: Deregister"; + } } diff --git a/src/main/java/dev.toad/msg/ref/OptionValue.java b/src/main/java/dev.toad/msg/ref/OptionValue.java index 36cbb4a..b23f2f6 100644 --- a/src/main/java/dev.toad/msg/ref/OptionValue.java +++ b/src/main/java/dev.toad/msg/ref/OptionValue.java @@ -1,6 +1,7 @@ package dev.toad.msg.ref; import dev.toad.ffi.Ptr; +import java.util.ArrayList; public final class OptionValue implements dev.toad.msg.OptionValue, AutoCloseable { diff --git a/src/test/scala/E2E.scala b/src/test/scala/E2E.scala index c5ea4ad..2b0a287 100644 --- a/src/test/scala/E2E.scala +++ b/src/test/scala/E2E.scala @@ -9,58 +9,149 @@ import java.net.InetSocketAddress import java.util.logging.Logger import java.util.logging.Level import java.util.ArrayList +import java.util.Optional import java.nio.ByteBuffer import java.util.concurrent.TimeUnit import java.net.InetAddress +val logLevel = Level.INFO; +val serverPort = 10102; +val clientPort = 10101; + class E2E extends munit.FunSuite { - test("little baby server") { - Toad.loadNativeLib() + val client = new Fixture[Client]("client") { + private var client: Client = null; - val server = Toad.builder - .port(10102) - .logLevel(Level.INFO) - .server - .post("exit", _msg => Server.Middleware.exit) - .get( - "hello", - msg => { - val rep = msg.buildResponse - .code(Code.OK_CONTENT) - .`type`(Type.NON) - .payload(Payload.text(s"Hello, ${msg.payload.toString}!")) - .build + def apply() = this.client - Server.Middleware.respond(rep) - } - ) - .build; - - val serverThread = Thread((() => server.run()): java.lang.Runnable) - serverThread.start() - - val req = Message.builder - .uri("coap://localhost:10102/hello") - .`type`(Type.NON) - .code(Code.GET) - .payload(Payload.text("Fred")) - .build - - val client = Toad.builder.port(10101).logLevel(Level.INFO).buildClient - val respFuture = client.send(req) - - try { - val respActual = respFuture.get(1, TimeUnit.SECONDS) - assertNoDiff(respActual.payload.toString, "Hello, Fred!") - } finally { - val exit = Message.builder - .uri("coap://localhost:10102/exit") - .`type`(Type.NON) - .code(Code.POST) - .build - client.sendNoResponse(exit) + override def beforeAll(): Unit = { + this.client = + Toad.builder.port(clientPort.shortValue).logLevel(logLevel).buildClient } - serverThread.join() + override def afterAll(): Unit = { + this.client.close() + this.client = null + } + } + + val server = new Fixture[Server]("server") { + private var thread: Thread = null; + private var server: Server = null; + + def apply() = this.server + + override def beforeAll(): Unit = { + Toad.loadNativeLib() + + var counterN = 0 + this.server = Toad.builder + .port(serverPort.shortValue) + .logLevel(logLevel) + .server + .put( + "failing", + _msg => { + throw java.lang.RuntimeException("fart") + } + ) + .get( + "counter", + msg => { + counterN += 1 + Optional.of( + msg.buildResponse + .code(Code.OK_CONTENT) + .payload(Payload.text(s"$counterN")) + .build + ) + } + ) + .get( + "greetings", + msg => { + Optional.of( + msg.buildResponse + .code(Code.OK_CONTENT) + .payload(Payload.text(s"Hello, ${msg.payload.toString}!")) + .build + ) + } + ) + .build + + this.thread = this.server.run() + } + + override def afterAll(): Unit = { + this.server.exit() + this.thread.join() + this.server.close() + } + } + + override def munitFixtures = List(client, server) + + test("server responds 2.05 Ok Content") { + server() + + val rep = client() + .get( + Type.NON, + s"coap://localhost:$serverPort/greetings", + Payload.text("Fred") + ) + .get() + + assertNoDiff( + rep.payload.toString, + "Hello, Fred!" + ) + assertEquals( + rep.code, + Code.OK_CONTENT + ) + } + + test( + "ClientObserveStream yields new resource states when server is notified of new state" + ) { + val stream = + client() + .observe( + Message.builder + .uri(s"coap://localhost:$serverPort/counter") + .`type`(Type.NON) + .code(Code.GET) + .build + ); + + try { + assertNoDiff(stream.next().get().payload.toString, "1") + server().notify("counter") + assertNoDiff(stream.next().get().payload.toString, "2") + server().notify("counter") + assertNoDiff(stream.next().get().payload.toString, "3") + } finally { + stream.close() + } + } + + test("unregistered resource responds 4.04 Not Found") { + server() + val c = client() + assertEquals( + c.get(Type.NON, s"coap://localhost:$serverPort/not-found").get().code, + Code.NOT_FOUND + ) + } + + test("handler throwing exception responds 5.00 Internal Server Error") { + server() + val c = client() + assertEquals( + c.put(Type.NON, s"coap://localhost:$serverPort/failing").get().code, + Code.INTERNAL_SERVER_ERROR + ) } }