Compare commits

...

1 Commits

Author SHA1 Message Date
f0c2f6f3e3 wip: task scheduler works, import task does not work
lol
2025-03-23 12:15:35 -04:00
6 changed files with 279 additions and 74 deletions

View File

@ -1 +1 @@
DATABASE_URL=postgres://wyatt:wyattisawesome@localhost:5432/postgres
DATABASE_URL=postgres://wyatt:wyattisawesome@192.168.100.253:5432/postgres

171
backend/task/Cargo.lock generated
View File

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "addr2line"
@ -29,6 +29,15 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [
"memchr",
]
[[package]]
name = "allocator-api2"
version = "0.2.18"
@ -1262,6 +1271,15 @@ dependencies = [
"unicode-id",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "md-5"
version = "0.10.6"
@ -1324,6 +1342,16 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.4"
@ -1404,6 +1432,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "p256"
version = "0.11.1"
@ -1586,12 +1620,56 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
name = "regex-automata"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-lite"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rfc6979"
version = "0.3.1"
@ -1903,6 +1981,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
@ -2249,6 +2336,8 @@ dependencies = [
"serde_yaml",
"sqlx",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
@ -2284,6 +2373,16 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]]
name = "time"
version = "0.3.37"
@ -2400,9 +2499,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
version = "0.1.40"
version = "0.1.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
dependencies = [
"log",
"pin-project-lite",
@ -2412,9 +2511,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.27"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
dependencies = [
"proc-macro2",
"quote",
@ -2423,11 +2522,41 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.32"
version = "0.1.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@ -2519,6 +2648,12 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587"
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -2632,6 +2767,28 @@ dependencies = [
"wasite",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"

View File

@ -18,6 +18,8 @@ once_cell = "1.19.0"
dotenvy = "0.15.7"
futures = "0.3.30"
markdown = "1.0.0-alpha.20"
serde = {version = "*", features = ["derive"]}
serde = { version = "*", features = ["derive"] }
serde_yaml = "*"
aws-sdk-s3 = "1.77.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

View File

@ -58,7 +58,7 @@ async fn main() {
.expect("Failed to connect to the database");
let mut manager = TaskManager::new(pool);
manager.register_jobs().await;
manager.register_jobs().await.unwrap();
loop {
manager.scheduler.tick();
@ -77,23 +77,30 @@ impl<'a> TaskManager<'a> {
}
}
pub async fn register_jobs(&self) {
// let jobs: Vec<Job> = Vec::new();
pub async fn register_jobs(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let results = sqlx::query_as::<_, TaskJob>("SELECT task_id, task_name, schedule, is_active, created_at, deleted_at FROM tasks WHERE is_active = true AND deleted_at IS NULL")
.fetch_all(&self.pool)
.await
.unwrap();
.fetch_all(&self.pool)
.await?;
let mut scheduler = job_scheduler::JobScheduler::new();
results.iter().for_each(|r| {
println!("Registering job: {:?}", r.task_name);
tracing::info!("Found {} active jobs to register", results.len());
let job: _ = job_scheduler::Job::new(r.schedule.parse().unwrap(), || match r.task_id {
1 => import_posts::register(&Arc::new(&self.pool)),
_ => panic!(),
});
for job in &results {
tracing::info!("Registering job: {}", job.task_name);
scheduler.add(job);
});
let pool = Arc::new(self.pool.clone());
let schedule = job
.schedule
.parse()
.map_err(|e| format!("Failed to parse schedule '{}': {}", job.schedule, e))?;
let task = match job.task_id {
1 => Box::new(move || import_posts::register(&pool)),
id => return Err(format!("Unknown task_id: {}", id).into()),
};
self.scheduler.add(job_scheduler::Job::new(schedule, task));
}
Ok(())
}
}

View File

@ -1,5 +1,5 @@
use std::fs;
use std::path;
use std::io::Read;
use crate::utils::task_log;
use serde::{Deserialize, Deserializer};
@ -7,75 +7,112 @@ use serde::{Deserialize, Deserializer};
pub fn register(pool: &sqlx::Pool<sqlx::Postgres>) {
let p = pool.clone();
tokio::spawn(async move {
import_posts("/app", &p).await;
let _ = import_posts("app/", &p).await;
});
}
async fn import_posts(dir_path: &str, pool: &sqlx::Pool<sqlx::Postgres>) {
println!("hello from import_posts");
let task = task_log::start(1, pool).await.unwrap();
let entries = fs::read_dir(dir_path).unwrap();
async fn import_posts(
dir_path: &str,
pool: &sqlx::Pool<sqlx::Postgres>,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Beginning post import process");
// Start task logging
let task = task_log::start(1, pool).await?;
// Setup markdown options
let options = MarkdownOptions {
options: markdown::Constructs::gfm(),
};
for f in entries {
let file = f.unwrap();
// Read directory contents
let entries = fs::read_dir(dir_path)?;
// Process each file
for entry_result in entries {
let file = entry_result?;
let file_path = file.path();
if file_path.is_file() {
let file_name = file.file_name();
let file_name_final = &file_name.to_str().unwrap();
let exists = sqlx::query_as::<_, FilenameExists>(
"SELECT EXISTS(SELECT 1 FROM posts WHERE filename = $1)",
)
.bind(file_name_final)
.fetch_one(pool)
.await
.unwrap()
.filename;
if !exists.is_empty() {
println!(
"File does not exist! Inserting: {:?}",
file_path.file_name()
);
let file_md_contents = process_read_file(file_path, &options);
let content = markdown::to_html(&file_md_contents);
let metadata =
crate::utils::front_matter::YamlFrontMatter::parse::<MarkdownMetadata>(
&content,
)
.unwrap();
let title = metadata.metadata.title;
// Skip non-file entries
if !file_path.is_file() {
continue;
}
sqlx::query_as::<_, InsertPosts>(
"INSERT INTO posts (title, body, filename, author_id) VALUES ($1, $2, $3, $4) RETURNING (title, body, filename, author_id)",
)
.bind(title)
.bind(content)
.bind(file_name_final)
.bind(1)
.fetch_one(pool)
.await
.unwrap();
let file_name = file.file_name();
let file_name_str = match file_name.to_str() {
Some(name) => name,
None => {
eprintln!("Skipping file with non-UTF8 filename: {:?}", file_path);
continue;
}
};
println!("Processing file: {}", file_name_str);
// Check if file already exists in database
let exists_query = sqlx::query_as!(
FilenameExists,
"SELECT EXISTS(SELECT 1 FROM posts p WHERE p.filename = $1) as filename",
file_name_str
)
.fetch_one(pool)
.await?;
// Skip if file already exists in database
if !exists_query.filename.unwrap_or(false) {
println!("Importing new file: {}", file_name_str);
// Process file contents
let file_md_contents = process_read_file(&file_path, &options)?;
// println!("{:?}", file_md_contents);
let content = markdown::to_html(&file_md_contents);
// println!("{:?}", content);
// Extract metadata
let metadata = crate::utils::front_matter::YamlFrontMatter::parse::<MarkdownMetadata>(
&file_md_contents,
)?;
// println!("{:?}", metadata);
let title = metadata.metadata.title;
let content_final = metadata.content;
// println!("{:?}", title);
// Insert into database
let results = sqlx::query_as::<_, InsertPosts>(
"INSERT INTO posts (title, body, filename, author_id) VALUES ($1, $2, $3, $4) RETURNING title, body, filename, author_id"
)
.bind(title)
.bind(content_final)
.bind(file_name_str)
.bind(1) // Consider making author_id a parameter
.fetch_one(pool)
.await?;
println!("{:?}", results);
println!("Successfully imported: {}", file_name_str);
} else {
println!("Skipping existing file: {}", file_name_str);
}
}
task_log::update(task.task_id, String::from("Completed"), pool)
.await
.unwrap();
// Mark task as completed
task_log::update(task.task_id, String::from("Completed"), pool).await?;
Ok(())
}
fn process_read_file<P: AsRef<path::Path>>(path: P, md_opts: &MarkdownOptions) -> String {
let file_contents = fs::read_to_string(path).unwrap();
markdown::to_html(file_contents.as_str())
fn process_read_file(
file_path: &std::path::Path,
options: &MarkdownOptions,
) -> Result<String, std::io::Error> {
let mut file = std::fs::read_to_string(file_path)?;
Ok(file)
}
#[derive(Debug, sqlx::FromRow)]
struct FilenameExists {
filename: String,
filename: Option<bool>,
}
#[derive(Debug, sqlx::FromRow)]
@ -90,7 +127,7 @@ struct MarkdownOptions {
options: markdown::Constructs,
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
struct MarkdownMetadata {
layout: String,
title: String,

View File

@ -1,6 +1,7 @@
// derived from https://github.com/EstebanBorai/yaml-front-matter
use serde::de::DeserializeOwned;
#[derive(Debug)]
pub struct Document<T: DeserializeOwned> {
pub metadata: T,
pub content: String,
@ -12,6 +13,7 @@ impl YamlFrontMatter {
markdown: &str,
) -> Result<Document<T>, Box<dyn std::error::Error>> {
let yaml = YamlFrontMatter::extract(markdown)?;
println!("{:?}", yaml);
let metadata = serde_yaml::from_str::<T>(yaml.0.as_str())?;
Ok(Document {