From e45a03056b4d9de89a9f252949d1092867f55f46 Mon Sep 17 00:00:00 2001 From: syr Date: Wed, 17 Jul 2024 22:57:01 +0900 Subject: [PATCH] move files to specified directory --- Cargo.toml | 5 + src/config.rs | 3 +- src/main.rs | 311 +++++++++++++++++++++++++++++++++++--------------- src/rule.rs | 13 +++ 4 files changed, 237 insertions(+), 95 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ad6eac9..38f2f3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] dotenv = "0.15.0" +futures = "0.3.30" reqwest = "0.12.5" rss = { version = "2.0.8", features = ["serde"] } serde = "1.0.203" @@ -13,3 +14,7 @@ serde_yaml = "0.9.34" thiserror = "1.0.61" tokio = { version = "1.38.0", features = ["rt-multi-thread", "macros"] } transmission-rpc = "0.4.2" +url = "2.5.2" + +# trname = { path = "../renamer" } +trname = { git = "https://github.com/syrflover/trname", rev = "65b0d5c" } diff --git a/src/config.rs b/src/config.rs index e6c2d66..1ccf636 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use std::{env, fmt::Debug, str::FromStr}; +use std::{env, fmt::Debug, path::PathBuf, str::FromStr}; use serde::Deserialize; @@ -57,5 +57,6 @@ impl Config { #[derive(Debug, Deserialize)] pub struct ChannelConfig { pub url: String, + pub directory: PathBuf, pub rules: Vec, } diff --git a/src/main.rs b/src/main.rs index 966c1cd..b467145 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,8 @@ -use rss::Channel; +use std::{path::PathBuf, time::Duration}; + +use futures::{stream, StreamExt}; +use rss::{Channel, Item}; +use tokio::time::sleep; use transmission_rpc::{ types::{ Id, SessionSetArgs, Torrent, TorrentAction, TorrentAddArgs, TorrentAddedOrDuplicate, @@ -6,7 +10,12 @@ use transmission_rpc::{ }, TransClient, }; -use transmission_rss::config::{ChannelConfig, Config}; +use transmission_rss::{ + config::{ChannelConfig, Config}, + rule::Rule, +}; +use trname::trname; +use url::Url; #[derive(Debug, thiserror::Error)] pub enum ChannelParseError { @@ -94,6 +103,7 @@ async fn get_torrent( TorrentGetField::HashString, TorrentGetField::Status, TorrentGetField::Labels, + TorrentGetField::FileCount, ]), Some(vec![Id::Hash(hash.to_owned())]), ) @@ -121,10 +131,14 @@ async fn test_get_torrent() { TorrentGetField::Error, TorrentGetField::ErrorString, TorrentGetField::DoneDate, + TorrentGetField::FileStats, + TorrentGetField::FileCount, + TorrentGetField::Files, ]), - Some(vec![Id::Hash( - "457be58a312d7a3881783b014cbf766e370c0598".to_owned(), - )]), + // Some(vec![Id::Hash( + // "457be58a312d7a3881783b014cbf766e370c0598".to_owned(), + // )]), + None, ) .await .unwrap(); @@ -141,11 +155,14 @@ fn has_label(labels: Option<&[String]>, x: &str) -> bool { async fn add_torrent( transmission: &mut TransClient, link: &str, + download_dir: &PathBuf, ) -> transmission_rpc::types::Result { let mut res = transmission .torrent_add(TorrentAddArgs { filename: Some(link.to_owned()), labels: Some(vec![BOT_LABEL.to_owned()]), + paused: Some(false), + download_dir: download_dir.to_str().map(|x| x.to_owned()), ..Default::default() }) .await?; @@ -156,7 +173,11 @@ async fn add_torrent( .await? .unwrap(); } - TorrentAddedOrDuplicate::TorrentAdded(_torrent) => {} + TorrentAddedOrDuplicate::TorrentAdded(torrent) => { + *torrent = get_torrent(transmission, torrent.hash_string.as_deref().unwrap()) + .await? + .unwrap(); + } } Ok(res.arguments) @@ -165,7 +186,7 @@ async fn add_torrent( #[tokio::test] #[ignore] async fn test_add_torrent() { - let link = "magnet:?xt=urn:btih:HTMN5OOCMXUKUQEG3XIXAQYYXGMRMPGG&dn=%5BSubsPlease%5D%20Oshi%20no%20Ko%20-%2012%20%281080p%29%20%5BEC3811BA%5D.mkv&xl=1033669775&tr=http%3A%2F%2Fnyaa.tracker.wf%3A7777%2Fannounce&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337%2Fannounce&tr=udp%3A%2F%2F9.rarbg.to%3A2710%2Fannounce&tr=udp%3A%2F%2F9.rarbg.me%3A2710%2Fannounce&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker.internetwarriors.net%3A1337%2Fannounce&tr=udp%3A%2F%2Ftracker.cyberia.is%3A6969%2Fannounce&tr=udp%3A%2F%2Fexodus.desync.com%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker3.itzmx.com%3A6961%2Fannounce&tr=udp%3A%2F%2Ftracker.torrent.eu.org%3A451%2Fannounce&tr=udp%3A%2F%2Ftracker.tiny-vps.com%3A6969%2Fannounce&tr=udp%3A%2F%2Fretracker.lanta-net.ru%3A2710%2Fannounce&tr=http%3A%2F%2Fopen.acgnxtracker.com%3A80%2Fannounce&tr=wss%3A%2F%2Ftracker.openwebtorrent.com"; + let link = "magnet:?xt=urn:btih:SEFD6A5N67C2CJ3NDJI74AP6CZXTCFSZ&dn=%5BSubsPlease%5D%20Katsute%20Mahou%20Shoujo%20to%20Aku%20wa%20Tekitai%20shiteita%20-%2002%20%281080p%29%20%5BC2A5EFC3%5D.mkv&xl=767183596&tr=http%3A%2F%2Fnyaa.tracker.wf%3A7777%2Fannounce&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337%2Fannounce&tr=udp%3A%2F%2F9.rarbg.to%3A2710%2Fannounce&tr=udp%3A%2F%2F9.rarbg.me%3A2710%2Fannounce&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker.internetwarriors.net%3A1337%2Fannounce&tr=udp%3A%2F%2Ftracker.cyberia.is%3A6969%2Fannounce&tr=udp%3A%2F%2Fexodus.desync.com%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker3.itzmx.com%3A6961%2Fannounce&tr=udp%3A%2F%2Ftracker.torrent.eu.org%3A451%2Fannounce&tr=udp%3A%2F%2Ftracker.tiny-vps.com%3A6969%2Fannounce&tr=udp%3A%2F%2Fretracker.lanta-net.ru%3A2710%2Fannounce&tr=http%3A%2F%2Fopen.acgnxtracker.com%3A80%2Fannounce&tr=wss%3A%2F%2Ftracker.openwebtorrent.com"; let mut transmission = TransClient::new( "http://192.168.1.21:32091/transmission/rpc" @@ -173,11 +194,50 @@ async fn test_add_torrent() { .expect("can't parse transmission url"), ); - let res = add_torrent(&mut transmission, link).await.unwrap(); + let res = add_torrent( + &mut transmission, + link, + &PathBuf::from( + "/downloads/Shows (current)/Katsute Mahou Shoujo to Aku wa Tekitai shiteita/Season 01", + ), + ) + .await + .unwrap(); println!("{res:#?}"); } +async fn rename_torrent( + transmission: &mut TransClient, + hash: &str, + download_dir: &PathBuf, + starts_episode_at: isize, +) -> transmission_rpc::types::Result> { + let Some(torrent) = get_torrent(transmission, hash).await? else { + return Ok(None); + }; + + if torrent.file_count.unwrap() == 1 { + let old_file_name = torrent.name.clone().unwrap(); + + if let Some(new_file_name) = trname(&download_dir, &old_file_name, starts_episode_at) { + let res = transmission + .torrent_rename_path( + vec![Id::Hash(hash.to_owned())], + old_file_name, + new_file_name.clone(), + ) + .await?; + + if res.result == "success" { + return Ok(Some(new_file_name)); + } + } + } + + Ok(None) +} + async fn run() { let config = Config::new(); let channels_config: Vec = serde_yaml::from_slice( @@ -190,12 +250,12 @@ async fn run() { ) .expect("can't deserialize channels configuration"); - let mut transmission = TransClient::new( - config - .transmission_url - .parse() - .expect("can't parse transmission url"), - ); + let transmission_url = config + .transmission_url + .parse::() + .expect("can't parse transmission url"); + + let mut transmission = TransClient::new(transmission_url.clone()); let transmission_config = SessionSetArgs { download_dir: config.download_dir, @@ -217,99 +277,162 @@ async fn run() { .await .expect("can't set transmission configuration"); - let mut channels = Vec::new(); + pub fn collect_items<'a>( + channels: impl Iterator, + ) -> Vec<(PathBuf, &'a Rule, &'a mut Item)> { + let mut items = Vec::new(); + + for (channel, channel_config) in channels { + for item in channel.items_mut() { + let matched = channel_config + .rules + .iter() + .find(|rule| rule.test(item.title().unwrap_or_default())); + + let Some(matched) = matched else { + continue; + }; + + items.push((channel_config.directory.clone(), matched, item)); + + println!("Matched {}", matched.r#match); - for channel_config in &channels_config { - let mut channel = match parse_channel(channel_config).await { - Ok(r) => r, - Err(err) => { - eprintln!("{err}"); - continue; + // return items; } - }; - - println!("Parsed {}", channel.link()); - - for item in channel.items_mut() { - let matched = channel_config - .rules - .iter() - .find(|rule| rule.test(item.title().unwrap_or_default())); - - let Some(matched) = matched else { - continue; - }; - - println!(); - println!("Matched {}", matched.r#match); - - let link = item.link().unwrap_or_default(); - - let torrent = match add_torrent(&mut transmission, link).await { - Ok(r) => match r { - TorrentAddedOrDuplicate::TorrentDuplicate(torrent) => { - let hash = torrent.hash_string.as_deref().unwrap(); - - match torrent.status.unwrap() { - TorrentStatus::QueuedToSeed | TorrentStatus::Seeding - if has_label(torrent.labels.as_deref(), BOT_LABEL) => - { - // pause_torrent - transmission - .torrent_action( - TorrentAction::Stop, - vec![Id::Hash(hash.to_owned())], - ) - .await - .inspect_err(|err| eprintln!("{err}")) - .ok(); // FIXME: error handle - - println!( - "Stopped {} | {}", - torrent.name.as_deref().unwrap(), - torrent.hash_string.as_deref().unwrap() - ); + } + + return items; + } + + let mut channels = stream::iter(channels_config.into_iter()) + .map(|channel_config| async { + ( + parse_channel(&channel_config) + .await + .inspect(|channel| println!("Parsed {}", channel.link())), + channel_config, + ) + }) + .buffered(5) + .collect::>() + .await + .into_iter() + .filter_map(|(res, channel_config)| { + res.inspect_err(|err| println!("{err}")) + .ok() + .map(|channel| (channel, channel_config)) + }) + .collect::>(); + + let matched_items = collect_items(channels.iter_mut()); + + stream::iter(matched_items.into_iter()) + .for_each_concurrent(100, |(base_directory, matched, item)| { + let transmission_url = transmission_url.clone(); + + async move { + println!(); + + let mut transmission = TransClient::new(transmission_url); + + let link = item.link().unwrap_or_default(); + + let torrent = + match add_torrent(&mut transmission, link, &matched.directory(&base_directory)) + .await + { + Ok(r) => match r { + TorrentAddedOrDuplicate::TorrentDuplicate(torrent) => { + let hash = torrent.hash_string.as_deref().unwrap(); + + match torrent.status.unwrap() { + TorrentStatus::QueuedToSeed | TorrentStatus::Seeding + if has_label(torrent.labels.as_deref(), BOT_LABEL) => + { + // pause_torrent + transmission + .torrent_action( + TorrentAction::Stop, + vec![Id::Hash(hash.to_owned())], + ) + .await + .inspect_err(|err| eprintln!("{err}")) + .ok(); // FIXME: error handle + + println!( + "Stopped {} | {}", + torrent.name.as_deref().unwrap(), + torrent.hash_string.as_deref().unwrap() + ); + } + _ => { + println!( + "Already {} | {}", + torrent.name.as_deref().unwrap(), + torrent.hash_string.as_deref().unwrap() + ); + } + } + + torrent } - _ => { - println!( - "Already {} | {}", - torrent.name.as_deref().unwrap(), - torrent.hash_string.as_deref().unwrap() - ); + TorrentAddedOrDuplicate::TorrentAdded(torrent) => { + let hash = torrent.hash_string.as_deref().unwrap(); + let name = torrent.name.as_deref().unwrap(); + + println!("Added {} | {}", name, hash); + + torrent } + }, + Err(err) => { + eprintln!("{err}"); + return; } + }; - torrent - } - TorrentAddedOrDuplicate::TorrentAdded(torrent) => { - println!( - "Added {} | {}", - torrent.name.as_deref().unwrap(), - torrent.hash_string.as_deref().unwrap() - ); - - torrent - } - }, - Err(err) => { - eprintln!("{err}"); - continue; - } - }; + let hash = torrent.hash_string.unwrap(); - // set torrent hash - item.set_description(torrent.hash_string.unwrap()); - } + // rename + { + let mut i = 0; - channels.push(channel); - } + loop { + sleep(Duration::from_secs(1)).await; + + let res = rename_torrent( + &mut transmission, + &hash, + &matched.directory(&base_directory), + matched.starts_episode_at, + ) + .await + .inspect_err(|err| println!("{err}")); + + match res { + Ok(Some(_name)) => break, + _ => { + if i > 15 { + break; + } + i += 1; + } + }; + } + }; + + // set torrent hash + item.set_description(hash); + } + }) + .await; - // remove oldest torrents match get_torrents(&mut transmission).await { Ok(torrents) => { + // remove oldest torrents let items = channels .into_iter() - .flat_map(|channel| channel.items) + .flat_map(|(channel, _)| channel.items) .collect::>(); let oldest_torrents = torrents diff --git a/src/rule.rs b/src/rule.rs index 7614c74..d581291 100644 --- a/src/rule.rs +++ b/src/rule.rs @@ -1,5 +1,11 @@ +use std::path::{Path, PathBuf}; + use serde::Deserialize; +const fn default_starts_episode_at() -> isize { + 1 +} + #[derive(Debug, Deserialize)] pub struct Rule { #[serde(default)] @@ -8,6 +14,9 @@ pub struct Rule { pub case_insensitive: bool, #[serde(rename = "match")] pub r#match: String, + #[serde(rename = "episode", default = "default_starts_episode_at")] + pub starts_episode_at: isize, + pub(crate) directory: PathBuf, } impl Rule { @@ -22,4 +31,8 @@ impl Rule { } } } + + pub fn directory(&self, base: impl AsRef) -> PathBuf { + base.as_ref().join(&self.directory) + } }