From 3c78ed5ae3a50450c134720fa1ef3ef452ec9b2f Mon Sep 17 00:00:00 2001 From: "Wyatt J. Miller" Date: Fri, 4 Oct 2024 07:21:10 -0400 Subject: [PATCH] implemented front matter parser, created task log abstraction fuctions --- backend/task/src/main.rs | 38 ++++++++++++++------------ backend/task/src/tasks/import_posts.rs | 36 ++++++++++++++++++++++-- backend/task/src/utils/mod.rs | 1 + backend/task/src/utils/task_log.rs | 11 ++++++++ 4 files changed, 67 insertions(+), 19 deletions(-) create mode 100644 backend/task/src/utils/task_log.rs diff --git a/backend/task/src/main.rs b/backend/task/src/main.rs index c838981..f81423f 100644 --- a/backend/task/src/main.rs +++ b/backend/task/src/main.rs @@ -1,9 +1,8 @@ -use chrono::{prelude::*, Duration as ChronoDuration}; -use job_scheduler::{Job, JobScheduler}; -// use once_cell::sync::Lazy; +use chrono::Utc; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; -use tasks::import_posts::{self, import_posts}; +use tasks::import_posts; use std::env; +use std::sync::Arc; use std::time::Duration; //mod config; @@ -18,17 +17,20 @@ pub struct TaskManager<'a> { scheduler: job_scheduler::JobScheduler<'a>, } +#[derive(Debug, sqlx::FromRow)] pub struct TaskLog { - task_log_id: u8, + log_id: u8, task_id: u8, created_at: chrono::DateTime, task_status: TaskStatus, + finished_at: Option>, } +#[derive(Debug)] enum TaskStatus { - Pending, - Completed, - Failed, + Pending(String), + Completed(String), + Failed(String), } #[derive(Debug, sqlx::FromRow, Clone)] @@ -83,15 +85,17 @@ impl<'a> TaskManager<'a> { .unwrap(); let mut scheduler = job_scheduler::JobScheduler::new(); - for result in results { - let r = result.clone(); - scheduler.add(job_scheduler::Job::new(r.schedule.parse().unwrap(), move || { - println!("Registering job: {}", r.task_name); - match r.task_id { - 1 => async { import_posts::import_posts("/app", &self.pool).await }, + results.iter().for_each(|r| { + println!("Registering job: {:?}", r.task_name); + + let job: _ = job_scheduler::Job::new(r.schedule.parse().unwrap(), || { + match r.task_id { + 1 => import_posts::register(&Arc::new(&self.pool)), _ => panic!(), - }; - })); - } + } + }); + + scheduler.add(job); + }); } } diff --git a/backend/task/src/tasks/import_posts.rs b/backend/task/src/tasks/import_posts.rs index 3572174..e626856 100644 --- a/backend/task/src/tasks/import_posts.rs +++ b/backend/task/src/tasks/import_posts.rs @@ -1,9 +1,20 @@ use std::fs; use std::path; -pub async fn import_posts(dir_path: &str, pool: &sqlx::Pool) { +use serde::Deserialize; +use serde::Deserializer; + +pub fn register<'a>(pool: &'a sqlx::Pool) { + let p = pool.clone(); + tokio::spawn(async move { + import_posts("/app", &p).await; + }); +} + +async fn import_posts(dir_path: &str, pool: &sqlx::Pool) { println!("hello from import_posts"); let entries = fs::read_dir(dir_path).unwrap(); + let options = MarkdownOptions { options: markdown::Constructs::gfm(), }; @@ -30,11 +41,13 @@ pub async fn import_posts(dir_path: &str, pool: &sqlx::Pool) { ); let file_md_contents = process_read_file(file_path, &options); let content = markdown::to_html(&file_md_contents); + let metadata = crate::utils::front_matter::YamlFrontMatter::parse::(&content).unwrap(); + let title = metadata.metadata.title; sqlx::query_as::<_, InsertPosts>( "INSERT INTO posts (title, body, filename, author_id) VALUES ($1, $2, $3, $4) RETURNING (title, body, filename, author_id)", ) - .bind(String::from("Hello world from Postgres!")) + .bind(title) .bind(content) .bind(file_name_final) .bind(1) @@ -67,3 +80,22 @@ struct InsertPosts { struct MarkdownOptions { options: markdown::Constructs, } + +#[derive(Deserialize)] +struct MarkdownMetadata { + layout: String, + title: String, + #[serde(deserialize_with = "deserialize_datetime")] + date: chrono::DateTime, + published: bool, +} + +fn deserialize_datetime<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + chrono::DateTime::parse_from_rfc3339(&s) + .map(|dt| dt.with_timezone(&chrono::Utc)) + .map_err(serde::de::Error::custom) +} diff --git a/backend/task/src/utils/mod.rs b/backend/task/src/utils/mod.rs index 30dc886..a545f3c 100644 --- a/backend/task/src/utils/mod.rs +++ b/backend/task/src/utils/mod.rs @@ -1 +1,2 @@ pub mod front_matter; +pub mod task_log; diff --git a/backend/task/src/utils/task_log.rs b/backend/task/src/utils/task_log.rs new file mode 100644 index 0000000..a9e58a8 --- /dev/null +++ b/backend/task/src/utils/task_log.rs @@ -0,0 +1,11 @@ +use crate::TaskStatus; + +pub async fn start(task_id: i32, pool: &sqlx::Pool) -> Result<(), Box> { + let _ = sqlx::query_as!(TaskLog, "INSERT INTO logs (task_id, created_at, task_status) VALUES ($1, now(), 'pending')", task_id).fetch_one(pool).await; + Ok(()) +} + +pub async fn update(task_id: i32, task_status: String, pool: &sqlx::Pool) -> Result<(), Box> { + let _ = sqlx::query_as!(TaskLog, "UPDATE logs SET task_status = $1 WHERE task_id = $2", task_status, task_id).fetch_one(pool).await; + Ok(()) +}