wip: impl scheduler, import posts task
This commit is contained in:
parent
4cb3983a9b
commit
80081b75d5
1
backend/task/.env
Normal file
1
backend/task/.env
Normal file
@ -0,0 +1 @@
|
||||
DATABASE_URL=postgres://wyatt:wyattisawesome@localhost:5432/postgres
|
@ -1,14 +1,16 @@
|
||||
use chrono::prelude::*;
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use sqlx::{database, PgPool};
|
||||
use chrono::{prelude::*, Duration};
|
||||
// use once_cell::sync::Lazy;
|
||||
use sqlx::{postgres::PgPoolOptions, PgPool, Pool, Postgres};
|
||||
use std::env;
|
||||
|
||||
mod tasks;
|
||||
|
||||
pub struct TaskServer {
|
||||
jobs: Vec<Job>,
|
||||
pub struct TaskManager<'a> {
|
||||
pool: Pool<Postgres>,
|
||||
jobs: Vec<TaskJob>,
|
||||
last_activated: Option<chrono::DateTime<Utc>>,
|
||||
last_job: Option<Job>,
|
||||
last_job: Option<TaskJob>,
|
||||
scheduler: job_scheduler::JobScheduler<'a>,
|
||||
}
|
||||
|
||||
pub struct TaskLog {
|
||||
@ -24,39 +26,69 @@ enum TaskStatus {
|
||||
Failed,
|
||||
}
|
||||
|
||||
pub struct Job {
|
||||
pub task_id: u8,
|
||||
pub task_name: u8,
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
pub struct TaskJob {
|
||||
pub task_id: i32,
|
||||
pub task_name: String,
|
||||
pub schedule: String,
|
||||
pub created_at: String,
|
||||
pub deleted_at: Option<String>,
|
||||
}
|
||||
|
||||
pub static G_DB: Lazy<DatabaseConfig> = Lazy::new(|| DatabaseConfig {
|
||||
pool: None,
|
||||
database_type: Some("postgres".to_string()),
|
||||
});
|
||||
|
||||
pub struct DatabaseConfig {
|
||||
pool: Option<PgPool>,
|
||||
pub database_type: Option<String>,
|
||||
pub is_active: bool,
|
||||
pub created_at: chrono::DateTime<Utc>,
|
||||
pub deleted_at: Option<chrono::DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
println!("Hello, world!");
|
||||
|
||||
dotenvy::dotenv().unwrap();
|
||||
let database_url =
|
||||
env::var("DATABASE_URL").expect("Environment variable DATABASE_URL is not found");
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(10)
|
||||
.connect(&database_url)
|
||||
.await
|
||||
.expect("Failed to connect to the database");
|
||||
|
||||
let mut manager = TaskManager::new(pool);
|
||||
manager.register_jobs().await;
|
||||
|
||||
loop {
|
||||
manager.scheduler.tick();
|
||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskServer {
|
||||
fn new() -> Self {
|
||||
TaskServer {
|
||||
impl<'a> TaskManager<'a> {
|
||||
fn new(pool: Pool<Postgres>) -> Self {
|
||||
TaskManager {
|
||||
pool,
|
||||
jobs: Vec::new(),
|
||||
last_activated: None,
|
||||
last_job: None,
|
||||
scheduler: job_scheduler::JobScheduler::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn register_jobs() -> Vec<Job> {
|
||||
Vec::new()
|
||||
pub async fn register_jobs(&self) {
|
||||
// let jobs: Vec<Job> = Vec::new();
|
||||
let results = sqlx::query_as::<_, TaskJob>("SELECT task_id, task_name, schedule, is_active, created_at, deleted_at FROM tasks WHERE is_active = true AND deleted_at IS NULL")
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut scheduler = job_scheduler::JobScheduler::new();
|
||||
results
|
||||
.iter()
|
||||
.map(|j| {
|
||||
scheduler.add(job_scheduler::Job::new(j.schedule.parse().unwrap(), || {
|
||||
println!("Starting task name: {:?}", j.task_name);
|
||||
|
||||
match j.task_id {
|
||||
1 => tasks::import_posts::import_posts("/app", &self.pool),
|
||||
_ => panic!(),
|
||||
}
|
||||
}));
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
}
|
||||
}
|
||||
|
42
backend/task/src/tasks/import_posts.rs
Normal file
42
backend/task/src/tasks/import_posts.rs
Normal file
@ -0,0 +1,42 @@
|
||||
pub async fn import_posts(dir_path: &str, pool: &sqlx::Pool<sqlx::Postgres>) {
|
||||
println!("hello from import_posts");
|
||||
let entries = std::fs::read_dir(dir_path).unwrap();
|
||||
|
||||
for f in entries {
|
||||
let file = f.unwrap();
|
||||
let file_path = file.path();
|
||||
if file_path.is_file() {
|
||||
let file_name = file.file_name().to_str().unwrap();
|
||||
let exists = sqlx::query_as::<_, FilenameExists>(
|
||||
"SELECT EXISTS(SELECT 1 FROM posts WHERE filename = $1)",
|
||||
)
|
||||
.bind(file_name.clone())
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.filename;
|
||||
|
||||
if !exists.is_empty() {
|
||||
let file_md_contents = std::fs::read_to_string(file).unwrap().as_str();
|
||||
let content = markdown::to_html(file_md_contents);
|
||||
|
||||
sqlx::query_as::<_, InsertPosts>("INSERT INTO posts (title, body) VALUES ($1, $2)")
|
||||
.bind(file_name.clone())
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
struct FilenameExists {
|
||||
filename: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
struct InsertPosts {
|
||||
title: String,
|
||||
body: String,
|
||||
}
|
@ -1 +1 @@
|
||||
pub mod register;
|
||||
pub mod import_posts;
|
||||
|
@ -1,2 +0,0 @@
|
||||
pub struct Register;
|
||||
impl Register {}
|
Loading…
Reference in New Issue
Block a user