implemented front matter parser, created task log abstraction fuctions

This commit is contained in:
Wyatt J. Miller 2024-10-04 07:21:10 -04:00
parent 59d8273498
commit 3c78ed5ae3
4 changed files with 67 additions and 19 deletions

View File

@ -1,9 +1,8 @@
use chrono::{prelude::*, Duration as ChronoDuration};
use job_scheduler::{Job, JobScheduler};
// use once_cell::sync::Lazy;
use chrono::Utc;
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use tasks::import_posts::{self, import_posts};
use tasks::import_posts;
use std::env;
use std::sync::Arc;
use std::time::Duration;
//mod config;
@ -18,17 +17,20 @@ pub struct TaskManager<'a> {
scheduler: job_scheduler::JobScheduler<'a>,
}
#[derive(Debug, sqlx::FromRow)]
pub struct TaskLog {
task_log_id: u8,
log_id: u8,
task_id: u8,
created_at: chrono::DateTime<Utc>,
task_status: TaskStatus,
finished_at: Option<chrono::DateTime<Utc>>,
}
#[derive(Debug)]
enum TaskStatus {
Pending,
Completed,
Failed,
Pending(String),
Completed(String),
Failed(String),
}
#[derive(Debug, sqlx::FromRow, Clone)]
@ -83,15 +85,17 @@ impl<'a> TaskManager<'a> {
.unwrap();
let mut scheduler = job_scheduler::JobScheduler::new();
for result in results {
let r = result.clone();
scheduler.add(job_scheduler::Job::new(r.schedule.parse().unwrap(), move || {
println!("Registering job: {}", r.task_name);
results.iter().for_each(|r| {
println!("Registering job: {:?}", r.task_name);
let job: _ = job_scheduler::Job::new(r.schedule.parse().unwrap(), || {
match r.task_id {
1 => async { import_posts::import_posts("/app", &self.pool).await },
1 => import_posts::register(&Arc::new(&self.pool)),
_ => panic!(),
};
}));
}
}
});
scheduler.add(job);
});
}
}

View File

@ -1,9 +1,20 @@
use std::fs;
use std::path;
pub async fn import_posts(dir_path: &str, pool: &sqlx::Pool<sqlx::Postgres>) {
use serde::Deserialize;
use serde::Deserializer;
pub fn register<'a>(pool: &'a sqlx::Pool<sqlx::Postgres>) {
let p = pool.clone();
tokio::spawn(async move {
import_posts("/app", &p).await;
});
}
async fn import_posts(dir_path: &str, pool: &sqlx::Pool<sqlx::Postgres>) {
println!("hello from import_posts");
let entries = fs::read_dir(dir_path).unwrap();
let options = MarkdownOptions {
options: markdown::Constructs::gfm(),
};
@ -30,11 +41,13 @@ pub async fn import_posts(dir_path: &str, pool: &sqlx::Pool<sqlx::Postgres>) {
);
let file_md_contents = process_read_file(file_path, &options);
let content = markdown::to_html(&file_md_contents);
let metadata = crate::utils::front_matter::YamlFrontMatter::parse::<MarkdownMetadata>(&content).unwrap();
let title = metadata.metadata.title;
sqlx::query_as::<_, InsertPosts>(
"INSERT INTO posts (title, body, filename, author_id) VALUES ($1, $2, $3, $4) RETURNING (title, body, filename, author_id)",
)
.bind(String::from("Hello world from Postgres!"))
.bind(title)
.bind(content)
.bind(file_name_final)
.bind(1)
@ -67,3 +80,22 @@ struct InsertPosts {
struct MarkdownOptions {
options: markdown::Constructs,
}
#[derive(Deserialize)]
struct MarkdownMetadata {
layout: String,
title: String,
#[serde(deserialize_with = "deserialize_datetime")]
date: chrono::DateTime<chrono::Utc>,
published: bool,
}
fn deserialize_datetime<'de, D>(deserializer: D) -> Result<chrono::DateTime<chrono::Utc>, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
chrono::DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&chrono::Utc))
.map_err(serde::de::Error::custom)
}

View File

@ -1 +1,2 @@
pub mod front_matter;
pub mod task_log;

View File

@ -0,0 +1,11 @@
use crate::TaskStatus;
pub async fn start(task_id: i32, pool: &sqlx::Pool<sqlx::Postgres>) -> Result<(), Box<dyn std::error::Error>> {
let _ = sqlx::query_as!(TaskLog, "INSERT INTO logs (task_id, created_at, task_status) VALUES ($1, now(), 'pending')", task_id).fetch_one(pool).await;
Ok(())
}
pub async fn update(task_id: i32, task_status: String, pool: &sqlx::Pool<sqlx::Postgres>) -> Result<(), Box<dyn std::error::Error>> {
let _ = sqlx::query_as!(TaskLog, "UPDATE logs SET task_status = $1 WHERE task_id = $2", task_status, task_id).fetch_one(pool).await;
Ok(())
}