feat: message stuff equals, clientobservestream
This commit is contained in:
parent
c71993ba24
commit
73d2233ff9
@ -2,7 +2,10 @@ package dev.toad;
|
||||
|
||||
import dev.toad.msg.Code;
|
||||
import dev.toad.msg.Message;
|
||||
import dev.toad.msg.Payload;
|
||||
import dev.toad.msg.Token;
|
||||
import dev.toad.msg.Type;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Optional;
|
||||
@ -22,12 +25,72 @@ public final class Client implements AutoCloseable {
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> get(Type ty, String uri)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.get(ty, uri, new Payload());
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> get(Type ty, String uri, Payload p)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.send(
|
||||
Message.builder().uri(uri).type(ty).code(Code.GET).build()
|
||||
Message.builder().uri(uri).type(ty).code(Code.GET).payload(p).build()
|
||||
);
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> post(String uri)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.post(Type.CON, uri);
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> post(Type ty, String uri)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.post(ty, uri, new Payload());
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> post(Type ty, String uri, Payload p)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.send(
|
||||
Message.builder().uri(uri).type(ty).code(Code.POST).payload(p).build()
|
||||
);
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> put(String uri)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.put(Type.CON, uri);
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> put(Type ty, String uri)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.put(ty, uri, new Payload());
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> put(Type ty, String uri, Payload p)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.send(
|
||||
Message.builder().uri(uri).type(ty).code(Code.PUT).payload(p).build()
|
||||
);
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> delete(String uri)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.delete(Type.CON, uri);
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> delete(Type ty, String uri)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.delete(ty, uri, new Payload());
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> delete(Type ty, String uri, Payload p)
|
||||
throws URISyntaxException, UnknownHostException {
|
||||
return this.send(
|
||||
Message.builder().uri(uri).type(ty).code(Code.DELETE).payload(p).build()
|
||||
);
|
||||
}
|
||||
|
||||
public ClientObserveStream observe(Message message) {
|
||||
return new ClientObserveStream(this, message);
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> send(Message message) {
|
||||
if (message.addr().isEmpty()) {
|
||||
throw new IllegalArgumentException(
|
||||
@ -38,10 +101,16 @@ public final class Client implements AutoCloseable {
|
||||
return Async
|
||||
.pollCompletable(() -> this.toad.sendMessage(message))
|
||||
.thenCompose((Toad.IdAndToken sent) ->
|
||||
Async.pollCompletable(() ->
|
||||
this.toad.pollResp(sent.token, message.addr().get())
|
||||
)
|
||||
)
|
||||
this.awaitResponse(sent.token, message.addr().get())
|
||||
);
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> awaitResponse(
|
||||
Token t,
|
||||
InetSocketAddress addr
|
||||
) {
|
||||
return Async
|
||||
.pollCompletable(() -> this.toad.pollResp(t, addr))
|
||||
.thenApply(msg -> msg.toOwned());
|
||||
}
|
||||
|
||||
|
71
src/main/java/dev.toad/ClientObserveStream.java
Normal file
71
src/main/java/dev.toad/ClientObserveStream.java
Normal file
@ -0,0 +1,71 @@
|
||||
package dev.toad;
|
||||
|
||||
import dev.toad.msg.Message;
|
||||
import dev.toad.msg.option.Observe;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class ClientObserveStream {
|
||||
|
||||
State state;
|
||||
Optional<CompletableFuture<Message>> buffered;
|
||||
final Client client;
|
||||
final Message message;
|
||||
|
||||
public ClientObserveStream(Client client, Message message) {
|
||||
this.state = State.OPEN;
|
||||
this.client = client;
|
||||
this.message = message.modify().option(Observe.REGISTER).build();
|
||||
this.buffered = Optional.of(client.send(this.message));
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> close() {
|
||||
return this.client.send(
|
||||
this.message.modify().option(Observe.DEREGISTER).unsetId().build()
|
||||
)
|
||||
.thenAccept(m -> {
|
||||
this.state = State.CLOSED;
|
||||
});
|
||||
}
|
||||
|
||||
public CompletableFuture<Message> next() {
|
||||
if (this.state == State.CLOSED) {
|
||||
throw new RuntimeException(
|
||||
"ClientObserveStream.next() invoked after .close()"
|
||||
);
|
||||
} else if (this.buffered.isEmpty()) {
|
||||
var buffered = this.buffered.get();
|
||||
this.buffered = Optional.empty();
|
||||
return buffered;
|
||||
} else {
|
||||
return this.client.awaitResponse(
|
||||
this.message.token(),
|
||||
this.message.addr().get()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class State {
|
||||
|
||||
public static final State OPEN = new State(0);
|
||||
public static final State CLOSED = new State(1);
|
||||
|
||||
final int state;
|
||||
|
||||
State(int state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
public boolean equals(State other) {
|
||||
return this.state == other.state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
return switch (other) {
|
||||
case State s -> this.equals(s);
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
@ -32,6 +32,10 @@ public interface Message {
|
||||
|
||||
public byte[] toBytes();
|
||||
|
||||
public default dev.toad.msg.build.Message modify() {
|
||||
return dev.toad.msg.build.Message.from(this);
|
||||
}
|
||||
|
||||
public default Optional<Option> getOption(long number) {
|
||||
return this.options().stream().filter(o -> o.number() == number).findAny();
|
||||
}
|
||||
@ -55,4 +59,15 @@ public interface Message {
|
||||
public default Optional<Query> getQuery() {
|
||||
return this.getOption(Query.number).map(o -> new Query(o));
|
||||
}
|
||||
|
||||
public default boolean equals(Message o) {
|
||||
return (
|
||||
this.addr().equals(o.addr()) &&
|
||||
this.options().equals(o.options()) &&
|
||||
this.id().equals(o.id()) &&
|
||||
this.token().equals(o.token()) &&
|
||||
this.type().equals(o.type()) &&
|
||||
this.payload().equals(o.payload())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -6,4 +6,8 @@ public interface Option {
|
||||
public long number();
|
||||
|
||||
public List<OptionValue> values();
|
||||
|
||||
public default boolean equals(Option o) {
|
||||
return this.number() == o.number() && this.values().equals(o.values());
|
||||
}
|
||||
}
|
||||
|
@ -6,4 +6,8 @@ public interface OptionValue {
|
||||
public String asString();
|
||||
|
||||
public dev.toad.msg.owned.OptionValue toOwned();
|
||||
|
||||
public default boolean equals(OptionValue o) {
|
||||
return this.asBytes().equals(o.asBytes());
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BinaryOperator;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -34,6 +35,40 @@ public final class Message
|
||||
|
||||
Message() {}
|
||||
|
||||
public static Message from(dev.toad.msg.Message other) {
|
||||
var builder = new Message();
|
||||
|
||||
Function<dev.toad.msg.Option, Long> key = o -> o.number();
|
||||
Function<dev.toad.msg.Option, ArrayList<dev.toad.msg.owned.OptionValue>> value =
|
||||
o ->
|
||||
o
|
||||
.values()
|
||||
.stream()
|
||||
.map(v -> v.toOwned())
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
BinaryOperator<ArrayList<dev.toad.msg.owned.OptionValue>> merge = (
|
||||
a,
|
||||
b
|
||||
) -> {
|
||||
a.addAll(b);
|
||||
return a;
|
||||
};
|
||||
|
||||
builder.options =
|
||||
other
|
||||
.options()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(key, value, merge, () -> new HashMap<>()));
|
||||
builder.id = Optional.of(other.id());
|
||||
builder.code = Optional.of(other.code());
|
||||
builder.token = Optional.of(other.token());
|
||||
builder.type = Optional.of(other.type());
|
||||
builder.payload = Optional.of(other.payload());
|
||||
builder.addr = other.addr();
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static MessageNeeds.Destination builder() {
|
||||
return new Message();
|
||||
}
|
||||
@ -72,11 +107,26 @@ public final class Message
|
||||
return this;
|
||||
}
|
||||
|
||||
public Message unsetId() {
|
||||
this.id = Optional.empty();
|
||||
return this;
|
||||
}
|
||||
|
||||
public Message token(Token token) {
|
||||
this.token = Optional.of(token);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Message unsetToken() {
|
||||
this.token = Optional.empty();
|
||||
return this;
|
||||
}
|
||||
|
||||
public Message unsetOption(long num) {
|
||||
this.options.remove(num);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Message payload(Payload payload) {
|
||||
this.payload = Optional.of(payload);
|
||||
return this.option(payload.contentFormat().get());
|
||||
@ -90,17 +140,26 @@ public final class Message
|
||||
}
|
||||
|
||||
public Message option(dev.toad.msg.Option opt) {
|
||||
return this.option(opt.number(), opt.values());
|
||||
return this.putOption(opt.number(), opt.values());
|
||||
}
|
||||
|
||||
public Message putOption(long number, List<dev.toad.msg.OptionValue> values) {
|
||||
this.options.put(
|
||||
number,
|
||||
values
|
||||
.stream()
|
||||
.map(v -> v.toOwned())
|
||||
.collect(Collectors.toCollection(ArrayList::new))
|
||||
);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Message option(long number, List<dev.toad.msg.OptionValue> values) {
|
||||
if (this.options.get(number) == null) {
|
||||
this.options.put(number, new ArrayList<>());
|
||||
}
|
||||
|
||||
this.options.get(number)
|
||||
.addAll(values.stream().map(v -> v.toOwned()).toList());
|
||||
return this;
|
||||
var vals = Optional
|
||||
.ofNullable(this.options.get(number))
|
||||
.orElse(new ArrayList<>());
|
||||
vals.addAll(values.stream().map(v -> v.toOwned()).toList());
|
||||
return this.putOption(number, List.copyOf(vals));
|
||||
}
|
||||
|
||||
public dev.toad.msg.Message build() {
|
||||
|
76
src/main/java/dev.toad/msg/option/Observe.java
Normal file
76
src/main/java/dev.toad/msg/option/Observe.java
Normal file
@ -0,0 +1,76 @@
|
||||
package dev.toad.msg.option;
|
||||
|
||||
import dev.toad.msg.Option;
|
||||
import dev.toad.msg.OptionValue;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class Observe implements Option {
|
||||
|
||||
final boolean register;
|
||||
|
||||
public static final Observe REGISTER = new Observe(true);
|
||||
public static final Observe DEREGISTER = new Observe(false);
|
||||
|
||||
public static final long number = 6;
|
||||
|
||||
Observe(boolean register) {
|
||||
this.register = register;
|
||||
}
|
||||
|
||||
public Observe(Option o) {
|
||||
if (o.number() != Observe.number) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("%d != Observe number %d", o.number(), Path.number)
|
||||
);
|
||||
}
|
||||
|
||||
if (o.values().size() > 1) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"Observe is not repeatable, %s",
|
||||
o
|
||||
.values()
|
||||
.stream()
|
||||
.map(v -> v.asString())
|
||||
.collect(Collectors.toList())
|
||||
)
|
||||
);
|
||||
} else if (o.values().size() == 0) {
|
||||
this.register = false;
|
||||
} else {
|
||||
this.register = o.values().get(0).asBytes()[0] == 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long number() {
|
||||
return Host.number;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.register ? "Observe.REGISTER" : "Observe.DEREGISTER";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
return switch (other) {
|
||||
case Host h -> this.toString().equals(h.toString());
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OptionValue> values() {
|
||||
var list = new ArrayList<OptionValue>();
|
||||
list.add(
|
||||
new dev.toad.msg.owned.OptionValue(
|
||||
new byte[] { this.register ? (byte) 1 : (byte) 0 }
|
||||
)
|
||||
);
|
||||
return list;
|
||||
}
|
||||
}
|
@ -85,4 +85,11 @@ public class Message implements dev.toad.msg.Message {
|
||||
public Payload payload() {
|
||||
return this.payload;
|
||||
}
|
||||
|
||||
public boolean equals(Object other) {
|
||||
return switch (other) {
|
||||
case dev.toad.msg.Message m -> m.equals(this);
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -35,4 +35,11 @@ public class Option implements dev.toad.msg.Option {
|
||||
public List<dev.toad.msg.OptionValue> values() {
|
||||
return List.copyOf(this.values);
|
||||
}
|
||||
|
||||
public boolean equals(Object other) {
|
||||
return switch (other) {
|
||||
case dev.toad.msg.Option o -> o.equals(this);
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -29,4 +29,11 @@ public class OptionValue implements dev.toad.msg.OptionValue {
|
||||
public dev.toad.msg.owned.OptionValue toOwned() {
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean equals(Object other) {
|
||||
return switch (other) {
|
||||
case dev.toad.msg.OptionValue o -> o.equals(this);
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -46,6 +46,13 @@ public final class Message implements dev.toad.msg.Message, AutoCloseable {
|
||||
return Arrays.asList(this.optionRefs());
|
||||
}
|
||||
|
||||
public boolean equals(Object other) {
|
||||
return switch (other) {
|
||||
case dev.toad.msg.Message m -> m.equals(this);
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.ptr.release();
|
||||
|
@ -29,6 +29,13 @@ public class Option implements dev.toad.msg.Option, AutoCloseable {
|
||||
return new dev.toad.msg.owned.Option(this);
|
||||
}
|
||||
|
||||
public boolean equals(Object other) {
|
||||
return switch (other) {
|
||||
case dev.toad.msg.Option o -> o.equals(this);
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.ptr.release();
|
||||
|
@ -21,6 +21,13 @@ public final class OptionValue
|
||||
return new dev.toad.msg.owned.OptionValue(this);
|
||||
}
|
||||
|
||||
public boolean equals(Object other) {
|
||||
return switch (other) {
|
||||
case dev.toad.msg.OptionValue o -> o.equals(this);
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.ptr.release();
|
||||
|
@ -6,6 +6,18 @@ import dev.toad.msg.option.Path
|
||||
import dev.toad.msg.option.Query
|
||||
|
||||
class MessageBuilder extends munit.FunSuite {
|
||||
test("from(Message) copies another message") {
|
||||
val msg = dev.toad.msg.build.Message
|
||||
.builder()
|
||||
.uri("coap://localhost")
|
||||
.`type`(Type.NON)
|
||||
.code(Code.GET)
|
||||
.payload(Payload.json("[\"fart\"]"))
|
||||
.build
|
||||
|
||||
assertEquals(msg, dev.toad.msg.build.Message.from(msg).build)
|
||||
}
|
||||
|
||||
test("payload(Payload) sets content format to ContentFormat.JSON") {
|
||||
val msg = dev.toad.msg.build.Message
|
||||
.builder()
|
||||
|
Loading…
Reference in New Issue
Block a user