diff --git a/Cargo.lock b/Cargo.lock index 37c3ef6..ace108b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,18 +84,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" -[[package]] -name = "async-channel" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-trait" version = "0.1.88" @@ -152,6 +140,12 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445" +[[package]] +name = "bit-vec" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b4ff8b16e6076c3e14220b39fbc1fabb6737522281a388998046859400895f" + [[package]] name = "bitcoin" version = "0.30.2" @@ -217,6 +211,25 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bloom" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d00ac8e5056d6d65376a3c1aa5c7c34850d6949ace17f0266953a254eb3d6fe8" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bs58" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4" +dependencies = [ + "sha2", + "tinyvec", +] + [[package]] name = "bumpalo" version = "3.17.0" @@ -250,6 +263,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "clap" version = "4.5.39" @@ -306,15 +325,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "concurrent-queue" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "config" version = "0.13.4" @@ -372,6 +382,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -390,31 +409,30 @@ dependencies = [ [[package]] name = "cryptohunter" -version = "0.1.7" +version = "0.2.0" dependencies = [ - "async-channel", - "async-trait", "bitcoin", - "bytes", + "bloom", + "bs58", "clap", "config", + "crossbeam-channel", "csv", - "deadpool-postgres", - "flume", - "futures", + "ctrlc", "hex", "indicatif", "log", "num-bigint", "num-traits", "num_cpus", + "once_cell", + "postgres", + "r2d2_postgres", "rand 0.8.5", "reqwest", "secp256k1 0.31.0", "serde", "simple_logger", - "tokio", - "tokio-postgres", ] [[package]] @@ -439,37 +457,13 @@ dependencies = [ ] [[package]] -name = "deadpool" -version = "0.12.2" +name = "ctrlc" +version = "3.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ed5957ff93768adf7a65ab167a17835c3d2c3c50d084fe305174c112f468e2f" +checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73" dependencies = [ - "deadpool-runtime", - "num_cpus", - "tokio", -] - -[[package]] -name = "deadpool-postgres" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d697d376cbfa018c23eb4caab1fd1883dd9c906a8c034e8d9a3cb06a7e0bef9" -dependencies = [ - "async-trait", - "deadpool", - "getrandom 0.2.16", - "tokio", - "tokio-postgres", - "tracing", -] - -[[package]] -name = "deadpool-runtime" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" -dependencies = [ - "tokio", + "nix", + "windows-sys 0.59.0", ] [[package]] @@ -540,27 +534,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "event-listener" -version = "5.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" -dependencies = [ - "event-listener", - "pin-project-lite", -] - [[package]] name = "fallible-iterator" version = "0.2.0" @@ -573,19 +546,6 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" -[[package]] -name = "flume" -version = "0.10.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" -dependencies = [ - "futures-core", - "futures-sink", - "nanorand", - "pin-project", - "spin", -] - [[package]] name = "fnv" version = "1.0.7" @@ -616,21 +576,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "futures" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.31" @@ -647,17 +592,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" -[[package]] -name = "futures-executor" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - [[package]] name = "futures-io" version = "0.3.31" @@ -693,7 +627,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ - "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -722,10 +655,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", - "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", - "wasm-bindgen", ] [[package]] @@ -1156,15 +1087,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "nanorand" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom 0.2.16", -] - [[package]] name = "native-tls" version = "0.2.14" @@ -1182,6 +1104,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -1326,12 +1260,6 @@ dependencies = [ "hashbrown 0.12.3", ] -[[package]] -name = "parking" -version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" - [[package]] name = "parking_lot" version = "0.12.4" @@ -1430,26 +1358,6 @@ dependencies = [ "siphasher", ] -[[package]] -name = "pin-project" -version = "1.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1474,6 +1382,20 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" +[[package]] +name = "postgres" +version = "0.19.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "363e6dfbdd780d3aa3597b6eb430db76bb315fa9bad7fae595bb8def808b8470" +dependencies = [ + "bytes", + "fallible-iterator", + "futures-util", + "log", + "tokio", + "tokio-postgres", +] + [[package]] name = "postgres-protocol" version = "0.6.8" @@ -1551,6 +1473,27 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_postgres" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efd4b47636dbca581cd057e2f27a5d39be741ea4f85fd3c29e415c55f71c7595" +dependencies = [ + "postgres", + "r2d2", +] + [[package]] name = "rand" version = "0.8.5" @@ -1729,6 +1672,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1859,15 +1811,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" -[[package]] -name = "signal-hook-registry" -version = "1.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" -dependencies = [ - "libc", -] - [[package]] name = "simple_logger" version = "4.3.3" @@ -1911,15 +1854,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -dependencies = [ - "lock_api", -] - [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -2099,25 +2033,11 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", "pin-project-lite", - "signal-hook-registry", "socket2", - "tokio-macros", "windows-sys 0.52.0", ] -[[package]] -name = "tokio-macros" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -2189,21 +2109,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", - "tracing-attributes", "tracing-core", ] -[[package]] -name = "tracing-attributes" -version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1ffbcf9c6f6b99d386e7444eb608ba646ae452a36b39737deb9663b610f662" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tracing-core" version = "0.1.33" diff --git a/Cargo.toml b/Cargo.toml index 587719a..87ff2a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,27 +1,26 @@ [package] name = "cryptohunter" -version = "0.2.0" +version = "0.2.1" edition = "2024" [dependencies] -futures = "0.3.31" +crossbeam-channel = "0.5.15" num_cpus = "1.17.0" num-bigint = "0.4.6" num-traits = "0.2.19" -bytes = "1.0" clap = { version = "4.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] } config = { version = "0.13", features = ["yaml", "json"] } -async-channel = "2.1" -flume = "0.10" rand = "0.8" +once_cell = "1.21.3" +bloom = "0.3.2" +bs58 = { version = "0.5.1", features = ["check"] } bitcoin = { version = "0.30", features = ["rand"] } secp256k1 = { version = "0.31.0", features = ["rand"] } hex = "0.4.3" -async-trait = "0.1.88" -tokio = { version = "1", features = ["full"] } -tokio-postgres = "0.7.13" -deadpool-postgres = "0.14.1" +postgres = "0.19.10" +r2d2_postgres = "0.18.2" +ctrlc = "3.4" reqwest = { version = "0.11", features = ["blocking", "json"] } csv = "1.1" log = "0.4" diff --git a/docker-compose.yaml b/docker-compose.yaml index 817e3d8..ccae487 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -4,20 +4,12 @@ services: environment: - RUST_LOG=info volumes: - - ./settings.yaml:/app/settings.yaml + - ./config.yaml:/app/config.yaml - ./snapshots:/snapshots depends_on: database: condition: service_healthy command: search bitcoin run - deploy: - resources: - limits: - cpus: "10.0" - memory: 512M - reservations: - cpus: "10.0" - memory: 128M database: image: postgres:16-alpine diff --git a/src/blockchains/bitcoin.rs b/src/blockchains/bitcoin.rs index 3769dff..cdf291d 100644 --- a/src/blockchains/bitcoin.rs +++ b/src/blockchains/bitcoin.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use bitcoin::{ secp256k1::Secp256k1, secp256k1::SecretKey, @@ -7,26 +6,35 @@ use bitcoin::{ PrivateKey, PublicKey, }; -use bytes::Bytes; +use bloom::{BloomFilter, ASMS}; +use bs58; use config::{Config as ConfigBuilder, Value}; -use deadpool_postgres; -use futures::{SinkExt, pin_mut}; use hex; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use once_cell::sync::Lazy; +use postgres::NoTls; +use r2d2_postgres::PostgresConnectionManager; +use r2d2_postgres::r2d2::{Pool, PooledConnection}; use serde::Deserialize; use std::collections::HashMap; +use std::fs::File; +use std::hash::{Hash, Hasher}; +use std::io::{BufReader, Read, Write}; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use tokio::fs::File; -use tokio::io::{AsyncReadExt, BufReader}; use crate::key_generators::KeyAlgorithm; use crate::utils::DynError; use super::enums::Blockchain; use super::traits::{SnapshotLoader, WalletChecker}; +static BLOOM_CACHE: Lazy>>> = + Lazy::new(|| Mutex::new(HashMap::new())); + pub struct Bitcoin { database_table: String, - pool: deadpool_postgres::Pool, + pool: Pool>, + address_bloom: Option>, } #[derive(Deserialize)] @@ -35,15 +43,27 @@ struct BitcoinParameters { #[serde(default = "default_database_table")] database_table: String, #[serde(default = "default_database_max_connections")] - database_max_connections: usize, + database_max_connections: u32, + #[serde(default = "default_use_in_memory")] + use_in_memory: bool, + #[serde(default = "default_bloom_precision")] + bloom_precision: f64, } fn default_database_table() -> String { "bitcoin".into() } -fn default_database_max_connections() -> usize { - 20 +fn default_database_max_connections() -> u32 { + 1 +} + +fn default_use_in_memory() -> bool { + false +} + +fn default_bloom_precision() -> f64 { + 0.01 } impl Bitcoin { @@ -54,27 +74,129 @@ impl Bitcoin { } let parameters: BitcoinParameters = builder.build()?.try_deserialize()?; - let pg_config = parameters.database_url.parse::()?; - let mgr_config = deadpool_postgres::ManagerConfig { - recycling_method: deadpool_postgres::RecyclingMethod::Fast, + let manager = PostgresConnectionManager::new( + parameters.database_url.parse()?, + NoTls, + ); + let pool = Pool::builder() + .max_size(parameters.database_max_connections) + .build(manager)?; + + let address_bloom: Option> = if parameters.use_in_memory { + // Проверяем глобальный кеш + let mut cache = BLOOM_CACHE.lock().unwrap(); + + if let Some(cached_bloom) = cache.get(¶meters.database_table) { + // Используем существующий Bloom filter + log::info!("Using cached Bloom filter for table: {}", parameters.database_table); + Some(cached_bloom.clone()) + } else { + // Загружаем адреса и создаем новый Bloom filter + log::info!("Creating new Bloom filter for table: {}", parameters.database_table); + let start_time = Instant::now(); + let bloom = Self::create_bloom_filter(&pool, ¶meters.database_table)?; + + log::info!( + "Bloom filter created in {:?} | False positive rate: {:.4}% | Memory: {:.2} MB", + start_time.elapsed(), + parameters.bloom_precision * 100.0, + bloom.num_bits() as f64 / (8.0 * 1024.0 * 1024.0) // Размер в MB + ); + + let bloom_arc = Arc::new(bloom); + cache.insert(parameters.database_table.clone(), bloom_arc.clone()); + Some(bloom_arc) + } + } else { + None }; - let mgr = deadpool_postgres::Manager::from_config(pg_config, tokio_postgres::NoTls, mgr_config); - let pool = deadpool_postgres::Pool::builder(mgr) - .config(deadpool_postgres::PoolConfig::new(parameters.database_max_connections)) - .build()?; Ok(Self { database_table: parameters.database_table, pool, + address_bloom, }) } - async fn get_client(&self) -> Result { - self.pool.get().await.map_err(Into::into) + fn create_bloom_filter( + pool: &Pool>, + table_name: &str, + ) -> Result { + let mut conn = pool.get()?; + + // Получаем общее количество адресов + let count_query = format!("SELECT COUNT(*) FROM {}", table_name); + let total: i64 = conn.query_one(&count_query, &[])?.get(0); + + // Создаем Bloom filter с ожидаемым количеством элементов + let mut bloom = BloomFilter::with_rate(0.01, total as u32); // 1% false positive rate + + // Прогресс-бар + let pb = ProgressBar::new(total as u64); + pb.set_style(ProgressStyle::default_bar() + .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({percent}%) | ETA: {eta} | {per_sec}") + .unwrap() + .progress_chars("#>-")); + pb.set_message("🌼 Building Bloom filter"); + pb.enable_steady_tick(Duration::from_millis(100)); + + let mut transaction = conn.transaction()?; + let query = format!("DECLARE cur CURSOR FOR SELECT address FROM {}", table_name); + transaction.execute(&query, &[])?; + + const BATCH_SIZE: i32 = 50_000; + let mut processed = 0; + + loop { + let fetch_query = format!("FETCH FORWARD {} FROM cur", BATCH_SIZE); + let rows = transaction.query(&fetch_query, &[])?; + + if rows.is_empty() { + break; + } + + for row in rows { + let addr_str: String = row.get(0); + if let Ok(decoded) = Self::address_to_hash(&addr_str) { + bloom.insert(&decoded); + } + processed += 1; + pb.set_position(processed as u64); + } + } + + // Закрываем курсор + transaction.execute("CLOSE cur", &[])?; + transaction.commit()?; + + pb.finish_with_message(format!( + "✅ Bloom filter created with {} items | Memory: {:.2} MB", + processed, + bloom.num_bits() as f64 / (8.0 * 1024.0 * 1024.0) + )); + + Ok(bloom) + } + + fn address_to_hash(address: &str) -> Result<[u8; 20], DynError> { + let decoded = bs58::decode(address) + .with_check(Some(0x00)) + .into_vec()?; + + if decoded.len() != 21 { + return Err(format!("Invalid address length: {}", decoded.len()).into()); + } + + let mut result = [0u8; 20]; + result.copy_from_slice(&decoded[1..21]); + Ok(result) + } + + fn get_client(&self) -> Result>, DynError> { + self.pool.get().map_err(Into::into) } } -#[async_trait] impl WalletChecker for Bitcoin { fn get_key_algorithm(&self) -> KeyAlgorithm { KeyAlgorithm::Secp256k1 @@ -84,7 +206,7 @@ impl WalletChecker for Bitcoin { Blockchain::Bitcoin } - async fn get_wallet_info(&self, key_hex: &str) -> Result, DynError> { + fn get_wallet_info(&self, key_hex: &str) -> Result, DynError> { let bytes = hex::decode(key_hex)?; let secret_key = SecretKey::from_slice(&bytes)?; let secp = Secp256k1::new(); @@ -92,12 +214,26 @@ impl WalletChecker for Bitcoin { let public_key = PublicKey::from_private_key(&secp, &private_key); let address = Address::p2pkh(&public_key, BitcoinNetwork::Bitcoin).to_string(); - let client = self.get_client().await?; + // Быстрая проверка в памяти + if let Some(ref bloom) = self.address_bloom { + let address_hash = match Self::address_to_hash(&address) { + Ok(hash) => hash, + Err(e) => { + log::warn!("Failed to convert address: {} - {}", address, e); + return Ok(None); + } + }; + if !bloom.contains(&address_hash) { + return Ok(None); + } + } + + let mut client = self.get_client()?; let row = client.query_opt( &format!("SELECT balance FROM {} WHERE address = $1", self.database_table), &[&address], - ).await?; - let balance = row.map(|r| r.get(0)).unwrap_or(0); + )?; + let balance: i64 = row.map(|r| r.get(0)).unwrap_or(0); if balance == 0 { return Ok(None); } @@ -110,17 +246,16 @@ impl WalletChecker for Bitcoin { } } -#[async_trait] impl SnapshotLoader for Bitcoin { fn get_blockchain(&self) -> Blockchain { Blockchain::Bitcoin } - async fn load_snapshot(&self, snapshot_path: &str) -> Result<(), DynError> { + fn load_snapshot(&self, snapshot_path: &str) -> Result<(), DynError> { let start_time = Instant::now(); - let client = self.get_client().await?; - let file = File::open(snapshot_path).await?; - let file_metadata = file.metadata().await?; + let mut client = self.get_client()?; + let file = File::open(snapshot_path)?; + let file_metadata = file.metadata()?; let file_size = file_metadata.len(); let index_name = format!("{}__address__ix", self.database_table); let multi_progress = MultiProgress::new(); @@ -135,24 +270,24 @@ impl SnapshotLoader for Bitcoin { prep_pb.set_message("⚙️ Preparing database..."); prep_pb.enable_steady_tick(Duration::from_millis(100)); - client.execute("SET synchronous_commit = off", &[]).await?; - client.execute("SET maintenance_work_mem = '4GB'", &[]).await?; - client.execute("SET work_mem = '2GB'", &[]).await?; + client.execute("SET synchronous_commit = off", &[])?; + client.execute("SET maintenance_work_mem = '4GB'", &[])?; + client.execute("SET work_mem = '2GB'", &[])?; client.execute( &format!("DROP TABLE IF EXISTS {}", self.database_table), &[], - ).await?; + )?; client.execute( &format!("DROP INDEX IF EXISTS {}", index_name), &[], - ).await?; + )?; client.execute( &format!( "CREATE TABLE {} (address TEXT, balance BIGINT)", self.database_table ), &[], - ).await?; + )?; prep_pb.finish_with_message("✅ Database prepared"); @@ -168,20 +303,17 @@ impl SnapshotLoader for Bitcoin { "COPY {} FROM STDIN WITH (FORMAT csv, DELIMITER E'\t', HEADER)", self.database_table, ); - let sink = client.copy_in(©_stmt).await?; - pin_mut!(sink); - let file = File::open(snapshot_path).await?; + let mut sink = client.copy_in(©_stmt)?; + let file = File::open(snapshot_path)?; let mut reader = BufReader::new(file); let mut buffer = vec![0u8; 65536]; - while let Ok(bytes_read) = reader.read(&mut buffer).await { + while let Ok(bytes_read) = reader.read(&mut buffer) { if bytes_read == 0 { break } - sink.as_mut() - .send(Bytes::copy_from_slice(&buffer[..bytes_read])) - .await?; + sink.write(&buffer[..bytes_read])?; copy_pb.inc(bytes_read as u64); } - sink.as_mut().close().await?; + sink.finish()?; copy_pb.set_position(file_size); copy_pb.finish_with_message("✅ Data copied to database"); @@ -197,7 +329,7 @@ impl SnapshotLoader for Bitcoin { self.database_table, ), &[], - ).await?; + )?; index_pb.finish_with_message("✅ Index created"); @@ -206,9 +338,9 @@ impl SnapshotLoader for Bitcoin { final_pb.set_message("🔁 Reset database settings"); final_pb.enable_steady_tick(Duration::from_millis(100)); - client.execute("RESET synchronous_commit", &[]).await?; - client.execute("RESET maintenance_work_mem", &[]).await?; - client.execute("RESET work_mem", &[]).await?; + client.execute("RESET synchronous_commit", &[])?; + client.execute("RESET maintenance_work_mem", &[])?; + client.execute("RESET work_mem", &[])?; final_pb.finish_with_message("✅ Database settings reseted"); @@ -221,9 +353,9 @@ impl SnapshotLoader for Bitcoin { self.database_table, ), &[], - ).await?.get(0); + )?.get(0); - let data_size: f64 = file_size as f64 / (1024 * 1024 * 1024) as f64; + let data_size: f64 = file_size as f64 / (1024.0 * 1024.0 * 1024.0); println!("\n🎉 Snapshot loaded successfully!"); println!("📊 Statistics:"); println!(" Total records processed: {}", records_count); diff --git a/src/blockchains/traits.rs b/src/blockchains/traits.rs index 9924c8d..5995e16 100644 --- a/src/blockchains/traits.rs +++ b/src/blockchains/traits.rs @@ -1,18 +1,14 @@ -use async_trait::async_trait; - use crate::key_generators::KeyAlgorithm; use crate::utils::DynError; use super::enums::Blockchain; -#[async_trait] pub trait WalletChecker: Send + Sync { fn get_blockchain(&self) -> Blockchain; fn get_key_algorithm(&self) -> KeyAlgorithm; - async fn get_wallet_info(&self, key_hex: &str) -> Result, DynError>; + fn get_wallet_info(&self, key_hex: &str) -> Result, DynError>; } -#[async_trait] pub trait SnapshotLoader: Send + Sync { fn get_blockchain(&self) -> Blockchain; - async fn load_snapshot(&self, snapshot_path: &str) -> Result<(), DynError>; + fn load_snapshot(&self, snapshot_path: &str) -> Result<(), DynError>; } \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index 1a4853d..7584826 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -4,8 +4,8 @@ use crate::blockchains::Blockchain; #[derive(Parser)] #[command(version, about, long_about = None)] pub struct Cli { - #[arg(short, long, default_value = "settings.yaml")] - pub settings: String, + #[arg(short, long, default_value = "config.yaml")] + pub config: String, #[command(subcommand)] pub command: Commands, diff --git a/src/main.rs b/src/main.rs index ebde1ff..b9054ba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,13 +5,14 @@ mod notification; mod settings; mod utils; -use async_channel; use clap::Parser; use cli::Cli; -use flume; +use crossbeam_channel::{bounded, unbounded, select}; use num_bigint::BigUint; use num_traits::One; use std::collections::HashMap; +use std::sync::Arc; +use std::thread; use std::time::{Duration, Instant}; use crate::blockchains::{create_snapshot_loader, create_wallet_checker, Blockchain}; @@ -20,11 +21,10 @@ use crate::notification::send_telegram_message; use crate::settings::Settings; use crate::utils::DynError; -#[tokio::main] -async fn main() -> Result<(), DynError> { +fn main() -> Result<(), DynError> { simple_logger::init_with_level(log::Level::Info)?; let cli = Cli::parse(); - let settings = Settings::load(&cli.settings)?; + let settings = Settings::load(&cli.config)?; match &cli.command { cli::Commands::Snapshots { blockchain, command } => { @@ -36,28 +36,27 @@ async fn main() -> Result<(), DynError> { match command { cli::SnapshotSubcommand::Load { path } => { - snapshot_loader.load_snapshot(path).await?; + snapshot_loader.load_snapshot(path)?; log::info!("Snapshot loaded successfully for {}", blockchain); } } } cli::Commands::Search { blockchain, command } => match command { - cli::SearchSubcommand::Run => run_search(blockchain.as_ref(), &settings).await?, + cli::SearchSubcommand::Run => run_search(blockchain.as_ref(), &settings)?, }, } Ok(()) } -async fn run_search( +fn run_search( blockchain: Option<&Blockchain>, settings: &Settings, ) -> Result<(), DynError> { - // 1. Создаем систему каналов с использованием flume let mut key_channels = HashMap::new(); for (algorithm, key_generator_settings) in &settings.key_generators { - let (tx, rx) = flume::bounded(key_generator_settings.buffer_size); + let (tx, rx) = bounded(key_generator_settings.buffer_size); key_channels.insert(algorithm.clone(), (tx, rx)); } @@ -70,7 +69,7 @@ async fn run_search( let algorithm = algorithm.clone(); let key_sender = key_sender.clone(); - let handle = tokio::spawn(async move { + let handle = thread::spawn(move || { let key_generator = create_key_generator(&algorithm, &key_generator_settings.data) .expect("Failed to create key generator"); @@ -78,15 +77,10 @@ async fn run_search( let mut generated_keys_count = BigUint::ZERO; loop { let key_hex = key_generator.generate_key_hex(); - // Отправляем ключ с ожиданием свободного места в буфере - match key_sender.send_async(key_hex).await { - Ok(_) => { - // Разрешение освобождается при обработке ключа воркером - } - Err(e) => { - log::error!("Key generator failed to send: {}", e); - break; - } + + // Отправляем ключ с возможностью прерывания + if key_sender.send(key_hex).is_err() { + continue; } generated_keys_count += BigUint::one(); @@ -103,18 +97,16 @@ async fn run_search( } } - let (notify_sender, notify_receiver) = async_channel::unbounded::(); + let (notify_sender, notify_receiver) = unbounded(); - // 4. Обработчики кошельков с гарантией обработки let mut wallet_checker_handles = vec![]; for (blockchain, blockchain_settings) in &settings.blockchains { let wallet_checker = create_wallet_checker(&blockchain, &blockchain_settings.data)?; let key_algorithm = wallet_checker.get_key_algorithm(); - // Получаем приемник для нужного алгоритма let (_, key_receiver) = key_channels.get(&key_algorithm).ok_or(format!( "No key channel for algorithm {} (blockchain {})", - key_algorithm, blockchain + &key_algorithm, &blockchain, ))?; for worker_index in 0..blockchain_settings.workers { @@ -124,14 +116,15 @@ async fn run_search( let wallet_checker = create_wallet_checker(&blockchain, &blockchain_settings.data)?; let notify_sender = notify_sender.clone(); - let handle = tokio::spawn(async move { + let handle = thread::spawn(move || { let mut start_time = Instant::now(); let mut checked_wallets_count = BigUint::ZERO; - while let Ok(key_hex) = key_receiver.recv_async().await { - match wallet_checker.get_wallet_info(&key_hex).await { + + while let Ok(key_hex) = key_receiver.recv() { + match wallet_checker.get_wallet_info(&key_hex) { Ok(Some(info)) => { - if let Err(e) = notify_sender.send(info).await { - log::error!("Failed to send notification: {}", e); + if notify_sender.send(info).is_err() { + break; } } Ok(None) => {} @@ -157,17 +150,40 @@ async fn run_search( // 5. Система нотификаций let bot_token = settings.notifications.telegram_bot_token.clone(); let user_id = settings.notifications.telegram_user_id.clone(); - - let _ = tokio::spawn(async move { - while let Ok(message) = notify_receiver.recv().await { - if let Err(e) = send_telegram_message(&bot_token, &user_id, &message).await { + + let notify_handle = thread::spawn(move || { + while let Ok(message) = notify_receiver.recv() { + if let Err(e) = send_telegram_message(&bot_token, &user_id, &message) { log::error!("Failed to send Telegram message: {}", e); } } }); - // 6. Управление завершением - tokio::signal::ctrl_c().await?; + // 6. Ожидаем Ctrl+C для завершения + let (shutdown_sender, shutdown_receiver) = bounded(0); + ctrlc::set_handler(move || { + log::info!("Received Ctrl+C, shutting down"); + let _ = shutdown_sender.send(()); + })?; + + // Ожидаем сигнал завершения + shutdown_receiver.recv()?; log::info!("Shutting down"); + + // Закрываем каналы для завершения потоков + for (_, (sender, _)) in key_channels { + drop(sender); + } + drop(notify_sender); + + // Дожидаемся завершения потоков + for handle in key_gen_handles { + let _ = handle.join(); + } + for handle in wallet_checker_handles { + let _ = handle.join(); + } + let _ = notify_handle.join(); + Ok(()) } diff --git a/src/notification.rs b/src/notification.rs index 4ad6a06..ab83591 100644 --- a/src/notification.rs +++ b/src/notification.rs @@ -2,7 +2,7 @@ use reqwest::blocking::Client; use crate::utils::DynError; -pub async fn send_telegram_message( +pub fn send_telegram_message( bot_token: &str, user_id: &str, message: &str,