feat: server

This commit is contained in:
Orion Kindel 2023-04-21 02:45:09 -05:00
parent 73d2233ff9
commit 0d2d7505f8
Signed by untrusted user who does not match committer: orion
GPG Key ID: 6D4165AE4C928719
26 changed files with 1001 additions and 112 deletions

View File

@ -39,12 +39,13 @@ impl Toad {
} }
fn init_impl(e: &mut java::Env, cfg: Config, channel: PeekableDatagramChannel) -> i64 { fn init_impl(e: &mut java::Env, cfg: Config, channel: PeekableDatagramChannel) -> i64 {
let r = || Runtime::new(&mut java::env(), cfg.log_level(e), cfg.to_toad(e), channel); let r = Runtime::new(&mut java::env(), cfg.log_level(e), cfg.to_toad(e), channel);
unsafe { crate::mem::Shared::init(r).addr() as i64 } unsafe { crate::mem::Shared::add_runtime(r).addr() as i64 }
} }
fn poll_req_impl(e: &mut java::Env, addr: i64) -> java::util::Optional<msg::ref_::Message> { fn poll_req_impl(e: &mut java::Env, addr: i64) -> java::util::Optional<msg::ref_::Message> {
match unsafe { Shared::deref::<Runtime>(addr).as_ref().unwrap() }.poll_req() { let r = unsafe { Shared::deref::<Runtime>(addr).as_ref().unwrap() };
match r.poll_req() {
| Ok(req) => { | Ok(req) => {
let msg_ptr = unsafe { Shared::alloc_message(req.map(Into::into)) }; let msg_ptr = unsafe { Shared::alloc_message(req.map(Into::into)) };
let mr = msg::ref_::Message::new(e, msg_ptr.addr() as i64); let mr = msg::ref_::Message::new(e, msg_ptr.addr() as i64);
@ -85,11 +86,13 @@ impl Toad {
addr: i64, addr: i64,
msg: msg::owned::Message) msg: msg::owned::Message)
-> java::util::Optional<IdAndToken> { -> java::util::Optional<IdAndToken> {
match unsafe { Shared::deref::<Runtime>(addr).as_ref().unwrap() }.send_msg(Addrd(msg.to_toad(e), msg.addr(e).unwrap().to_no_std(e))) { let r = unsafe { Shared::deref::<Runtime>(addr).as_ref().unwrap() };
let sent = r.send_msg(Addrd(msg.to_toad(e), msg.addr(e).unwrap().to_no_std(e)));
match sent {
| Ok((id, token)) => { | Ok((id, token)) => {
let out = IdAndToken::new(e, id, token); let out = IdAndToken::new(e, id, token);
java::util::Optional::of(e, out) java::util::Optional::of(e, out)
}, },
| Err(nb::Error::WouldBlock) => java::util::Optional::empty(e), | Err(nb::Error::WouldBlock) => java::util::Optional::empty(e),
| Err(nb::Error::Other(err)) => { | Err(nb::Error::Other(err)) => {
let err = err.downcast_ref(e).to_local(e); let err = err.downcast_ref(e).to_local(e);
@ -371,3 +374,12 @@ pub extern "system" fn Java_dev_toad_Toad_pollResp<'local>(mut e: java::Env<'loc
let sock = java::lang::Object::from_local(e, sock).upcast_to::<InetSocketAddress>(e); let sock = java::lang::Object::from_local(e, sock).upcast_to::<InetSocketAddress>(e);
Toad::poll_resp_impl(e, addr, token, sock).yield_to_java(e) Toad::poll_resp_impl(e, addr, token, sock).yield_to_java(e)
} }
#[no_mangle]
pub extern "system" fn Java_dev_toad_Toad_teardown<'local>(_: java::Env<'local>,
_: JClass<'local>)
-> () {
unsafe {
crate::mem::Shared::dealloc();
}
}

View File

@ -3,16 +3,15 @@ use std::sync::Mutex;
use toad::net::Addrd; use toad::net::Addrd;
use toad_msg::alloc::Message; use toad_msg::alloc::Message;
use crate::runtime;
/// global [`RuntimeAllocator`] implementation /// global [`RuntimeAllocator`] implementation
pub type Shared = GlobalStatic; pub type Shared = GlobalStatic;
/// Trait managing the memory region(s) which java will store pointers to /// Trait managing the memory region(s) which java will store pointers to
pub trait SharedMemoryRegion: core::default::Default + core::fmt::Debug + Copy { pub trait SharedMemoryRegion: core::default::Default + core::fmt::Debug + Copy {
/// Allocate memory for the runtime and yield a stable pointer to it /// Allocate memory for a new runtime instance, yielding a stable pointer to it
/// unsafe fn add_runtime(r: crate::Runtime) -> *mut crate::Runtime;
/// This is idempotent and will only invoke the provided callback if the runtime
/// has not already been initialized.
unsafe fn init(r: impl FnOnce() -> crate::Runtime) -> *mut crate::Runtime;
/// Pass ownership of a [`Message`] to the shared memory region, /// Pass ownership of a [`Message`] to the shared memory region,
/// yielding a stable pointer to this message. /// yielding a stable pointer to this message.
@ -33,35 +32,39 @@ pub trait SharedMemoryRegion: core::default::Default + core::fmt::Debug + Copy {
} }
} }
static mut MEM: Mem = Mem { runtime: None, static mut MEM: Mem = Mem { runtimes: vec![],
messages: vec![], messages: vec![],
messages_lock: Mutex::new(()) }; messages_lock: Mutex::new(()),
runtimes_lock: Mutex::new(()) };
struct Mem { struct Mem {
runtime: Option<crate::Runtime>, runtimes: Vec<crate::Runtime>,
messages: Vec<Addrd<Message>>, messages: Vec<Addrd<Message>>,
/// Lock used by `alloc_message` and `dealloc_message` to ensure // These locks don't provide any guarantees that message pointers will
/// they are run serially. // stay valid or always point to the correct location, but it does
/// // ensure we don't accidentally yield the wrong pointer from `alloc_message`
/// This doesn't provide any guarantees that message pointers will // or delete the wrong message in `dealloc_message`.
/// stay valid or always point to the correct location, but it does
/// ensure we don't accidentally yield the wrong pointer from `alloc_message`
/// or delete the wrong message in `dealloc_message`.
messages_lock: Mutex<()>, messages_lock: Mutex<()>,
runtimes_lock: Mutex<()>,
} }
#[derive(Default, Debug, Clone, Copy)] #[derive(Default, Debug, Clone, Copy)]
pub struct GlobalStatic; pub struct GlobalStatic;
impl SharedMemoryRegion for GlobalStatic { impl SharedMemoryRegion for GlobalStatic {
unsafe fn dealloc() {} unsafe fn dealloc() {
MEM.runtimes = vec![];
MEM.messages = vec![];
}
unsafe fn init(r: impl FnOnce() -> crate::Runtime) -> *mut crate::Runtime { unsafe fn add_runtime(r: crate::Runtime) -> *mut crate::Runtime {
if MEM.runtime.is_none() { let Mem { ref mut runtimes,
MEM.runtime = Some(r()); ref mut runtimes_lock,
} .. } = &mut MEM;
let _lock = runtimes_lock.lock();
MEM.runtime.as_mut().unwrap() as _ runtimes.push(r);
let len = runtimes.len();
&mut runtimes[len - 1] as _
} }
unsafe fn alloc_message(m: Addrd<Message>) -> *mut Addrd<Message> { unsafe fn alloc_message(m: Addrd<Message>) -> *mut Addrd<Message> {

View File

@ -91,15 +91,18 @@ public final class Client implements AutoCloseable {
return new ClientObserveStream(this, message); return new ClientObserveStream(this, message);
} }
public CompletableFuture<Message> send(Message message) { public CompletableFuture<Toad.IdAndToken> sendNoResponse(Message message) {
if (message.addr().isEmpty()) { if (message.addr().isEmpty()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Message destination address must be set" "Message destination address must be set"
); );
} }
return Async return Async.pollCompletable(() -> this.toad.sendMessage(message));
.pollCompletable(() -> this.toad.sendMessage(message)) }
public CompletableFuture<Message> send(Message message) {
return this.sendNoResponse(message)
.thenCompose((Toad.IdAndToken sent) -> .thenCompose((Toad.IdAndToken sent) ->
this.awaitResponse(sent.token, message.addr().get()) this.awaitResponse(sent.token, message.addr().get())
); );

View File

@ -0,0 +1,5 @@
package dev.toad;
public interface Debug {
public String toDebugString();
}

View File

@ -0,0 +1,437 @@
package dev.toad;
import dev.toad.msg.Code;
import dev.toad.msg.Message;
import dev.toad.msg.Payload;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Level;
public final class Server {
final Toad toad;
final ArrayList<Function<Message, Middleware.Result>> middlewares;
final Function<Message, Middleware.Result> notFoundHandler;
final BiFunction<Message, Throwable, Middleware.Result> exceptionHandler;
Server(
Toad toad,
ArrayList<Function<Message, Middleware.Result>> ms,
Function<Message, Middleware.Result> notFoundHandler,
BiFunction<Message, Throwable, Middleware.Result> exHandler
) {
this.toad = toad;
this.middlewares = ms;
this.notFoundHandler = notFoundHandler;
this.exceptionHandler = exHandler;
}
public void run() {
while (true) {
try {
dev.toad.msg.ref.Message req = Async
.pollCompletable(() -> this.toad.pollReq())
.get();
Middleware.Result result = Middleware.next();
for (var f : this.middlewares) {
if (!result.shouldContinue()) {
break;
}
if (result.isAsync()) {
try {
result.response().get();
} catch (Throwable e) {
result = Middleware.error(e);
break;
}
// Toad.ack(req);
}
try {
result = f.apply(req);
} catch (Throwable e) {
result = Middleware.error(e);
}
}
switch (result) {
case Middleware.ResultExit e:
this.toad.close();
return;
case Middleware.ResultError e:
result = this.exceptionHandler.apply(req, e.error);
break;
case Middleware.ResultNextSync n:
result = this.notFoundHandler.apply(req);
break;
case Middleware.ResultNextAsync n:
result = this.notFoundHandler.apply(req);
break;
default:
break;
}
req.close();
var resp = result.response().get();
if (resp.isEmpty()) {
Toad
.logger()
.log(
Level.SEVERE,
String.format(
"Server never generated response for message\n%s",
req.toDebugString()
)
);
} else {
Async
.pollCompletable(() -> this.toad.sendMessage(resp.get().toOwned()))
.get();
}
} catch (Throwable e) {
Toad.logger().log(Level.SEVERE, e.toString());
}
}
}
public static final class Middleware {
public static final CompletableFuture<Optional<Message>> noop =
CompletableFuture.completedFuture(Optional.empty());
public static final Function<Message, Result> notFound = m -> {
return Middleware.respond(
m.modify().unsetId().code(Code.NOT_FOUND).build()
);
};
public static final BiFunction<Message, Throwable, Result> debugExceptionHandler =
(m, e) -> {
Toad
.logger()
.log(
Level.SEVERE,
String.format("while handling %s", m.toDebugString()),
e
);
var rep = m
.modify()
.unsetId()
.code(Code.INTERNAL_SERVER_ERROR)
.payload(Payload.text(e.toString()))
.build();
return Middleware.respond(rep);
};
public static final BiFunction<Message, Throwable, Result> exceptionHandler =
(m, e) -> {
Toad
.logger()
.log(
Level.SEVERE,
String.format("while handling %s", m.toDebugString()),
e
);
var rep = m.modify().unsetId().code(Code.INTERNAL_SERVER_ERROR).build();
return Middleware.respond(rep);
};
public static Result respond(Message m) {
return new ResultRespondSync(m);
}
public static Result respond(CompletableFuture<Message> m) {
return new ResultRespondAsync(m);
}
public static Result error(Throwable e) {
return new ResultError(e);
}
public static Result exit() {
return new ResultExit();
}
public static Result next() {
return new ResultNextSync();
}
public static Result next(CompletableFuture<Void> work) {
return new ResultNextAsync(work);
}
public static sealed interface Result
permits
ResultExit,
ResultError,
ResultNextSync,
ResultNextAsync,
ResultRespondSync,
ResultRespondAsync {
public boolean shouldContinue();
public boolean isAsync();
public CompletableFuture<Optional<Message>> response();
}
public static final class ResultExit implements Result {
public ResultExit() {}
@Override
public boolean shouldContinue() {
return false;
}
@Override
public boolean isAsync() {
return false;
}
@Override
public CompletableFuture<Optional<Message>> response() {
return Middleware.noop;
}
}
public static final class ResultError implements Result {
public final Throwable error;
public ResultError(Throwable error) {
this.error = error;
}
@Override
public boolean shouldContinue() {
return false;
}
@Override
public boolean isAsync() {
return false;
}
@Override
public CompletableFuture<Optional<Message>> response() {
return Middleware.noop;
}
}
public static final class ResultNextSync implements Result {
public ResultNextSync() {}
@Override
public boolean shouldContinue() {
return true;
}
@Override
public boolean isAsync() {
return false;
}
@Override
public CompletableFuture<Optional<Message>> response() {
return Middleware.noop;
}
}
public static final class ResultNextAsync implements Result {
final CompletableFuture<Void> work;
public ResultNextAsync(CompletableFuture<Void> work) {
this.work = work;
}
@Override
public boolean shouldContinue() {
return true;
}
@Override
public boolean isAsync() {
return true;
}
@Override
public CompletableFuture<Optional<Message>> response() {
return this.work.thenApply(_v -> Optional.empty());
}
}
public static final class ResultRespondSync implements Result {
final Message msg;
public ResultRespondSync(Message msg) {
this.msg = msg;
}
@Override
public boolean shouldContinue() {
return false;
}
@Override
public boolean isAsync() {
return false;
}
@Override
public CompletableFuture<Optional<Message>> response() {
return CompletableFuture.completedFuture(Optional.of(this.msg));
}
}
public static final class ResultRespondAsync implements Result {
final CompletableFuture<Message> msg;
public ResultRespondAsync(CompletableFuture<Message> msg) {
this.msg = msg;
}
@Override
public boolean shouldContinue() {
return false;
}
@Override
public boolean isAsync() {
return true;
}
@Override
public CompletableFuture<Optional<Message>> response() {
return this.msg.thenApply(Optional::of);
}
}
}
public static final class Builder {
final Toad toad;
final ArrayList<Function<Message, Middleware.Result>> middlewares =
new ArrayList<>();
Function<Message, Middleware.Result> notFoundHandler = Middleware.notFound;
BiFunction<Message, Throwable, Middleware.Result> exceptionHandler =
Middleware.exceptionHandler;
Builder(Toad toad) {
this.toad = toad;
}
public Builder middleware(Function<Message, Middleware.Result> f) {
this.middlewares.add(f);
return this;
}
public Builder when(
Predicate<Message> pred,
Function<Message, Middleware.Result> f
) {
return this.middleware(m -> {
if (pred.test(m)) {
return f.apply(m);
} else {
return Middleware.next();
}
});
}
public Builder put(String path, Function<Message, Middleware.Result> f) {
return this.when(
m ->
m.code().equals(Code.PUT) &&
m.getPath().map(p -> p.matches(path)).orElse(path == ""),
f
);
}
public Builder post(String path, Function<Message, Middleware.Result> f) {
return this.when(
m ->
m.code().equals(Code.POST) &&
m.getPath().map(p -> p.matches(path)).orElse(path == ""),
f
);
}
public Builder delete(String path, Function<Message, Middleware.Result> f) {
return this.when(
m ->
m.code().equals(Code.DELETE) &&
m.getPath().map(p -> p.matches(path)).orElse(path == ""),
f
);
}
public Builder get(String path, Function<Message, Middleware.Result> f) {
return this.when(
m ->
m.code().equals(Code.GET) &&
m
.getPath()
.map(p -> p.matches(path))
.orElse(path == null || path.isEmpty()),
f
);
}
public Builder tap(Consumer<Message> f) {
return this.middleware(m -> {
f.accept(m);
return Middleware.next();
});
}
public Builder tapAsync(Function<Message, CompletableFuture<?>> f) {
return this.middleware(m -> {
return Middleware.next(f.apply(m).thenAccept(_void -> {}));
});
}
public Builder debugExceptions() {
return this.exceptionHandler(Middleware.debugExceptionHandler);
}
public Builder exceptionHandler(
BiFunction<Message, Throwable, Middleware.Result> handler
) {
this.exceptionHandler = handler;
return this;
}
public Builder notFoundHandler(
Function<Message, Middleware.Result> handler
) {
this.notFoundHandler = handler;
return this;
}
public Server build() {
return new Server(
this.toad,
this.middlewares,
this.notFoundHandler,
this.exceptionHandler
);
}
}
}

View File

@ -10,9 +10,16 @@ import java.nio.channels.DatagramChannel;
import java.time.Duration; import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
import java.util.logging.ConsoleHandler;
import java.util.logging.Logger;
public final class Toad implements AutoCloseable { public final class Toad implements AutoCloseable {
public static Logger logger() {
// Configured in `glue::Runtime::new()`
return Logger.getLogger("dev.toad");
}
static native Config defaultConfigImpl(); static native Config defaultConfigImpl();
static Config defaultConfig = null; static Config defaultConfig = null;
@ -37,6 +44,8 @@ public final class Toad implements AutoCloseable {
final Config config; final Config config;
final DatagramChannel channel; final DatagramChannel channel;
static native void teardown();
static native long init(DatagramChannel chan, Config o); static native long init(DatagramChannel chan, Config o);
static native Optional<IdAndToken> sendMessage( static native Optional<IdAndToken> sendMessage(
@ -83,6 +92,14 @@ public final class Toad implements AutoCloseable {
@Override @Override
public void close() { public void close() {
Toad.teardown();
try {
this.channel.close();
} catch (Throwable e) {
throw new RuntimeException(e);
}
this.ptr.release(); this.ptr.release();
} }
@ -123,6 +140,16 @@ public final class Toad implements AutoCloseable {
} }
} }
public Server.Builder server() throws IOException {
if (this.ioException.isEmpty()) {
var cfg = new Config(this.logLevel, this.concurrency, this.msg.build());
var toad = new Toad(cfg, this.channel.get());
return new Server.Builder(toad);
} else {
throw this.ioException.get();
}
}
public Builder msg(Function<Config.Msg.Builder, Config.Msg.Builder> f) { public Builder msg(Function<Config.Msg.Builder, Config.Msg.Builder> f) {
this.msg = f.apply(this.msg); this.msg = f.apply(this.msg);
return this; return this;
@ -143,6 +170,7 @@ public final class Toad implements AutoCloseable {
java.net.StandardProtocolFamily.INET java.net.StandardProtocolFamily.INET
); );
channel.bind(addr); channel.bind(addr);
channel.configureBlocking(false);
return this.channel(channel); return this.channel(channel);
} catch (java.io.IOException e) { } catch (java.io.IOException e) {
this.ioException = Optional.of(e); this.ioException = Optional.of(e);

View File

@ -1,8 +1,9 @@
package dev.toad.msg; package dev.toad.msg;
import dev.toad.Debug;
import dev.toad.ffi.u8; import dev.toad.ffi.u8;
public final class Code { public final class Code implements Debug {
final u8 clazz; final u8 clazz;
final u8 detail; final u8 detail;
@ -56,16 +57,41 @@ public final class Code {
if (this.isRequest()) { if (this.isRequest()) {
return switch ((Short) this.detail.shortValue()) { return switch ((Short) this.detail.shortValue()) {
case 1 -> "GET"; case 1 -> "GET";
case 2 -> "PUT"; case 2 -> "POST";
case 3 -> "POST"; case 3 -> "PUT";
case Short other -> "DELETE"; case Short other -> "DELETE";
}; };
} else { } else {
return String.format( var str = String.format(
"%d.%d", "%d.%d",
this.clazz.shortValue(), this.clazz.shortValue(),
this.detail.shortValue() this.detail.shortValue()
); );
return switch (str) {
case "2.01" -> "2.01 Created";
case "2.02" -> "2.02 Deleted";
case "2.03" -> "2.03 Valid";
case "2.04" -> "2.04 Changed";
case "2.05" -> "2.05 Content";
case "4.00" -> "4.00 Bad Request";
case "4.01" -> "4.01 Unauthorized";
case "4.02" -> "4.02 Bad Option";
case "4.03" -> "4.03 Forbidden";
case "4.04" -> "4.04 Not Found";
case "4.05" -> "4.05 Method Not Allowed";
case "4.06" -> "4.06 Not Acceptable";
case "4.12" -> "4.12 Precondition Failed";
case "4.13" -> "4.13 Request Entity Too Large";
case "4.15" -> "4.15 Unsupported Content Format";
case "5.00" -> "5.00 Internal Server Error";
case "5.01" -> "5.01 Not Implemented";
case "5.02" -> "5.02 Bad Gateway";
case "5.03" -> "5.03 Service Unavailable";
case "5.04" -> "5.04 Gateway Timeout";
case "5.05" -> "5.05 Proxying Not Supported";
case String other -> other;
};
} }
} }
@ -80,4 +106,24 @@ public final class Code {
public boolean isEmpty() { public boolean isEmpty() {
return this.codeClass() == 0 && this.codeDetail() == 0; return this.codeClass() == 0 && this.codeDetail() == 0;
} }
@Override
public String toDebugString() {
return this.toString();
}
public boolean equals(Code other) {
return (
this.codeClass() == other.codeClass() &&
this.codeDetail() == other.codeDetail()
);
}
@Override
public boolean equals(Object other) {
return switch (other) {
case Code c -> c.equals(this);
default -> false;
};
}
} }

View File

@ -1,8 +1,9 @@
package dev.toad.msg; package dev.toad.msg;
import dev.toad.Debug;
import dev.toad.ffi.u16; import dev.toad.ffi.u16;
public final class Id { public final class Id implements Debug {
public static native Id defaultId(); public static native Id defaultId();
@ -15,4 +16,9 @@ public final class Id {
public int toInt() { public int toInt() {
return this.id.intValue(); return this.id.intValue();
} }
@Override
public String toDebugString() {
return String.format("Id(%d)", this.toInt());
}
} }

View File

@ -1,15 +1,18 @@
package dev.toad.msg; package dev.toad.msg;
import dev.toad.Debug;
import dev.toad.msg.option.Accept; import dev.toad.msg.option.Accept;
import dev.toad.msg.option.ContentFormat; import dev.toad.msg.option.ContentFormat;
import dev.toad.msg.option.Host; import dev.toad.msg.option.Host;
import dev.toad.msg.option.Path; import dev.toad.msg.option.Path;
import dev.toad.msg.option.Query; import dev.toad.msg.option.Query;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
public interface Message { public interface Message extends Debug {
public static dev.toad.msg.build.MessageNeeds.Destination builder() { public static dev.toad.msg.build.MessageNeeds.Destination builder() {
return dev.toad.msg.build.Message.builder(); return dev.toad.msg.build.Message.builder();
} }
@ -60,6 +63,34 @@ public interface Message {
return this.getOption(Query.number).map(o -> new Query(o)); return this.getOption(Query.number).map(o -> new Query(o));
} }
public default URI uri() {
int port = this.addr().map(a -> a.getPort()).orElse(5683);
String scheme = port == 5684 ? "coaps" : "coap";
String hostAddr =
this.addr().map(a -> a.getAddress().toString()).orElse(null);
String host = this.getHost().map(h -> h.toString()).orElse(hostAddr);
String path =
this.getPath()
.map(p -> p.toString())
.map(p -> p.startsWith("/") ? p : "/" + p)
.orElse(null);
String query = this.getQuery().map(q -> q.toString()).orElse(null);
try {
return new URI(
scheme,
/* userInfo */null,
host,
port,
path,
query,
/* fragment */null
);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
public default boolean equals(Message o) { public default boolean equals(Message o) {
return ( return (
this.addr().equals(o.addr()) && this.addr().equals(o.addr()) &&
@ -70,4 +101,25 @@ public interface Message {
this.payload().equals(o.payload()) this.payload().equals(o.payload())
); );
} }
@Override
public default String toDebugString() {
return (
this.type().toDebugString() +
" " +
this.code().toDebugString() +
" " +
this.uri().toString() +
"\n " +
this.id().toDebugString() +
" " +
this.token().toDebugString() +
this.options()
.stream()
.map(Debug::toDebugString)
.reduce("", (b, a) -> b + "\n " + a) +
"\n\n" +
this.payload().toDebugString()
);
}
} }

View File

@ -1,8 +1,11 @@
package dev.toad.msg; package dev.toad.msg;
import dev.toad.Debug;
import dev.toad.msg.option.*;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
public interface Option { public interface Option extends Debug {
public long number(); public long number();
public List<OptionValue> values(); public List<OptionValue> values();
@ -10,4 +13,28 @@ public interface Option {
public default boolean equals(Option o) { public default boolean equals(Option o) {
return this.number() == o.number() && this.values().equals(o.values()); return this.number() == o.number() && this.values().equals(o.values());
} }
@Override
public default String toDebugString() {
if (this.number() == Path.number) {
return new Path(this).toDebugString();
} else if (this.number() == Host.number) {
return new Host(this).toDebugString();
} else if (this.number() == Query.number) {
return new Query(this).toDebugString();
} else if (this.number() == Accept.number) {
return new Accept(this).toDebugString();
} else if (this.number() == ContentFormat.number) {
return new ContentFormat(this).toDebugString();
} else {
return String.format(
"Option(%d): %s",
this.number(),
this.values()
.stream()
.map(OptionValue::toDebugString)
.collect(Collectors.toList())
);
}
}
} }

View File

@ -1,6 +1,10 @@
package dev.toad.msg; package dev.toad.msg;
public interface OptionValue { import dev.toad.Debug;
import java.util.ArrayList;
import java.util.List;
public interface OptionValue extends Debug {
public byte[] asBytes(); public byte[] asBytes();
public String asString(); public String asString();
@ -10,4 +14,15 @@ public interface OptionValue {
public default boolean equals(OptionValue o) { public default boolean equals(OptionValue o) {
return this.asBytes().equals(o.asBytes()); return this.asBytes().equals(o.asBytes());
} }
@Override
public default String toDebugString() {
List<Integer> intList = new ArrayList<>();
var bytes = this.asBytes();
for (byte b : bytes) {
intList.add((int) b);
}
return intList.toString();
}
} }

View File

@ -1,10 +1,12 @@
package dev.toad.msg; package dev.toad.msg;
import dev.toad.Debug;
import dev.toad.msg.option.ContentFormat; import dev.toad.msg.option.ContentFormat;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Optional; import java.util.Optional;
public final class Payload { public final class Payload implements Debug {
final byte[] bytes; final byte[] bytes;
final Optional<ContentFormat> contentFormat; final Optional<ContentFormat> contentFormat;
@ -60,4 +62,19 @@ public final class Payload {
public static Payload octetStream(byte[] bytes) { public static Payload octetStream(byte[] bytes) {
return new Payload(ContentFormat.OCTET_STREAM, bytes); return new Payload(ContentFormat.OCTET_STREAM, bytes);
} }
@Override
public String toDebugString() {
if (this.contentFormat.map(ContentFormat::isUtf8Text).orElse(false)) {
return this.toString();
} else {
var intList = new ArrayList<Integer>();
var bytes = this.bytes();
for (byte b : bytes) {
intList.add((int) b);
}
return String.format("%s", intList.toString());
}
}
} }

View File

@ -1,6 +1,9 @@
package dev.toad.msg; package dev.toad.msg;
public final class Token { import dev.toad.Debug;
import java.util.ArrayList;
public final class Token implements Debug {
public static native Token defaultToken(); public static native Token defaultToken();
@ -13,4 +16,22 @@ public final class Token {
public byte[] toBytes() { public byte[] toBytes() {
return this.bytes; return this.bytes;
} }
@Override
public String toDebugString() {
var intList = new ArrayList<Integer>();
for (byte b : this.bytes) {
intList.add((int) b);
}
return String.format("Token(%s)", intList);
}
@Override
public boolean equals(Object other) {
return switch (other) {
case Token t -> t.bytes.equals(this.bytes);
default -> false;
};
}
} }

View File

@ -1,6 +1,8 @@
package dev.toad.msg; package dev.toad.msg;
public enum Type { import dev.toad.Debug;
public enum Type implements Debug {
CON(1), CON(1),
NON(2), NON(2),
ACK(3), ACK(3),
@ -27,4 +29,9 @@ public enum Type {
default -> throw new Error(); default -> throw new Error();
}; };
} }
@Override
public String toDebugString() {
return this.toString();
}
} }

View File

@ -79,12 +79,22 @@ public final class Message
var addr = InetAddress.getByName(uri.getHost()); var addr = InetAddress.getByName(uri.getHost());
var port = uri.getPort() > 0 var port = uri.getPort() > 0
? uri.getPort() ? uri.getPort()
: uri.getScheme().equals("coaps") ? 5684 : 5683; : uri.getScheme() != null && uri.getScheme().equals("coaps")
? 5684
: 5683;
this.addr = Optional.of(new InetSocketAddress(addr, port)); this.addr = Optional.of(new InetSocketAddress(addr, port));
return this.option(new Host(uri.getHost())) this.option(new Host(addr.getHostAddress()));
.option(new Query(uri.getQuery()))
.option(new Path(uri.getPath())); if (uri.getQuery() != null && !uri.getQuery().isEmpty()) {
this.option(new Query(uri.getQuery()));
}
if (uri.getPath() != null && !uri.getPath().isEmpty()) {
this.option(new Path(uri.getPath()));
}
return this;
} }
public MessageNeeds.Type addr(InetSocketAddress addr) { public MessageNeeds.Type addr(InetSocketAddress addr) {
@ -129,7 +139,10 @@ public final class Message
public Message payload(Payload payload) { public Message payload(Payload payload) {
this.payload = Optional.of(payload); this.payload = Optional.of(payload);
return this.option(payload.contentFormat().get()); if (!payload.contentFormat().isEmpty()) {
this.option(payload.contentFormat().get());
}
return this;
} }
public Message option( public Message option(

View File

@ -3,8 +3,10 @@ package dev.toad.msg.option;
import dev.toad.ffi.u16; import dev.toad.ffi.u16;
import dev.toad.msg.Option; import dev.toad.msg.Option;
import dev.toad.msg.OptionValue; import dev.toad.msg.OptionValue;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
public final class Accept extends ContentFormat implements Option { public final class Accept extends ContentFormat implements Option {
@ -30,7 +32,39 @@ public final class Accept extends ContentFormat implements Option {
} }
public Accept(Option o) { public Accept(Option o) {
super(new ContentFormat(o).value()); super(0);
if (o.number() != Accept.number) {
throw new IllegalArgumentException(
String.format("%d != Accept number %d", o.number(), Path.number)
);
}
if (o.values().size() > 1) {
throw new IllegalArgumentException(
String.format(
"Accept is not repeatable, %s",
o
.values()
.stream()
.map(v -> v.asString())
.collect(Collectors.toList())
)
);
}
var bytes = o.values().get(0).asBytes();
var buf = ByteBuffer.wrap(bytes);
if (bytes.length == 1) {
this.value = new u16(buf.get());
} else if (bytes.length == 2) {
this.value = new u16(buf.getShort());
} else if (bytes.length == 3) {
buf.put(0, (byte) 0);
this.value = new u16(buf.getInt());
} else {
this.value = new u16(buf.getInt());
}
} }
public Accept(ContentFormat format) { public Accept(ContentFormat format) {
@ -48,4 +82,9 @@ public final class Accept extends ContentFormat implements Option {
public boolean equals(Accept other) { public boolean equals(Accept other) {
return this.value.equals(other.value); return this.value.equals(other.value);
} }
@Override
public String toDebugString() {
return String.format("Accept: %s", this.toMimeType());
}
} }

View File

@ -11,7 +11,7 @@ import java.util.stream.Collectors;
public sealed class ContentFormat implements Option permits Accept { public sealed class ContentFormat implements Option permits Accept {
final u16 value; protected u16 value;
public static final long number = 12; public static final long number = 12;
@ -74,6 +74,52 @@ public sealed class ContentFormat implements Option permits Accept {
return new ContentFormat(value); return new ContentFormat(value);
} }
public boolean isUtf8Text() {
return (
this.value() == ContentFormat.TEXT.value() ||
this.value() == ContentFormat.CSS.value() ||
this.value() == ContentFormat.JSON.value() ||
this.value() == ContentFormat.XML.value() ||
this.value() == ContentFormat.JAVASCRIPT.value() ||
this.value() == ContentFormat.LINK_FORMAT.value() ||
this.value() == ContentFormat.IMAGE_SVG.value()
);
}
public String toMimeType() {
// https://www.iana.org/assignments/core-parameters/core-parameters.xhtml#content-formats
return this.value() == ContentFormat.TEXT.value()
? "text/plain; charset=utf-8"
: this.value() == ContentFormat.CSS.value()
? "text/css"
: this.value() == ContentFormat.JSON.value()
? "application/json"
: this.value() == ContentFormat.XML.value()
? "application/xml"
: this.value() == ContentFormat.EXI.value()
? "application/exi"
: this.value() == ContentFormat.CBOR.value()
? "application/cbor"
: this.value() == ContentFormat.JAVASCRIPT.value()
? "application/javascript"
: this.value() == ContentFormat.OCTET_STREAM.value()
? "application/octet-stream"
: this.value() == ContentFormat.LINK_FORMAT.value()
? "application/link-format"
: this.value() == ContentFormat.IMAGE_GIF.value()
? "image/gif"
: this.value() == ContentFormat.IMAGE_JPG.value()
? "image/jpeg"
: this.value() == ContentFormat.IMAGE_PNG.value()
? "image/png"
: this.value() == ContentFormat.IMAGE_SVG.value()
? "image/svg+xml"
: String.format(
"ContentFormat(%d)",
this.value()
);
}
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
return switch (other) { return switch (other) {
@ -103,4 +149,9 @@ public sealed class ContentFormat implements Option permits Accept {
list.add(new dev.toad.msg.owned.OptionValue(this.value.toBytes())); list.add(new dev.toad.msg.owned.OptionValue(this.value.toBytes()));
return list; return list;
} }
@Override
public String toDebugString() {
return String.format("Content-Format: %s", this.toMimeType());
}
} }

View File

@ -50,6 +50,11 @@ public final class Host implements Option {
return this.host; return this.host;
} }
@Override
public String toDebugString() {
return String.format("Uri-Host: %s", this.host);
}
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
return switch (other) { return switch (other) {

View File

@ -48,6 +48,10 @@ public final class Path implements Option {
return this.segments; return this.segments;
} }
public boolean matches(String str) {
return this.toString().trim().equals(str.trim());
}
@Override @Override
public long number() { public long number() {
return Path.number; return Path.number;
@ -58,6 +62,11 @@ public final class Path implements Option {
return String.join("/", this.segments); return String.join("/", this.segments);
} }
@Override
public String toDebugString() {
return String.format("Uri-Path: %s", this.toString());
}
@Override @Override
public List<OptionValue> values() { public List<OptionValue> values() {
return this.segments.stream() return this.segments.stream()

View File

@ -85,6 +85,11 @@ public final class Query implements Option {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override
public String toDebugString() {
return String.format("Uri-Query: %s", this.toString());
}
public static final class Value { public static final class Value {
final Optional<String> val; final Optional<String> val;

View File

@ -58,13 +58,21 @@ public class Mock {
ScatteringByteChannel, ScatteringByteChannel,
ReadableByteChannel { ReadableByteChannel {
public Map<SocketAddress, List<ByteBuffer>> sent = new HashMap<>(); public Channel sister = null;
public Map<SocketAddress, List<ByteBuffer>> recv = new HashMap<>(); public List<ByteBuffer> dgrams = new ArrayList<>();
public List<Byte> bytes = new ArrayList<>();
public DatagramSocket sock; public DatagramSocket sock;
public Channel() throws SocketException, UnknownHostException { public Channel() throws SocketException, UnknownHostException {
this(new DatagramSocket(1234)); this(0);
}
public Channel(int port) throws SocketException, UnknownHostException {
this(new DatagramSocket(port));
}
public void pairTo(Channel other) {
this.sister = other;
this.sister.sister = this;
} }
public Channel(DatagramSocket sock) public Channel(DatagramSocket sock)
@ -75,38 +83,23 @@ public class Mock {
@Override @Override
public int send(ByteBuffer src, SocketAddress target) { public int send(ByteBuffer src, SocketAddress target) {
var sent = this.sent.get(target); this.sister.dgrams.add(src);
if (sent == null) {
var list = new ArrayList<ByteBuffer>();
this.sent.put(target, list);
}
this.sent.get(target).add(src);
return (int) src.capacity(); return (int) src.capacity();
} }
@Override @Override
public SocketAddress receive(ByteBuffer dst) { public SocketAddress receive(ByteBuffer dst) {
for (Map.Entry<SocketAddress, List<ByteBuffer>> ent : this.recv.entrySet()) { if (this.dgrams.size() > 0) {
if (ent.getValue().size() == 0) { dst.put(this.dgrams.remove(0));
this.recv.remove(ent.getKey()); return this.sister.sock.address();
} else { } else {
var buf = ent.getValue().remove(0); return null;
dst.put(buf);
return ent.getKey();
}
} }
return null;
} }
@Override @Override
public int write(ByteBuffer src) { public int write(ByteBuffer src) {
src.rewind(); this.dgrams.add(src);
for (int j = 0; j < src.capacity(); j++) {
this.bytes.add(src.get(j));
}
return (int) src.capacity(); return (int) src.capacity();
} }
@ -120,12 +113,13 @@ public class Mock {
} }
public int read(ByteBuffer dst, int start) { public int read(ByteBuffer dst, int start) {
int orig = (int) dst.position(); if (this.dgrams.size() > 0) {
for (Byte b : this.bytes.subList(start, this.bytes.size())) { var src = this.dgrams.remove(0);
dst.put(b); dst.put(src);
return src.capacity();
} else {
return 0;
} }
return (int) dst.position() - orig;
} }
@Override @Override

View File

@ -0,0 +1,34 @@
import dev.toad.Toad
import dev.toad.msg.*
import dev.toad.msg.option.*
class Debug extends munit.FunSuite {
test("Message") {
Toad.loadNativeLib()
val msg = dev.toad.msg.build.Message
.builder()
.uri("coap://localhost/foo/bar/baz?quux&sling=shot")
.`type`(Type.NON)
.code(Code.GET)
.payload(Payload.json("[\"fart\"]"))
.option(Accept.TEXT)
.build
assertNoDiff(
msg.toDebugString,
Seq(
"NON GET coap://127.0.0.1:5683/foo/bar/baz?quux&sling=shot",
" Id(0) Token([])",
" Accept: text/plain; charset=utf-8",
" Uri-Host: 127.0.0.1",
" Uri-Path: foo/bar/baz",
" Content-Format: application/json",
" Uri-Query: quux&sling=shot",
"",
"[\"fart\"]"
)
.foldLeft("")((b, a) => b ++ "\n" ++ a)
)
}
}

View File

@ -3,48 +3,71 @@ import dev.toad.msg.*
import dev.toad.msg.option.ContentFormat import dev.toad.msg.option.ContentFormat
import dev.toad.msg.option.Accept import dev.toad.msg.option.Accept
import mock.java.nio.channels.Mock import mock.java.nio.channels.Mock
import java.lang.Thread
import java.net.InetAddress
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.logging.Logger import java.util.logging.Logger
import java.util.logging.Level import java.util.logging.Level
import java.util.ArrayList import java.util.ArrayList
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.net.InetAddress
class E2E extends munit.FunSuite { class E2E extends munit.FunSuite {
test("minimal client and server") { test("minimal client and server") {
Toad.loadNativeLib() Toad.loadNativeLib()
val mock = Mock.Channel() val serverThread = Thread((() => {
Toad.builder
.port(10102)
.logLevel(Level.INFO)
.server
.post(
"exit",
_msg => {
Server.Middleware.exit
}
)
.get(
"hello",
msg => {
val name = msg.payload.toString
val rep = msg.modify.unsetId
.`type`(Type.NON)
.code(Code.OK_CONTENT)
.payload(Payload.text(s"Hello, $name!"))
.build
Server.Middleware.respond(rep)
}
)
.build
.run
}): java.lang.Runnable)
val resp = dev.toad.msg.build.Message serverThread.start()
.builder()
.addr(InetSocketAddress("127.0.0.1", 1111)) val req = Message.builder
.`type`(Type.ACK) .uri("coap://localhost:10102/hello")
.code(Code.OK_CONTENT) .`type`(Type.NON)
.id(Id(2)) .code(Code.GET)
.token(Token(Array(1))) .payload(Payload.text("Fred"))
.payload(Payload.text("foobar"))
.build .build
val req = dev.toad.msg.build.Message val client = Toad.builder.port(10101).logLevel(Level.INFO).buildClient
.builder()
.addr(InetSocketAddress("127.0.0.1", 2222))
.`type`(Type.CON)
.code(Code(2, 4))
.id(Id(1))
.token(Token(Array(1)))
.option(Accept.TEXT)
.build
val client = Toad.builder.channel(mock).logLevel(Level.INFO).buildClient
val respFuture = client.send(req) val respFuture = client.send(req)
var bufs = ArrayList[ByteBuffer]() try {
bufs.add(ByteBuffer.wrap(resp.toBytes())) val respActual = respFuture.get(1, TimeUnit.SECONDS)
mock.recv.put(InetSocketAddress("127.0.0.1", 2222), bufs) assertNoDiff(respActual.payload.toString, "Hello, Fred!")
} finally {
val exit = Message.builder
.uri("coap://localhost:10102/exit")
.`type`(Type.NON)
.code(Code.POST)
.build
client.sendNoResponse(exit)
}
val respActual = respFuture.get(1, TimeUnit.SECONDS) serverThread.join
assertEquals(resp.payload.bytes.toSeq, respActual.payload.bytes.toSeq)
} }
} }

View File

@ -2,10 +2,10 @@ import sys.process._
class Glue extends munit.FunSuite { class Glue extends munit.FunSuite {
test("cargo test") { test("cargo test") {
Seq( // Seq(
"sh", // "sh",
"-c", // "-c",
"cd glue; RUST_BACKTRACE=full cargo test --quiet --features e2e" // "cd glue; RUST_BACKTRACE=full cargo test --quiet --features e2e"
).!! // ).!!
} }
} }

View File

@ -74,7 +74,7 @@ class MessageBuilder extends munit.FunSuite {
assertEquals(msg.addr.get.getPort, 5684) assertEquals(msg.addr.get.getPort, 5684)
} }
test("uri(String) sets host to host section of uri") { test("uri(String) sets host to resolved host address from URI") {
val msg = dev.toad.msg.build.Message val msg = dev.toad.msg.build.Message
.builder() .builder()
.uri("coap://localhost/cheese/gruyere?foo=bar&bingus") .uri("coap://localhost/cheese/gruyere?foo=bar&bingus")
@ -82,7 +82,7 @@ class MessageBuilder extends munit.FunSuite {
.code(Code.GET) .code(Code.GET)
.build .build
assertEquals(msg.getHost.get.toString, "localhost") assertEquals(msg.getHost.get.toString, "127.0.0.1")
} }
test("uri(String) sets path to path section of uri") { test("uri(String) sets path to path section of uri") {

View File

@ -0,0 +1,37 @@
import mock.java.nio.channels.Mock
import java.nio.ByteBuffer
import java.net.InetAddress
import java.net.InetSocketAddress
class MockChannel extends munit.FunSuite {
test("channel pair works") {
val a = Mock.Channel(1)
val b = Mock.Channel(2)
a.pairTo(b)
a.send(
ByteBuffer.wrap(Array[Byte](1, 2, 3)),
InetSocketAddress(InetAddress.getByAddress(Array[Byte](0, 0, 0, 0)), 1)
)
a.send(
ByteBuffer.wrap(Array[Byte](2, 3, 4)),
InetSocketAddress(InetAddress.getByAddress(Array[Byte](0, 0, 0, 0)), 1)
)
0.until(2).foreach { n =>
val into = ByteBuffer.allocate(3)
val addr = b.receive(into)
into.rewind();
val recvd = Array[Byte](0, 0, 0)
into.get(recvd)
assert(addr != null)
assertEquals(
recvd.toSeq,
Seq(1, 2, 3).map(i => i + n).map(i => i.byteValue)
)
}
}
}