This commit is contained in:
2025-06-10 18:17:12 +03:00
parent 74b1cc6e6f
commit f3264339ab
5 changed files with 180 additions and 60 deletions

83
Cargo.lock generated
View File

@@ -400,10 +400,14 @@ dependencies = [
"config", "config",
"csv", "csv",
"deadpool-postgres", "deadpool-postgres",
"flume",
"futures", "futures",
"hex", "hex",
"indicatif", "indicatif",
"log", "log",
"num-bigint",
"num-traits",
"num_cpus",
"rand 0.8.5", "rand 0.8.5",
"reqwest", "reqwest",
"secp256k1 0.31.0", "secp256k1 0.31.0",
@@ -569,6 +573,19 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@@ -1139,6 +1156,15 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom 0.2.16",
]
[[package]] [[package]]
name = "native-tls" name = "native-tls"
version = "0.2.14" version = "0.2.14"
@@ -1166,12 +1192,40 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "num-bigint"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
dependencies = [
"num-integer",
"num-traits",
]
[[package]] [[package]]
name = "num-conv" name = "num-conv"
version = "0.1.0" version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-integer"
version = "0.1.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
dependencies = [
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "num_cpus" name = "num_cpus"
version = "1.17.0" version = "1.17.0"
@@ -1376,6 +1430,26 @@ dependencies = [
"siphasher", "siphasher",
] ]
[[package]]
name = "pin-project"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.16" version = "0.2.16"
@@ -1837,6 +1911,15 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]] [[package]]
name = "stable_deref_trait" name = "stable_deref_trait"
version = "1.2.0" version = "1.2.0"

View File

@@ -1,15 +1,19 @@
[package] [package]
name = "cryptohunter" name = "cryptohunter"
version = "0.1.7" version = "0.2.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
futures = "0.3.31" futures = "0.3.31"
num_cpus = "1.17.0"
num-bigint = "0.4.6"
num-traits = "0.2.19"
bytes = "1.0" bytes = "1.0"
clap = { version = "4.0", features = ["derive"] } clap = { version = "4.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
config = { version = "0.13", features = ["yaml", "json"] } config = { version = "0.13", features = ["yaml", "json"] }
async-channel = "2.1" async-channel = "2.1"
flume = "0.10"
rand = "0.8" rand = "0.8"
bitcoin = { version = "0.30", features = ["rand"] } bitcoin = { version = "0.30", features = ["rand"] }
secp256k1 = { version = "0.31.0", features = ["rand"] } secp256k1 = { version = "0.31.0", features = ["rand"] }

View File

@@ -10,6 +10,14 @@ services:
database: database:
condition: service_healthy condition: service_healthy
command: search bitcoin run command: search bitcoin run
deploy:
resources:
limits:
cpus: "10.0"
memory: 512M
reservations:
cpus: "10.0"
memory: 128M
database: database:
image: postgres:16-alpine image: postgres:16-alpine

View File

@@ -8,15 +8,13 @@ mod utils;
use async_channel; use async_channel;
use clap::Parser; use clap::Parser;
use cli::Cli; use cli::Cli;
use flume;
use num_bigint::BigUint;
use num_traits::One;
use std::collections::HashMap; use std::collections::HashMap;
use std::thread; use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use crate::blockchains::{ use crate::blockchains::{create_snapshot_loader, create_wallet_checker, Blockchain};
create_snapshot_loader,
create_wallet_checker,
Blockchain,
};
use crate::key_generators::{create_key_generator, KeyAlgorithm}; use crate::key_generators::{create_key_generator, KeyAlgorithm};
use crate::notification::send_telegram_message; use crate::notification::send_telegram_message;
use crate::settings::Settings; use crate::settings::Settings;
@@ -43,11 +41,9 @@ async fn main() -> Result<(), DynError> {
} }
} }
} }
cli::Commands::Search { blockchain, command } => { cli::Commands::Search { blockchain, command } => match command {
match command {
cli::SearchSubcommand::Run => run_search(blockchain.as_ref(), &settings).await?, cli::SearchSubcommand::Run => run_search(blockchain.as_ref(), &settings).await?,
} },
}
} }
Ok(()) Ok(())
@@ -57,27 +53,49 @@ async fn run_search(
blockchain: Option<&Blockchain>, blockchain: Option<&Blockchain>,
settings: &Settings, settings: &Settings,
) -> Result<(), DynError> { ) -> Result<(), DynError> {
let mut key_senders: HashMap<KeyAlgorithm, broadcast::Sender<String>> = HashMap::new(); // 1. Создаем систему каналов с использованием flume
for algorithm in settings.key_generators.keys() { let mut key_channels = HashMap::new();
let (sender, _) = broadcast::channel(1_000_000);
key_senders.insert(algorithm.clone(), sender); for (algorithm, key_generator_settings) in &settings.key_generators {
let (tx, rx) = flume::bounded(key_generator_settings.buffer_size);
key_channels.insert(algorithm.clone(), (tx, rx));
} }
let mut key_gen_handles = vec![]; let mut key_gen_handles = vec![];
for (algorithm, algo_settings) in &settings.key_generators { for (algorithm, key_generator_settings) in &settings.key_generators {
let sender = key_senders.get(algorithm).unwrap().clone(); let (key_sender, _) = key_channels.get(algorithm).unwrap();
for _ in 0..algo_settings.workers { for worker_index in 0..key_generator_settings.workers {
let algo_settings = algo_settings.clone(); let key_generator_settings = key_generator_settings.clone();
let algorithm = algorithm.clone(); let algorithm = algorithm.clone();
let sender = sender.clone(); // Клонируем sender для каждого потока let key_sender = key_sender.clone();
let handle = thread::spawn(move || {
let key_generator = create_key_generator(&algorithm, &algo_settings.data) let handle = tokio::spawn(async move {
let key_generator = create_key_generator(&algorithm, &key_generator_settings.data)
.expect("Failed to create key generator"); .expect("Failed to create key generator");
let mut start_time = Instant::now();
let mut generated_keys_count = BigUint::ZERO;
loop { loop {
let key_hex = key_generator.generate_key_hex(); let key_hex = key_generator.generate_key_hex();
if let Err(e) = sender.send(key_hex) { // Отправляем ключ с ожиданием свободного места в буфере
log::error!("Key generator for {:?} failed to send: {}", algorithm, e); match key_sender.send_async(key_hex).await {
Ok(_) => {
// Разрешение освобождается при обработке ключа воркером
}
Err(e) => {
log::error!("Key generator failed to send: {}", e);
break;
}
}
generated_keys_count += BigUint::one();
if start_time.elapsed() > Duration::from_secs(30) {
log::info!(
"[{}][{}] Keys generated: {}",
&algorithm, &worker_index, &generated_keys_count,
);
start_time = Instant::now();
} }
} }
}); });
@@ -87,25 +105,29 @@ async fn run_search(
let (notify_sender, notify_receiver) = async_channel::unbounded::<String>(); let (notify_sender, notify_receiver) = async_channel::unbounded::<String>();
// 4. Обработчики кошельков с гарантией обработки
let mut wallet_checker_handles = vec![]; let mut wallet_checker_handles = vec![];
for (blockchain, blockchain_settings) in &settings.blockchains { for (blockchain, blockchain_settings) in &settings.blockchains {
let wallet_checker = create_wallet_checker(blockchain, &blockchain_settings.data)?; let wallet_checker = create_wallet_checker(&blockchain, &blockchain_settings.data)?;
let key_algorithm = wallet_checker.get_key_algorithm(); let key_algorithm = wallet_checker.get_key_algorithm();
let key_sender = key_senders.get(&key_algorithm).ok_or(format!(
"No key sender for algorithm {:?} (blockchain {:?})", // Получаем приемник для нужного алгоритма
let (_, key_receiver) = key_channels.get(&key_algorithm).ok_or(format!(
"No key channel for algorithm {} (blockchain {})",
key_algorithm, blockchain key_algorithm, blockchain
))?; ))?;
let key_receiver = key_sender.subscribe();
for _ in 0..blockchain_settings.workers { for worker_index in 0..blockchain_settings.workers {
let mut key_receiver = key_receiver.resubscribe(); let blockchain = blockchain.clone();
let wallet_checker = create_wallet_checker(blockchain, &blockchain_settings.data)?; let blockchain_settings = blockchain_settings.clone();
let key_receiver = key_receiver.clone();
let wallet_checker = create_wallet_checker(&blockchain, &blockchain_settings.data)?;
let notify_sender = notify_sender.clone(); let notify_sender = notify_sender.clone();
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
loop { let mut start_time = Instant::now();
match key_receiver.recv().await { let mut checked_wallets_count = BigUint::ZERO;
Ok(key_hex) => { while let Ok(key_hex) = key_receiver.recv_async().await {
match wallet_checker.get_wallet_info(&key_hex).await { match wallet_checker.get_wallet_info(&key_hex).await {
Ok(Some(info)) => { Ok(Some(info)) => {
if let Err(e) = notify_sender.send(info).await { if let Err(e) = notify_sender.send(info).await {
@@ -114,20 +136,17 @@ async fn run_search(
} }
Ok(None) => {} Ok(None) => {}
Err(e) => { Err(e) => {
log::error!( log::error!("Error getting wallet info: {}", e);
"Error getting wallet info: {}", }
e }
checked_wallets_count += BigUint::one();
if start_time.elapsed() > Duration::from_secs(30) {
log::info!(
"[{}][{}] Wallets checked: {}",
&blockchain, &worker_index, &checked_wallets_count,
); );
} start_time = Instant::now();
}
}
Err(broadcast::error::RecvError::Closed) => {
log::info!("Key channel closed");
break;
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!("Skipped {} keys", skipped);
}
} }
} }
}); });
@@ -135,7 +154,7 @@ async fn run_search(
} }
} }
// Клонируем необходимые данные для нотификации // 5. Система нотификаций
let bot_token = settings.notifications.telegram_bot_token.clone(); let bot_token = settings.notifications.telegram_bot_token.clone();
let user_id = settings.notifications.telegram_user_id.clone(); let user_id = settings.notifications.telegram_user_id.clone();
@@ -147,7 +166,7 @@ async fn run_search(
} }
}); });
// Ждем сигнала Ctrl+C для завершения // 6. Управление завершением
tokio::signal::ctrl_c().await?; tokio::signal::ctrl_c().await?;
log::info!("Shutting down"); log::info!("Shutting down");
Ok(()) Ok(())

View File

@@ -1,7 +1,7 @@
use config::{Config, File, Value}; use config::{Config, File, Value};
use num_cpus;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::thread::available_parallelism;
use crate::blockchains::Blockchain; use crate::blockchains::Blockchain;
use crate::key_generators::KeyAlgorithm; use crate::key_generators::KeyAlgorithm;
@@ -18,10 +18,16 @@ pub struct Settings {
pub struct KeyGeneratorSettings { pub struct KeyGeneratorSettings {
#[serde(default = "default_workers")] #[serde(default = "default_workers")]
pub workers: usize, pub workers: usize,
#[serde(default = "default_buffer_size")]
pub buffer_size: usize,
#[serde(flatten)] #[serde(flatten)]
pub data: HashMap<String, Value>, pub data: HashMap<String, Value>,
} }
fn default_buffer_size() -> usize {
10_000
}
#[derive(Debug, Deserialize, Clone)] #[derive(Debug, Deserialize, Clone)]
pub struct BlockchainSettings { pub struct BlockchainSettings {
#[serde(default = "default_workers")] #[serde(default = "default_workers")]
@@ -32,7 +38,7 @@ pub struct BlockchainSettings {
} }
fn default_workers() -> usize { fn default_workers() -> usize {
available_parallelism().unwrap().get() num_cpus::get()
} }
#[derive(Debug, Deserialize, Clone)] #[derive(Debug, Deserialize, Clone)]