From d53f3da4c6dfaad77c45065047fe92f2a625bddd Mon Sep 17 00:00:00 2001 From: "Wyatt J. Miller" Date: Mon, 14 Jul 2025 23:30:29 -0400 Subject: [PATCH] added cache, s3 to taskmanager, ask cache if result is the same, among others --- backend/cache/src/lib.rs | 2 +- backend/public/src/main.rs | 10 +-- backend/public/src/utils/rss.rs | 23 +++--- backend/public/src/utils/sitemap.rs | 7 +- backend/storage/src/services/aws.rs | 5 +- backend/task/Cargo.lock | 100 +++++++++++++++++++++-- backend/task/Cargo.toml | 1 + backend/task/README.md | 5 ++ backend/task/src/main.rs | 48 ++++++++++- backend/task/src/tasks/upload_rss.rs | 49 ++++++++--- backend/task/src/tasks/upload_sitemap.rs | 44 +++++++--- 11 files changed, 241 insertions(+), 53 deletions(-) diff --git a/backend/cache/src/lib.rs b/backend/cache/src/lib.rs index 7e758bd..73bb6b9 100644 --- a/backend/cache/src/lib.rs +++ b/backend/cache/src/lib.rs @@ -1,6 +1,6 @@ pub use fred::{ clients::Pool, - interfaces::KeysInterface, + interfaces::{ClientLike, KeysInterface}, prelude::*, types::{Expiration, SetOptions}, }; diff --git a/backend/public/src/main.rs b/backend/public/src/main.rs index 694a68c..251a769 100644 --- a/backend/public/src/main.rs +++ b/backend/public/src/main.rs @@ -1,6 +1,6 @@ use axum::Router; +use cache::ClientLike; use config::config; -use fred::prelude::*; use sqlx::postgres::PgPoolOptions; use std::fs::File; use std::sync::Arc; @@ -8,7 +8,7 @@ use std::time::Duration; use tokio::net::TcpListener; use tokio::signal; use tokio::sync::Mutex; -use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; +// use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tower_http::{ cors::{Any, CorsLayer}, trace::{self, TraceLayer}, @@ -101,13 +101,13 @@ async fn main() { .expect("Failed to connect to database"); let pool_size = 8; - let config = Config::from_url(&redis_url).unwrap(); // TODO: fix the unwrap <<< + let config = cache::Config::from_url(&redis_url).unwrap(); // TODO: fix the unwrap <<< - let redis_pool = Builder::from_config(config) + let redis_pool = cache::Builder::from_config(config) .with_performance_config(|config| { config.default_command_timeout = Duration::from_secs(60); }) - .set_policy(ReconnectPolicy::new_exponential(0, 100, 30_000, 2)) + .set_policy(cache::ReconnectPolicy::new_exponential(0, 100, 30_000, 2)) .build_pool(pool_size) .expect("Failed to create cache pool"); diff --git a/backend/public/src/utils/rss.rs b/backend/public/src/utils/rss.rs index 851e100..133b57a 100644 --- a/backend/public/src/utils/rss.rs +++ b/backend/public/src/utils/rss.rs @@ -73,17 +73,18 @@ pub fn generate_rss( format!( r#" - - {safe_title} - {safe_description} - {link} - en-us - 60 - Kyouma 1.0.0-SE - - {} - - "#, + + {safe_title} + {safe_description} + {link} + en-us + 60 + Kyouma 1.0.0-SE + + {} + + + "#, rss_entries ) } diff --git a/backend/public/src/utils/sitemap.rs b/backend/public/src/utils/sitemap.rs index 00f735a..628b8b4 100644 --- a/backend/public/src/utils/sitemap.rs +++ b/backend/public/src/utils/sitemap.rs @@ -23,7 +23,6 @@ impl SitemapEntry { pub fn generate_sitemap(entries: &HashMap) -> String { let urls = entries .values() - .into_iter() .map(|entry| entry.to_item()) .collect::(); format!( @@ -39,21 +38,21 @@ pub fn generate_sitemap(entries: &HashMap) -> String { pub fn get_static_pages(entries: &mut HashMap, web_url: &String) { entries.insert( - (entries.len() + 1).to_string(), + "10000".to_string(), SitemapEntry { location: web_url.clone(), lastmod: chrono::Utc::now(), }, ); entries.insert( - (entries.len() + 1).to_string(), + "10001".to_string(), SitemapEntry { location: format!("{}/posts", web_url), lastmod: chrono::Utc::now(), }, ); entries.insert( - (entries.len() + 1).to_string(), + "10002".to_string(), SitemapEntry { location: format!("{}/projects", web_url), lastmod: chrono::Utc::now(), diff --git a/backend/storage/src/services/aws.rs b/backend/storage/src/services/aws.rs index 237b088..a87306c 100644 --- a/backend/storage/src/services/aws.rs +++ b/backend/storage/src/services/aws.rs @@ -4,7 +4,7 @@ use aws_config::{BehaviorVersion, Region}; use aws_sdk_s3::{Client, Config, config::Credentials}; use std::env; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct S3ClientConfig { pub access_key: String, secret_key: String, @@ -13,8 +13,10 @@ pub struct S3ClientConfig { region: String, } +#[derive(Clone)] pub struct S3Client { client: Client, + pub client_config: S3ClientConfig, } impl S3ClientConfig { @@ -68,6 +70,7 @@ impl S3Client { Self { client: Client::from_conf(s3_config), + client_config: config.clone(), } } } diff --git a/backend/task/Cargo.lock b/backend/task/Cargo.lock index 48ac821..4a55da8 100644 --- a/backend/task/Cargo.lock +++ b/backend/task/Cargo.lock @@ -71,6 +71,12 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "async-channel" version = "1.9.0" @@ -796,6 +802,15 @@ dependencies = [ "either", ] +[[package]] +name = "cache" +version = "0.1.0" +dependencies = [ + "fred", + "serde", + "serde_json", +] + [[package]] name = "cc" version = "1.1.21" @@ -877,6 +892,12 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.4" @@ -940,6 +961,12 @@ dependencies = [ "regex", ] +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crc32fast" version = "1.4.2" @@ -1227,6 +1254,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b09cf3155332e944990140d967ff5eceb70df778b34f77d8075db46e4704e6d8" +dependencies = [ + "num-traits", +] + [[package]] name = "flume" version = "0.11.0" @@ -1274,6 +1310,43 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fred" +version = "10.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a7b2fd0f08b23315c13b6156f971aeedb6f75fb16a29ac1872d2eabccc1490e" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "bytes-utils", + "float-cmp", + "fred-macros", + "futures", + "log", + "parking_lot", + "rand 0.8.5", + "redis-protocol", + "semver", + "socket2", + "tokio", + "tokio-stream", + "tokio-util", + "url", + "urlencoding", +] + +[[package]] +name = "fred-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1458c6e22d36d61507034d5afecc64f105c1d39712b7ac6ec3b352c423f715cc" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -2695,6 +2768,20 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "redis-protocol" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdba59219406899220fc4cdfd17a95191ba9c9afb719b5fa5a083d63109a9f1" +dependencies = [ + "bytes", + "bytes-utils", + "cookie-factory", + "crc16", + "log", + "nom 7.1.3", +] + [[package]] name = "redox_syscall" version = "0.5.4" @@ -3085,18 +3172,18 @@ checksum = "f79dfe2d285b0488816f30e700a7438c5a73d816b5b7d3ac72fbc48b0d185e03" [[package]] name = "serde" -version = "1.0.210" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.210" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -3105,9 +3192,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.128" +version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ "itoa", "memchr", @@ -3580,6 +3667,7 @@ dependencies = [ name = "task" version = "0.1.0" dependencies = [ + "cache", "chrono", "dotenvy", "futures", diff --git a/backend/task/Cargo.toml b/backend/task/Cargo.toml index ca3c96d..099ea6c 100644 --- a/backend/task/Cargo.toml +++ b/backend/task/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] storage = { version = "0.1.0", path = "../storage" } +cache = { version = "0.1.0", path = "../cache" } tokio = { version = "1.19.2", features = ["full"] } reqwest = { version = "0.12.20", features = ["json", "rustls-tls"] } job_scheduler = "1.2.1" diff --git a/backend/task/README.md b/backend/task/README.md index dd34a6e..0baadc7 100644 --- a/backend/task/README.md +++ b/backend/task/README.md @@ -12,4 +12,9 @@ This is a task runner/scheduler programs that will fire off various tasks. These For `task` to work properly, please make sure to first create the `.env` file, then fill out the following environment variables: +- `BASE_URI_API` - needed for communicating with `public` - `DATABASE_URL` - needed for communicating to Postgres +- `REDIS_URL` - needed for communicating with the cache (Redis or Valkey) +- `S3_ACCESS_KEY` - needed for Amazon S3 (or compatible services) storage +- `S3_SECRET_KEY` - needed for Amazon S3 (or compatible services) storage +- `S3_BUCKET` - needed for Amazon S3 (or compatible services) storage diff --git a/backend/task/src/main.rs b/backend/task/src/main.rs index 89eea65..0fa7fbd 100644 --- a/backend/task/src/main.rs +++ b/backend/task/src/main.rs @@ -1,8 +1,10 @@ +use cache::ClientLike; use chrono::Utc; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use std::env; use std::sync::Arc; use std::time::Duration; +use storage::services::aws; use tasks::*; //mod config; @@ -11,6 +13,8 @@ mod utils; pub struct TaskManager<'a> { pool: Pool, + cache: cache::Pool, + s3_client: aws::S3Client, jobs: Vec, last_activated: Option>, last_job: Option, @@ -49,6 +53,8 @@ async fn main() { println!("Hello, world!"); dotenvy::dotenv().unwrap(); + + // setup database let database_url = env::var("DATABASE_URL").expect("Environment variable DATABASE_URL is not found"); let pool = PgPoolOptions::new() @@ -58,7 +64,35 @@ async fn main() { .await .expect("Failed to connect to the database"); - let mut manager = TaskManager::new(pool); + // setup redis/valkey + let redis_url = match std::env::var("REDIS_URL").unwrap().as_str() { + // TODO: fix the unwrap ^ + "" => "redis://localhost:6379".to_string(), + x => x.to_string(), + }; + + let pool_size = 8; + let config = cache::Config::from_url(&redis_url).unwrap(); // TODO: fix the unwrap <<< + + let redis_pool = cache::Builder::from_config(config) + .with_performance_config(|config| { + config.default_command_timeout = Duration::from_secs(60); + }) + .set_policy(cache::ReconnectPolicy::new_exponential(0, 100, 30_000, 2)) + .build_pool(pool_size) + .expect("Failed to create cache pool"); + + if std::env::var("REDIS_URL").unwrap() != "" { + // TODO: fix the unwrap ^ + redis_pool.init().await.expect("Failed to connect to cache"); + let _ = redis_pool.flushall::(false).await; + } + + // setup storage + let s3_client_config = aws::S3ClientConfig::from_env().unwrap(); + let s3_client = aws::S3Client::new(&s3_client_config); + + let mut manager = TaskManager::new(pool, redis_pool, s3_client); manager.register_jobs().await.unwrap(); loop { @@ -68,9 +102,11 @@ async fn main() { } impl<'a> TaskManager<'a> { - fn new(pool: Pool) -> Self { + fn new(pool: Pool, cache: cache::Pool, s3_client: aws::S3Client) -> Self { TaskManager { pool, + cache, + s3_client, jobs: Vec::new(), last_activated: None, last_job: None, @@ -100,11 +136,15 @@ impl<'a> TaskManager<'a> { } 2 => { let pool = Arc::new(self.pool.clone()); - Box::new(move || upload_rss::register(&pool)) + let cache = Arc::new(self.cache.clone()); + let s3_client = Arc::new(self.s3_client.clone()); + Box::new(move || upload_rss::register(&pool, &cache, &s3_client)) } 3 => { let pool = Arc::new(self.pool.clone()); - Box::new(move || upload_sitemap::register(&pool)) + let cache = Arc::new(self.cache.clone()); + let s3_client = Arc::new(self.s3_client.clone()); + Box::new(move || upload_sitemap::register(&pool, &cache, &s3_client)) } id => return Err(format!("Unknown task_id: {}", id).into()), }; diff --git a/backend/task/src/tasks/upload_rss.rs b/backend/task/src/tasks/upload_rss.rs index af6c6aa..7affefd 100644 --- a/backend/task/src/tasks/upload_rss.rs +++ b/backend/task/src/tasks/upload_rss.rs @@ -2,19 +2,24 @@ use crate::utils::{ request::{Request, Response}, task_log, }; -use storage::services::{ - aws::{S3Client, S3ClientConfig}, - ObjectStorageClient, -}; +use cache::KeysInterface; +use storage::services::{aws::S3Client, ObjectStorageClient}; -pub fn register(pool: &sqlx::Pool) { +pub fn register(pool: &sqlx::Pool, cache: &cache::Pool, s3_client: &S3Client) { let p = pool.clone(); + let c = cache.clone(); + let s3 = s3_client.to_owned(); + tokio::spawn(async move { - let _ = upload_rss(&p).await; + let _ = upload_rss(&p, &c, s3).await; }); } -async fn upload_rss(pool: &sqlx::Pool) -> Result<(), Box> { +async fn upload_rss( + pool: &sqlx::Pool, + cache: &cache::Pool, + s3_client: S3Client, +) -> Result<(), Box> { // start task logging task_log::start(2, pool).await?; @@ -25,15 +30,37 @@ async fn upload_rss(pool: &sqlx::Pool) -> Result<(), Box = &cache.get(String::from("rss")).await.unwrap_or(None); + let cache_clone = cache.clone(); + if let Some(cached_value) = cached { + if *cached_value == rss { + println!("Response is the same in the cache, exiting"); + return Ok(()); + } + } + let r = rss.clone(); + let _ = s3_client .put_object( - client_config.bucket.as_str(), + s3_client.client_config.bucket.as_str(), "feed.xml", rss.as_bytes().to_vec(), ) - .await; + .await?; + + tokio::spawn(async move { + cache_clone + .set::( + String::from("rss"), + &r, + Some(cache::Expiration::EX(3600)), + None, + false, + ) + .await + .unwrap(); + }); + println!("Finished uploading RSS feed"); } diff --git a/backend/task/src/tasks/upload_sitemap.rs b/backend/task/src/tasks/upload_sitemap.rs index 6f5a760..8ceee5e 100644 --- a/backend/task/src/tasks/upload_sitemap.rs +++ b/backend/task/src/tasks/upload_sitemap.rs @@ -2,20 +2,23 @@ use crate::utils::{ request::{Request, Response}, task_log, }; -use storage::services::{ - aws::{S3Client, S3ClientConfig}, - ObjectStorageClient, -}; +use cache::KeysInterface; +use storage::services::{aws::S3Client, ObjectStorageClient}; -pub fn register(pool: &sqlx::Pool) { +pub fn register(pool: &sqlx::Pool, cache: &cache::Pool, s3_client: &S3Client) { let p = pool.clone(); + let c = cache.clone(); + let s3 = s3_client.to_owned(); + tokio::spawn(async move { - let _ = upload_sitemap(&p).await; + let _ = upload_sitemap(&p, &c, s3).await; }); } async fn upload_sitemap( pool: &sqlx::Pool, + cache: &cache::Pool, + s3_client: S3Client, ) -> Result<(), Box> { // start task logging task_log::start(3, pool).await?; @@ -27,15 +30,36 @@ async fn upload_sitemap( // upload the sucker to obj storage if let Response::Xml(sitemap) = sitemap_result { - let client_config = S3ClientConfig::from_env().unwrap(); - let s3_client = S3Client::new(&client_config); + let cached: &Option = &cache.get(String::from("sitemap")).await.unwrap_or(None); + let cache_clone = cache.clone(); + if let Some(cached_value) = cached { + if *cached_value == sitemap { + println!("Response is the same in the cache, exiting"); + return Ok(()); + } + } + let s = sitemap.clone(); + let _ = s3_client .put_object( - client_config.bucket.as_str(), + s3_client.client_config.bucket.as_str(), "sitemap.xml", sitemap.as_bytes().to_vec(), ) - .await; + .await?; + + tokio::spawn(async move { + cache_clone + .set::( + String::from("sitemap"), + &s, + Some(cache::Expiration::EX(3600)), + None, + false, + ) + .await + .unwrap(); + }); println!("Finished uploading sitemap!"); }