This commit is contained in:
2025-06-11 12:31:32 +03:00
parent f3264339ab
commit 10610b8ed1
8 changed files with 355 additions and 312 deletions

310
Cargo.lock generated
View File

@@ -84,18 +84,6 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-channel"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
dependencies = [
"concurrent-queue",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-trait"
version = "0.1.88"
@@ -152,6 +140,12 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
name = "bit-vec"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02b4ff8b16e6076c3e14220b39fbc1fabb6737522281a388998046859400895f"
[[package]]
name = "bitcoin"
version = "0.30.2"
@@ -217,6 +211,25 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bloom"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d00ac8e5056d6d65376a3c1aa5c7c34850d6949ace17f0266953a254eb3d6fe8"
dependencies = [
"bit-vec",
]
[[package]]
name = "bs58"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4"
dependencies = [
"sha2",
"tinyvec",
]
[[package]]
name = "bumpalo"
version = "3.17.0"
@@ -250,6 +263,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cfg_aliases"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "clap"
version = "4.5.39"
@@ -306,15 +325,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "concurrent-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "config"
version = "0.13.4"
@@ -372,6 +382,15 @@ dependencies = [
"libc",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@@ -390,31 +409,30 @@ dependencies = [
[[package]]
name = "cryptohunter"
version = "0.1.7"
version = "0.2.0"
dependencies = [
"async-channel",
"async-trait",
"bitcoin",
"bytes",
"bloom",
"bs58",
"clap",
"config",
"crossbeam-channel",
"csv",
"deadpool-postgres",
"flume",
"futures",
"ctrlc",
"hex",
"indicatif",
"log",
"num-bigint",
"num-traits",
"num_cpus",
"once_cell",
"postgres",
"r2d2_postgres",
"rand 0.8.5",
"reqwest",
"secp256k1 0.31.0",
"serde",
"simple_logger",
"tokio",
"tokio-postgres",
]
[[package]]
@@ -439,37 +457,13 @@ dependencies = [
]
[[package]]
name = "deadpool"
version = "0.12.2"
name = "ctrlc"
version = "3.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ed5957ff93768adf7a65ab167a17835c3d2c3c50d084fe305174c112f468e2f"
checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73"
dependencies = [
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-postgres"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d697d376cbfa018c23eb4caab1fd1883dd9c906a8c034e8d9a3cb06a7e0bef9"
dependencies = [
"async-trait",
"deadpool",
"getrandom 0.2.16",
"tokio",
"tokio-postgres",
"tracing",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
dependencies = [
"tokio",
"nix",
"windows-sys 0.59.0",
]
[[package]]
@@ -540,27 +534,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "event-listener"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
@@ -573,19 +546,6 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -616,21 +576,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
@@ -647,17 +592,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
@@ -693,7 +627,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@@ -722,10 +655,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen",
]
[[package]]
@@ -1156,15 +1087,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom 0.2.16",
]
[[package]]
name = "native-tls"
version = "0.2.14"
@@ -1182,6 +1104,18 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nix"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
dependencies = [
"bitflags 2.9.1",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nom"
version = "7.1.3"
@@ -1326,12 +1260,6 @@ dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "parking"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "parking_lot"
version = "0.12.4"
@@ -1430,26 +1358,6 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"
@@ -1474,6 +1382,20 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e"
[[package]]
name = "postgres"
version = "0.19.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "363e6dfbdd780d3aa3597b6eb430db76bb315fa9bad7fae595bb8def808b8470"
dependencies = [
"bytes",
"fallible-iterator",
"futures-util",
"log",
"tokio",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.8"
@@ -1551,6 +1473,27 @@ version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "r2d2_postgres"
version = "0.18.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efd4b47636dbca581cd057e2f27a5d39be741ea4f85fd3c29e415c55f71c7595"
dependencies = [
"postgres",
"r2d2",
]
[[package]]
name = "rand"
version = "0.8.5"
@@ -1729,6 +1672,15 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -1859,15 +1811,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410"
dependencies = [
"libc",
]
[[package]]
name = "simple_logger"
version = "4.3.3"
@@ -1911,15 +1854,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@@ -2099,25 +2033,11 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
]
[[package]]
name = "tokio-macros"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
@@ -2189,21 +2109,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
dependencies = [
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1ffbcf9c6f6b99d386e7444eb608ba646ae452a36b39737deb9663b610f662"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.33"

View File

@@ -1,27 +1,26 @@
[package]
name = "cryptohunter"
version = "0.2.0"
version = "0.2.1"
edition = "2024"
[dependencies]
futures = "0.3.31"
crossbeam-channel = "0.5.15"
num_cpus = "1.17.0"
num-bigint = "0.4.6"
num-traits = "0.2.19"
bytes = "1.0"
clap = { version = "4.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
config = { version = "0.13", features = ["yaml", "json"] }
async-channel = "2.1"
flume = "0.10"
rand = "0.8"
once_cell = "1.21.3"
bloom = "0.3.2"
bs58 = { version = "0.5.1", features = ["check"] }
bitcoin = { version = "0.30", features = ["rand"] }
secp256k1 = { version = "0.31.0", features = ["rand"] }
hex = "0.4.3"
async-trait = "0.1.88"
tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7.13"
deadpool-postgres = "0.14.1"
postgres = "0.19.10"
r2d2_postgres = "0.18.2"
ctrlc = "3.4"
reqwest = { version = "0.11", features = ["blocking", "json"] }
csv = "1.1"
log = "0.4"

View File

@@ -4,20 +4,12 @@ services:
environment:
- RUST_LOG=info
volumes:
- ./settings.yaml:/app/settings.yaml
- ./config.yaml:/app/config.yaml
- ./snapshots:/snapshots
depends_on:
database:
condition: service_healthy
command: search bitcoin run
deploy:
resources:
limits:
cpus: "10.0"
memory: 512M
reservations:
cpus: "10.0"
memory: 128M
database:
image: postgres:16-alpine

View File

@@ -1,4 +1,3 @@
use async_trait::async_trait;
use bitcoin::{
secp256k1::Secp256k1,
secp256k1::SecretKey,
@@ -7,26 +6,35 @@ use bitcoin::{
PrivateKey,
PublicKey,
};
use bytes::Bytes;
use bloom::{BloomFilter, ASMS};
use bs58;
use config::{Config as ConfigBuilder, Value};
use deadpool_postgres;
use futures::{SinkExt, pin_mut};
use hex;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use once_cell::sync::Lazy;
use postgres::NoTls;
use r2d2_postgres::PostgresConnectionManager;
use r2d2_postgres::r2d2::{Pool, PooledConnection};
use serde::Deserialize;
use std::collections::HashMap;
use std::fs::File;
use std::hash::{Hash, Hasher};
use std::io::{BufReader, Read, Write};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
use crate::key_generators::KeyAlgorithm;
use crate::utils::DynError;
use super::enums::Blockchain;
use super::traits::{SnapshotLoader, WalletChecker};
static BLOOM_CACHE: Lazy<Mutex<HashMap<String, Arc<BloomFilter>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
pub struct Bitcoin {
database_table: String,
pool: deadpool_postgres::Pool,
pool: Pool<PostgresConnectionManager<NoTls>>,
address_bloom: Option<Arc<BloomFilter>>,
}
#[derive(Deserialize)]
@@ -35,15 +43,27 @@ struct BitcoinParameters {
#[serde(default = "default_database_table")]
database_table: String,
#[serde(default = "default_database_max_connections")]
database_max_connections: usize,
database_max_connections: u32,
#[serde(default = "default_use_in_memory")]
use_in_memory: bool,
#[serde(default = "default_bloom_precision")]
bloom_precision: f64,
}
fn default_database_table() -> String {
"bitcoin".into()
}
fn default_database_max_connections() -> usize {
20
fn default_database_max_connections() -> u32 {
1
}
fn default_use_in_memory() -> bool {
false
}
fn default_bloom_precision() -> f64 {
0.01
}
impl Bitcoin {
@@ -54,27 +74,129 @@ impl Bitcoin {
}
let parameters: BitcoinParameters = builder.build()?.try_deserialize()?;
let pg_config = parameters.database_url.parse::<tokio_postgres::Config>()?;
let mgr_config = deadpool_postgres::ManagerConfig {
recycling_method: deadpool_postgres::RecyclingMethod::Fast,
let manager = PostgresConnectionManager::new(
parameters.database_url.parse()?,
NoTls,
);
let pool = Pool::builder()
.max_size(parameters.database_max_connections)
.build(manager)?;
let address_bloom: Option<Arc<BloomFilter>> = if parameters.use_in_memory {
// Проверяем глобальный кеш
let mut cache = BLOOM_CACHE.lock().unwrap();
if let Some(cached_bloom) = cache.get(&parameters.database_table) {
// Используем существующий Bloom filter
log::info!("Using cached Bloom filter for table: {}", parameters.database_table);
Some(cached_bloom.clone())
} else {
// Загружаем адреса и создаем новый Bloom filter
log::info!("Creating new Bloom filter for table: {}", parameters.database_table);
let start_time = Instant::now();
let bloom = Self::create_bloom_filter(&pool, &parameters.database_table)?;
log::info!(
"Bloom filter created in {:?} | False positive rate: {:.4}% | Memory: {:.2} MB",
start_time.elapsed(),
parameters.bloom_precision * 100.0,
bloom.num_bits() as f64 / (8.0 * 1024.0 * 1024.0) // Размер в MB
);
let bloom_arc = Arc::new(bloom);
cache.insert(parameters.database_table.clone(), bloom_arc.clone());
Some(bloom_arc)
}
} else {
None
};
let mgr = deadpool_postgres::Manager::from_config(pg_config, tokio_postgres::NoTls, mgr_config);
let pool = deadpool_postgres::Pool::builder(mgr)
.config(deadpool_postgres::PoolConfig::new(parameters.database_max_connections))
.build()?;
Ok(Self {
database_table: parameters.database_table,
pool,
address_bloom,
})
}
async fn get_client(&self) -> Result<deadpool_postgres::Client, DynError> {
self.pool.get().await.map_err(Into::into)
fn create_bloom_filter(
pool: &Pool<PostgresConnectionManager<NoTls>>,
table_name: &str,
) -> Result<BloomFilter, DynError> {
let mut conn = pool.get()?;
// Получаем общее количество адресов
let count_query = format!("SELECT COUNT(*) FROM {}", table_name);
let total: i64 = conn.query_one(&count_query, &[])?.get(0);
// Создаем Bloom filter с ожидаемым количеством элементов
let mut bloom = BloomFilter::with_rate(0.01, total as u32); // 1% false positive rate
// Прогресс-бар
let pb = ProgressBar::new(total as u64);
pb.set_style(ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({percent}%) | ETA: {eta} | {per_sec}")
.unwrap()
.progress_chars("#>-"));
pb.set_message("🌼 Building Bloom filter");
pb.enable_steady_tick(Duration::from_millis(100));
let mut transaction = conn.transaction()?;
let query = format!("DECLARE cur CURSOR FOR SELECT address FROM {}", table_name);
transaction.execute(&query, &[])?;
const BATCH_SIZE: i32 = 50_000;
let mut processed = 0;
loop {
let fetch_query = format!("FETCH FORWARD {} FROM cur", BATCH_SIZE);
let rows = transaction.query(&fetch_query, &[])?;
if rows.is_empty() {
break;
}
for row in rows {
let addr_str: String = row.get(0);
if let Ok(decoded) = Self::address_to_hash(&addr_str) {
bloom.insert(&decoded);
}
processed += 1;
pb.set_position(processed as u64);
}
}
// Закрываем курсор
transaction.execute("CLOSE cur", &[])?;
transaction.commit()?;
pb.finish_with_message(format!(
"✅ Bloom filter created with {} items | Memory: {:.2} MB",
processed,
bloom.num_bits() as f64 / (8.0 * 1024.0 * 1024.0)
));
Ok(bloom)
}
fn address_to_hash(address: &str) -> Result<[u8; 20], DynError> {
let decoded = bs58::decode(address)
.with_check(Some(0x00))
.into_vec()?;
if decoded.len() != 21 {
return Err(format!("Invalid address length: {}", decoded.len()).into());
}
let mut result = [0u8; 20];
result.copy_from_slice(&decoded[1..21]);
Ok(result)
}
fn get_client(&self) -> Result<PooledConnection<PostgresConnectionManager<NoTls>>, DynError> {
self.pool.get().map_err(Into::into)
}
}
#[async_trait]
impl WalletChecker for Bitcoin {
fn get_key_algorithm(&self) -> KeyAlgorithm {
KeyAlgorithm::Secp256k1
@@ -84,7 +206,7 @@ impl WalletChecker for Bitcoin {
Blockchain::Bitcoin
}
async fn get_wallet_info(&self, key_hex: &str) -> Result<Option<String>, DynError> {
fn get_wallet_info(&self, key_hex: &str) -> Result<Option<String>, DynError> {
let bytes = hex::decode(key_hex)?;
let secret_key = SecretKey::from_slice(&bytes)?;
let secp = Secp256k1::new();
@@ -92,12 +214,26 @@ impl WalletChecker for Bitcoin {
let public_key = PublicKey::from_private_key(&secp, &private_key);
let address = Address::p2pkh(&public_key, BitcoinNetwork::Bitcoin).to_string();
let client = self.get_client().await?;
// Быстрая проверка в памяти
if let Some(ref bloom) = self.address_bloom {
let address_hash = match Self::address_to_hash(&address) {
Ok(hash) => hash,
Err(e) => {
log::warn!("Failed to convert address: {} - {}", address, e);
return Ok(None);
}
};
if !bloom.contains(&address_hash) {
return Ok(None);
}
}
let mut client = self.get_client()?;
let row = client.query_opt(
&format!("SELECT balance FROM {} WHERE address = $1", self.database_table),
&[&address],
).await?;
let balance = row.map(|r| r.get(0)).unwrap_or(0);
)?;
let balance: i64 = row.map(|r| r.get(0)).unwrap_or(0);
if balance == 0 {
return Ok(None);
}
@@ -110,17 +246,16 @@ impl WalletChecker for Bitcoin {
}
}
#[async_trait]
impl SnapshotLoader for Bitcoin {
fn get_blockchain(&self) -> Blockchain {
Blockchain::Bitcoin
}
async fn load_snapshot(&self, snapshot_path: &str) -> Result<(), DynError> {
fn load_snapshot(&self, snapshot_path: &str) -> Result<(), DynError> {
let start_time = Instant::now();
let client = self.get_client().await?;
let file = File::open(snapshot_path).await?;
let file_metadata = file.metadata().await?;
let mut client = self.get_client()?;
let file = File::open(snapshot_path)?;
let file_metadata = file.metadata()?;
let file_size = file_metadata.len();
let index_name = format!("{}__address__ix", self.database_table);
let multi_progress = MultiProgress::new();
@@ -135,24 +270,24 @@ impl SnapshotLoader for Bitcoin {
prep_pb.set_message("⚙️ Preparing database...");
prep_pb.enable_steady_tick(Duration::from_millis(100));
client.execute("SET synchronous_commit = off", &[]).await?;
client.execute("SET maintenance_work_mem = '4GB'", &[]).await?;
client.execute("SET work_mem = '2GB'", &[]).await?;
client.execute("SET synchronous_commit = off", &[])?;
client.execute("SET maintenance_work_mem = '4GB'", &[])?;
client.execute("SET work_mem = '2GB'", &[])?;
client.execute(
&format!("DROP TABLE IF EXISTS {}", self.database_table),
&[],
).await?;
)?;
client.execute(
&format!("DROP INDEX IF EXISTS {}", index_name),
&[],
).await?;
)?;
client.execute(
&format!(
"CREATE TABLE {} (address TEXT, balance BIGINT)",
self.database_table
),
&[],
).await?;
)?;
prep_pb.finish_with_message("✅ Database prepared");
@@ -168,20 +303,17 @@ impl SnapshotLoader for Bitcoin {
"COPY {} FROM STDIN WITH (FORMAT csv, DELIMITER E'\t', HEADER)",
self.database_table,
);
let sink = client.copy_in(&copy_stmt).await?;
pin_mut!(sink);
let file = File::open(snapshot_path).await?;
let mut sink = client.copy_in(&copy_stmt)?;
let file = File::open(snapshot_path)?;
let mut reader = BufReader::new(file);
let mut buffer = vec![0u8; 65536];
while let Ok(bytes_read) = reader.read(&mut buffer).await {
while let Ok(bytes_read) = reader.read(&mut buffer) {
if bytes_read == 0 { break }
sink.as_mut()
.send(Bytes::copy_from_slice(&buffer[..bytes_read]))
.await?;
sink.write(&buffer[..bytes_read])?;
copy_pb.inc(bytes_read as u64);
}
sink.as_mut().close().await?;
sink.finish()?;
copy_pb.set_position(file_size);
copy_pb.finish_with_message("✅ Data copied to database");
@@ -197,7 +329,7 @@ impl SnapshotLoader for Bitcoin {
self.database_table,
),
&[],
).await?;
)?;
index_pb.finish_with_message("✅ Index created");
@@ -206,9 +338,9 @@ impl SnapshotLoader for Bitcoin {
final_pb.set_message("🔁 Reset database settings");
final_pb.enable_steady_tick(Duration::from_millis(100));
client.execute("RESET synchronous_commit", &[]).await?;
client.execute("RESET maintenance_work_mem", &[]).await?;
client.execute("RESET work_mem", &[]).await?;
client.execute("RESET synchronous_commit", &[])?;
client.execute("RESET maintenance_work_mem", &[])?;
client.execute("RESET work_mem", &[])?;
final_pb.finish_with_message("✅ Database settings reseted");
@@ -221,9 +353,9 @@ impl SnapshotLoader for Bitcoin {
self.database_table,
),
&[],
).await?.get(0);
)?.get(0);
let data_size: f64 = file_size as f64 / (1024 * 1024 * 1024) as f64;
let data_size: f64 = file_size as f64 / (1024.0 * 1024.0 * 1024.0);
println!("\n🎉 Snapshot loaded successfully!");
println!("📊 Statistics:");
println!(" Total records processed: {}", records_count);

View File

@@ -1,18 +1,14 @@
use async_trait::async_trait;
use crate::key_generators::KeyAlgorithm;
use crate::utils::DynError;
use super::enums::Blockchain;
#[async_trait]
pub trait WalletChecker: Send + Sync {
fn get_blockchain(&self) -> Blockchain;
fn get_key_algorithm(&self) -> KeyAlgorithm;
async fn get_wallet_info(&self, key_hex: &str) -> Result<Option<String>, DynError>;
fn get_wallet_info(&self, key_hex: &str) -> Result<Option<String>, DynError>;
}
#[async_trait]
pub trait SnapshotLoader: Send + Sync {
fn get_blockchain(&self) -> Blockchain;
async fn load_snapshot(&self, snapshot_path: &str) -> Result<(), DynError>;
fn load_snapshot(&self, snapshot_path: &str) -> Result<(), DynError>;
}

View File

@@ -4,8 +4,8 @@ use crate::blockchains::Blockchain;
#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct Cli {
#[arg(short, long, default_value = "settings.yaml")]
pub settings: String,
#[arg(short, long, default_value = "config.yaml")]
pub config: String,
#[command(subcommand)]
pub command: Commands,

View File

@@ -5,13 +5,14 @@ mod notification;
mod settings;
mod utils;
use async_channel;
use clap::Parser;
use cli::Cli;
use flume;
use crossbeam_channel::{bounded, unbounded, select};
use num_bigint::BigUint;
use num_traits::One;
use std::collections::HashMap;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use crate::blockchains::{create_snapshot_loader, create_wallet_checker, Blockchain};
@@ -20,11 +21,10 @@ use crate::notification::send_telegram_message;
use crate::settings::Settings;
use crate::utils::DynError;
#[tokio::main]
async fn main() -> Result<(), DynError> {
fn main() -> Result<(), DynError> {
simple_logger::init_with_level(log::Level::Info)?;
let cli = Cli::parse();
let settings = Settings::load(&cli.settings)?;
let settings = Settings::load(&cli.config)?;
match &cli.command {
cli::Commands::Snapshots { blockchain, command } => {
@@ -36,28 +36,27 @@ async fn main() -> Result<(), DynError> {
match command {
cli::SnapshotSubcommand::Load { path } => {
snapshot_loader.load_snapshot(path).await?;
snapshot_loader.load_snapshot(path)?;
log::info!("Snapshot loaded successfully for {}", blockchain);
}
}
}
cli::Commands::Search { blockchain, command } => match command {
cli::SearchSubcommand::Run => run_search(blockchain.as_ref(), &settings).await?,
cli::SearchSubcommand::Run => run_search(blockchain.as_ref(), &settings)?,
},
}
Ok(())
}
async fn run_search(
fn run_search(
blockchain: Option<&Blockchain>,
settings: &Settings,
) -> Result<(), DynError> {
// 1. Создаем систему каналов с использованием flume
let mut key_channels = HashMap::new();
for (algorithm, key_generator_settings) in &settings.key_generators {
let (tx, rx) = flume::bounded(key_generator_settings.buffer_size);
let (tx, rx) = bounded(key_generator_settings.buffer_size);
key_channels.insert(algorithm.clone(), (tx, rx));
}
@@ -70,7 +69,7 @@ async fn run_search(
let algorithm = algorithm.clone();
let key_sender = key_sender.clone();
let handle = tokio::spawn(async move {
let handle = thread::spawn(move || {
let key_generator = create_key_generator(&algorithm, &key_generator_settings.data)
.expect("Failed to create key generator");
@@ -78,15 +77,10 @@ async fn run_search(
let mut generated_keys_count = BigUint::ZERO;
loop {
let key_hex = key_generator.generate_key_hex();
// Отправляем ключ с ожиданием свободного места в буфере
match key_sender.send_async(key_hex).await {
Ok(_) => {
// Разрешение освобождается при обработке ключа воркером
}
Err(e) => {
log::error!("Key generator failed to send: {}", e);
break;
}
// Отправляем ключ с возможностью прерывания
if key_sender.send(key_hex).is_err() {
continue;
}
generated_keys_count += BigUint::one();
@@ -103,18 +97,16 @@ async fn run_search(
}
}
let (notify_sender, notify_receiver) = async_channel::unbounded::<String>();
let (notify_sender, notify_receiver) = unbounded();
// 4. Обработчики кошельков с гарантией обработки
let mut wallet_checker_handles = vec![];
for (blockchain, blockchain_settings) in &settings.blockchains {
let wallet_checker = create_wallet_checker(&blockchain, &blockchain_settings.data)?;
let key_algorithm = wallet_checker.get_key_algorithm();
// Получаем приемник для нужного алгоритма
let (_, key_receiver) = key_channels.get(&key_algorithm).ok_or(format!(
"No key channel for algorithm {} (blockchain {})",
key_algorithm, blockchain
&key_algorithm, &blockchain,
))?;
for worker_index in 0..blockchain_settings.workers {
@@ -124,14 +116,15 @@ async fn run_search(
let wallet_checker = create_wallet_checker(&blockchain, &blockchain_settings.data)?;
let notify_sender = notify_sender.clone();
let handle = tokio::spawn(async move {
let handle = thread::spawn(move || {
let mut start_time = Instant::now();
let mut checked_wallets_count = BigUint::ZERO;
while let Ok(key_hex) = key_receiver.recv_async().await {
match wallet_checker.get_wallet_info(&key_hex).await {
while let Ok(key_hex) = key_receiver.recv() {
match wallet_checker.get_wallet_info(&key_hex) {
Ok(Some(info)) => {
if let Err(e) = notify_sender.send(info).await {
log::error!("Failed to send notification: {}", e);
if notify_sender.send(info).is_err() {
break;
}
}
Ok(None) => {}
@@ -158,16 +151,39 @@ async fn run_search(
let bot_token = settings.notifications.telegram_bot_token.clone();
let user_id = settings.notifications.telegram_user_id.clone();
let _ = tokio::spawn(async move {
while let Ok(message) = notify_receiver.recv().await {
if let Err(e) = send_telegram_message(&bot_token, &user_id, &message).await {
let notify_handle = thread::spawn(move || {
while let Ok(message) = notify_receiver.recv() {
if let Err(e) = send_telegram_message(&bot_token, &user_id, &message) {
log::error!("Failed to send Telegram message: {}", e);
}
}
});
// 6. Управление завершением
tokio::signal::ctrl_c().await?;
// 6. Ожидаем Ctrl+C для завершения
let (shutdown_sender, shutdown_receiver) = bounded(0);
ctrlc::set_handler(move || {
log::info!("Received Ctrl+C, shutting down");
let _ = shutdown_sender.send(());
})?;
// Ожидаем сигнал завершения
shutdown_receiver.recv()?;
log::info!("Shutting down");
// Закрываем каналы для завершения потоков
for (_, (sender, _)) in key_channels {
drop(sender);
}
drop(notify_sender);
// Дожидаемся завершения потоков
for handle in key_gen_handles {
let _ = handle.join();
}
for handle in wallet_checker_handles {
let _ = handle.join();
}
let _ = notify_handle.join();
Ok(())
}

View File

@@ -2,7 +2,7 @@ use reqwest::blocking::Client;
use crate::utils::DynError;
pub async fn send_telegram_message(
pub fn send_telegram_message(
bot_token: &str,
user_id: &str,
message: &str,