From 80081b75d5f8ebeeaac0905e7e4e87363549f01d Mon Sep 17 00:00:00 2001 From: "Wyatt J. Miller" Date: Sun, 22 Sep 2024 04:10:29 -0400 Subject: [PATCH] wip: impl scheduler, import posts task --- backend/task/.env | 1 + backend/task/src/main.rs | 84 ++++++++++++++++++-------- backend/task/src/tasks/import_posts.rs | 42 +++++++++++++ backend/task/src/tasks/mod.rs | 2 +- backend/task/src/tasks/register.rs | 2 - 5 files changed, 102 insertions(+), 29 deletions(-) create mode 100644 backend/task/.env create mode 100644 backend/task/src/tasks/import_posts.rs delete mode 100644 backend/task/src/tasks/register.rs diff --git a/backend/task/.env b/backend/task/.env new file mode 100644 index 0000000..a370ac9 --- /dev/null +++ b/backend/task/.env @@ -0,0 +1 @@ +DATABASE_URL=postgres://wyatt:wyattisawesome@localhost:5432/postgres diff --git a/backend/task/src/main.rs b/backend/task/src/main.rs index 7f192d3..d4de67d 100644 --- a/backend/task/src/main.rs +++ b/backend/task/src/main.rs @@ -1,14 +1,16 @@ -use chrono::prelude::*; -use once_cell::sync::{Lazy, OnceCell}; -use sqlx::{database, PgPool}; +use chrono::{prelude::*, Duration}; +// use once_cell::sync::Lazy; +use sqlx::{postgres::PgPoolOptions, PgPool, Pool, Postgres}; use std::env; mod tasks; -pub struct TaskServer { - jobs: Vec, +pub struct TaskManager<'a> { + pool: Pool, + jobs: Vec, last_activated: Option>, - last_job: Option, + last_job: Option, + scheduler: job_scheduler::JobScheduler<'a>, } pub struct TaskLog { @@ -24,39 +26,69 @@ enum TaskStatus { Failed, } -pub struct Job { - pub task_id: u8, - pub task_name: u8, +#[derive(Debug, sqlx::FromRow)] +pub struct TaskJob { + pub task_id: i32, + pub task_name: String, pub schedule: String, - pub created_at: String, - pub deleted_at: Option, -} - -pub static G_DB: Lazy = Lazy::new(|| DatabaseConfig { - pool: None, - database_type: Some("postgres".to_string()), -}); - -pub struct DatabaseConfig { - pool: Option, - pub database_type: Option, + pub is_active: bool, + pub created_at: chrono::DateTime, + pub deleted_at: Option>, } #[tokio::main] async fn main() { println!("Hello, world!"); + + dotenvy::dotenv().unwrap(); + let database_url = + env::var("DATABASE_URL").expect("Environment variable DATABASE_URL is not found"); + let pool = PgPoolOptions::new() + .max_connections(10) + .connect(&database_url) + .await + .expect("Failed to connect to the database"); + + let mut manager = TaskManager::new(pool); + manager.register_jobs().await; + + loop { + manager.scheduler.tick(); + std::thread::sleep(std::time::Duration::from_millis(1000)); + } } -impl TaskServer { - fn new() -> Self { - TaskServer { +impl<'a> TaskManager<'a> { + fn new(pool: Pool) -> Self { + TaskManager { + pool, jobs: Vec::new(), last_activated: None, last_job: None, + scheduler: job_scheduler::JobScheduler::new(), } } - fn register_jobs() -> Vec { - Vec::new() + pub async fn register_jobs(&self) { + // let jobs: Vec = Vec::new(); + let results = sqlx::query_as::<_, TaskJob>("SELECT task_id, task_name, schedule, is_active, created_at, deleted_at FROM tasks WHERE is_active = true AND deleted_at IS NULL") + .fetch_all(&self.pool) + .await + .unwrap(); + + let mut scheduler = job_scheduler::JobScheduler::new(); + results + .iter() + .map(|j| { + scheduler.add(job_scheduler::Job::new(j.schedule.parse().unwrap(), || { + println!("Starting task name: {:?}", j.task_name); + + match j.task_id { + 1 => tasks::import_posts::import_posts("/app", &self.pool), + _ => panic!(), + } + })); + }) + .collect::>(); } } diff --git a/backend/task/src/tasks/import_posts.rs b/backend/task/src/tasks/import_posts.rs new file mode 100644 index 0000000..d180365 --- /dev/null +++ b/backend/task/src/tasks/import_posts.rs @@ -0,0 +1,42 @@ +pub async fn import_posts(dir_path: &str, pool: &sqlx::Pool) { + println!("hello from import_posts"); + let entries = std::fs::read_dir(dir_path).unwrap(); + + for f in entries { + let file = f.unwrap(); + let file_path = file.path(); + if file_path.is_file() { + let file_name = file.file_name().to_str().unwrap(); + let exists = sqlx::query_as::<_, FilenameExists>( + "SELECT EXISTS(SELECT 1 FROM posts WHERE filename = $1)", + ) + .bind(file_name.clone()) + .fetch_one(pool) + .await + .unwrap() + .filename; + + if !exists.is_empty() { + let file_md_contents = std::fs::read_to_string(file).unwrap().as_str(); + let content = markdown::to_html(file_md_contents); + + sqlx::query_as::<_, InsertPosts>("INSERT INTO posts (title, body) VALUES ($1, $2)") + .bind(file_name.clone()) + .fetch_one(pool) + .await + .unwrap(); + } + } + } +} + +#[derive(Debug, sqlx::FromRow)] +struct FilenameExists { + filename: String, +} + +#[derive(Debug, sqlx::FromRow)] +struct InsertPosts { + title: String, + body: String, +} diff --git a/backend/task/src/tasks/mod.rs b/backend/task/src/tasks/mod.rs index f862bee..5b96f31 100644 --- a/backend/task/src/tasks/mod.rs +++ b/backend/task/src/tasks/mod.rs @@ -1 +1 @@ -pub mod register; +pub mod import_posts; diff --git a/backend/task/src/tasks/register.rs b/backend/task/src/tasks/register.rs deleted file mode 100644 index 660d6cf..0000000 --- a/backend/task/src/tasks/register.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub struct Register; -impl Register {}