feat: add postgres integration, submodule, start repo pattern

This commit is contained in:
Orion Kindel 2023-07-06 22:28:58 -05:00
commit b3be701339
Signed by untrusted user who does not match committer: orion
GPG Key ID: 6D4165AE4C928719
12 changed files with 1951 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "postgres"]
path = postgres
url = https://github.com/sfackler/rust-postgres

1293
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

16
Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "api"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
toad = "1.0.0-beta.2"
toad-msg = "0.18.1"
simple_logger = "4.2"
nb = "1.1.0"
serde = {version = "1", features = ["derive"]}
serde_json = "1"
log = "0.4"
postgres = {path = "./postgres/postgres"}
rand = "0.8"

1
postgres Submodule

@ -0,0 +1 @@
Subproject commit 790af54a0fdd5c487e77dc9a25d82921ee31ffe6

24
rustfmt.toml Normal file
View File

@ -0,0 +1,24 @@
# General
max_width = 100
newline_style = "Unix"
tab_spaces = 2
indent_style = "Visual"
format_code_in_doc_comments = true
format_macro_bodies = true
# Match statements
match_arm_leading_pipes = "Always"
match_block_trailing_comma = true
# Structs
use_field_init_shorthand = true
struct_field_align_threshold = 0
# Enums
enum_discrim_align_threshold = 0
# Imports
group_imports = "StdExternalCrate"
imports_granularity = "Module"
imports_indent = "Visual"
imports_layout = "HorizontalVertical"

13
src/app.rs Normal file
View File

@ -0,0 +1,13 @@
use crate::hashed_text::{HashedTextExt, HashedTextExtImpl};
use crate::postgres::PostgresImpl;
use crate::user::UserRepo;
trait App: Send + Sync + Sized {
type HashedTextExt: HashedTextExt;
type UserRepo: UserRepo;
}
pub struct AppConcrete {
pg: PostgresImpl<postgres::Client>,
hashed_text_ext: HashedTextExtImpl<PostgresImpl<postgres::Client>>,
}

70
src/hashed_text.rs Normal file
View File

@ -0,0 +1,70 @@
use postgres::{GenericClient, GenericRow};
use crate::postgres::{DbError, Postgres};
use crate::repo::Ext;
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct HashedText(String);
pub trait HashedTextExt: Ext {
fn matches<S: AsRef<str>>(&self, this: &HashedText, other: S) -> Result<bool, Self::Error>;
}
pub struct HashedTextExtImpl<Db: Postgres>(&'static Db);
impl<Db> Ext for HashedTextExtImpl<Db> where Db: Postgres
{
type Error = DbError<Db>;
}
impl<Db> HashedTextExt for HashedTextExtImpl<Db> where Db: Postgres
{
fn matches<S: AsRef<str>>(&self, this: &HashedText, other: S) -> Result<bool, DbError<Db>> {
static QUERY: &'static str =
"select public.hashed_text_matches($1, public.hashed_text_of_string($2))";
self.0
.with_client(|client| client.query_one(QUERY, &[&other.as_ref(), &this.0.as_str()]))
.and_then(|row| row.try_get(0))
}
}
#[cfg(test)]
mod test {
use std::{any::Any, panic::AssertUnwindSafe};
use postgres::types::{Type, ToSql, private::BytesMut, FromSql};
use crate::{postgres::{test::{Client, Row}, PostgresImpl, Postgres}};
use super::{HashedTextExtImpl, HashedTextExt, HashedText};
#[test]
fn hashed_text_matches_fn_call() {
let client = || Client {
query_one: Box::new(|_, q, ps| {
assert_eq!(q.unwrap_str(), "select public.hashed_text_matches($1, public.hashed_text_of_string($2))");
let mut p0 = BytesMut::with_capacity(32);
let mut p1 = BytesMut::with_capacity(32);
ps[0].to_sql(&Type::TEXT, &mut p0).unwrap();
ps[1].to_sql(&Type::TEXT, &mut p1).unwrap();
let p0 = <&str as FromSql>::from_sql(&Type::TEXT, &p0).unwrap();
let p1 = <&str as FromSql>::from_sql(&Type::TEXT, &p1).unwrap();
assert_eq!(p1, "XXX");
Ok(Row::new(vec![("", Type::BOOL)]).value(Type::BOOL, p0 == "foo"))
}),
..Client::default()
};
let pg = PostgresImpl::<Client<()>>::try_new(|| Ok(client()), 1).unwrap();
let htext = HashedTextExtImpl(unsafe {std::mem::transmute::<_, &'static PostgresImpl<Client<()>>>(&pg)});
assert!(htext.matches(&HashedText(String::from("XXX")), "foo").unwrap());
assert!(!htext.matches(&HashedText(String::from("XXX")), "foob").unwrap());
}
}

73
src/main.rs Normal file
View File

@ -0,0 +1,73 @@
use hashed_text::HashedTextExt;
use toad::config::Config;
use toad::net::Addrd;
use toad::platform::Platform as _;
use toad::req::Req;
use toad::resp::Resp;
use toad::std::dtls;
mod app;
mod hashed_text;
mod postgres;
mod repo;
mod user;
use repo::Repo;
use user::UserRepo;
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct Email(String);
type Dtls = toad::std::dtls::N;
type ToadT = toad::std::PlatformTypes<Dtls>;
type Toad = toad::std::Platform<Dtls, toad::step::runtime::std::Runtime<Dtls>>;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, serde::Serialize, serde::Deserialize)]
struct Foo {
id: String,
}
fn handle_request(req: Addrd<Req<ToadT>>) -> Result<Addrd<Resp<ToadT>>, String> {
let path = req.data()
.path()
.map_err(|e| format!("{e:?}"))
.unwrap_or(None)
.unwrap_or("");
let mut path_segments = path.split("/").peekable();
if path_segments.peek() == Some(&"users") {
let mut path_segments = path_segments.clone();
path_segments.next();
let id = path_segments.next();
}
Ok(req.map(|r| Resp::for_request(&r).unwrap()))
}
fn server_worker(p: &'static Toad) {
loop {
match nb::block!(p.poll_req()) {
| Err(e) => log::error!("{e:?}"),
| Ok(req) => match handle_request(req) {
| Err(e) => log::error!("{e:?}"),
| Ok(rep) => {
nb::block!(p.send_msg(rep.clone().map(Into::into))).map_err(|e| log::error!("{e:?}"))
.ok();
},
},
}
}
}
fn main() {
simple_logger::init().unwrap();
static mut TOAD: Option<Toad> = None;
unsafe {
TOAD = Some(Toad::try_new("0.0.0.0:4444", Config::default()).unwrap());
}
let r = unsafe { TOAD.as_ref().unwrap() };
server_worker(r);
}

410
src/postgres.rs Normal file
View File

@ -0,0 +1,410 @@
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::{Mutex, MutexGuard, TryLockError};
use rand::{Rng, SeedableRng};
pub type DbError<Pg> = <<Pg as Postgres>::Client as postgres::GenericClient>::Error;
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConnectionParams {
pub host: String,
pub port: usize,
pub user: String,
pub pass: String,
}
pub trait Postgres
where Self: Send + Sync + Sized + 'static
{
type Client: postgres::GenericClient;
fn try_new<F>(connect: F, pool_size: usize) -> Result<Self, DbError<Self>>
where F: Fn() -> Result<Self::Client, DbError<Self>>;
fn with_client<F, R>(&self, f: F) -> Result<R, DbError<Self>>
where F: FnOnce(&mut Self::Client) -> Result<R, DbError<Self>>;
}
pub struct PostgresImpl<C> {
size: usize,
pool: Pin<Box<Vec<Mutex<C>>>>,
}
impl<C> PostgresImpl<C> {
fn unused_lock(&self) -> Option<MutexGuard<C>> {
self.pool.iter().find_map(|m| m.try_lock().ok())
}
fn block_for_next(&self) -> MutexGuard<C> {
let ix = rand::thread_rng().gen_range(0..self.size);
self.pool[ix].lock().unwrap()
}
}
impl<C> Postgres for PostgresImpl<C>
where Self: Send + Sync,
C: 'static + postgres::GenericClient
{
type Client = C;
fn try_new<F>(connect: F, pool_size: usize) -> Result<Self, C::Error>
where F: Fn() -> Result<C, C::Error>
{
let mut pool = Vec::new();
if pool_size == 0 {
panic!("pool size must be > 0");
}
(0..pool_size).try_for_each(|_| {
// let c = postgres::Client::connect(&format!("user={user} password='{pass}' host={host} port={port}"), postgres::NoTls)?;
let c = connect()?;
pool.push(Mutex::new(c));
Ok(())
})?;
Ok(PostgresImpl { size: pool_size,
pool: Box::pin(pool) })
}
fn with_client<F, R>(&self, f: F) -> Result<R, C::Error>
where F: FnOnce(&mut Self::Client) -> Result<R, C::Error>
{
match self.unused_lock() {
| Some(mut lock) => f(lock.deref_mut()),
| None => f(self.block_for_next().deref_mut()),
}
}
}
#[cfg(test)]
pub mod test {
use std::marker::PhantomData;
pub mod client_function {
use postgres::StatementOrString;
use super::Row;
pub type Params<'a> = &'a [&'a (dyn postgres::types::ToSql + Sync)];
pub trait Execute<C, E>:
Fn(&mut C, StatementOrString<'_>, Params<'_>) -> Result<u64, E>
{
}
pub trait BatchExecute<C, E>: Fn(&mut C, &str) -> Result<(), E> {}
pub trait Query<C, E>:
Fn(&mut C, StatementOrString<'_>, Params<'_>) -> Result<Vec<Row<E>>, E>
{
}
pub trait QueryOne<C, E>:
Fn(&mut C, StatementOrString<'_>, Params<'_>) -> Result<Row<E>, E>
{
}
pub trait QueryOpt<C, E>:
Fn(&mut C, StatementOrString<'_>, Params<'_>) -> Result<Option<Row<E>>, E>
{
}
pub trait SimpleQuery<C, E>:
Fn(&mut C, &str) -> Result<Vec<postgres::SimpleQueryMessage>, E>
{
}
pub trait CommitOrRollback<C, E>: Fn(C) -> Result<(), E> {}
pub trait Transaction<C, E>: Fn(&C) -> Result<C, E> {}
impl<T, C, E> Execute<C, E> for T
where T: Fn(&mut C, StatementOrString<'_>, Params<'_>) -> Result<u64, E>
{
}
impl<T, C, E> BatchExecute<C, E> for T where T: Fn(&mut C, &str) -> Result<(), E> {}
impl<T, C, E> Query<C, E> for T
where T: Fn(&mut C, StatementOrString<'_>, Params<'_>) -> Result<Vec<Row<E>>, E>
{
}
impl<T, C, E> QueryOne<C, E> for T
where T: Fn(&mut C, StatementOrString<'_>, Params<'_>) -> Result<Row<E>, E>
{
}
impl<T, C, E> QueryOpt<C, E> for T
where T: Fn(&mut C, StatementOrString<'_>, Params<'_>) -> Result<Option<Row<E>>, E>
{
}
impl<T, C, E> SimpleQuery<C, E> for T
where T: Fn(&mut C, &str) -> Result<Vec<postgres::SimpleQueryMessage>, E>
{
}
impl<T, C, E> CommitOrRollback<C, E> for T where T: Fn(C) -> Result<(), E> {}
impl<T, C, E> Transaction<C, E> for T where T: Fn(&C) -> Result<C, E> {}
}
use client_function::{BatchExecute,
CommitOrRollback,
Execute,
Query,
QueryOne,
QueryOpt,
SimpleQuery,
Transaction};
use postgres::types::private::BytesMut;
use postgres::types::{FromSql, ToSql, Type};
use postgres::{Column, GenericRow, GenericClient};
use super::{Postgres, PostgresImpl};
pub struct Row<E> {
pub columns: Vec<Column>,
pub values: Vec<BytesMut>,
__phantom: PhantomData<E>,
}
impl<E> Row<E> {
pub fn new(cols: Vec<(&'static str, Type)>) -> Self {
Self {
columns: cols.into_iter().map(|(name, ty)| Column::new(name.to_string(), ty)).collect(),
values: vec![],
__phantom: PhantomData,
}
}
pub fn value<V>(mut self, ty: Type, val: V) -> Self where V: ToSql {
let mut bs = BytesMut::with_capacity(128);
val.to_sql(&ty, &mut bs).unwrap();
self.values.push(bs);
self
}
}
impl<E> GenericRow for Row<E> where E: core::fmt::Debug {
type Error = E;
fn columns(&self) -> &[Column] {
&self.columns
}
fn len(&self) -> usize {
self.values.len()
}
fn get<'a, I, T>(&'a self, idx: I) -> T
where I: postgres::row::RowIndex + core::fmt::Display,
T: FromSql<'a>
{
let ix = idx.__idx(self.columns()).unwrap();
let ty = self.columns()[ix].type_();
T::from_sql(ty, &self.values[ix]).unwrap()
}
fn try_get<'a, I, T>(&'a self, idx: I) -> Result<T, E>
where I: postgres::row::RowIndex + core::fmt::Display,
T: FromSql<'a>
{
let ix = idx.__idx(self.columns()).unwrap();
let ty = self.columns()[ix].type_();
Ok(T::from_sql(ty, &self.values[ix]).unwrap())
}
}
#[non_exhaustive]
pub struct Client<E> {
pub __phantom: PhantomData<E>,
pub execute: Box<dyn Execute<Self, E>>,
pub query: Box<dyn Query<Self, E>>,
pub query_one: Box<dyn QueryOne<Self, E>>,
pub query_opt: Box<dyn QueryOpt<Self, E>>,
pub simple_query: Box<dyn SimpleQuery<Self, E>>,
pub batch_execute: Box<dyn BatchExecute<Self, E>>,
pub commit: Box<dyn CommitOrRollback<Self, E>>,
pub rollback: Box<dyn CommitOrRollback<Self, E>>,
pub transaction: Box<dyn Transaction<Self, E>>,
}
unsafe impl<E> Send for Client<E> {}
unsafe impl<E> Sync for Client<E> {}
impl<E> Default for Client<E> where E: core::fmt::Debug
{
fn default() -> Self {
Client { __phantom: PhantomData,
execute: Box::new(|_, _, _| panic!("execute not implemented")),
query: Box::new(|_, _, _| panic!("query not implemented")),
query_one: Box::new(|_, _, _| panic!("query_one not implemented")),
query_opt: Box::new(|_, _, _| panic!("query_opt not implemented")),
simple_query: Box::new(|_, _| panic!("simple_query not implemented")),
batch_execute: Box::new(|_, _| panic!("batch_execute not implemented")),
commit: Box::new(|_| panic!("commit not implemented")),
rollback: Box::new(|_| panic!("rollback not implemented")),
transaction: Box::new(|_| panic!("transaction not implemented")) }
}
}
macro_rules! common {
() => {
fn execute<T>(&mut self,
query: &T,
params: &[&(dyn postgres::types::ToSql + Sync)])
-> Result<u64, Self::Error>
where T: ?Sized + postgres::ToStatement
{
let mut f = Client::<E>::default().execute;
std::mem::swap(&mut self.execute, &mut f);
let res = f(self, query.__convert(), params);
std::mem::swap(&mut self.execute, &mut f);
res
}
fn query<T>(&mut self,
query: &T,
params: &[&(dyn postgres::types::ToSql + Sync)])
-> Result<Vec<Row<E>>, Self::Error>
where T: ?Sized + postgres::ToStatement
{
let mut f = Client::<E>::default().query;
std::mem::swap(&mut self.query, &mut f);
let res = f(self, query.__convert(), params);
std::mem::swap(&mut self.query, &mut f);
res
}
fn query_one<T>(&mut self,
query: &T,
params: &[&(dyn postgres::types::ToSql + Sync)])
-> Result<Row<E>, Self::Error>
where T: ?Sized + postgres::ToStatement
{
let mut f = Client::<E>::default().query_one;
std::mem::swap(&mut self.query_one, &mut f);
let res = f(self, query.__convert(), params);
std::mem::swap(&mut self.query_one, &mut f);
res
}
fn query_opt<T>(&mut self,
query: &T,
params: &[&(dyn postgres::types::ToSql + Sync)])
-> Result<Option<Row<E>>, Self::Error>
where T: ?Sized + postgres::ToStatement
{
let mut f = Client::<E>::default().query_opt;
std::mem::swap(&mut self.query_opt, &mut f);
let res = f(self, query.__convert(), params);
std::mem::swap(&mut self.query_opt, &mut f);
res
}
fn query_raw<T, P, I>(&mut self,
_: &T,
_: I)
-> Result<postgres::RowIter<'_>, Self::Error>
where T: ?Sized + postgres::ToStatement,
P: postgres::types::BorrowToSql,
I: IntoIterator<Item = P>,
I::IntoIter: ExactSizeIterator
{
panic!("query_raw cannot be mocked")
}
fn prepare(&mut self, _: &str) -> Result<postgres::Statement, Self::Error> {
panic!("prepared statements cannot be mocked")
}
fn prepare_typed(&mut self,
_: &str,
_: &[postgres::types::Type])
-> Result<postgres::Statement, Self::Error> {
panic!("prepared statements cannot be mocked")
}
fn copy_in<T>(&mut self, _: &T) -> Result<postgres::CopyInWriter<'_>, Self::Error>
where T: ?Sized + postgres::ToStatement
{
panic!("copy_in cannot be mocked")
}
fn copy_out<T>(&mut self, _: &T) -> Result<postgres::CopyOutReader<'_>, Self::Error>
where T: ?Sized + postgres::ToStatement
{
panic!("copy_out cannot be mocked")
}
fn simple_query(&mut self,
query: &str)
-> Result<Vec<postgres::SimpleQueryMessage>, Self::Error> {
let mut f = Client::<E>::default().simple_query;
std::mem::swap(&mut self.simple_query, &mut f);
let res = f(self, query);
std::mem::swap(&mut self.simple_query, &mut f);
res
}
fn batch_execute(&mut self, query: &str) -> Result<(), Self::Error> {
let mut f = Client::<E>::default().batch_execute;
std::mem::swap(&mut self.batch_execute, &mut f);
let res = f(self, query);
std::mem::swap(&mut self.batch_execute, &mut f);
res
}
fn transaction(&mut self) -> Result<Client<E>, Self::Error> {
let mut f = Client::<E>::default().transaction;
std::mem::swap(&mut self.transaction, &mut f);
let res = f(self);
std::mem::swap(&mut self.transaction, &mut f);
res
}
};
}
impl<'a, E> postgres::GenericTransaction<'a> for Client<E> where E: core::fmt::Debug
{
type Error = E;
type NestedTransaction<'b> = Client<E> where Client<E>: 'b;
type Row = Row<E>;
common!();
fn commit(mut self) -> Result<(), E> {
let mut f = Client::<E>::default().commit;
std::mem::swap(&mut self.commit, &mut f);
let res = f(self);
res
}
fn rollback(mut self) -> Result<(), E> {
let mut f = Client::<E>::default().rollback;
std::mem::swap(&mut self.commit, &mut f);
let res = f(self);
res
}
}
impl<E> postgres::GenericClient for Client<E> where E: core::fmt::Debug
{
type Error = E;
type Transaction<'a> = Client<E> where Client<E>: 'a;
type Row = Row<E>;
common!();
}
#[test]
fn postgres_impl_pooling() {
let client = || Client { query_one: Box::new(|_, _, _| {
Ok(Row::new(vec![("foo", Type::TEXT)]).value(Type::TEXT, "bar"))
}),
..Client::default() };
let pg = PostgresImpl::<Client<()>>::try_new(|| Ok(client()), 2).unwrap();
// try_lock locks the first available and returns None when all are locked
let a = pg.unused_lock().expect("there are unlocked clients");
let _b = pg.unused_lock().expect("there are unlocked clients");
assert!(pg.unused_lock().is_none());
drop(a);
assert!(pg.unused_lock().is_some());
// with_client does not block when there are available clients
let row = pg.with_client(|c| c.query_one("", &[])).unwrap();
assert_eq!(row.get::<_, String>("foo"), String::from("bar"));
}
}

22
src/repo.rs Normal file
View File

@ -0,0 +1,22 @@
pub trait Repo: Send + Sync {
type T;
type TPut;
type Error: core::fmt::Debug;
type Id: AsRef<str>;
fn get(&self, id: Self::Id) -> Result<Option<Self::T>, Self::Error>;
fn get_all(&self) -> Result<Vec<Self::T>, Self::Error>;
fn put(&self, id: Self::Id, state: Self::TPut) -> Result<(), Self::Error>;
fn del(&self, id: Self::Id) -> Result<(), Self::Error>;
}
/// An entity that has some operations which rely on
/// an external service
pub trait Ext: Send + Sync {
type Error: core::fmt::Debug;
/// Preparing statements would go here
fn init(&self) -> Result<(), Self::Error> {
Ok(())
}
}

25
src/user.rs Normal file
View File

@ -0,0 +1,25 @@
use crate::repo::Repo;
use crate::{Email, hashed_text::HashedText};
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct UserId(String);
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct UserTag(String);
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct User {
uid: UserId,
tag: UserTag,
password: HashedText,
email: Email,
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct UserPut {
tag: UserTag,
password: HashedText,
email: Email,
}
pub trait UserRepo: Repo<T = User, TPut = UserPut, Id = UserId> {}