From f3264339ab82a3816c0aaa8c1af1fbff17df33f3 Mon Sep 17 00:00:00 2001 From: Oleg Yurchik Date: Tue, 10 Jun 2025 18:17:12 +0300 Subject: [PATCH] v0.2.0 --- Cargo.lock | 83 +++++++++++++++++++++++++++ Cargo.toml | 6 +- docker-compose.yaml | 8 +++ src/main.rs | 133 +++++++++++++++++++++++++------------------- src/settings.rs | 10 +++- 5 files changed, 180 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a725402..37c3ef6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -400,10 +400,14 @@ dependencies = [ "config", "csv", "deadpool-postgres", + "flume", "futures", "hex", "indicatif", "log", + "num-bigint", + "num-traits", + "num_cpus", "rand 0.8.5", "reqwest", "secp256k1 0.31.0", @@ -569,6 +573,19 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1139,6 +1156,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.16", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -1166,12 +1192,40 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.17.0" @@ -1376,6 +1430,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1837,6 +1911,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 5442b3f..587719a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,15 +1,19 @@ [package] name = "cryptohunter" -version = "0.1.7" +version = "0.2.0" edition = "2024" [dependencies] futures = "0.3.31" +num_cpus = "1.17.0" +num-bigint = "0.4.6" +num-traits = "0.2.19" bytes = "1.0" clap = { version = "4.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] } config = { version = "0.13", features = ["yaml", "json"] } async-channel = "2.1" +flume = "0.10" rand = "0.8" bitcoin = { version = "0.30", features = ["rand"] } secp256k1 = { version = "0.31.0", features = ["rand"] } diff --git a/docker-compose.yaml b/docker-compose.yaml index e45f393..817e3d8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -10,6 +10,14 @@ services: database: condition: service_healthy command: search bitcoin run + deploy: + resources: + limits: + cpus: "10.0" + memory: 512M + reservations: + cpus: "10.0" + memory: 128M database: image: postgres:16-alpine diff --git a/src/main.rs b/src/main.rs index 91c6c28..ebde1ff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,15 +8,13 @@ mod utils; use async_channel; use clap::Parser; use cli::Cli; +use flume; +use num_bigint::BigUint; +use num_traits::One; use std::collections::HashMap; -use std::thread; -use tokio::sync::broadcast; +use std::time::{Duration, Instant}; -use crate::blockchains::{ - create_snapshot_loader, - create_wallet_checker, - Blockchain, -}; +use crate::blockchains::{create_snapshot_loader, create_wallet_checker, Blockchain}; use crate::key_generators::{create_key_generator, KeyAlgorithm}; use crate::notification::send_telegram_message; use crate::settings::Settings; @@ -43,11 +41,9 @@ async fn main() -> Result<(), DynError> { } } } - cli::Commands::Search { blockchain, command } => { - match command { - cli::SearchSubcommand::Run => run_search(blockchain.as_ref(), &settings).await?, - } - } + cli::Commands::Search { blockchain, command } => match command { + cli::SearchSubcommand::Run => run_search(blockchain.as_ref(), &settings).await?, + }, } Ok(()) @@ -57,27 +53,49 @@ async fn run_search( blockchain: Option<&Blockchain>, settings: &Settings, ) -> Result<(), DynError> { - let mut key_senders: HashMap> = HashMap::new(); - for algorithm in settings.key_generators.keys() { - let (sender, _) = broadcast::channel(1_000_000); - key_senders.insert(algorithm.clone(), sender); + // 1. Создаем систему каналов с использованием flume + let mut key_channels = HashMap::new(); + + for (algorithm, key_generator_settings) in &settings.key_generators { + let (tx, rx) = flume::bounded(key_generator_settings.buffer_size); + key_channels.insert(algorithm.clone(), (tx, rx)); } let mut key_gen_handles = vec![]; - for (algorithm, algo_settings) in &settings.key_generators { - let sender = key_senders.get(algorithm).unwrap().clone(); + for (algorithm, key_generator_settings) in &settings.key_generators { + let (key_sender, _) = key_channels.get(algorithm).unwrap(); - for _ in 0..algo_settings.workers { - let algo_settings = algo_settings.clone(); + for worker_index in 0..key_generator_settings.workers { + let key_generator_settings = key_generator_settings.clone(); let algorithm = algorithm.clone(); - let sender = sender.clone(); // Клонируем sender для каждого потока - let handle = thread::spawn(move || { - let key_generator = create_key_generator(&algorithm, &algo_settings.data) + let key_sender = key_sender.clone(); + + let handle = tokio::spawn(async move { + let key_generator = create_key_generator(&algorithm, &key_generator_settings.data) .expect("Failed to create key generator"); + + let mut start_time = Instant::now(); + let mut generated_keys_count = BigUint::ZERO; loop { let key_hex = key_generator.generate_key_hex(); - if let Err(e) = sender.send(key_hex) { - log::error!("Key generator for {:?} failed to send: {}", algorithm, e); + // Отправляем ключ с ожиданием свободного места в буфере + match key_sender.send_async(key_hex).await { + Ok(_) => { + // Разрешение освобождается при обработке ключа воркером + } + Err(e) => { + log::error!("Key generator failed to send: {}", e); + break; + } + } + + generated_keys_count += BigUint::one(); + if start_time.elapsed() > Duration::from_secs(30) { + log::info!( + "[{}][{}] Keys generated: {}", + &algorithm, &worker_index, &generated_keys_count, + ); + start_time = Instant::now(); } } }); @@ -87,55 +105,56 @@ async fn run_search( let (notify_sender, notify_receiver) = async_channel::unbounded::(); + // 4. Обработчики кошельков с гарантией обработки let mut wallet_checker_handles = vec![]; for (blockchain, blockchain_settings) in &settings.blockchains { - let wallet_checker = create_wallet_checker(blockchain, &blockchain_settings.data)?; + let wallet_checker = create_wallet_checker(&blockchain, &blockchain_settings.data)?; let key_algorithm = wallet_checker.get_key_algorithm(); - let key_sender = key_senders.get(&key_algorithm).ok_or(format!( - "No key sender for algorithm {:?} (blockchain {:?})", + + // Получаем приемник для нужного алгоритма + let (_, key_receiver) = key_channels.get(&key_algorithm).ok_or(format!( + "No key channel for algorithm {} (blockchain {})", key_algorithm, blockchain ))?; - let key_receiver = key_sender.subscribe(); - for _ in 0..blockchain_settings.workers { - let mut key_receiver = key_receiver.resubscribe(); - let wallet_checker = create_wallet_checker(blockchain, &blockchain_settings.data)?; + for worker_index in 0..blockchain_settings.workers { + let blockchain = blockchain.clone(); + let blockchain_settings = blockchain_settings.clone(); + let key_receiver = key_receiver.clone(); + let wallet_checker = create_wallet_checker(&blockchain, &blockchain_settings.data)?; let notify_sender = notify_sender.clone(); let handle = tokio::spawn(async move { - loop { - match key_receiver.recv().await { - Ok(key_hex) => { - match wallet_checker.get_wallet_info(&key_hex).await { - Ok(Some(info)) => { - if let Err(e) = notify_sender.send(info).await { - log::error!("Failed to send notification: {}", e); - } - } - Ok(None) => {} - Err(e) => { - log::error!( - "Error getting wallet info: {}", - e - ); - } + let mut start_time = Instant::now(); + let mut checked_wallets_count = BigUint::ZERO; + while let Ok(key_hex) = key_receiver.recv_async().await { + match wallet_checker.get_wallet_info(&key_hex).await { + Ok(Some(info)) => { + if let Err(e) = notify_sender.send(info).await { + log::error!("Failed to send notification: {}", e); } } - Err(broadcast::error::RecvError::Closed) => { - log::info!("Key channel closed"); - break; - } - Err(broadcast::error::RecvError::Lagged(skipped)) => { - log::warn!("Skipped {} keys", skipped); + Ok(None) => {} + Err(e) => { + log::error!("Error getting wallet info: {}", e); } } + + checked_wallets_count += BigUint::one(); + if start_time.elapsed() > Duration::from_secs(30) { + log::info!( + "[{}][{}] Wallets checked: {}", + &blockchain, &worker_index, &checked_wallets_count, + ); + start_time = Instant::now(); + } } }); wallet_checker_handles.push(handle); } } - // Клонируем необходимые данные для нотификации + // 5. Система нотификаций let bot_token = settings.notifications.telegram_bot_token.clone(); let user_id = settings.notifications.telegram_user_id.clone(); @@ -147,8 +166,8 @@ async fn run_search( } }); - // Ждем сигнала Ctrl+C для завершения + // 6. Управление завершением tokio::signal::ctrl_c().await?; log::info!("Shutting down"); Ok(()) -} \ No newline at end of file +} diff --git a/src/settings.rs b/src/settings.rs index bfdca83..38f170a 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,7 +1,7 @@ use config::{Config, File, Value}; +use num_cpus; use serde::Deserialize; use std::collections::HashMap; -use std::thread::available_parallelism; use crate::blockchains::Blockchain; use crate::key_generators::KeyAlgorithm; @@ -18,10 +18,16 @@ pub struct Settings { pub struct KeyGeneratorSettings { #[serde(default = "default_workers")] pub workers: usize, + #[serde(default = "default_buffer_size")] + pub buffer_size: usize, #[serde(flatten)] pub data: HashMap, } +fn default_buffer_size() -> usize { + 10_000 +} + #[derive(Debug, Deserialize, Clone)] pub struct BlockchainSettings { #[serde(default = "default_workers")] @@ -32,7 +38,7 @@ pub struct BlockchainSettings { } fn default_workers() -> usize { - available_parallelism().unwrap().get() + num_cpus::get() } #[derive(Debug, Deserialize, Clone)]