diff --git a/backend/public/src/main.rs b/backend/public/src/main.rs index c516d01..68d407b 100644 --- a/backend/public/src/main.rs +++ b/backend/public/src/main.rs @@ -1,11 +1,13 @@ -use axum::{http::Method, Router}; +use axum::Router; use config::config; -use sqlx::{postgres::PgPoolOptions, PgPool}; +use fred::prelude::*; +use sqlx::postgres::PgPoolOptions; use std::fs::File; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; use tokio::signal; +use tokio::sync::Mutex; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tower_http::{ cors::{Any, CorsLayer}, @@ -84,6 +86,11 @@ async fn main() { // grabbing the database url from our env variables let db_connection_str = std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://postgres:password@localhost".to_string()); + let redis_url = match std::env::var("REDIS_URL").unwrap().as_str() { + // TODO: fix the unwrap ^ + "" => "redis://localhost:6379".to_string(), + x => x.to_string(), + }; // set up connection pool let pool = PgPoolOptions::new() @@ -93,7 +100,24 @@ async fn main() { .await .expect("Failed to connect to database"); - let app_state = AppState { db: pool.clone() }; + let pool_size = 8; + let config = Config::from_url(&redis_url).unwrap(); // TODO: fix the unwrap <<< + + let redis_pool = Builder::from_config(config) + .with_performance_config(|config| { + config.default_command_timeout = Duration::from_secs(60); + }) + .set_policy(ReconnectPolicy::new_exponential(0, 100, 30_000, 2)) + .build_pool(pool_size) + .expect("Failed to create cache pool"); + + if std::env::var("REDIS_URL").unwrap() != "" { + // TODO: fix the unwrap ^ + redis_pool.init().await.expect("Failed to connect to cache"); + let _ = redis_pool.flushall::(false).await; + } + + let app_state = Arc::new(Mutex::new(state::AppInternalState::new(pool, redis_pool))); // build our application with some routes let app = Router::new() diff --git a/backend/public/src/routes/authors.rs b/backend/public/src/routes/authors.rs index 903211d..96dd1c1 100644 --- a/backend/public/src/routes/authors.rs +++ b/backend/public/src/routes/authors.rs @@ -5,14 +5,14 @@ use axum::{ routing::get, Json, }; +use fred::types::Expiration; use serde::{Deserialize, Serialize}; -use sqlx::{Pool, Postgres}; -use crate::{datasources::authors::AuthorsDatasource, AppState}; +use crate::{datasources::authors::AuthorsDatasource, state::AppState}; use super::comments::Pagination; -#[derive(Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct Author { pub author_id: i32, pub first_name: String, @@ -21,7 +21,7 @@ pub struct Author { pub image: Option, } -#[derive(Deserialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct AuthorGetOneParams { pub id: i32, } @@ -33,34 +33,104 @@ impl AuthorsRoute { .route("/", get(AuthorsRoute::get_all)) .route("/:id", get(AuthorsRoute::get_one)) .route("/:id/posts", get(AuthorsRoute::get_authors_posts)) - .with_state(app_state.db.clone()) + .with_state(app_state.clone()) } async fn get_all( - State(pool): State>, + State(app_state): State, Json(pagination): Json, ) -> impl IntoResponse { - match AuthorsDatasource::get_all(&pool, pagination).await { - Ok(a) => Ok(Json(a)), + let mut state = app_state.lock().await; + let cached: Option> = state + .cache + .get(String::from("authors:all")) + .await + .unwrap_or(None); + + if let Some(authors) = cached { + tracing::info!("grabbing all authors from cache"); + return Ok(Json(authors)); + } + + match AuthorsDatasource::get_all(&state.database, pagination).await { + Ok(authors) => { + tracing::info!("grabbing all authors from the database"); + if let a = &authors { + let author_cloned = a.clone(); + let state = app_state.clone(); + + tracing::info!("storing database data in cache"); + tokio::spawn(async move { + let mut s = state.lock().await; + let _ = s + .cache + .set( + String::from("authors:all"), + &author_cloned, + Some(Expiration::EX(5)), + None, + false, + ) + .await; + }); + } + Ok(Json(authors)) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } async fn get_one( - State(pool): State>, + State(app_state): State, Path(params): Path, ) -> impl IntoResponse { - match AuthorsDatasource::get_one(&pool, params.id).await { - Ok(a) => Ok(Json(a)), + let mut state = app_state.lock().await; + let cached: Option = state + .cache + .get(format!("authors:{}", params.id)) + .await + .unwrap_or(None); + + if let Some(author) = cached { + tracing::info!("grabbing one author from cache"); + return Ok(Json(author)); + } + + match AuthorsDatasource::get_one(&state.database, params.id).await { + Ok(author) => { + tracing::info!("grabbing all authors from the database"); + if let a = &author { + let author_cloned = a.clone(); + let state = app_state.clone(); + + tracing::info!("storing database data in cache"); + tokio::spawn(async move { + let mut s = state.lock().await; + let _ = s + .cache + .set( + format!("authors:{}", author_cloned.author_id), + &author_cloned, + Some(Expiration::EX(5)), + None, + false, + ) + .await; + }); + } + Ok(Json(author)) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } async fn get_authors_posts( - State(pool): State>, + State(app_state): State, Path(params): Path, ) -> impl IntoResponse { - match AuthorsDatasource::get_authors_posts(&pool, params.id).await { + let state = app_state.lock().await; + + match AuthorsDatasource::get_authors_posts(&state.database, params.id).await { Ok(p) => Ok(Json(p)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } diff --git a/backend/public/src/routes/comments.rs b/backend/public/src/routes/comments.rs index 53b26df..4a71457 100644 --- a/backend/public/src/routes/comments.rs +++ b/backend/public/src/routes/comments.rs @@ -1,5 +1,5 @@ -use super::posts::serialize_datetime; -use crate::{datasources::comments::CommentsDatasource, AppState}; +use super::posts::{deserialize_datetime, serialize_datetime}; +use crate::{datasources::comments::CommentsDatasource, state::AppState}; use axum::{ extract::{Path, State}, http::StatusCode, @@ -8,33 +8,34 @@ use axum::{ Json, }; use chrono::Utc; +use fred::types::{Expiration, SetOptions}; use serde::{Deserialize, Serialize}; -use sqlx::{Pool, Postgres}; -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Serialize, Debug)] pub struct CommentInputPayload { pub name: String, pub body: String, pub post_id: i32, } -#[derive(Deserialize)] +#[derive(Deserialize, Serialize)] pub struct CommentPathParams { id: i32, } -#[derive(Deserialize)] +#[derive(Deserialize, Serialize)] pub struct Pagination { pub page_number: i64, pub page_size: i64, } -#[derive(sqlx::FromRow, Serialize, Debug)] +#[derive(sqlx::FromRow, Deserialize, Serialize, Debug, Clone)] pub struct Comment { pub comment_id: i32, pub name: String, pub body: String, #[serde(serialize_with = "serialize_datetime")] + #[serde(deserialize_with = "deserialize_datetime")] pub created_at: Option>, } @@ -46,34 +47,61 @@ impl CommentsRoute { .route("/post/:id", get(CommentsRoute::get_post_comments)) .route("/add", post(CommentsRoute::insert_comment)) .route("/index", get(CommentsRoute::get_comments_index)) - .with_state(app_state.db.clone()) + .with_state(app_state.clone()) } async fn get_post_comments( - State(pool): State>, + State(app_state): State, Path(params): Path, ) -> impl IntoResponse { - match CommentsDatasource::get_posts_comments(&pool, params.id).await { + let state = app_state.lock().await; + + match CommentsDatasource::get_posts_comments(&state.database, params.id).await { Ok(c) => Ok(Json(c)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } - // + async fn insert_comment( - State(pool): State>, + State(app_state): State, Json(input): Json, ) -> impl IntoResponse { - match CommentsDatasource::insert_comment(&pool, input).await { - Ok(c) => Ok((StatusCode::CREATED, Json(c))), + let state = app_state.lock().await; + + match CommentsDatasource::insert_comment(&state.database, input).await { + Ok(c) => { + if let co = &c { + let co = c.clone(); + let state = app_state.clone(); + + tokio::spawn(async move { + tracing::info!("update cache if key already exists!"); + let mut s = state.lock().await; + let _ = s + .cache + .set( + format!("comments:{}", co.comment_id), + &co, + Some(Expiration::EX(60 * 15)), + Some(SetOptions::XX), + false, + ) + .await; + }); + } + Ok((StatusCode::CREATED, Json(c))) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } async fn get_comments_index( - State(pool): State>, + State(app_state): State, Json(pagination): Json, ) -> impl IntoResponse { - match CommentsDatasource::get_index_comments(&pool, pagination).await { + let state = app_state.lock().await; + + match CommentsDatasource::get_index_comments(&state.database, pagination).await { Ok(c) => Ok(Json(c)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } diff --git a/backend/public/src/routes/posts.rs b/backend/public/src/routes/posts.rs index f5678d6..5dbb94f 100644 --- a/backend/public/src/routes/posts.rs +++ b/backend/public/src/routes/posts.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use crate::utils::rss; -use crate::{datasources::posts::PostsDatasource, AppState}; +use crate::{datasources::posts::PostsDatasource, state::AppState}; use axum::http::{HeaderMap, HeaderValue}; use axum::{ extract::{Path, State}, @@ -11,10 +11,11 @@ use axum::{ Json, Router, }; use chrono::Utc; -use serde::{Deserialize, Serialize, Serializer}; -use sqlx::{Pool, Postgres}; +use fred::types::Expiration; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::fmt; -#[derive(sqlx::FromRow, Serialize, Debug, Clone)] +#[derive(sqlx::FromRow, Deserialize, Serialize, Debug, Clone)] pub struct Post { pub post_id: i32, pub author_id: Option, @@ -23,10 +24,11 @@ pub struct Post { pub title: String, pub body: String, #[serde(serialize_with = "serialize_datetime")] + #[serde(deserialize_with = "deserialize_datetime")] pub created_at: Option>, } -#[derive(sqlx::FromRow, Serialize, Debug)] +#[derive(sqlx::FromRow, Deserialize, Serialize, Debug, Clone)] pub struct PostFeaturedVariant { pub post_id: i32, pub author_id: Option, @@ -35,6 +37,7 @@ pub struct PostFeaturedVariant { pub title: String, pub body: String, #[serde(serialize_with = "serialize_datetime")] + #[serde(deserialize_with = "deserialize_datetime")] pub created_at: Option>, pub is_featured: Option, } @@ -56,63 +59,275 @@ impl PostsRoute { .route("/hot", get(PostsRoute::get_hot_posts)) .route("/featured", get(PostsRoute::get_featured_posts)) .route("/rss", get(PostsRoute::get_rss_posts)) - .with_state(app_state.db.clone()) + .with_state(app_state.clone()) } // get all posts - async fn get_all(State(pool): State>) -> impl IntoResponse { - match PostsDatasource::get_all(&pool).await { - Ok(posts) => Ok(Json(posts)), + async fn get_all(State(app_state): State) -> impl IntoResponse { + let mut state = app_state.lock().await; + let cached: Option> = state + .cache + .get(String::from("posts:all")) + .await + .unwrap_or(None); + + if let Some(posts) = cached { + tracing::info!("grabbing all posts from cache"); + return Ok(Json(posts)); + }; + + match PostsDatasource::get_all(&state.database).await { + Ok(posts) => { + tracing::info!("grabbing all posts from database"); + if let p = &posts { + let posts = p.clone(); + let state = app_state.clone(); + + tracing::info!("storing database data in cache"); + tokio::spawn(async move { + let mut s = state.lock().await; + let _ = s + .cache + .set( + String::from("posts:all"), + &posts, + Some(Expiration::EX(10)), + None, + false, + ) + .await; + }); + }; + + Ok(Json(posts)) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } // get one post async fn get_one( - State(pool): State>, + State(app_state): State, Path(params): Path, ) -> impl IntoResponse { - match PostsDatasource::get_one(&pool, params.id).await { - Ok(post) => Ok(Json(post)), + let mut state = app_state.lock().await; + let cached: Option = state + .cache + .get(format!("posts:{}", params.id)) + .await + .unwrap_or(None); + + if let Some(post) = cached { + tracing::info!("grabbing one post from cache"); + return Ok(Json(post)); + }; + + match PostsDatasource::get_one(&state.database, params.id).await { + Ok(post) => { + tracing::info!("grabbing one post from database"); + if let p = &post { + let post = p.clone(); + let state = app_state.clone(); + + tracing::info!("storing database data in cache"); + tokio::spawn(async move { + let mut s = state.lock().await; + let _ = s + .cache + .set( + format!("posts:{}", params.id), + &post, + Some(Expiration::EX(10)), + None, + false, + ) + .await; + }); + }; + + Ok(Json(post)) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } // get recent posts - async fn get_recent_posts(State(pool): State>) -> impl IntoResponse { - match PostsDatasource::get_recent(&pool).await { - Ok(posts) => Ok(Json(posts)), + async fn get_recent_posts(State(app_state): State) -> impl IntoResponse { + let mut state = app_state.lock().await; + let cached: Option> = state + .cache + .get(String::from("posts:recent")) + .await + .unwrap_or(None); + + if let Some(posts) = cached { + tracing::info!("grabbing recent posts from cache"); + return Ok(Json(posts)); + }; + + match PostsDatasource::get_recent(&state.database).await { + Ok(posts) => { + tracing::info!("grabbing recent posts from database"); + if let p = &posts { + let posts = p.clone(); + let state = app_state.clone(); + + tracing::info!("storing database data in cache"); + tokio::spawn(async move { + let mut s = state.lock().await; + let _ = s + .cache + .set( + String::from("posts:recent"), + &posts, + Some(Expiration::EX(5)), + None, + false, + ) + .await; + }); + }; + + Ok(Json(posts)) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } // get the top three posts with the highest view count - async fn get_popular_posts(State(pool): State>) -> impl IntoResponse { - match PostsDatasource::get_popular(&pool).await { - Ok(posts) => Ok(Json(posts)), + async fn get_popular_posts(State(app_state): State) -> impl IntoResponse { + let mut state = app_state.lock().await; + let cached: Option> = state + .cache + .get(String::from("posts:popular")) + .await + .unwrap_or(None); + + if let Some(posts) = cached { + tracing::info!("grabbing popular posts from cache"); + return Ok(Json(posts)); + }; + + match PostsDatasource::get_popular(&state.database).await { + Ok(posts) => { + tracing::info!("grabbing popular posts from database"); + if let p = &posts { + let posts = p.clone(); + let state = app_state.clone(); + + tracing::info!("storing database data in cache"); + tokio::spawn(async move { + let mut s = state.lock().await; + let _ = s + .cache + .set( + String::from("posts:popular"), + &posts, + Some(Expiration::EX(5)), + None, + false, + ) + .await; + }); + }; + + Ok(Json(posts)) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } // get the top three posts with the most comments - async fn get_hot_posts(State(pool): State>) -> impl IntoResponse { - match PostsDatasource::get_hot(&pool).await { - Ok(posts) => Ok(Json(posts)), + async fn get_hot_posts(State(app_state): State) -> impl IntoResponse { + let mut state = app_state.lock().await; + let cached: Option> = state + .cache + .get(String::from("posts:hot")) + .await + .unwrap_or(None); + + if let Some(posts) = cached { + tracing::info!("grabbing hot posts from cache"); + return Ok(Json(posts)); + }; + + match PostsDatasource::get_hot(&state.database).await { + Ok(posts) => { + tracing::info!("grabbing hot posts from database"); + if let p = &posts { + let posts = p.clone(); + let state = app_state.clone(); + + tracing::info!("storing database data in cache"); + tokio::spawn(async move { + let mut s = state.lock().await; + let _ = s + .cache + .set( + String::from("posts:hot"), + &posts, + Some(Expiration::EX(5)), + None, + false, + ) + .await; + }); + }; + + Ok(Json(posts)) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } // get posts that are featured - async fn get_featured_posts(State(pool): State>) -> impl IntoResponse { - match PostsDatasource::get_featured(&pool).await { - Ok(posts) => Ok(Json(posts)), + async fn get_featured_posts(State(app_state): State) -> impl IntoResponse { + let mut state = app_state.lock().await; + let cached: Option> = state + .cache + .get(String::from("posts:featured")) + .await + .unwrap_or(None); + + if let Some(posts) = cached { + tracing::info!("grabbing featured posts from cache"); + return Ok(Json(posts)); + }; + + match PostsDatasource::get_featured(&state.database).await { + Ok(posts) => { + tracing::info!("grabbing featured posts from database"); + if let p = &posts { + let posts = p.clone(); + let state = app_state.clone(); + + tracing::info!("storing database data in cache"); + tokio::spawn(async move { + let mut s = state.lock().await; + let _ = s + .cache + .set( + String::from("posts:featured"), + &posts, + Some(Expiration::EX(5)), + None, + false, + ) + .await; + }); + }; + + Ok(Json(posts)) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } // get rss posts - async fn get_rss_posts(State(pool): State>) -> impl IntoResponse { - match PostsDatasource::get_all(&pool).await { + async fn get_rss_posts(State(app_state): State) -> impl IntoResponse { + let state = app_state.lock().await; + + match PostsDatasource::get_all(&state.database).await { Ok(posts) => { let web_url = std::env::var("BASE_URI_WEB").expect("No environment variable found"); let mapped_posts: HashMap = posts @@ -154,3 +369,46 @@ where { serializer.serialize_str(&date.unwrap().to_rfc3339()) } + +pub fn deserialize_datetime<'de, D>( + deserializer: D, +) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + struct DateTimeVisitor; + + impl<'de> serde::de::Visitor<'de> for DateTimeVisitor { + type Value = Option>; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("an ISO 8601 formatted date string or null") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + match chrono::DateTime::parse_from_rfc3339(value) { + Ok(dt) => Ok(Some(dt.with_timezone(&Utc))), + Err(e) => Err(E::custom(format!("Failed to parse datetime: {}", e))), + } + } + + fn visit_none(self) -> Result + where + E: serde::de::Error, + { + Ok(None) + } + + fn visit_some(self, deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_str(self) + } + } + + deserializer.deserialize_option(DateTimeVisitor) +} diff --git a/backend/public/src/state.rs b/backend/public/src/state.rs index ab662a9..cb8140e 100644 --- a/backend/public/src/state.rs +++ b/backend/public/src/state.rs @@ -29,19 +29,21 @@ impl Cache { T: for<'de> serde::Deserialize<'de>, { if !self.inmem.is_connected() { - return Err(Box::new("Are you connected to the cache?".into())); + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Not connected to cache".to_string(), + ))); } - let value: Option = self.inmem.get(key).await?; + let value: Option = self.inmem.get(&key).await?; - let result = match value { - Some(x) => match serde_json::from_value(x) { - Ok(x) => Some(x), - Err(_) => None, + match value { + Some(json_str) => match serde_json::from_str::(&json_str) { + Ok(deserialized) => Ok(Some(deserialized)), + Err(_) => Ok(None), }, - None => None, - }; - Ok(result) + None => Ok(None), + } } pub async fn set( @@ -53,16 +55,20 @@ impl Cache { get: bool, ) -> Result<(), Box> where - T: for<'de> serde::Deserialize<'de>, + T: for<'de> serde::Deserialize<'de> + serde::Serialize, { if !self.inmem.is_connected() { - return Err(Box::new("Are you connected to the cache?".into())); + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Not connected to cache".to_string(), + ))); } - let value: Value = serde_json::to_value(contents)?; + let json_string = serde_json::to_string(contents)?; self.inmem - .set(key, value.to_string(), expiration, set_opts, get) + .set(key, json_string, expiration, set_opts, get) .await?; + Ok(()) }