added cache, s3 to taskmanager, ask cache if result is the same, among others
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
use cache::ClientLike;
|
||||
use chrono::Utc;
|
||||
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use storage::services::aws;
|
||||
use tasks::*;
|
||||
|
||||
//mod config;
|
||||
@@ -11,6 +13,8 @@ mod utils;
|
||||
|
||||
pub struct TaskManager<'a> {
|
||||
pool: Pool<Postgres>,
|
||||
cache: cache::Pool,
|
||||
s3_client: aws::S3Client,
|
||||
jobs: Vec<TaskJob>,
|
||||
last_activated: Option<chrono::DateTime<Utc>>,
|
||||
last_job: Option<TaskJob>,
|
||||
@@ -49,6 +53,8 @@ async fn main() {
|
||||
println!("Hello, world!");
|
||||
|
||||
dotenvy::dotenv().unwrap();
|
||||
|
||||
// setup database
|
||||
let database_url =
|
||||
env::var("DATABASE_URL").expect("Environment variable DATABASE_URL is not found");
|
||||
let pool = PgPoolOptions::new()
|
||||
@@ -58,7 +64,35 @@ async fn main() {
|
||||
.await
|
||||
.expect("Failed to connect to the database");
|
||||
|
||||
let mut manager = TaskManager::new(pool);
|
||||
// setup redis/valkey
|
||||
let redis_url = match std::env::var("REDIS_URL").unwrap().as_str() {
|
||||
// TODO: fix the unwrap ^
|
||||
"" => "redis://localhost:6379".to_string(),
|
||||
x => x.to_string(),
|
||||
};
|
||||
|
||||
let pool_size = 8;
|
||||
let config = cache::Config::from_url(&redis_url).unwrap(); // TODO: fix the unwrap <<<
|
||||
|
||||
let redis_pool = cache::Builder::from_config(config)
|
||||
.with_performance_config(|config| {
|
||||
config.default_command_timeout = Duration::from_secs(60);
|
||||
})
|
||||
.set_policy(cache::ReconnectPolicy::new_exponential(0, 100, 30_000, 2))
|
||||
.build_pool(pool_size)
|
||||
.expect("Failed to create cache pool");
|
||||
|
||||
if std::env::var("REDIS_URL").unwrap() != "" {
|
||||
// TODO: fix the unwrap ^
|
||||
redis_pool.init().await.expect("Failed to connect to cache");
|
||||
let _ = redis_pool.flushall::<i32>(false).await;
|
||||
}
|
||||
|
||||
// setup storage
|
||||
let s3_client_config = aws::S3ClientConfig::from_env().unwrap();
|
||||
let s3_client = aws::S3Client::new(&s3_client_config);
|
||||
|
||||
let mut manager = TaskManager::new(pool, redis_pool, s3_client);
|
||||
manager.register_jobs().await.unwrap();
|
||||
|
||||
loop {
|
||||
@@ -68,9 +102,11 @@ async fn main() {
|
||||
}
|
||||
|
||||
impl<'a> TaskManager<'a> {
|
||||
fn new(pool: Pool<Postgres>) -> Self {
|
||||
fn new(pool: Pool<Postgres>, cache: cache::Pool, s3_client: aws::S3Client) -> Self {
|
||||
TaskManager {
|
||||
pool,
|
||||
cache,
|
||||
s3_client,
|
||||
jobs: Vec::new(),
|
||||
last_activated: None,
|
||||
last_job: None,
|
||||
@@ -100,11 +136,15 @@ impl<'a> TaskManager<'a> {
|
||||
}
|
||||
2 => {
|
||||
let pool = Arc::new(self.pool.clone());
|
||||
Box::new(move || upload_rss::register(&pool))
|
||||
let cache = Arc::new(self.cache.clone());
|
||||
let s3_client = Arc::new(self.s3_client.clone());
|
||||
Box::new(move || upload_rss::register(&pool, &cache, &s3_client))
|
||||
}
|
||||
3 => {
|
||||
let pool = Arc::new(self.pool.clone());
|
||||
Box::new(move || upload_sitemap::register(&pool))
|
||||
let cache = Arc::new(self.cache.clone());
|
||||
let s3_client = Arc::new(self.s3_client.clone());
|
||||
Box::new(move || upload_sitemap::register(&pool, &cache, &s3_client))
|
||||
}
|
||||
id => return Err(format!("Unknown task_id: {}", id).into()),
|
||||
};
|
||||
|
@@ -2,19 +2,24 @@ use crate::utils::{
|
||||
request::{Request, Response},
|
||||
task_log,
|
||||
};
|
||||
use storage::services::{
|
||||
aws::{S3Client, S3ClientConfig},
|
||||
ObjectStorageClient,
|
||||
};
|
||||
use cache::KeysInterface;
|
||||
use storage::services::{aws::S3Client, ObjectStorageClient};
|
||||
|
||||
pub fn register(pool: &sqlx::Pool<sqlx::Postgres>) {
|
||||
pub fn register(pool: &sqlx::Pool<sqlx::Postgres>, cache: &cache::Pool, s3_client: &S3Client) {
|
||||
let p = pool.clone();
|
||||
let c = cache.clone();
|
||||
let s3 = s3_client.to_owned();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _ = upload_rss(&p).await;
|
||||
let _ = upload_rss(&p, &c, s3).await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn upload_rss(pool: &sqlx::Pool<sqlx::Postgres>) -> Result<(), Box<dyn std::error::Error>> {
|
||||
async fn upload_rss(
|
||||
pool: &sqlx::Pool<sqlx::Postgres>,
|
||||
cache: &cache::Pool,
|
||||
s3_client: S3Client,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// start task logging
|
||||
task_log::start(2, pool).await?;
|
||||
|
||||
@@ -25,15 +30,37 @@ async fn upload_rss(pool: &sqlx::Pool<sqlx::Postgres>) -> Result<(), Box<dyn std
|
||||
|
||||
// upload the sucker to obj storage
|
||||
if let Response::Xml(rss) = rss_result {
|
||||
let client_config = S3ClientConfig::from_env().unwrap();
|
||||
let s3_client = S3Client::new(&client_config);
|
||||
let cached: &Option<String> = &cache.get(String::from("rss")).await.unwrap_or(None);
|
||||
let cache_clone = cache.clone();
|
||||
if let Some(cached_value) = cached {
|
||||
if *cached_value == rss {
|
||||
println!("Response is the same in the cache, exiting");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
let r = rss.clone();
|
||||
|
||||
let _ = s3_client
|
||||
.put_object(
|
||||
client_config.bucket.as_str(),
|
||||
s3_client.client_config.bucket.as_str(),
|
||||
"feed.xml",
|
||||
rss.as_bytes().to_vec(),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
cache_clone
|
||||
.set::<String, String, &String>(
|
||||
String::from("rss"),
|
||||
&r,
|
||||
Some(cache::Expiration::EX(3600)),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
println!("Finished uploading RSS feed");
|
||||
}
|
||||
|
||||
|
@@ -2,20 +2,23 @@ use crate::utils::{
|
||||
request::{Request, Response},
|
||||
task_log,
|
||||
};
|
||||
use storage::services::{
|
||||
aws::{S3Client, S3ClientConfig},
|
||||
ObjectStorageClient,
|
||||
};
|
||||
use cache::KeysInterface;
|
||||
use storage::services::{aws::S3Client, ObjectStorageClient};
|
||||
|
||||
pub fn register(pool: &sqlx::Pool<sqlx::Postgres>) {
|
||||
pub fn register(pool: &sqlx::Pool<sqlx::Postgres>, cache: &cache::Pool, s3_client: &S3Client) {
|
||||
let p = pool.clone();
|
||||
let c = cache.clone();
|
||||
let s3 = s3_client.to_owned();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _ = upload_sitemap(&p).await;
|
||||
let _ = upload_sitemap(&p, &c, s3).await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn upload_sitemap(
|
||||
pool: &sqlx::Pool<sqlx::Postgres>,
|
||||
cache: &cache::Pool,
|
||||
s3_client: S3Client,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// start task logging
|
||||
task_log::start(3, pool).await?;
|
||||
@@ -27,15 +30,36 @@ async fn upload_sitemap(
|
||||
|
||||
// upload the sucker to obj storage
|
||||
if let Response::Xml(sitemap) = sitemap_result {
|
||||
let client_config = S3ClientConfig::from_env().unwrap();
|
||||
let s3_client = S3Client::new(&client_config);
|
||||
let cached: &Option<String> = &cache.get(String::from("sitemap")).await.unwrap_or(None);
|
||||
let cache_clone = cache.clone();
|
||||
if let Some(cached_value) = cached {
|
||||
if *cached_value == sitemap {
|
||||
println!("Response is the same in the cache, exiting");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
let s = sitemap.clone();
|
||||
|
||||
let _ = s3_client
|
||||
.put_object(
|
||||
client_config.bucket.as_str(),
|
||||
s3_client.client_config.bucket.as_str(),
|
||||
"sitemap.xml",
|
||||
sitemap.as_bytes().to_vec(),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
cache_clone
|
||||
.set::<String, String, &String>(
|
||||
String::from("sitemap"),
|
||||
&s,
|
||||
Some(cache::Expiration::EX(3600)),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
println!("Finished uploading sitemap!");
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user