feat: test ClientObserveStream, improve server middleware modularity
This commit is contained in:
parent
f718d9b482
commit
fde857a563
8
glue/Cargo.lock
generated
8
glue/Cargo.lock
generated
@ -504,9 +504,9 @@ checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "toad"
|
name = "toad"
|
||||||
version = "0.17.4"
|
version = "0.17.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4eeb47a7e30efe198acc91ee919d420b478d9874c9362611d61f2ad9a3d27848"
|
checksum = "7a19447d2692bd6d8b1c0c10457f42394da7b3f633b96c8705825aca91c080ad"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"embedded-time",
|
"embedded-time",
|
||||||
"log",
|
"log",
|
||||||
@ -582,9 +582,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "toad-jni"
|
name = "toad-jni"
|
||||||
version = "0.14.0"
|
version = "0.14.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c0bdcbce1dfef5ac76668490d12944aec41816999b2bd6ab889a5afbcbf13eb5"
|
checksum = "15d9feb2f3a24a6d67aa5672d37893d160a57b9dbbc62d09a0f243abaec6c11f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"embedded-time",
|
"embedded-time",
|
||||||
"jni",
|
"jni",
|
||||||
|
@ -14,8 +14,8 @@ e2e = []
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
jni = "0.21.1"
|
jni = "0.21.1"
|
||||||
nb = "1"
|
nb = "1"
|
||||||
toad = "0.17.4"
|
toad = "0.17.5"
|
||||||
toad-jni = "0.14.0"
|
toad-jni = "0.14.1"
|
||||||
no-std-net = "0.6"
|
no-std-net = "0.6"
|
||||||
toad-msg = "0.18.1"
|
toad-msg = "0.18.1"
|
||||||
tinyvec = {version = "1.5", default_features = false, features = ["rustc_1_55"]}
|
tinyvec = {version = "1.5", default_features = false, features = ["rustc_1_55"]}
|
||||||
|
@ -3,18 +3,21 @@ pub mod msg;
|
|||||||
|
|
||||||
mod retry_strategy;
|
mod retry_strategy;
|
||||||
|
|
||||||
use jni::objects::{JClass, JObject};
|
use jni::objects::{JClass, JObject, JThrowable};
|
||||||
use jni::sys::jobject;
|
use jni::sys::jobject;
|
||||||
pub use retry_strategy::RetryStrategy;
|
pub use retry_strategy::RetryStrategy;
|
||||||
use toad::net::Addrd;
|
use toad::net::Addrd;
|
||||||
use toad::platform::Platform;
|
use toad::platform::{Platform, PlatformError};
|
||||||
use toad::retry::{Attempts, Strategy};
|
use toad::retry::{Attempts, Strategy};
|
||||||
|
use toad::step::Step;
|
||||||
use toad::time::Millis;
|
use toad::time::Millis;
|
||||||
|
use toad_jni::java::io::IOException;
|
||||||
use toad_jni::java::net::InetSocketAddress;
|
use toad_jni::java::net::InetSocketAddress;
|
||||||
use toad_jni::java::nio::channels::{DatagramChannel, PeekableDatagramChannel};
|
use toad_jni::java::nio::channels::{DatagramChannel, PeekableDatagramChannel};
|
||||||
use toad_jni::java::util::Optional;
|
use toad_jni::java::util::Optional;
|
||||||
use toad_jni::java::{self, Object};
|
use toad_jni::java::{self, Object, ResultYieldToJavaOrThrow};
|
||||||
|
|
||||||
|
use self::ffi::Ptr;
|
||||||
use crate::mem::{Shared, SharedMemoryRegion};
|
use crate::mem::{Shared, SharedMemoryRegion};
|
||||||
use crate::Runtime;
|
use crate::Runtime;
|
||||||
|
|
||||||
@ -38,11 +41,27 @@ impl Toad {
|
|||||||
CONFIG.invoke(e, self)
|
CONFIG.invoke(e, self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn ptr(&self, e: &mut java::Env) -> Ptr {
|
||||||
|
static PTR: java::Field<Toad, Ptr> = java::Field::new("ptr");
|
||||||
|
PTR.get(e, self)
|
||||||
|
}
|
||||||
|
|
||||||
fn init_impl(e: &mut java::Env, cfg: Config, channel: PeekableDatagramChannel) -> i64 {
|
fn init_impl(e: &mut java::Env, cfg: Config, channel: PeekableDatagramChannel) -> i64 {
|
||||||
let r = Runtime::new(&mut java::env(), cfg.log_level(e), cfg.to_toad(e), channel);
|
let r = Runtime::new(&mut java::env(), cfg.log_level(e), cfg.to_toad(e), channel);
|
||||||
unsafe { crate::mem::Shared::add_runtime(r).addr() as i64 }
|
unsafe { crate::mem::Shared::add_runtime(r).addr() as i64 }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn notify_impl(&self,
|
||||||
|
e: &mut java::Env,
|
||||||
|
path: impl AsRef<str> + Clone)
|
||||||
|
-> Result<(), java::io::IOException> {
|
||||||
|
self.ptr(e)
|
||||||
|
.addr(e)
|
||||||
|
.map_err(|err| IOException::new_caused_by(e, "", err))
|
||||||
|
.map(|addr| unsafe { Shared::deref::<Runtime>(addr.inner(e)).as_ref().unwrap() })
|
||||||
|
.and_then(|r| r.notify(path).map_err(PlatformError::step))
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_req_impl(e: &mut java::Env, addr: i64) -> java::util::Optional<msg::ref_::Message> {
|
fn poll_req_impl(e: &mut java::Env, addr: i64) -> java::util::Optional<msg::ref_::Message> {
|
||||||
let r = unsafe { Shared::deref::<Runtime>(addr).as_ref().unwrap() };
|
let r = unsafe { Shared::deref::<Runtime>(addr).as_ref().unwrap() };
|
||||||
match r.poll_req() {
|
match r.poll_req() {
|
||||||
@ -375,6 +394,21 @@ pub extern "system" fn Java_dev_toad_Toad_pollResp<'local>(mut e: java::Env<'loc
|
|||||||
Toad::poll_resp_impl(e, addr, token, sock).yield_to_java(e)
|
Toad::poll_resp_impl(e, addr, token, sock).yield_to_java(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub extern "system" fn Java_dev_toad_Toad_notify<'local>(mut e: java::Env<'local>,
|
||||||
|
toad: JObject<'local>,
|
||||||
|
path: JObject<'local>)
|
||||||
|
-> () {
|
||||||
|
let e = &mut e;
|
||||||
|
let toad = java::lang::Object::from_local(e, toad).upcast_to::<Toad>(e);
|
||||||
|
let path = java::lang::Object::from_local(e, path).upcast_to::<String>(e);
|
||||||
|
|
||||||
|
if let Err(err) = toad.notify_impl(e, path) {
|
||||||
|
let err = JThrowable::from(err.downcast(e).to_local(e));
|
||||||
|
e.throw(err).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[no_mangle]
|
#[no_mangle]
|
||||||
pub extern "system" fn Java_dev_toad_Toad_teardown<'local>(_: java::Env<'local>,
|
pub extern "system" fn Java_dev_toad_Toad_teardown<'local>(_: java::Env<'local>,
|
||||||
_: JClass<'local>)
|
_: JClass<'local>)
|
||||||
|
@ -46,13 +46,16 @@ mod runtime {
|
|||||||
config: Config,
|
config: Config,
|
||||||
channel: PeekableDatagramChannel)
|
channel: PeekableDatagramChannel)
|
||||||
-> Self {
|
-> Self {
|
||||||
let handler = ConsoleHandler::new(e);
|
|
||||||
handler.set_level(e, log_level);
|
|
||||||
|
|
||||||
let logger = Logger::get_logger(e, "dev.toad");
|
let logger = Logger::get_logger(e, "dev.toad");
|
||||||
logger.use_parent_handlers(e, false);
|
|
||||||
logger.add_handler(e, handler.to_handler());
|
if logger.uses_parent_handlers(e) {
|
||||||
logger.set_level(e, log_level);
|
let handler = ConsoleHandler::new(e);
|
||||||
|
handler.set_level(e, log_level);
|
||||||
|
|
||||||
|
logger.use_parent_handlers(e, false);
|
||||||
|
logger.add_handler(e, handler.to_handler());
|
||||||
|
logger.set_level(e, log_level);
|
||||||
|
}
|
||||||
|
|
||||||
Self { steps: Default::default(),
|
Self { steps: Default::default(),
|
||||||
config,
|
config,
|
||||||
|
@ -1,14 +1,16 @@
|
|||||||
package dev.toad;
|
package dev.toad;
|
||||||
|
|
||||||
import dev.toad.msg.Message;
|
import dev.toad.msg.Message;
|
||||||
|
import dev.toad.msg.Token;
|
||||||
import dev.toad.msg.option.Observe;
|
import dev.toad.msg.option.Observe;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
public class ClientObserveStream {
|
public class ClientObserveStream implements AutoCloseable {
|
||||||
|
|
||||||
State state;
|
State state;
|
||||||
Optional<CompletableFuture<Message>> buffered;
|
CompletableFuture<Message> initial;
|
||||||
|
Optional<Token> token = Optional.empty();
|
||||||
final Client client;
|
final Client client;
|
||||||
final Message message;
|
final Message message;
|
||||||
|
|
||||||
@ -16,16 +18,7 @@ public class ClientObserveStream {
|
|||||||
this.state = State.OPEN;
|
this.state = State.OPEN;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.message = message.buildCopy().option(Observe.REGISTER).build();
|
this.message = message.buildCopy().option(Observe.REGISTER).build();
|
||||||
this.buffered = Optional.of(client.send(this.message));
|
this.initial = client.send(this.message);
|
||||||
}
|
|
||||||
|
|
||||||
public CompletableFuture<Void> close() {
|
|
||||||
return this.client.send(
|
|
||||||
this.message.buildCopy().option(Observe.DEREGISTER).unsetId().build()
|
|
||||||
)
|
|
||||||
.thenAccept(m -> {
|
|
||||||
this.state = State.CLOSED;
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<Message> next() {
|
public CompletableFuture<Message> next() {
|
||||||
@ -33,18 +26,31 @@ public class ClientObserveStream {
|
|||||||
throw new RuntimeException(
|
throw new RuntimeException(
|
||||||
"ClientObserveStream.next() invoked after .close()"
|
"ClientObserveStream.next() invoked after .close()"
|
||||||
);
|
);
|
||||||
} else if (this.buffered.isEmpty()) {
|
} else if (this.token.isEmpty()) {
|
||||||
var buffered = this.buffered.get();
|
return this.initial.whenComplete((rep, e) -> {
|
||||||
this.buffered = Optional.empty();
|
if (rep != null) {
|
||||||
return buffered;
|
this.token = Optional.of(rep.token());
|
||||||
|
}
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
return this.client.awaitResponse(
|
return this.initial.thenCompose(_i ->
|
||||||
this.message.token(),
|
this.client.awaitResponse(this.token.get(), this.message.addr().get())
|
||||||
this.message.addr().get()
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
try {
|
||||||
|
this.client.send(
|
||||||
|
this.message.buildCopy().option(Observe.DEREGISTER).unsetId().build()
|
||||||
|
)
|
||||||
|
.get();
|
||||||
|
} catch (Throwable t) {}
|
||||||
|
|
||||||
|
this.state = State.CLOSED;
|
||||||
|
}
|
||||||
|
|
||||||
public static final class State {
|
public static final class State {
|
||||||
|
|
||||||
public static final Eq<State> eq = Eq.int_.contramap((State s) -> s.state);
|
public static final Eq<State> eq = Eq.int_.contramap((State s) -> s.state);
|
||||||
|
@ -9,6 +9,7 @@ import java.util.function.Function;
|
|||||||
|
|
||||||
public final class Eq<T> {
|
public final class Eq<T> {
|
||||||
|
|
||||||
|
public static final Eq<Boolean> bool = new Eq<>((a, b) -> a == b);
|
||||||
public static final Eq<Short> short_ = new Eq<>((a, b) -> a == b);
|
public static final Eq<Short> short_ = new Eq<>((a, b) -> a == b);
|
||||||
public static final Eq<Integer> int_ = new Eq<>((a, b) -> a == b);
|
public static final Eq<Integer> int_ = new Eq<>((a, b) -> a == b);
|
||||||
public static final Eq<Long> long_ = new Eq<>((a, b) -> a == b);
|
public static final Eq<Long> long_ = new Eq<>((a, b) -> a == b);
|
||||||
|
@ -3,207 +3,305 @@ package dev.toad;
|
|||||||
import dev.toad.msg.Code;
|
import dev.toad.msg.Code;
|
||||||
import dev.toad.msg.Message;
|
import dev.toad.msg.Message;
|
||||||
import dev.toad.msg.Payload;
|
import dev.toad.msg.Payload;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.BiPredicate;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
|
|
||||||
public final class Server {
|
public final class Server implements AutoCloseable {
|
||||||
|
|
||||||
|
boolean exit = false;
|
||||||
|
Optional<Thread> thread = Optional.empty();
|
||||||
final Toad toad;
|
final Toad toad;
|
||||||
final ArrayList<Function<Message, Middleware.Result>> middlewares;
|
final ArrayList<Middleware> middlewares;
|
||||||
final Function<Message, Middleware.Result> notFoundHandler;
|
|
||||||
final BiFunction<Message, Throwable, Middleware.Result> exceptionHandler;
|
|
||||||
|
|
||||||
Server(
|
Server(Toad toad, ArrayList<Middleware> ms) {
|
||||||
Toad toad,
|
|
||||||
ArrayList<Function<Message, Middleware.Result>> ms,
|
|
||||||
Function<Message, Middleware.Result> notFoundHandler,
|
|
||||||
BiFunction<Message, Throwable, Middleware.Result> exHandler
|
|
||||||
) {
|
|
||||||
this.toad = toad;
|
this.toad = toad;
|
||||||
this.middlewares = ms;
|
this.middlewares = ms;
|
||||||
this.notFoundHandler = notFoundHandler;
|
|
||||||
this.exceptionHandler = exHandler;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() {
|
public void exit() {
|
||||||
while (true) {
|
this.exit = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notify(String path) {
|
||||||
|
this.toad.notify(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Thread run() {
|
||||||
|
if (!this.thread.isEmpty()) {
|
||||||
|
return this.thread.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
var thread = new Thread(() -> {
|
||||||
try {
|
try {
|
||||||
dev.toad.msg.ref.Message req = Async
|
Toad
|
||||||
.pollCompletable(() -> this.toad.pollReq())
|
.logger()
|
||||||
.get();
|
.log(
|
||||||
|
Level.INFO,
|
||||||
|
String.format(
|
||||||
|
"Server listening on %s",
|
||||||
|
this.toad.localAddress().toString()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
} catch (Throwable t) {}
|
||||||
|
|
||||||
Middleware.Result result = Middleware.next();
|
while (true) {
|
||||||
for (var f : this.middlewares) {
|
try {
|
||||||
if (!result.shouldContinue()) {
|
dev.toad.msg.ref.Message req = Async
|
||||||
break;
|
.pollCompletable(() -> {
|
||||||
}
|
if (this.exit) {
|
||||||
|
throw new Exit();
|
||||||
|
}
|
||||||
|
|
||||||
if (result.isAsync()) {
|
return this.toad.pollReq();
|
||||||
try {
|
})
|
||||||
result.response().get();
|
.get();
|
||||||
} catch (Throwable e) {
|
var addr = req.addr().get();
|
||||||
result = Middleware.error(e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// Toad.ack(req);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
result = f.apply(req);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
result = Middleware.error(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (result) {
|
|
||||||
case Middleware.ResultExit e:
|
|
||||||
this.toad.close();
|
|
||||||
return;
|
|
||||||
case Middleware.ResultError e:
|
|
||||||
result = this.exceptionHandler.apply(req, e.error);
|
|
||||||
break;
|
|
||||||
case Middleware.ResultNextSync n:
|
|
||||||
result = this.notFoundHandler.apply(req);
|
|
||||||
break;
|
|
||||||
case Middleware.ResultNextAsync n:
|
|
||||||
result = this.notFoundHandler.apply(req);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
req.close();
|
|
||||||
|
|
||||||
var resp = result.response().get();
|
|
||||||
if (resp.isEmpty()) {
|
|
||||||
Toad
|
Toad
|
||||||
.logger()
|
.logger()
|
||||||
.log(
|
.log(
|
||||||
Level.SEVERE,
|
Level.FINE,
|
||||||
String.format(
|
String.format("<== %s\n%s", addr.toString(), req.toDebugString())
|
||||||
"Server never generated response for message\n%s",
|
|
||||||
req.toDebugString()
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
} else {
|
|
||||||
Async
|
Middleware.Result result = Middleware.Result.next();
|
||||||
.pollCompletable(() -> this.toad.sendMessage(resp.get().toOwned()))
|
for (var m : this.middlewares) {
|
||||||
.get();
|
if (!m.shouldRun.test(result, req)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (result.isAsync()) {
|
||||||
|
result.response().get();
|
||||||
|
// Toad.ack(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
result = m.run(result, req);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
result = Middleware.Result.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp = result.response().get();
|
||||||
|
if (resp.isEmpty()) {
|
||||||
|
Toad
|
||||||
|
.logger()
|
||||||
|
.log(
|
||||||
|
Level.SEVERE,
|
||||||
|
String.format(
|
||||||
|
"Server never generated response for message\n%s",
|
||||||
|
req.toDebugString()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
var resp_ = resp.get().toOwned();
|
||||||
|
Toad
|
||||||
|
.logger()
|
||||||
|
.log(
|
||||||
|
Level.FINE,
|
||||||
|
String.format("==> %s\n%s", addr, resp_.toDebugString())
|
||||||
|
);
|
||||||
|
Async.pollCompletable(() -> this.toad.sendMessage(resp_)).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
req.close();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
if (e.getCause() instanceof Exit || e instanceof Exit) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Toad.logger().log(Level.SEVERE, e.toString());
|
||||||
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
|
||||||
Toad.logger().log(Level.SEVERE, e.toString());
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
|
thread.start();
|
||||||
|
|
||||||
|
this.thread = Optional.of(thread);
|
||||||
|
return thread;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
this.toad.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class Exit extends RuntimeException {}
|
||||||
|
|
||||||
public static final class Middleware {
|
public static final class Middleware {
|
||||||
|
|
||||||
public static final CompletableFuture<Optional<Message>> noop =
|
final BiPredicate<Result, Message> shouldRun;
|
||||||
|
final BiFunction<Result, Message, Result> fun;
|
||||||
|
|
||||||
|
static final CompletableFuture<Optional<Message>> noop =
|
||||||
CompletableFuture.completedFuture(Optional.empty());
|
CompletableFuture.completedFuture(Optional.empty());
|
||||||
|
|
||||||
public static final Function<Message, Result> notFound = m -> {
|
public static final Middleware respondNotFound =
|
||||||
return Middleware.respond(m.buildResponse().code(Code.NOT_FOUND).build());
|
Middleware.requestHandler(m ->
|
||||||
};
|
Optional.of(m.buildResponse().code(Code.NOT_FOUND).build())
|
||||||
|
);
|
||||||
|
|
||||||
|
public static final Middleware handleException = new Middleware(
|
||||||
|
(r, _m) -> r instanceof ResultError,
|
||||||
|
(r, m) -> {
|
||||||
|
var e = ((ResultError) r).error;
|
||||||
|
|
||||||
public static final BiFunction<Message, Throwable, Result> debugExceptionHandler =
|
|
||||||
(m, e) -> {
|
|
||||||
Toad
|
Toad
|
||||||
.logger()
|
.logger()
|
||||||
.log(
|
.log(
|
||||||
Level.SEVERE,
|
Level.SEVERE,
|
||||||
String.format("while handling %s", m.toDebugString()),
|
String.format(
|
||||||
|
"Exception thrown while handling:\n%s\nException:",
|
||||||
|
m.toDebugString()
|
||||||
|
),
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
|
|
||||||
var rep = m
|
return Middleware.Result.respond(
|
||||||
.buildResponse()
|
m.buildResponse().code(Code.INTERNAL_SERVER_ERROR).build()
|
||||||
.code(Code.INTERNAL_SERVER_ERROR)
|
);
|
||||||
.payload(Payload.text(e.toString()))
|
}
|
||||||
.build();
|
);
|
||||||
|
|
||||||
return Middleware.respond(rep);
|
public static final Middleware handleExceptionDebug = new Middleware(
|
||||||
};
|
(r, _m) -> r instanceof ResultError,
|
||||||
|
(r, m) -> {
|
||||||
|
var e = ((ResultError) r).error;
|
||||||
|
|
||||||
public static final BiFunction<Message, Throwable, Result> exceptionHandler =
|
|
||||||
(m, e) -> {
|
|
||||||
Toad
|
Toad
|
||||||
.logger()
|
.logger()
|
||||||
.log(
|
.log(
|
||||||
Level.SEVERE,
|
Level.SEVERE,
|
||||||
String.format("while handling %s", m.toDebugString()),
|
String.format(
|
||||||
|
"Exception thrown while handling:\n%s\nException:",
|
||||||
|
m.toDebugString()
|
||||||
|
),
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
var rep = m.buildResponse().code(Code.INTERNAL_SERVER_ERROR).build();
|
|
||||||
return Middleware.respond(rep);
|
|
||||||
};
|
|
||||||
|
|
||||||
public static Result respond(Message m) {
|
return Middleware.Result.respond(
|
||||||
return new ResultRespondSync(m);
|
m
|
||||||
|
.buildResponse()
|
||||||
|
.code(Code.INTERNAL_SERVER_ERROR)
|
||||||
|
.payload(Payload.text(e.toString()))
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
public static Middleware requestHandler(
|
||||||
|
Function<Message, Optional<Message>> f
|
||||||
|
) {
|
||||||
|
return new Middleware(
|
||||||
|
(r, _m) -> !r.isFinal(),
|
||||||
|
(_result, request) ->
|
||||||
|
f
|
||||||
|
.apply(request)
|
||||||
|
.map(Middleware.Result::respond)
|
||||||
|
.orElse(Middleware.Result.next())
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Result respond(CompletableFuture<Message> m) {
|
public static Middleware requestHandlerAsync(
|
||||||
return new ResultRespondAsync(m);
|
Function<Message, Optional<CompletableFuture<Message>>> f
|
||||||
|
) {
|
||||||
|
return new Middleware(
|
||||||
|
(r, _m) -> !r.isFinal(),
|
||||||
|
(_result, request) ->
|
||||||
|
f
|
||||||
|
.apply(request)
|
||||||
|
.map(Middleware.Result::respond)
|
||||||
|
.orElse(Middleware.Result.next())
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Result error(Throwable e) {
|
public static Middleware responseConsumer(Consumer<Message> f) {
|
||||||
return new ResultError(e);
|
return new Middleware(
|
||||||
|
(r, _m) ->
|
||||||
|
r instanceof ResultRespondSync || r instanceof ResultRespondAsync,
|
||||||
|
(res, request) -> {
|
||||||
|
try {
|
||||||
|
var rep = res.response().get().get();
|
||||||
|
f.accept(rep);
|
||||||
|
return res;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Result exit() {
|
public static Middleware exceptionHandler(Function<Throwable, Result> f) {
|
||||||
return new ResultExit();
|
return new Middleware(
|
||||||
|
(r, _m) -> r instanceof ResultError,
|
||||||
|
(res, request) -> f.apply(((ResultError) res).error)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Result next() {
|
public Middleware(
|
||||||
return new ResultNextSync();
|
BiPredicate<Result, Message> shouldRun,
|
||||||
|
BiFunction<Result, Message, Result> fun
|
||||||
|
) {
|
||||||
|
this.fun = fun;
|
||||||
|
this.shouldRun = shouldRun;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Result next(CompletableFuture<Void> work) {
|
public Result run(Result res, Message msg) {
|
||||||
return new ResultNextAsync(work);
|
return this.fun.apply(res, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Middleware filter(BiPredicate<Result, Message> f) {
|
||||||
|
return new Middleware(
|
||||||
|
(r, m) -> f.test(r, m) && this.shouldRun.test(r, m),
|
||||||
|
this.fun
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static sealed interface Result
|
public static sealed interface Result
|
||||||
permits
|
permits
|
||||||
ResultExit,
|
|
||||||
ResultError,
|
ResultError,
|
||||||
ResultNextSync,
|
ResultNextSync,
|
||||||
ResultNextAsync,
|
ResultNextAsync,
|
||||||
ResultRespondSync,
|
ResultRespondSync,
|
||||||
ResultRespondAsync {
|
ResultRespondAsync {
|
||||||
public boolean shouldContinue();
|
public static Result respond(Message m) {
|
||||||
|
return new ResultRespondSync(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Result respond(CompletableFuture<Message> m) {
|
||||||
|
return new ResultRespondAsync(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Result error(Throwable e) {
|
||||||
|
return new ResultError(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Result next() {
|
||||||
|
return new ResultNextSync();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Result next(CompletableFuture<Void> work) {
|
||||||
|
return new ResultNextAsync(work);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isFinal();
|
||||||
|
|
||||||
public boolean isAsync();
|
public boolean isAsync();
|
||||||
|
|
||||||
public CompletableFuture<Optional<Message>> response();
|
public CompletableFuture<Optional<Message>> response();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class ResultExit implements Result {
|
|
||||||
|
|
||||||
public ResultExit() {}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean shouldContinue() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAsync() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public CompletableFuture<Optional<Message>> response() {
|
|
||||||
return Middleware.noop;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final class ResultError implements Result {
|
public static final class ResultError implements Result {
|
||||||
|
|
||||||
public final Throwable error;
|
public final Throwable error;
|
||||||
@ -213,8 +311,8 @@ public final class Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean shouldContinue() {
|
public boolean isFinal() {
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -233,8 +331,8 @@ public final class Server {
|
|||||||
public ResultNextSync() {}
|
public ResultNextSync() {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean shouldContinue() {
|
public boolean isFinal() {
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -257,8 +355,8 @@ public final class Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean shouldContinue() {
|
public boolean isFinal() {
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -281,8 +379,8 @@ public final class Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean shouldContinue() {
|
public boolean isFinal() {
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -305,8 +403,8 @@ public final class Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean shouldContinue() {
|
public boolean isFinal() {
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -324,120 +422,104 @@ public final class Server {
|
|||||||
public static final class Builder {
|
public static final class Builder {
|
||||||
|
|
||||||
final Toad toad;
|
final Toad toad;
|
||||||
final ArrayList<Function<Message, Middleware.Result>> middlewares =
|
final ArrayList<Middleware> middlewares = new ArrayList<>();
|
||||||
new ArrayList<>();
|
|
||||||
Function<Message, Middleware.Result> notFoundHandler = Middleware.notFound;
|
|
||||||
BiFunction<Message, Throwable, Middleware.Result> exceptionHandler =
|
|
||||||
Middleware.exceptionHandler;
|
|
||||||
|
|
||||||
Builder(Toad toad) {
|
Builder(Toad toad) {
|
||||||
this.toad = toad;
|
this.toad = toad;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder middleware(Function<Message, Middleware.Result> f) {
|
public Builder middleware(Middleware m) {
|
||||||
this.middlewares.add(f);
|
this.middlewares.add(m);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder when(
|
public Builder put(String path, Function<Message, Optional<Message>> f) {
|
||||||
Predicate<Message> pred,
|
return this.put(path, Middleware.requestHandler(f));
|
||||||
Function<Message, Middleware.Result> f
|
|
||||||
) {
|
|
||||||
return this.middleware(m -> {
|
|
||||||
if (pred.test(m)) {
|
|
||||||
return f.apply(m);
|
|
||||||
} else {
|
|
||||||
return Middleware.next();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder put(String path, Function<Message, Middleware.Result> f) {
|
public Builder put(String path, Middleware m) {
|
||||||
return this.when(
|
return this.middleware(
|
||||||
m ->
|
m.filter((_r, req) ->
|
||||||
Code.eq.test(m.code(), Code.PUT) &&
|
Code.eq.test(req.code(), Code.PUT) &&
|
||||||
m
|
req
|
||||||
.getPath()
|
.getPath()
|
||||||
.map(p -> p.matches(path))
|
.map(p -> p.matches(path))
|
||||||
.orElse(path == null || path.isEmpty()),
|
.orElse(path == null || path.isEmpty())
|
||||||
f
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder post(String path, Function<Message, Middleware.Result> f) {
|
public Builder post(String path, Function<Message, Optional<Message>> f) {
|
||||||
return this.when(
|
return this.post(path, Middleware.requestHandler(f));
|
||||||
m ->
|
}
|
||||||
Code.eq.test(m.code(), Code.POST) &&
|
|
||||||
m
|
public Builder post(String path, Middleware m) {
|
||||||
|
return this.middleware(
|
||||||
|
m.filter((_r, req) ->
|
||||||
|
Code.eq.test(req.code(), Code.POST) &&
|
||||||
|
req
|
||||||
.getPath()
|
.getPath()
|
||||||
.map(p -> p.matches(path))
|
.map(p -> p.matches(path))
|
||||||
.orElse(path == null || path.isEmpty()),
|
.orElse(path == null || path.isEmpty())
|
||||||
f
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder delete(String path, Function<Message, Middleware.Result> f) {
|
public Builder get(String path, Function<Message, Optional<Message>> f) {
|
||||||
return this.when(
|
return this.get(path, Middleware.requestHandler(f));
|
||||||
m ->
|
}
|
||||||
Code.eq.test(m.code(), Code.DELETE) &&
|
|
||||||
m
|
public Builder get(String path, Middleware m) {
|
||||||
|
return this.middleware(
|
||||||
|
m.filter((_r, req) ->
|
||||||
|
Code.eq.test(req.code(), Code.GET) &&
|
||||||
|
req
|
||||||
.getPath()
|
.getPath()
|
||||||
.map(p -> p.matches(path))
|
.map(p -> p.matches(path))
|
||||||
.orElse(path == null || path.isEmpty()),
|
.orElse(path == null || path.isEmpty())
|
||||||
f
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder get(String path, Function<Message, Middleware.Result> f) {
|
public Builder delete(String path, Function<Message, Optional<Message>> f) {
|
||||||
return this.when(
|
return this.delete(path, Middleware.requestHandler(f));
|
||||||
m ->
|
}
|
||||||
Code.eq.test(m.code(), Code.GET) &&
|
|
||||||
m
|
public Builder delete(String path, Middleware m) {
|
||||||
|
return this.middleware(
|
||||||
|
m.filter((_r, req) ->
|
||||||
|
Code.eq.test(req.code(), Code.DELETE) &&
|
||||||
|
req
|
||||||
.getPath()
|
.getPath()
|
||||||
.map(p -> p.matches(path))
|
.map(p -> p.matches(path))
|
||||||
.orElse(path == null || path.isEmpty()),
|
.orElse(path == null || path.isEmpty())
|
||||||
f
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder tap(Consumer<Message> f) {
|
public Builder onRequest(Consumer<Message> f) {
|
||||||
return this.middleware(m -> {
|
return this.middleware(
|
||||||
f.accept(m);
|
Middleware.requestHandler(m -> {
|
||||||
return Middleware.next();
|
f.accept(m);
|
||||||
});
|
return Optional.empty();
|
||||||
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder tapAsync(Function<Message, CompletableFuture<?>> f) {
|
public Builder onResponse(Consumer<Message> f) {
|
||||||
return this.middleware(m -> {
|
return this.middleware(Middleware.responseConsumer(f));
|
||||||
return Middleware.next(f.apply(m).thenAccept(_void -> {}));
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder debugExceptions() {
|
public Server buildDebugExceptionHandler() {
|
||||||
return this.exceptionHandler(Middleware.debugExceptionHandler);
|
this.middleware(Middleware.handleExceptionDebug);
|
||||||
}
|
this.middleware(Middleware.respondNotFound);
|
||||||
|
return new Server(this.toad, this.middlewares);
|
||||||
public Builder exceptionHandler(
|
|
||||||
BiFunction<Message, Throwable, Middleware.Result> handler
|
|
||||||
) {
|
|
||||||
this.exceptionHandler = handler;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder notFoundHandler(
|
|
||||||
Function<Message, Middleware.Result> handler
|
|
||||||
) {
|
|
||||||
this.notFoundHandler = handler;
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Server build() {
|
public Server build() {
|
||||||
return new Server(
|
this.middleware(Middleware.handleException);
|
||||||
this.toad,
|
this.middleware(Middleware.respondNotFound);
|
||||||
this.middlewares,
|
return new Server(this.toad, this.middlewares);
|
||||||
this.notFoundHandler,
|
|
||||||
this.exceptionHandler
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,9 @@ public final class Toad implements AutoCloseable {
|
|||||||
|
|
||||||
public static Logger logger() {
|
public static Logger logger() {
|
||||||
// Configured in `glue::Runtime::new()`
|
// Configured in `glue::Runtime::new()`
|
||||||
return Logger.getLogger("dev.toad");
|
var l = Logger.getLogger("dev.toad");
|
||||||
|
l.setUseParentHandlers(false);
|
||||||
|
return l;
|
||||||
}
|
}
|
||||||
|
|
||||||
static native Config defaultConfigImpl();
|
static native Config defaultConfigImpl();
|
||||||
@ -53,6 +55,8 @@ public final class Toad implements AutoCloseable {
|
|||||||
dev.toad.msg.owned.Message msg
|
dev.toad.msg.owned.Message msg
|
||||||
);
|
);
|
||||||
|
|
||||||
|
native void notify(String path);
|
||||||
|
|
||||||
static native Optional<dev.toad.msg.ref.Message> pollReq(long ptr);
|
static native Optional<dev.toad.msg.ref.Message> pollReq(long ptr);
|
||||||
|
|
||||||
static native Optional<dev.toad.msg.ref.Message> pollResp(
|
static native Optional<dev.toad.msg.ref.Message> pollResp(
|
||||||
@ -90,6 +94,10 @@ public final class Toad implements AutoCloseable {
|
|||||||
return this.config;
|
return this.config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public InetSocketAddress localAddress() throws IOException {
|
||||||
|
return (InetSocketAddress) this.channel.getLocalAddress();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
Toad.teardown();
|
Toad.teardown();
|
||||||
|
@ -72,7 +72,7 @@ public final class Code implements Debug {
|
|||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
var str = String.format(
|
var str = String.format(
|
||||||
"%d.%d",
|
"%d.%02d",
|
||||||
this.clazz.shortValue(),
|
this.clazz.shortValue(),
|
||||||
this.detail.shortValue()
|
this.detail.shortValue()
|
||||||
);
|
);
|
||||||
|
@ -72,8 +72,12 @@ public interface Message extends Debug {
|
|||||||
int port = this.addr().map(a -> a.getPort()).orElse(5683);
|
int port = this.addr().map(a -> a.getPort()).orElse(5683);
|
||||||
String scheme = port == 5684 ? "coaps" : "coap";
|
String scheme = port == 5684 ? "coaps" : "coap";
|
||||||
String hostAddr =
|
String hostAddr =
|
||||||
this.addr().map(a -> a.getAddress().toString()).orElse(null);
|
this.addr().map(a -> a.getAddress().getHostAddress()).orElse(null);
|
||||||
String host = this.getHost().map(h -> h.toString()).orElse(hostAddr);
|
String host =
|
||||||
|
this.getHost()
|
||||||
|
.map(h -> h.toString())
|
||||||
|
.filter(h -> h != null && !h.trim().isEmpty())
|
||||||
|
.orElse(hostAddr);
|
||||||
String path =
|
String path =
|
||||||
this.getPath()
|
this.getPath()
|
||||||
.map(p -> p.toString())
|
.map(p -> p.toString())
|
||||||
@ -130,8 +134,8 @@ public interface Message extends Debug {
|
|||||||
.stream()
|
.stream()
|
||||||
.map(Debug::toDebugString)
|
.map(Debug::toDebugString)
|
||||||
.reduce("", (b, a) -> b + "\n " + a) +
|
.reduce("", (b, a) -> b + "\n " + a) +
|
||||||
"\n\n" +
|
"\n" +
|
||||||
this.payload().toDebugString()
|
(!this.payload().isEmpty() ? "\n" + this.payload().toDebugString() : "")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,9 @@ public interface Option extends Debug {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public default String toDebugString() {
|
public default String toDebugString() {
|
||||||
if (this.number() == Path.number) {
|
if (this.number() == Observe.number) {
|
||||||
|
return new Observe(this).toDebugString();
|
||||||
|
} else if (this.number() == Path.number) {
|
||||||
return new Path(this).toDebugString();
|
return new Path(this).toDebugString();
|
||||||
} else if (this.number() == Host.number) {
|
} else if (this.number() == Host.number) {
|
||||||
return new Host(this).toDebugString();
|
return new Host(this).toDebugString();
|
||||||
|
@ -42,6 +42,10 @@ public final class Payload implements Debug {
|
|||||||
this.bytes = bytes;
|
this.bytes = bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return this.bytes.length == 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return new String(this.bytes, StandardCharsets.UTF_8);
|
return new String(this.bytes, StandardCharsets.UTF_8);
|
||||||
|
@ -6,6 +6,7 @@ import dev.toad.msg.Payload;
|
|||||||
import dev.toad.msg.Token;
|
import dev.toad.msg.Token;
|
||||||
import dev.toad.msg.Type;
|
import dev.toad.msg.Type;
|
||||||
import dev.toad.msg.option.Host;
|
import dev.toad.msg.option.Host;
|
||||||
|
import dev.toad.msg.option.Observe;
|
||||||
import dev.toad.msg.option.Path;
|
import dev.toad.msg.option.Path;
|
||||||
import dev.toad.msg.option.Query;
|
import dev.toad.msg.option.Query;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
@ -48,7 +49,7 @@ public final class Message
|
|||||||
: conResponseToNonRequest ? Type.CON
|
: conResponseToNonRequest ? Type.CON
|
||||||
: Type.NON;
|
: Type.NON;
|
||||||
|
|
||||||
return Message.copyOf(other).unsetId().type(type);
|
return Message.copyOf(other).unsetId().unsetOption(Observe.number).type(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Message copyOf(dev.toad.msg.Message other) {
|
public static Message copyOf(dev.toad.msg.Message other) {
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package dev.toad.msg.option;
|
package dev.toad.msg.option;
|
||||||
|
|
||||||
|
import dev.toad.Eq;
|
||||||
import dev.toad.msg.Option;
|
import dev.toad.msg.Option;
|
||||||
import dev.toad.msg.OptionValue;
|
import dev.toad.msg.OptionValue;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -11,6 +12,8 @@ public final class Observe implements Option {
|
|||||||
|
|
||||||
final boolean register;
|
final boolean register;
|
||||||
|
|
||||||
|
public static final Eq<Observe> eq = Eq.bool.contramap(Observe::isRegister);
|
||||||
|
|
||||||
public static final Observe REGISTER = new Observe(true);
|
public static final Observe REGISTER = new Observe(true);
|
||||||
public static final Observe DEREGISTER = new Observe(false);
|
public static final Observe DEREGISTER = new Observe(false);
|
||||||
|
|
||||||
@ -41,13 +44,17 @@ public final class Observe implements Option {
|
|||||||
} else if (o.values().size() == 0) {
|
} else if (o.values().size() == 0) {
|
||||||
this.register = false;
|
this.register = false;
|
||||||
} else {
|
} else {
|
||||||
this.register = o.values().get(0).asBytes()[0] == 1;
|
this.register = o.values().get(0).asBytes()[0] == 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long number() {
|
public long number() {
|
||||||
return Host.number;
|
return Observe.number;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isRegister() {
|
||||||
|
return this.register;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -68,9 +75,14 @@ public final class Observe implements Option {
|
|||||||
var list = new ArrayList<OptionValue>();
|
var list = new ArrayList<OptionValue>();
|
||||||
list.add(
|
list.add(
|
||||||
new dev.toad.msg.owned.OptionValue(
|
new dev.toad.msg.owned.OptionValue(
|
||||||
new byte[] { this.register ? (byte) 1 : (byte) 0 }
|
new byte[] { this.register ? (byte) 0 : (byte) 1 }
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toDebugString() {
|
||||||
|
return this.register ? "Observe: Register" : "Observe: Deregister";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package dev.toad.msg.ref;
|
package dev.toad.msg.ref;
|
||||||
|
|
||||||
import dev.toad.ffi.Ptr;
|
import dev.toad.ffi.Ptr;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
public final class OptionValue
|
public final class OptionValue
|
||||||
implements dev.toad.msg.OptionValue, AutoCloseable {
|
implements dev.toad.msg.OptionValue, AutoCloseable {
|
||||||
|
@ -9,58 +9,149 @@ import java.net.InetSocketAddress
|
|||||||
import java.util.logging.Logger
|
import java.util.logging.Logger
|
||||||
import java.util.logging.Level
|
import java.util.logging.Level
|
||||||
import java.util.ArrayList
|
import java.util.ArrayList
|
||||||
|
import java.util.Optional
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.net.InetAddress
|
import java.net.InetAddress
|
||||||
|
|
||||||
|
val logLevel = Level.INFO;
|
||||||
|
val serverPort = 10102;
|
||||||
|
val clientPort = 10101;
|
||||||
|
|
||||||
class E2E extends munit.FunSuite {
|
class E2E extends munit.FunSuite {
|
||||||
test("little baby server") {
|
val client = new Fixture[Client]("client") {
|
||||||
Toad.loadNativeLib()
|
private var client: Client = null;
|
||||||
|
|
||||||
val server = Toad.builder
|
def apply() = this.client
|
||||||
.port(10102)
|
|
||||||
.logLevel(Level.INFO)
|
|
||||||
.server
|
|
||||||
.post("exit", _msg => Server.Middleware.exit)
|
|
||||||
.get(
|
|
||||||
"hello",
|
|
||||||
msg => {
|
|
||||||
val rep = msg.buildResponse
|
|
||||||
.code(Code.OK_CONTENT)
|
|
||||||
.`type`(Type.NON)
|
|
||||||
.payload(Payload.text(s"Hello, ${msg.payload.toString}!"))
|
|
||||||
.build
|
|
||||||
|
|
||||||
Server.Middleware.respond(rep)
|
override def beforeAll(): Unit = {
|
||||||
}
|
this.client =
|
||||||
)
|
Toad.builder.port(clientPort.shortValue).logLevel(logLevel).buildClient
|
||||||
.build;
|
|
||||||
|
|
||||||
val serverThread = Thread((() => server.run()): java.lang.Runnable)
|
|
||||||
serverThread.start()
|
|
||||||
|
|
||||||
val req = Message.builder
|
|
||||||
.uri("coap://localhost:10102/hello")
|
|
||||||
.`type`(Type.NON)
|
|
||||||
.code(Code.GET)
|
|
||||||
.payload(Payload.text("Fred"))
|
|
||||||
.build
|
|
||||||
|
|
||||||
val client = Toad.builder.port(10101).logLevel(Level.INFO).buildClient
|
|
||||||
val respFuture = client.send(req)
|
|
||||||
|
|
||||||
try {
|
|
||||||
val respActual = respFuture.get(1, TimeUnit.SECONDS)
|
|
||||||
assertNoDiff(respActual.payload.toString, "Hello, Fred!")
|
|
||||||
} finally {
|
|
||||||
val exit = Message.builder
|
|
||||||
.uri("coap://localhost:10102/exit")
|
|
||||||
.`type`(Type.NON)
|
|
||||||
.code(Code.POST)
|
|
||||||
.build
|
|
||||||
client.sendNoResponse(exit)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
serverThread.join()
|
override def afterAll(): Unit = {
|
||||||
|
this.client.close()
|
||||||
|
this.client = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val server = new Fixture[Server]("server") {
|
||||||
|
private var thread: Thread = null;
|
||||||
|
private var server: Server = null;
|
||||||
|
|
||||||
|
def apply() = this.server
|
||||||
|
|
||||||
|
override def beforeAll(): Unit = {
|
||||||
|
Toad.loadNativeLib()
|
||||||
|
|
||||||
|
var counterN = 0
|
||||||
|
this.server = Toad.builder
|
||||||
|
.port(serverPort.shortValue)
|
||||||
|
.logLevel(logLevel)
|
||||||
|
.server
|
||||||
|
.put(
|
||||||
|
"failing",
|
||||||
|
_msg => {
|
||||||
|
throw java.lang.RuntimeException("fart")
|
||||||
|
}
|
||||||
|
)
|
||||||
|
.get(
|
||||||
|
"counter",
|
||||||
|
msg => {
|
||||||
|
counterN += 1
|
||||||
|
Optional.of(
|
||||||
|
msg.buildResponse
|
||||||
|
.code(Code.OK_CONTENT)
|
||||||
|
.payload(Payload.text(s"$counterN"))
|
||||||
|
.build
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
.get(
|
||||||
|
"greetings",
|
||||||
|
msg => {
|
||||||
|
Optional.of(
|
||||||
|
msg.buildResponse
|
||||||
|
.code(Code.OK_CONTENT)
|
||||||
|
.payload(Payload.text(s"Hello, ${msg.payload.toString}!"))
|
||||||
|
.build
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
.build
|
||||||
|
|
||||||
|
this.thread = this.server.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def afterAll(): Unit = {
|
||||||
|
this.server.exit()
|
||||||
|
this.thread.join()
|
||||||
|
this.server.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def munitFixtures = List(client, server)
|
||||||
|
|
||||||
|
test("server responds 2.05 Ok Content") {
|
||||||
|
server()
|
||||||
|
|
||||||
|
val rep = client()
|
||||||
|
.get(
|
||||||
|
Type.NON,
|
||||||
|
s"coap://localhost:$serverPort/greetings",
|
||||||
|
Payload.text("Fred")
|
||||||
|
)
|
||||||
|
.get()
|
||||||
|
|
||||||
|
assertNoDiff(
|
||||||
|
rep.payload.toString,
|
||||||
|
"Hello, Fred!"
|
||||||
|
)
|
||||||
|
assertEquals(
|
||||||
|
rep.code,
|
||||||
|
Code.OK_CONTENT
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
test(
|
||||||
|
"ClientObserveStream yields new resource states when server is notified of new state"
|
||||||
|
) {
|
||||||
|
val stream =
|
||||||
|
client()
|
||||||
|
.observe(
|
||||||
|
Message.builder
|
||||||
|
.uri(s"coap://localhost:$serverPort/counter")
|
||||||
|
.`type`(Type.NON)
|
||||||
|
.code(Code.GET)
|
||||||
|
.build
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertNoDiff(stream.next().get().payload.toString, "1")
|
||||||
|
server().notify("counter")
|
||||||
|
assertNoDiff(stream.next().get().payload.toString, "2")
|
||||||
|
server().notify("counter")
|
||||||
|
assertNoDiff(stream.next().get().payload.toString, "3")
|
||||||
|
} finally {
|
||||||
|
stream.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("unregistered resource responds 4.04 Not Found") {
|
||||||
|
server()
|
||||||
|
val c = client()
|
||||||
|
assertEquals(
|
||||||
|
c.get(Type.NON, s"coap://localhost:$serverPort/not-found").get().code,
|
||||||
|
Code.NOT_FOUND
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
test("handler throwing exception responds 5.00 Internal Server Error") {
|
||||||
|
server()
|
||||||
|
val c = client()
|
||||||
|
assertEquals(
|
||||||
|
c.put(Type.NON, s"coap://localhost:$serverPort/failing").get().code,
|
||||||
|
Code.INTERNAL_SERVER_ERROR
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user