stuff happened
This commit is contained in:
1428
backend/task/Cargo.lock
generated
1428
backend/task/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -7,6 +7,7 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.19.2", features = ["full"] }
|
||||
reqwest = { version = "0.12.20", features = ["json", "rustls-tls"] }
|
||||
job_scheduler = "1.2.1"
|
||||
sqlx = { version = "0.8.2", features = [
|
||||
"postgres",
|
||||
@ -20,6 +21,7 @@ futures = "0.3.30"
|
||||
markdown = "1.0.0-alpha.20"
|
||||
serde = { version = "*", features = ["derive"] }
|
||||
serde_yml = "*"
|
||||
aws-sdk-s3 = "1.77.0"
|
||||
aws-sdk-s3 = "1.94.0"
|
||||
aws-config = "1.8"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
|
||||
|
@ -4,4 +4,12 @@ also known as `task`
|
||||
|
||||
## What is this?
|
||||
|
||||
I don't know yet - hopefully this will be filled out soon.
|
||||
This is a task runner/scheduler programs that will fire off various tasks. These tasks can be anything from an blog post import task to a RSS generator task. Additionally, there is task logs inside the database so that you can keep track of tasks when something goes wrong.
|
||||
|
||||
## Things you should know
|
||||
|
||||
`task` uses a `.env` file at the root of the project. The file takes standard environment variables (like enviroment variables you would put into a `.bashrc` or ad-hoc into your shell).
|
||||
|
||||
For `task` to work properly, please make sure to first create the `.env` file, then fill out the following environment variables:
|
||||
|
||||
- `DATABASE_URL` - needed for communicating to Postgres
|
||||
|
@ -3,7 +3,7 @@ use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tasks::import_posts;
|
||||
use tasks::*;
|
||||
|
||||
//mod config;
|
||||
mod tasks;
|
||||
@ -87,14 +87,24 @@ impl<'a> TaskManager<'a> {
|
||||
for job in &results {
|
||||
tracing::info!("Registering job: {}", job.task_name);
|
||||
|
||||
let pool = Arc::new(self.pool.clone());
|
||||
let schedule = job
|
||||
.schedule
|
||||
.parse()
|
||||
.map_err(|e| format!("Failed to parse schedule '{}': {}", job.schedule, e))?;
|
||||
|
||||
let task = match job.task_id {
|
||||
1 => Box::new(move || import_posts::register(&pool)),
|
||||
let task: Box<dyn Fn() + Send + Sync> = match job.task_id {
|
||||
1 => {
|
||||
let pool = Arc::new(self.pool.clone());
|
||||
Box::new(move || import_posts::register(&pool))
|
||||
}
|
||||
2 => {
|
||||
let pool = Arc::new(self.pool.clone());
|
||||
Box::new(move || upload_rss::register(&pool))
|
||||
}
|
||||
3 => {
|
||||
let pool = Arc::new(self.pool.clone());
|
||||
Box::new(move || upload_sitemap::register(&pool))
|
||||
}
|
||||
id => return Err(format!("Unknown task_id: {}", id).into()),
|
||||
};
|
||||
|
||||
|
@ -64,7 +64,6 @@ async fn import_posts(
|
||||
|
||||
// Process file contents
|
||||
let file_md_contents = process_read_file(&file_path)?;
|
||||
// println!("{:?}", file_md_contents);
|
||||
// Extract metadata
|
||||
let document = crate::utils::front_matter::YamlFrontMatter::parse::<MarkdownMetadata>(
|
||||
&file_md_contents,
|
||||
@ -74,10 +73,8 @@ async fn import_posts(
|
||||
markdown::to_html_with_options(&document.content, &markdown::Options::default());
|
||||
println!("{:?}", content);
|
||||
|
||||
// println!("{:?}", document);
|
||||
let title = document.metadata.title;
|
||||
let content_final = content.unwrap();
|
||||
// println!("{:?}", title);
|
||||
|
||||
// Insert into database
|
||||
let results = sqlx::query_as::<_, InsertPosts>(
|
||||
|
@ -1 +1,3 @@
|
||||
pub mod import_posts;
|
||||
pub mod upload_rss;
|
||||
pub mod upload_sitemap;
|
||||
|
40
backend/task/src/tasks/upload_rss.rs
Normal file
40
backend/task/src/tasks/upload_rss.rs
Normal file
@ -0,0 +1,40 @@
|
||||
use sqlx::{Pool, Postgres};
|
||||
|
||||
use crate::utils::{
|
||||
request::{Request, Response},
|
||||
task_log,
|
||||
{upload::S3ClientConfig, *},
|
||||
};
|
||||
|
||||
pub fn register(pool: &sqlx::Pool<sqlx::Postgres>) {
|
||||
let p = pool.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = upload_rss(&p).await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn upload_rss(pool: &sqlx::Pool<sqlx::Postgres>) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// start task logging
|
||||
task_log::start(2, pool).await?;
|
||||
|
||||
// get request and request the things
|
||||
let request = Request::new();
|
||||
let rss_url = format!("{}/posts/rss", request.base_url);
|
||||
let rss_result = request.request_url::<String>(&rss_url).await.unwrap();
|
||||
|
||||
// upload the sucker to obj storage
|
||||
if let Response::Xml(rss) = rss_result {
|
||||
let client_config = S3ClientConfig::from_env().unwrap();
|
||||
let s3_client = upload::create_s3_client(&client_config).await.unwrap();
|
||||
let _ = upload::upload(
|
||||
&s3_client,
|
||||
client_config.bucket.as_str(),
|
||||
"feed.xml",
|
||||
rss.as_str(),
|
||||
)
|
||||
.await;
|
||||
println!("Finished uploading RSS feed");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
40
backend/task/src/tasks/upload_sitemap.rs
Normal file
40
backend/task/src/tasks/upload_sitemap.rs
Normal file
@ -0,0 +1,40 @@
|
||||
use crate::utils::{
|
||||
request::{Request, Response},
|
||||
task_log,
|
||||
{upload::S3ClientConfig, *},
|
||||
};
|
||||
|
||||
pub fn register(pool: &sqlx::Pool<sqlx::Postgres>) {
|
||||
let p = pool.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = upload_sitemap(&p).await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn upload_sitemap(
|
||||
pool: &sqlx::Pool<sqlx::Postgres>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// TODO:: get sitemap and upload it to bucket??
|
||||
task_log::start(3, pool).await?;
|
||||
|
||||
// get request and request the things
|
||||
let request = Request::new();
|
||||
let sitemap_url = format!("{}/posts/sitemap", request.base_url);
|
||||
let sitemap_result = request.request_url::<String>(&sitemap_url).await;
|
||||
|
||||
// upload the sucker to obj storage
|
||||
if let Response::Xml(sitemap) = sitemap_result {
|
||||
let client_config = S3ClientConfig::from_env().unwrap();
|
||||
let s3_client = upload::create_s3_client(&client_config).await.unwrap();
|
||||
let _ = upload::upload(
|
||||
&s3_client,
|
||||
client_config.bucket.as_str(),
|
||||
"sitemap.xml",
|
||||
sitemap.as_str(),
|
||||
)
|
||||
.await;
|
||||
println!("Finished uploading sitemap!");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
@ -21,10 +21,7 @@ impl YamlFrontMatter {
|
||||
markdown: &str,
|
||||
) -> Result<Document, Box<dyn std::error::Error>> {
|
||||
let yaml = YamlFrontMatter::extract(markdown)?;
|
||||
println!("File front matter metadata (raw): {:?}", yaml.0);
|
||||
// println!("File content: {:?}", yaml.1);
|
||||
let clean_yaml = YamlFrontMatter::unescape_str(&yaml.0);
|
||||
println!("File front matter metadata (clean): {:?}", clean_yaml);
|
||||
let metadata = match YamlFrontMatter::from_yaml_str(clean_yaml.as_str()) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
|
@ -1,2 +1,4 @@
|
||||
pub mod front_matter;
|
||||
pub mod request;
|
||||
pub mod task_log;
|
||||
pub mod upload;
|
||||
|
85
backend/task/src/utils/request.rs
Normal file
85
backend/task/src/utils/request.rs
Normal file
@ -0,0 +1,85 @@
|
||||
use reqwest::StatusCode;
|
||||
use std::env;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Request<'a> {
|
||||
pub client: reqwest::Client,
|
||||
pub base_url: Box<str>,
|
||||
pub full_url: Option<&'a str>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Response<T> {
|
||||
Json(T),
|
||||
Xml(String),
|
||||
Text(String),
|
||||
Bytes(Vec<u8>),
|
||||
}
|
||||
|
||||
impl<'a> Request<'a> {
|
||||
pub fn new() -> Self {
|
||||
Request {
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.use_rustls_tls()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.unwrap(),
|
||||
base_url: env::var("BASE_URI_API")
|
||||
.expect("Environment variable BASE_URI_API is not found")
|
||||
.into_boxed_str(),
|
||||
full_url: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn request_url<T>(
|
||||
&self,
|
||||
url: &String,
|
||||
) -> Result<Response<T>, Box<dyn std::error::Error>>
|
||||
where
|
||||
T: for<'de> serde::Deserialize<'de>,
|
||||
{
|
||||
println!("{}", url);
|
||||
let api_result = match self.client.get(url).send().await {
|
||||
Ok(r) => r,
|
||||
Err(e) => return Err(Box::new(e)),
|
||||
};
|
||||
|
||||
match api_result.status() {
|
||||
StatusCode::OK => {
|
||||
// TODO: handle errors here
|
||||
let content_type = api_result
|
||||
.headers()
|
||||
.get("content-type")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap();
|
||||
|
||||
if content_type.contains("application/json") {
|
||||
match api_result.json::<T>().await {
|
||||
Ok(j) => Ok(Response::Json(j)),
|
||||
Err(e) => return Err(Box::new(e)),
|
||||
}
|
||||
} else if content_type.contains("application/xml") {
|
||||
match api_result.text().await {
|
||||
Ok(x) => Ok(Response::Xml(x)),
|
||||
Err(e) => return Err(Box::new(e)),
|
||||
}
|
||||
} else if content_type.starts_with("text/") {
|
||||
match api_result.text().await {
|
||||
Ok(t) => Ok(Response::Text(t)),
|
||||
Err(e) => return Err(Box::new(e)),
|
||||
}
|
||||
} else {
|
||||
match api_result.bytes().await {
|
||||
Ok(b) => Ok(Response::Bytes(b.to_vec())),
|
||||
Err(e) => Err(Box::new(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
status => Err(Box::new(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Unexpected status code: {}", status),
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
73
backend/task/src/utils/upload.rs
Normal file
73
backend/task/src/utils/upload.rs
Normal file
@ -0,0 +1,73 @@
|
||||
use aws_config::{BehaviorVersion, Region};
|
||||
use aws_sdk_s3::{config::Credentials, Client, Config};
|
||||
use std::env;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct S3ClientConfig {
|
||||
pub access_key: String,
|
||||
secret_key: String,
|
||||
endpoint: String,
|
||||
pub bucket: String,
|
||||
region: String,
|
||||
}
|
||||
|
||||
impl S3ClientConfig {
|
||||
pub fn from_env() -> Result<Self, Box<dyn std::error::Error>> {
|
||||
Ok(S3ClientConfig {
|
||||
access_key: env::var("LINODE_ACCESS_KEY")
|
||||
.map_err(|_| "LINODE_ACCESS_KEY environment variable not set")?,
|
||||
secret_key: env::var("LINODE_SECRET_KEY")
|
||||
.map_err(|_| "LINODE_SECRET_KEY environment variable not set")?,
|
||||
endpoint: env::var("LINODE_ENDPOINT")
|
||||
.unwrap_or_else(|_| "us-ord-1.linodeobjects.com".to_string()),
|
||||
bucket: env::var("LINODE_BUCKET")
|
||||
.map_err(|_| "LINODE_BUCKET environment variable not set")?,
|
||||
region: env::var("LINODE_REGION").unwrap_or_else(|_| "us-ord".to_string()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_s3_client(
|
||||
config: &S3ClientConfig,
|
||||
) -> Result<Client, Box<dyn std::error::Error>> {
|
||||
let credentials = Credentials::new(
|
||||
&config.access_key,
|
||||
&config.secret_key,
|
||||
None,
|
||||
None,
|
||||
"linode-object-storage",
|
||||
);
|
||||
|
||||
let s3_config = Config::builder()
|
||||
.behavior_version(BehaviorVersion::latest())
|
||||
.region(Region::new(config.region.clone()))
|
||||
.endpoint_url(format!("https://{}", config.endpoint))
|
||||
.credentials_provider(credentials)
|
||||
.force_path_style(false)
|
||||
.build();
|
||||
|
||||
Ok(Client::from_conf(s3_config))
|
||||
}
|
||||
|
||||
pub async fn upload(
|
||||
client: &Client,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
content: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("Uploading to Linode Object Storage...");
|
||||
println!("Bucket: {}", bucket);
|
||||
|
||||
let put_object_req = client
|
||||
.put_object()
|
||||
.bucket(bucket)
|
||||
.key(key)
|
||||
.body(content.as_bytes().to_vec().into())
|
||||
.acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
|
||||
.content_type("application/rss+xml")
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
println!("Upload successful! ETag: {:?}", put_object_req.e_tag());
|
||||
Ok(())
|
||||
}
|
Reference in New Issue
Block a user