wip: task scheduler works, import task does not work
lol
This commit is contained in:
@ -58,7 +58,7 @@ async fn main() {
|
||||
.expect("Failed to connect to the database");
|
||||
|
||||
let mut manager = TaskManager::new(pool);
|
||||
manager.register_jobs().await;
|
||||
manager.register_jobs().await.unwrap();
|
||||
|
||||
loop {
|
||||
manager.scheduler.tick();
|
||||
@ -77,23 +77,30 @@ impl<'a> TaskManager<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn register_jobs(&self) {
|
||||
// let jobs: Vec<Job> = Vec::new();
|
||||
pub async fn register_jobs(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let results = sqlx::query_as::<_, TaskJob>("SELECT task_id, task_name, schedule, is_active, created_at, deleted_at FROM tasks WHERE is_active = true AND deleted_at IS NULL")
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
let mut scheduler = job_scheduler::JobScheduler::new();
|
||||
results.iter().for_each(|r| {
|
||||
println!("Registering job: {:?}", r.task_name);
|
||||
tracing::info!("Found {} active jobs to register", results.len());
|
||||
|
||||
let job: _ = job_scheduler::Job::new(r.schedule.parse().unwrap(), || match r.task_id {
|
||||
1 => import_posts::register(&Arc::new(&self.pool)),
|
||||
_ => panic!(),
|
||||
});
|
||||
for job in &results {
|
||||
tracing::info!("Registering job: {}", job.task_name);
|
||||
|
||||
scheduler.add(job);
|
||||
});
|
||||
let pool = Arc::new(self.pool.clone());
|
||||
let schedule = job
|
||||
.schedule
|
||||
.parse()
|
||||
.map_err(|e| format!("Failed to parse schedule '{}': {}", job.schedule, e))?;
|
||||
|
||||
let task = match job.task_id {
|
||||
1 => Box::new(move || import_posts::register(&pool)),
|
||||
id => return Err(format!("Unknown task_id: {}", id).into()),
|
||||
};
|
||||
|
||||
self.scheduler.add(job_scheduler::Job::new(schedule, task));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
use std::fs;
|
||||
use std::path;
|
||||
use std::io::Read;
|
||||
|
||||
use crate::utils::task_log;
|
||||
use serde::{Deserialize, Deserializer};
|
||||
@ -7,75 +7,112 @@ use serde::{Deserialize, Deserializer};
|
||||
pub fn register(pool: &sqlx::Pool<sqlx::Postgres>) {
|
||||
let p = pool.clone();
|
||||
tokio::spawn(async move {
|
||||
import_posts("/app", &p).await;
|
||||
let _ = import_posts("app/", &p).await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn import_posts(dir_path: &str, pool: &sqlx::Pool<sqlx::Postgres>) {
|
||||
println!("hello from import_posts");
|
||||
let task = task_log::start(1, pool).await.unwrap();
|
||||
let entries = fs::read_dir(dir_path).unwrap();
|
||||
async fn import_posts(
|
||||
dir_path: &str,
|
||||
pool: &sqlx::Pool<sqlx::Postgres>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("Beginning post import process");
|
||||
|
||||
// Start task logging
|
||||
let task = task_log::start(1, pool).await?;
|
||||
|
||||
// Setup markdown options
|
||||
let options = MarkdownOptions {
|
||||
options: markdown::Constructs::gfm(),
|
||||
};
|
||||
|
||||
for f in entries {
|
||||
let file = f.unwrap();
|
||||
// Read directory contents
|
||||
let entries = fs::read_dir(dir_path)?;
|
||||
|
||||
// Process each file
|
||||
for entry_result in entries {
|
||||
let file = entry_result?;
|
||||
let file_path = file.path();
|
||||
if file_path.is_file() {
|
||||
let file_name = file.file_name();
|
||||
let file_name_final = &file_name.to_str().unwrap();
|
||||
let exists = sqlx::query_as::<_, FilenameExists>(
|
||||
"SELECT EXISTS(SELECT 1 FROM posts WHERE filename = $1)",
|
||||
)
|
||||
.bind(file_name_final)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.filename;
|
||||
|
||||
if !exists.is_empty() {
|
||||
println!(
|
||||
"File does not exist! Inserting: {:?}",
|
||||
file_path.file_name()
|
||||
);
|
||||
let file_md_contents = process_read_file(file_path, &options);
|
||||
let content = markdown::to_html(&file_md_contents);
|
||||
let metadata =
|
||||
crate::utils::front_matter::YamlFrontMatter::parse::<MarkdownMetadata>(
|
||||
&content,
|
||||
)
|
||||
.unwrap();
|
||||
let title = metadata.metadata.title;
|
||||
// Skip non-file entries
|
||||
if !file_path.is_file() {
|
||||
continue;
|
||||
}
|
||||
|
||||
sqlx::query_as::<_, InsertPosts>(
|
||||
"INSERT INTO posts (title, body, filename, author_id) VALUES ($1, $2, $3, $4) RETURNING (title, body, filename, author_id)",
|
||||
)
|
||||
.bind(title)
|
||||
.bind(content)
|
||||
.bind(file_name_final)
|
||||
.bind(1)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let file_name = file.file_name();
|
||||
let file_name_str = match file_name.to_str() {
|
||||
Some(name) => name,
|
||||
None => {
|
||||
eprintln!("Skipping file with non-UTF8 filename: {:?}", file_path);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
println!("Processing file: {}", file_name_str);
|
||||
|
||||
// Check if file already exists in database
|
||||
let exists_query = sqlx::query_as!(
|
||||
FilenameExists,
|
||||
"SELECT EXISTS(SELECT 1 FROM posts p WHERE p.filename = $1) as filename",
|
||||
file_name_str
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
// Skip if file already exists in database
|
||||
if !exists_query.filename.unwrap_or(false) {
|
||||
println!("Importing new file: {}", file_name_str);
|
||||
|
||||
// Process file contents
|
||||
let file_md_contents = process_read_file(&file_path, &options)?;
|
||||
// println!("{:?}", file_md_contents);
|
||||
let content = markdown::to_html(&file_md_contents);
|
||||
// println!("{:?}", content);
|
||||
|
||||
// Extract metadata
|
||||
let metadata = crate::utils::front_matter::YamlFrontMatter::parse::<MarkdownMetadata>(
|
||||
&file_md_contents,
|
||||
)?;
|
||||
// println!("{:?}", metadata);
|
||||
let title = metadata.metadata.title;
|
||||
let content_final = metadata.content;
|
||||
// println!("{:?}", title);
|
||||
|
||||
// Insert into database
|
||||
let results = sqlx::query_as::<_, InsertPosts>(
|
||||
"INSERT INTO posts (title, body, filename, author_id) VALUES ($1, $2, $3, $4) RETURNING title, body, filename, author_id"
|
||||
)
|
||||
.bind(title)
|
||||
.bind(content_final)
|
||||
.bind(file_name_str)
|
||||
.bind(1) // Consider making author_id a parameter
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
println!("{:?}", results);
|
||||
|
||||
println!("Successfully imported: {}", file_name_str);
|
||||
} else {
|
||||
println!("Skipping existing file: {}", file_name_str);
|
||||
}
|
||||
}
|
||||
|
||||
task_log::update(task.task_id, String::from("Completed"), pool)
|
||||
.await
|
||||
.unwrap();
|
||||
// Mark task as completed
|
||||
task_log::update(task.task_id, String::from("Completed"), pool).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_read_file<P: AsRef<path::Path>>(path: P, md_opts: &MarkdownOptions) -> String {
|
||||
let file_contents = fs::read_to_string(path).unwrap();
|
||||
markdown::to_html(file_contents.as_str())
|
||||
fn process_read_file(
|
||||
file_path: &std::path::Path,
|
||||
options: &MarkdownOptions,
|
||||
) -> Result<String, std::io::Error> {
|
||||
let mut file = std::fs::read_to_string(file_path)?;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
struct FilenameExists {
|
||||
filename: String,
|
||||
filename: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
@ -90,7 +127,7 @@ struct MarkdownOptions {
|
||||
options: markdown::Constructs,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct MarkdownMetadata {
|
||||
layout: String,
|
||||
title: String,
|
||||
|
@ -1,6 +1,7 @@
|
||||
// derived from https://github.com/EstebanBorai/yaml-front-matter
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Document<T: DeserializeOwned> {
|
||||
pub metadata: T,
|
||||
pub content: String,
|
||||
@ -12,6 +13,7 @@ impl YamlFrontMatter {
|
||||
markdown: &str,
|
||||
) -> Result<Document<T>, Box<dyn std::error::Error>> {
|
||||
let yaml = YamlFrontMatter::extract(markdown)?;
|
||||
println!("{:?}", yaml);
|
||||
let metadata = serde_yaml::from_str::<T>(yaml.0.as_str())?;
|
||||
|
||||
Ok(Document {
|
||||
|
Reference in New Issue
Block a user