diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..b5d4567 --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,231 @@ +name: Release +on: + push: + tags: + - 'v[0-9]+.[0-9]+.[0-9]+' + +env: + PROJECT_NAME: kun_peng + REPO_NAME: ${{ github.repository }} + BREW_TAP: eric9n/homebrew-tap + DESC: "An ultra-fast, low-memory footprint and accurate taxonomy classifier for all" + +jobs: + dist: + permissions: + contents: write + name: Dist + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + build: [x86_64-linux, x86_64-macos, x86_64-windows, aarch64-macos] + include: + - build: x86_64-linux + os: ubuntu-latest + rust: stable + target: x86_64-unknown-linux-gnu + - build: x86_64-macos + os: macos-latest + rust: stable + target: x86_64-apple-darwin + - build: x86_64-windows + os: windows-latest + rust: stable + target: x86_64-pc-windows-msvc + - build: aarch64-macos + os: macos-latest + rust: stable + target: aarch64-apple-darwin + + steps: + - name: Checkout sources + uses: actions/checkout@v2 + with: + submodules: true + + - name: Install ${{ matrix.rust }} toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: ${{ matrix.rust }} + target: ${{ matrix.target }} + override: true + + - name: Run cargo test + uses: actions-rs/cargo@v1 + with: + use-cross: ${{ matrix.cross }} + command: test + args: --release --locked --target ${{ matrix.target }} + + - name: Build release binary + uses: actions-rs/cargo@v1 + with: + use-cross: ${{ matrix.cross }} + command: build + args: --release --locked --target ${{ matrix.target }} + + - name: Build archive + shell: bash + run: | + mkdir dist + if [ "${{ matrix.os }}" = "windows-latest" ]; then + cp "target/${{ matrix.target }}/release/${{ env.PROJECT_NAME }}.exe" "dist/${{ env.PROJECT_NAME }}-${{ github.ref_name }}-${{ matrix.target }}.exe" + else + cp "target/${{ matrix.target }}/release/${{ env.PROJECT_NAME }}" "dist/${{ env.PROJECT_NAME }}-${{ github.ref_name }}-${{ matrix.target }}" + fi + + # Set up the GitHub CLI + - name: Install GitHub CLI (macOS) + run: | + brew install gh + if: matrix.os == 'macos-latest' + + - name: Install GitHub CLI (Ubuntu) + run: | + sudo apt install -y gh + if: matrix.os == 'ubuntu-latest' + + - name: Install GitHub CLI (Windows) + run: | + choco install gh + if: matrix.os == 'windows-latest' + + # Log in to the GitHub CLI + - name: Login to GitHub CLI + run: echo "${{ secrets.GITHUB_TOKEN }}" | gh auth login --with-token + + - name: Upload Release Asset + run: | + if [ "${{ matrix.os }}" = "windows-latest" ]; then + ASSET_NAME="${{ env.PROJECT_NAME }}-${{ github.ref_name }}-${{ matrix.target }}.exe" + else + ASSET_NAME="${{ env.PROJECT_NAME }}-${{ github.ref_name }}-${{ matrix.target }}" + fi + gh release upload ${{ github.ref_name }} \ + "./dist/$ASSET_NAME" \ + --clobber + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + shell: bash + + - name: Set macOS artifact name + if: matrix.os == 'macos-latest' && matrix.target == 'x86_64-apple-darwin' + run: echo "macos_artifact=${{ env.PROJECT_NAME }}-${{ github.ref_name }}-${{ matrix.target }}" >> $GITHUB_OUTPUT + id: artifact_name + + - name: Build on CentOS 7 + if: matrix.os == 'ubuntu-latest' + run: | + docker run --name centos7-container -v $GITHUB_WORKSPACE:/github/workspace -w /github/workspace centos:7 \ + /bin/bash -c "echo '[base]' > /etc/yum.repos.d/CentOS-Base.repo; \ + echo 'name=CentOS-7 - Base' >> /etc/yum.repos.d/CentOS-Base.repo; \ + echo 'baseurl=http://vault.centos.org/centos/7/os/x86_64/' >> /etc/yum.repos.d/CentOS-Base.repo; \ + echo 'gpgcheck=1' >> /etc/yum.repos.d/CentOS-Base.repo; \ + echo 'enabled=1' >> /etc/yum.repos.d/CentOS-Base.repo; \ + echo 'gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7' >> /etc/yum.repos.d/CentOS-Base.repo; \ + yum update -y && yum install -y gcc make openssl openssl-devel && \ + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y && export PATH=\$HOME/.cargo/bin:\$PATH && cd /github/workspace && cargo build --release" + + docker cp centos7-container:/github/workspace/target/release/${{ env.PROJECT_NAME }} ./dist/${{ env.PROJECT_NAME }}-${{ github.ref_name }}-centos7 + docker rm centos7-container + + - name: Upload CentOS 7 Release Asset + if: matrix.os == 'ubuntu-latest' + run: | + gh release upload ${{ github.ref_name }} \ + "./dist/${{ env.PROJECT_NAME }}-${{ github.ref_name }}-centos7" \ + --clobber + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + update-formula: + needs: dist + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Extract version + id: extract-version + run: echo "tag-name=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT + + - name: Set environment variables + run: | + echo "PROJECT_NAME=${{ env.PROJECT_NAME }}" >> $GITHUB_ENV + echo "REPO_NAME=${{ env.REPO_NAME }}" >> $GITHUB_ENV + + - name: Verify release assets + run: | + VERSION=${{ steps.extract-version.outputs.tag-name }} + REPO=${{ env.REPO_NAME }} + PROJECT=${{ env.PROJECT_NAME }} + X86_64_URL="https://github.com/${REPO}/releases/download/${VERSION}/${PROJECT}-${VERSION}-x86_64-apple-darwin" + AARCH64_URL="https://github.com/${REPO}/releases/download/${VERSION}/${PROJECT}-${VERSION}-aarch64-apple-darwin" + + if curl --output /dev/null --silent --head --fail "$X86_64_URL"; then + echo "x86_64 binary exists" + else + echo "x86_64 binary does not exist" + exit 1 + fi + + if curl --output /dev/null --silent --head --fail "$AARCH64_URL"; then + echo "aarch64 binary exists" + else + echo "aarch64 binary does not exist" + exit 1 + fi + + - name: Update Homebrew formula + env: + COMMITTER_TOKEN: ${{ secrets.COMMITTER_TOKEN }} + run: | + VERSION=${{ steps.extract-version.outputs.tag-name }} + REPO=${{ env.REPO_NAME }} + PROJECT=${{ env.PROJECT_NAME }} + DESC="${{ env.DESC }}" + X86_64_URL="https://github.com/${REPO}/releases/download/${VERSION}/${PROJECT}-${VERSION}-x86_64-apple-darwin" + AARCH64_URL="https://github.com/${REPO}/releases/download/${VERSION}/${PROJECT}-${VERSION}-aarch64-apple-darwin" + + # 下载并更新formula + git clone https://github.com/${{ env.BREW_TAP }}.git homebrew-tap + cd homebrew-tap + + cat > Formula/${PROJECT}.rb < "${PROJECT}" + else + bin.install "${PROJECT}-#{version}-aarch64-apple-darwin" => "${PROJECT}" + end + end + + test do + system "#{bin}/${PROJECT}", "--version" + end + end + + EOL + + git config user.name github-actions + git config user.email github-actions@github.com + git add Formula/${PROJECT}.rb + git commit -m "Updating formula for ${PROJECT} to ${VERSION}" + git push https://${{ secrets.COMMITTER_TOKEN }}@github.com/${{ env.BREW_TAP }}.git main diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ad9d73d..2c4c741 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -1,15 +1,17 @@ name: "publish" -on: - create: - tags: - - '*' +# Commenting out the current trigger +# on: +# create: +# tags: +# - '*' + # This is the example from the readme. # On each push to the `release` branch it will create or update a GitHub release, build your app, and upload the artifacts to the release. env: CARGO_TERM_COLOR: always - BINARIES_LIST: 'ncbi_dl kun_peng' + BINARIES_LIST: 'kun_peng' PROJECT_PREFIX: 'Kun-peng-' jobs: diff --git a/Cargo.toml b/Cargo.toml index cc6dbc9..dc3daef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["ncbi", "kr2r", "seqkmer"] +members = ["kr2r", "seqkmer"] resolver = "2" [profile.release] diff --git a/README.md b/README.md index 800db8b..466d962 100644 --- a/README.md +++ b/README.md @@ -41,14 +41,56 @@ Follow these steps to install Kun-peng and run the examples. If you prefer not to build from source, you can download the pre-built binaries for your platform from the GitHub [releases page](https://github.com/eric9n/Kun-peng/releases). +For Linux users (CentOS 7 compatible): + ```bash -mkdir kun_peng_v0.6.15 -tar -xvf Kun-peng-v0.6.15-centos7.tar.gz -C kun_peng_v0.6.15 -# Add environment variable -echo 'export PATH=$PATH:~/biosoft/kun_peng_v0.6.15' >> ~/.bashrc +# Replace X.Y.Z with the latest version number +VERSION=vX.Y.Z +mkdir kun_peng_$VERSION +wget https://github.com/eric9n/Kun-peng/releases/download/$VERSION/kun_peng-$VERSION-centos7 +mv kun_peng-$VERSION-centos7 kun_peng_$VERSION/kun_peng +chmod +x kun_peng_$VERSION/kun_peng +# Add to PATH +echo "export PATH=\$PATH:$PWD/kun_peng_$VERSION" >> ~/.bashrc source ~/.bashrc ``` +For macOS users: + +```bash +# Replace X.Y.Z with the latest version number +VERSION=vX.Y.Z +mkdir kun_peng_$VERSION +# For Intel Macs +wget https://github.com/eric9n/Kun-peng/releases/download/$VERSION/kun_peng-$VERSION-x86_64-apple-darwin +mv kun_peng-$VERSION-x86_64-apple-darwin kun_peng_$VERSION/kun_peng +# For Apple Silicon Macs +# wget https://github.com/eric9n/Kun-peng/releases/download/$VERSION/kun_peng-$VERSION-aarch64-apple-darwin +# mv kun_peng-$VERSION-aarch64-apple-darwin kun_peng_$VERSION/kun_peng +chmod +x kun_peng_$VERSION/kun_peng +# Add to PATH +echo "export PATH=\$PATH:$PWD/kun_peng_$VERSION" >> ~/.zshrc # or ~/.bash_profile for Bash +source ~/.zshrc # or source ~/.bash_profile for Bash +``` + +For Windows users: + +```powershell +# Replace X.Y.Z with the latest version number +$VERSION = "vX.Y.Z" +New-Item -ItemType Directory -Force -Path kun_peng_$VERSION +Invoke-WebRequest -Uri "https://github.com/eric9n/Kun-peng/releases/download/$VERSION/kun_peng-$VERSION-x86_64-pc-windows-msvc.exe" -OutFile "kun_peng_$VERSION\kun_peng.exe" +# Add to PATH +$env:Path += ";$PWD\kun_peng_$VERSION" +[Environment]::SetEnvironmentVariable("Path", $env:Path, [EnvironmentVariableTarget]::User) +``` + +After installation, you can verify the installation by running: + +```bash +kun_peng --version +``` + #### Run the `kun_peng` example We will use a very small virus database on the GitHub homepage as an example: @@ -148,54 +190,31 @@ This output confirms that the `kun_peng` commands were executed successfully and ## ncbi_dl tool +For detailed information and usage instructions for the ncbi_dl tool, please refer to the [ncbi_dl repository](https://github.com/eric9n/ncbi_dl.git). -#### Run the `ncbi` Example - -Run the example script in the ncbi project to download the necessary files. Execute the following command from the root of the workspace: +The ncbi_dl tool is used to download resources from the NCBI website, including taxonomy files and genome data. It provides a convenient way to obtain the necessary data for building Kun-peng databases. -``` sh -cargo run --release --example run_download --package ncbi_dl -``` - -This will run the run_download.rs example located in the ncbi project's examples directory. The script will: - -1. Ensure the necessary directories exist. -2. Download the required files using the ncbi binary with the following commands: +### Downloading Genome Databases -- ./target/release/ncbi_dl -d downloads gen -g archaea -- ./target/release/ncbi_dl -d downloads tax +To download genome databases using ncbi_dl, you can use the `genomes` (or `gen`) command. Here's a basic example: -Example Output You should see output similar to the following: - -``` txt -Executing command: /path/to/workspace/target/release/ncbi_dl -d /path/to/workspace/downloads gen -g archaea -NCBI binary output: [download output here] - -Executing command: /path/to/workspace/target/release/ncbi_dl -d /path/to/workspace/downloads tax -NCBI binary output: [download output here] +```sh +ncbi_dl -d /path/to/download/directory gen -g bacteria ``` -The ncbi_dl binary is used to download resources from the NCBI website. Here is the help manual for the ncbi_dl binary: +This command will download bacterial genomes to the specified directory. You can replace `bacteria` with other genome groups like `archaea`, `fungi`, `protozoa`, or `viral` depending on your needs. -``` sh -./target/release/ncbi_dl -h -ncbi_dl download resource +Some key options for the `genomes` command include: -Usage: ncbi_dl [OPTIONS] +- `-g, --groups `: Specify which genome groups to download (e.g., bacteria, archaea, viral) +- `-f, --file-types `: Choose which file types to download (default is genomic.fna.gz) +- `-l, --assembly-level `: Set the assembly level (e.g., complete, chromosome, scaffold, contig) -Commands: - taxonomy Download taxonomy files from NCBI (alias: tax) - genomes Download genomes data from NCBI (alias: gen) - help Print this message or the help of the given subcommand(s) +For a full list of options and more detailed usage instructions, please refer to the ncbi_dl repository documentation. -Options: - -d, --download-dir Directory to store downloaded files [default: lib] - -n, --num-threads Number of threads to use for downloading [default: 20] - -h, --help Print help (see more with '--help') - -V, --version Print version -``` +For installation, additional usage examples, and more detailed documentation, please visit the ncbi_dl repository linked above. -## kun_peng tool +## kun_peng ``` sh Usage: kun_peng diff --git a/ncbi/Cargo.toml b/ncbi/Cargo.toml deleted file mode 100644 index 436727f..0000000 --- a/ncbi/Cargo.toml +++ /dev/null @@ -1,29 +0,0 @@ -[package] -name = "ncbi_dl" -version = "0.1.8" -edition = "2021" -authors = ["eric9n@gmail.com"] -license = "MIT" -repository = "https://github.com/eric9n/Kun-peng" -keywords = ["bioinformatics", "metagenomics", "microbiome", "ncbi"] -description = "download ncbi genome file" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -reqwest = { version = "0.12.4", features = ["stream", "multipart", "gzip"] } -tokio = { version = "1", features = ["full"] } -anyhow = "1.0" -futures = "0.3" -regex = "1" -clap = { version = "4.4.10", features = ["derive"] } -futures-util = "0.3.30" -reqwest-retry = "0.6.1" -reqwest-middleware = "0.3" -lazy_static = "1.4" -log = "0.4" -env_logger = "0.11.0" -md-5 = "0.10.6" -async-compression = "0.4.5" -tar = "0.4" -num_cpus = "1.13.1" diff --git a/ncbi/README.md b/ncbi/README.md deleted file mode 100644 index f7a2ab1..0000000 --- a/ncbi/README.md +++ /dev/null @@ -1,21 +0,0 @@ -## ncbi_dl tool - -The ncbi_dl binary is used to download resources from the NCBI website. Here is the help manual for the ncbi_dl binary: - -``` sh -./target/release/ncbi_dl -h -ncbi_dl download resource - -Usage: ncbi_dl [OPTIONS] - -Commands: - taxonomy Download taxonomy files from NCBI (alias: tax) - genomes Download genomes data from NCBI (alias: gen) - help Print this message or the help of the given subcommand(s) - -Options: - -d, --download-dir Directory to store downloaded files [default: lib] - -n, --num-threads Number of threads to use for downloading [default: 20] - -h, --help Print help (see more with '--help') - -V, --version Print version -``` diff --git a/ncbi/examples/run_download.rs b/ncbi/examples/run_download.rs deleted file mode 100644 index 4591759..0000000 --- a/ncbi/examples/run_download.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::fs; -use std::path::PathBuf; -use std::process::Command; - -fn main() { - let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .parent() - .unwrap() - .to_path_buf(); - - // Run the NCBI binary to download files - let ncbi_binary = workspace_root.join("target/release/ncbi_dl"); - let download_dir = workspace_root.join("downloads"); - // Ensure the download directory exists - fs::create_dir_all(&download_dir).expect("Failed to create download directory"); - - let args = vec![ - "-d".to_string(), - download_dir.to_string_lossy().to_string(), - "gen".to_string(), - "-g".to_string(), - "archaea".to_string(), - ]; - - let command_str = format!("{} {}", ncbi_binary.to_string_lossy(), args.join(" ")); - println!("Executing command: {}", command_str); - - // Run the NCBI binary to download files - let output = Command::new(&ncbi_binary) - .args(&args) - .output() - .expect("Failed to run NCBI binary"); - println!( - "NCBI binary output: {}", - String::from_utf8_lossy(&output.stdout) - ); - - let args = vec![ - "-d".to_string(), - download_dir.to_string_lossy().to_string(), - "tax".to_string(), - ]; - - let command_str = format!("{} {}", ncbi_binary.to_string_lossy(), args.join(" ")); - println!("Executing command: {}", command_str); - - // Run the NCBI binary to download files - let output = Command::new(&ncbi_binary) - .args(&args) - .output() - .expect("Failed to run NCBI binary"); - println!( - "NCBI binary output: {}", - String::from_utf8_lossy(&output.stdout) - ); -} diff --git a/ncbi/src/client.rs b/ncbi/src/client.rs deleted file mode 100644 index 6e2545a..0000000 --- a/ncbi/src/client.rs +++ /dev/null @@ -1,26 +0,0 @@ -use lazy_static::lazy_static; -use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; -use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; -use tokio::time::Duration; - -lazy_static! { - static ref CLIENT: ClientWithMiddleware = { - let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); - let client = reqwest::Client::builder() - .gzip(true) - .tcp_nodelay(true) - // .timeout(Duration::from_secs(300)) - .connect_timeout(Duration::from_secs(20)) - // .tcp_keepalive(Duration::from_secs(300)) - .build() - .expect("reqwest::Client::new()"); - - ClientBuilder::new(client) - .with(RetryTransientMiddleware::new_with_policy(retry_policy)) - .build() - }; -} - -pub fn retry_client() -> &'static ClientWithMiddleware { - &CLIENT -} diff --git a/ncbi/src/down.rs b/ncbi/src/down.rs deleted file mode 100644 index 9cb05e5..0000000 --- a/ncbi/src/down.rs +++ /dev/null @@ -1,97 +0,0 @@ -use crate::client::retry_client; -use anyhow::Result; -use futures_util::StreamExt; -use reqwest::{self, header, StatusCode}; -use std::path::PathBuf; -use std::str::FromStr; -use tokio::fs::{self, OpenOptions}; -use tokio::io::AsyncWriteExt; -use tokio::time::{sleep, timeout, Duration}; - -fn get_etag(response: &reqwest::Response) -> String { - response - .headers() - .get(header::ETAG) - .map(|tag| tag.to_str().unwrap_or("")) - .unwrap_or("") - .to_string() -} - -fn get_content_length(response: &reqwest::Response) -> u64 { - // 如果解析不到这个字段,设置成 1,为了让下载继续 - response - .headers() - .get(header::CONTENT_LENGTH) - .and_then(|value| value.to_str().ok()) - .and_then(|value_str| u64::from_str(value_str).ok()) - .unwrap_or(1) -} - -/// 断点续传 -async fn download_resume(url: &str, file_name: &PathBuf, etag: &str) -> Result { - let client = retry_client(); - - // 检查本地文件大小 - let file_size = if file_name.exists() { - fs::metadata(file_name).await?.len() - } else { - 0 - }; - - let head_resp: reqwest::Response = client.head(url).send().await?; - let latest_etag = get_etag(&head_resp); - let content_length = get_content_length(&head_resp); - - // 如果文件下载完成,并且 etag 没有更新,则不下载 - if file_size == content_length && latest_etag != "" && latest_etag == etag { - return Ok(etag.into()); - } - - // 设置 Range 头部 - let mut headers = header::HeaderMap::new(); - headers.insert(header::RANGE, format!("bytes={}-", file_size).parse()?); - - // 发送带 Range 头部的 GET 请求 - let response = client.get(url).headers(headers).send().await?; - let status = response.status(); - - // 仅当服务器响应 206 Partial Content 时处理响应体 - if status == StatusCode::PARTIAL_CONTENT { - // 打开文件用于追加 - let mut file = OpenOptions::new() - .create(true) - .append(true) - .open(&file_name) - .await?; - - // 读取响应内容并写入文件 - let mut stream = response.bytes_stream(); - while let Some(chunk) = timeout(Duration::from_secs(30), stream.next()).await? { - let chunk = chunk?; - file.write_all(&chunk).await?; - } - } - - Ok(latest_etag) -} - -/// 下载文件 -pub async fn retry_download( - url: &str, - file_name: &PathBuf, - etag: &str, - retry: i32, -) -> Result { - let mut count = retry; - let mut err = anyhow::anyhow!("download failed"); - while count > 0 { - let result = download_resume(url, file_name, etag).await; - match result { - Ok(retrieved_etag) => return Ok(retrieved_etag), - Err(e) => err = e, - } - count -= 1; - sleep(Duration::from_secs(3)).await; - } - Err(err) -} diff --git a/ncbi/src/fna.rs b/ncbi/src/fna.rs deleted file mode 100644 index b96c7b8..0000000 --- a/ncbi/src/fna.rs +++ /dev/null @@ -1,192 +0,0 @@ -use async_compression::tokio::bufread::GzipDecoder; -use regex::Regex; -use std::collections::HashMap; -use std::fs; -use std::path::{Path, PathBuf}; -use tokio::fs::OpenOptions; -use tokio::{ - fs::File, - io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, -}; - -use anyhow::Result; -use tar::Archive; - -pub async fn decompress_and_extract_tar_gz( - gz_path: &PathBuf, - out_path: &PathBuf, - files_to_extract: Vec, -) -> std::io::Result<()> { - // Open the .tar.gz file - let file = File::open(gz_path).await?; - let buf_reader = BufReader::new(file); - - // Use GzipDecoder for decompression - let gzip_decoder = GzipDecoder::new(buf_reader); - - // Create an async reader - let mut async_reader = BufReader::new(gzip_decoder); - - // Read all decompressed data into memory - let mut decompressed_data = Vec::new(); - async_reader.read_to_end(&mut decompressed_data).await?; // Works after importing AsyncReadExt - - // Use the tar crate to decompress the TAR archive - let mut archive = Archive::new(&decompressed_data[..]); - // archive.unpack(out_path)?; - - // 遍历 TAR 归档中的每个条目 - for entry in archive.entries()? { - let mut entry = entry?; - let path = entry.path()?.to_string_lossy().to_string(); - - // 检查是否为需要提取的文件 - if files_to_extract.contains(&path) { - let out_file_path = out_path.join(&path); - - // 创建输出文件夹 - if let Some(parent) = out_file_path.parent() { - tokio::fs::create_dir_all(parent).await?; - } - - // 解压缩并写入文件 - entry.unpack(out_file_path)?; - } - } - - Ok(()) -} - -async fn delete_hllp_json_files(out_dir: &Path) -> Result<()> { - let entries = fs::read_dir(out_dir)?; - - for entry in entries { - let entry = entry?; - let path = entry.path(); - if path.is_file() { - if let Some(file_name) = path.file_name() { - if let Some(file_name_str) = file_name.to_str() { - if file_name_str.starts_with("hllp_") && file_name_str.ends_with(".json") { - tokio::fs::remove_file(&path).await?; - println!("Deleted: {:?}", path); - } - } - } - } - } - - Ok(()) -} - -pub async fn parse_assembly_fna( - site: &str, - data_dir: &PathBuf, - asm_levels: &Vec<&str>, -) -> Result> { - let mut gz_files: HashMap = HashMap::new(); - let file_name = format!("assembly_summary_{}.txt", site); - let file_path = data_dir.join(file_name); - let file = File::open(&file_path).await?; - let reader = BufReader::new(file); - let mut lines = reader.lines(); - - while let Some(line) = lines.next_line().await? { - if line.starts_with('#') { - continue; - } - - let fields: Vec<&str> = line.split('\t').collect(); - if fields.len() > 19 { - let (taxid, asm_level, ftp_path) = (fields[5], fields[11], fields[19]); - - if ftp_path == "na" { - continue; - } - - if !asm_levels.contains(&asm_level) { - continue; - } - - let fna_file_name = format!( - "{}/{}_genomic.fna.gz", - site, - ftp_path.split('/').last().unwrap_or_default() - ); - gz_files.insert(fna_file_name, taxid.into()); - } - } - Ok(gz_files) -} - -pub async fn write_to_fna( - site: &str, - group: &str, - asm_levels: &Vec<&str>, - data_dir: &PathBuf, - out_dir: &PathBuf, -) -> Result<()> { - log::info!("{} {} write to fna...", group, site); - - let gz_files = if site == "all" { - let mut gz_files = parse_assembly_fna("genbank", data_dir, asm_levels).await?; - let ref_gz_files = parse_assembly_fna("refseq", data_dir, asm_levels).await?; - gz_files.extend(ref_gz_files); - gz_files - } else { - parse_assembly_fna(site, data_dir, asm_levels).await? - }; - let library_fna_path = out_dir.join("library.fna"); - let prelim_map_path = out_dir.join("prelim_map.txt"); - - let mut fna_writer = BufWriter::new( - OpenOptions::new() - .create(true) - .write(true) - .open(&library_fna_path) - .await?, - ); - delete_hllp_json_files(&out_dir).await?; - let mut map_writer = BufWriter::new( - OpenOptions::new() - .create(true) - .write(true) - .open(&prelim_map_path) - .await?, - ); - - let re: Regex = Regex::new(r"^>(\S+)").unwrap(); - - for (gz_path, taxid) in gz_files { - let gz_file = data_dir.join(gz_path); - if !gz_file.exists() { - continue; - } - - let file = File::open(gz_file).await?; - let decompressor = GzipDecoder::new(BufReader::new(file)); - let mut reader = BufReader::new(decompressor); - - let mut line = String::new(); - while reader.read_line(&mut line).await? != 0 { - if let Some(caps) = re.captures(&line) { - let seqid = &caps[1]; - let full_tax_id = format!("taxid|{}", taxid); - map_writer - .write_all(format!("TAXID\t{}|{}\t{}\n", full_tax_id, seqid, taxid).as_bytes()) - .await?; - fna_writer - .write_all(format!(">{}|{}", full_tax_id, &line[1..]).as_bytes()) - .await?; - } else { - fna_writer.write_all(line.as_bytes()).await?; - } - line.clear(); - } - } - - fna_writer.flush().await?; - map_writer.flush().await?; - - log::info!("{} {} write to fna finished", group, site); - Ok(()) -} diff --git a/ncbi/src/lib.rs b/ncbi/src/lib.rs deleted file mode 100644 index 74a5a7d..0000000 --- a/ncbi/src/lib.rs +++ /dev/null @@ -1,10 +0,0 @@ -mod client; -pub mod down; -pub mod fna; -pub mod load; -pub mod md5sum; -pub mod meta; -// pub mod site; -pub mod plas; -pub mod task; -pub mod utils; diff --git a/ncbi/src/load.rs b/ncbi/src/load.rs deleted file mode 100644 index 4656cca..0000000 --- a/ncbi/src/load.rs +++ /dev/null @@ -1,247 +0,0 @@ -use crate::down::retry_download; -use crate::fna::decompress_and_extract_tar_gz; -use crate::md5sum::check_md5sum_file; -use crate::meta::get_local_etag; -use crate::meta::insert_local_etag; -use anyhow::{anyhow, Result}; -use futures::Future; -use std::path::PathBuf; -use tokio::fs::{remove_file, File}; -use tokio::io::{AsyncBufReadExt, BufReader}; - -pub const NCBI_GEN_URL: &'static str = "https://ftp.ncbi.nlm.nih.gov/genomes/"; -pub const NCBI_TAXO_URL: &'static str = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy"; - -#[derive(Debug)] -pub struct DownTuple { - url: String, - etag: String, - pub file: PathBuf, -} - -impl DownTuple { - pub fn new(url: String, file: PathBuf, etag: String) -> Self { - Self { url, file, etag } - } - - pub async fn new_taxo(url_path: String, taxo_dir: &PathBuf) -> Self { - let taxdump_url = format!("{}/{}", NCBI_TAXO_URL, url_path); - let file_name = if url_path.contains("/") { - url_path.split("/").last().unwrap().to_string() - } else { - url_path - }; - let output_path = taxo_dir.join(file_name); - let etag = get_local_etag(&taxdump_url).await; - DownTuple::new(taxdump_url, output_path, etag.unwrap_or_default()) - } - - pub fn file_exists(&self) -> bool { - self.file.exists() - } - - pub async fn clear(&self) { - let _ = remove_file(&self.file).await; - } - - pub async fn run(&self) -> Result<()> { - let result = retry_download(&self.url, &self.file, &self.etag, 3).await?; - if result != self.etag { - insert_local_etag(self.url.clone(), result).await; - } - Ok(()) - } -} - -#[derive(Debug)] -pub enum NcbiFile { - Summary(DownTuple), - Genomic(DownTuple, DownTuple), - Taxonomy(DownTuple, DownTuple), -} - -impl NcbiFile { - pub async fn from_group(group: &str, data_dir: &PathBuf, site: &str) -> Self { - let url = format!("{}{}/{}/assembly_summary.txt", NCBI_GEN_URL, site, group); - let output_path = data_dir.join(format!("assembly_summary_{}.txt", site)); - - let etag = get_local_etag(&url).await; - NcbiFile::Summary(DownTuple::new( - url.clone(), - output_path, - etag.unwrap_or("".into()), - )) - } - - pub async fn from_file(site: &str, data_dir: &PathBuf, fna_url: &str) -> Self { - let fna_file_name = fna_url.split("/").last().unwrap_or_default(); - let ftp_path = fna_url.trim_end_matches(fna_file_name); - let fna_file = data_dir.join(site).join(&fna_file_name); - let fna_etag = get_local_etag(&fna_url).await.unwrap_or_default(); - let genomic_dt = DownTuple::new(fna_url.to_string(), fna_file, fna_etag); - let md5_url = format!("{}/md5checksums.txt", ftp_path); - let md5_file = data_dir - .join(site) - .join(format!("{}_md5checksums.txt", &fna_file_name)); - let md5_etag = get_local_etag(&md5_url).await.unwrap_or_default(); - let md5_dt = DownTuple::new(md5_url, md5_file, md5_etag); - NcbiFile::Genomic(genomic_dt, md5_dt) - } - - pub async fn new_taxo(taxo_dir: &PathBuf, url_path: &str) -> Self { - let taxo = DownTuple::new_taxo(url_path.to_string(), taxo_dir).await; - let md5_file = format!("{}.md5", url_path); - let taxo_md5 = DownTuple::new_taxo(md5_file, taxo_dir).await; - - NcbiFile::Taxonomy(taxo, taxo_md5) - } - - pub async fn decompress(&self, data_dir: &PathBuf) -> Result<()> { - match self { - NcbiFile::Summary(_) => {} - NcbiFile::Genomic(_, _) => {} - NcbiFile::Taxonomy(dt1, _) => { - let taxo_files: Vec = - vec!["names.dmp".to_string(), "nodes.dmp".to_string()]; - decompress_and_extract_tar_gz(&dt1.file, &data_dir, taxo_files).await?; - } - } - Ok(()) - } - - pub fn file_exists(&self) -> bool { - match self { - NcbiFile::Summary(s) => s.file_exists(), - NcbiFile::Genomic(dt1, dt2) => dt1.file_exists() && dt2.file_exists(), - NcbiFile::Taxonomy(dt1, dt2) => dt1.file_exists() && dt2.file_exists(), - } - } - - pub async fn clear(&self) { - match self { - NcbiFile::Summary(s) => s.clear().await, - NcbiFile::Genomic(dt1, dt2) => { - dt1.clear().await; - dt2.clear().await; - } - NcbiFile::Taxonomy(dt1, dt2) => { - dt1.clear().await; - dt2.clear().await; - } - } - } - - pub async fn run(&self) -> Result<()> { - if self.file_exists() && self.check().await.is_ok() { - // 如果 check 成功,直接返回 - return Ok(()); - } - - match self { - NcbiFile::Summary(dt) => dt.run().await?, - NcbiFile::Genomic(dt1, dt2) => { - dt1.run().await?; - dt2.run().await?; - } - NcbiFile::Taxonomy(dt1, dt2) => { - dt1.run().await?; - dt2.run().await?; - } - } - Ok(()) - } - - pub async fn check(&self) -> Result<()> { - match self { - NcbiFile::Summary(_) => return Err(anyhow!("不用校验")), - NcbiFile::Genomic(dt1, dt2) => { - let result = check_md5sum_file(&dt1.file, &dt2.file).await?; - if !result { - return Err(anyhow::anyhow!( - "{:?} {:?} {:?} mismatch", - dt1.url, - dt1.file, - dt2.file - )); - } - Ok(()) - } - NcbiFile::Taxonomy(dt1, dt2) => { - let result = check_md5sum_file(&dt1.file, &dt2.file).await?; - if !result { - return Err(anyhow::anyhow!( - "{:?} {:?} {:?} mismatch", - dt1.url, - dt1.file, - dt2.file - )); - } - Ok(()) - } - } - } -} - -impl NcbiFile { - pub async fn process_summary_and_apply( - &self, - site: &str, - data_dir: &PathBuf, - asm_levels: &Vec<&str>, - callback: F, - ) -> Result<()> - where - F: Fn(NcbiFile) -> Fut, - Fut: Future + Send + 'static, - { - match self { - NcbiFile::Genomic(_, _) => {} - NcbiFile::Taxonomy(_, _) => {} - NcbiFile::Summary(ncbi) => { - let file = File::open(&ncbi.file).await?; - let reader = BufReader::new(file); - let mut lines = reader.lines(); - - while let Some(line) = lines.next_line().await? { - if line.starts_with('#') { - continue; - } - - let fields: Vec<&str> = line.split('\t').collect(); - if fields.len() > 19 { - let (_, asm_level, ftp_path) = (fields[5], fields[11], fields[19]); - - if ftp_path == "na" { - continue; - } - if !asm_levels.contains(&asm_level) { - continue; - } - // if !["Complete Genome", "Chromosome"].contains(&asm_level) { - // continue; - // } - - let fna_file_name = format!( - "{}_genomic.fna.gz", - ftp_path.split('/').last().unwrap_or_default() - ); - - let fna_url = format!("{}/{}", ftp_path, fna_file_name); - let fna_file = data_dir.join(site).join(&fna_file_name); - let fna_etag = get_local_etag(&fna_url).await.unwrap_or_default(); - let genomic_dt = DownTuple::new(fna_url, fna_file, fna_etag); - let md5_url = format!("{}/md5checksums.txt", ftp_path); - let md5_file = data_dir - .join(site) - .join(format!("{}_md5checksums.txt", &fna_file_name)); - let md5_etag = get_local_etag(&md5_url).await.unwrap_or_default(); - let md5_dt = DownTuple::new(md5_url, md5_file, md5_etag); - let new_file = NcbiFile::Genomic(genomic_dt, md5_dt); - let _ = callback(new_file).await; - } - } - } - } - Ok(()) - } -} diff --git a/ncbi/src/main.rs b/ncbi/src/main.rs deleted file mode 100644 index 3b13a08..0000000 --- a/ncbi/src/main.rs +++ /dev/null @@ -1,346 +0,0 @@ -use anyhow::Result; -use clap::{Parser, Subcommand, ValueEnum}; -use lazy_static::lazy_static; -use ncbi_dl::fna::write_to_fna; -use ncbi_dl::meta::{init_meta, save_meta}; -use ncbi_dl::plas::download_plas_files; -use ncbi_dl::task; -use ncbi_dl::utils; -use std::collections::HashMap; -use std::fmt; -use std::path::PathBuf; -use tokio::runtime::Builder; - -const NCBI_LIBRARY: &'static [&str] = &[ - "archaea", - "bacteria", - "viral", - "fungi", - "plant", - "human", - "protozoa", - "vertebrate_mammalian", - "vertebrate_other", - "invertebrate", - "plasmid", -]; - -lazy_static! { - static ref NCBI_ASM_LEVELS: HashMap> = { - let mut m = HashMap::new(); - m.insert("complete_genome".to_string(), vec!["Complete Genome"]); - m.insert("chromosome".to_string(), vec!["Chromosome"]); - m.insert("scaffold".to_string(), vec!["Scaffold"]); - m.insert("contig".into(), vec!["Contig"]); - m.insert("basic".into(), vec!["Complete Genome", "Chromosome"]); - m.insert("uncomplete".into(), vec!["Scaffold", "Contig"]); - m.insert( - "all".into(), - vec!["Complete Genome", "Chromosome", "Scaffold", "Contig"], - ); - m - }; -} - -fn validate_group(group: &str) -> Result { - let groups = utils::parse_comma_separated_list(&group); - for grp in &groups { - if !NCBI_LIBRARY.contains(&grp.as_str()) { - return Err(format!("group not in ncbi library")); - } - } - Ok(group.to_string()) -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] -enum Site { - /// Download genbank resources - Genbank, - /// Download refseq resources - Refseq, - /// Both genbank and refseq - All, -} - -impl fmt::Display for Site { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{}", - match self { - Site::Genbank => "genbank", - Site::Refseq => "refseq", - Site::All => "all", - } - ) - } -} - -#[derive(Subcommand, Debug, ValueEnum, Clone)] -enum Plas { - Plasmid, - Plastid, -} - -impl fmt::Display for Plas { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{}", - match self { - Plas::Plasmid => "plasmid", - Plas::Plastid => "plastid", - } - ) - } -} - -#[derive(Subcommand, Debug)] -enum Mode { - /// Check the md5 of files only - Md5, - /// Parse genomic files and generate a library fna file - /// Also concatenate individual fna files into a group for building a database - Fna { - /// Directory to store the library fna file to avoid mixing with original files - #[clap(value_parser)] - out_dir: Option, - }, - /// Download and parse assembly files only - Assembly, - /// Download genomic files separately by specifying a URL - Url { - #[clap(value_parser)] - url: String, - }, -} - -#[derive(Parser, Debug)] -#[clap( - version, - about = "ncbi download resource", - long_about = "Download genomes resources from the NCBI website" -)] -struct Args { - /// Directory to store downloaded files - #[arg(short, long, default_value = "lib")] - download_dir: PathBuf, - - /// Number of threads to use for downloading - #[arg(short, long, default_value_t = num_cpus::get() * 2)] - num_threads: usize, - - #[command(subcommand)] - command: Commands, -} - -#[derive(Subcommand, Debug)] -enum Commands { - /// plasmid or plastid - Plas { - #[command(subcommand)] - mode: Plas, - }, - /// Download taxonomy files from NCBI (alias: tax) - #[command(alias = "tax")] - Taxonomy, - - /// Download genomes data from NCBI (alias: gen) - #[command(alias = "gen")] - Genomes { - /// Site directory to download from NCBI (RefSeq or GenBank) - #[arg(long, value_enum, default_value_t = Site::Refseq)] - site: Site, - - /// Assembly level: the highest level of assembly for any object in the genome - /// all, complete_genome, chromosome, scaffold, contig. basic: [complete_genome, chromosome] - #[arg(long, default_value = "basic")] - asm_level: String, - - /// Type of data to download from NCBI site, can be multiple comma-separated values - /// e.g., archaea, bacteria, viral, fungi, plant, human, protozoa, vertebrate_mammalian, vertebrate_other, invertebrate, plasmid - #[arg(short, long, value_parser = validate_group)] - group: String, - - /// Subcommand to generate fna files using md5 checksum - #[command(subcommand)] - mode: Option, - }, -} - -async fn async_run(args: Args) -> Result<()> { - let db_path = utils::create_data_dir(&args.download_dir).unwrap(); - init_meta(&db_path).await; - - match args.command { - Commands::Plas { mode } => { - let data_dir: PathBuf = db_path - .join("library") - .join(mode.to_string()) - .join("refseq"); - utils::create_dir(&data_dir)?; - download_plas_files(data_dir, &mode.to_string()).await? - } - Commands::Taxonomy => { - let data_dir: PathBuf = db_path.join("taxonomy"); - utils::create_dir(&data_dir)?; - let _ = task::run_taxo(&data_dir).await; - } - Commands::Genomes { - site, - group, - asm_level, - mode, - } => { - // let site_str = site.to_string(); - let groups = utils::parse_comma_separated_list(&group); - for grp in groups { - let data_dir: PathBuf = db_path.join("library").join(grp.clone()); - match site { - Site::All => { - for s in [Site::Genbank, Site::Refseq].iter() { - utils::create_dir(&data_dir.join(&s.to_string()))?; - } - } - _ => { - utils::create_dir(&data_dir.join(&site.to_string()))?; - } - } - - let trans_group = if &grp == "human" { - "vertebrate_mammalian/Homo_sapiens".to_string() - } else { - grp.to_string() - }; - - let levels = NCBI_ASM_LEVELS.get(&asm_level).unwrap(); - - match &mode { - Some(Mode::Md5) => match site { - Site::All => { - for site in [Site::Genbank, Site::Refseq].iter() { - let _ = task::run_check( - &site.to_string(), - &trans_group, - &data_dir, - &levels, - args.num_threads, - ) - .await; - } - } - _ => { - let _ = task::run_check( - &site.to_string(), - &trans_group, - &data_dir, - &levels, - args.num_threads, - ) - .await; - } - }, - Some(Mode::Fna { out_dir }) => { - let fna_out_dir = out_dir - .clone() - .unwrap_or(db_path.clone()) - .join("library") - .join(grp.clone()); - utils::create_dir(&fna_out_dir)?; - write_to_fna( - &site.to_string(), - &trans_group, - &levels, - &data_dir, - &fna_out_dir, - ) - .await?; - } - Some(Mode::Assembly) => match site { - Site::All => { - for s in [Site::Genbank, Site::Refseq].iter() { - let _ = task::run_assembly( - &s.to_string(), - &trans_group, - &levels, - &data_dir, - ) - .await; - } - } - _ => { - let _ = task::run_assembly( - &site.to_string(), - &trans_group, - &levels, - &data_dir, - ) - .await; - } - }, - Some(Mode::Url { url }) => { - if site == Site::All { - log::error!("Must specify a suitable site"); - } else { - let result = - task::run_download_file(&site.to_string(), &data_dir, &url).await; - if result.is_err() { - log::error!("download error... {:?}", result); - } - } - } - None => match site { - Site::All => { - for s in [Site::Genbank, Site::Refseq].iter() { - let _ = task::run_task( - &s.to_string(), - &trans_group, - &data_dir, - &&levels, - args.num_threads, - ) - .await; - } - } - _ => { - let _ = task::run_task( - &site.to_string(), - &trans_group, - &data_dir, - &&levels, - args.num_threads, - ) - .await; - } - }, - } - } - } - } - - save_meta(&db_path).await?; - Ok(()) -} - -fn main() -> Result<()> { - env_logger::Builder::new() - .filter_level(log::LevelFilter::Info) - .filter_module("reqwest_retry::middleware", log::LevelFilter::Error) - .init(); - - let args = Args::parse(); - let num_thread = args.num_threads.clone(); - // 创建一个 Runtime 实例,并配置线程数 - let runtime = Builder::new_multi_thread() - .enable_all() - .thread_name("ncbi") - // .max_blocking_threads(100) - .worker_threads(num_thread) // 设置所需的工作线程数 - .build() - .expect("Failed to create runtime"); - - // 使用 Runtime 运行异步代码 - runtime.block_on(async_run(args))?; - - Ok(()) -} diff --git a/ncbi/src/md5sum.rs b/ncbi/src/md5sum.rs deleted file mode 100644 index 41c41e8..0000000 --- a/ncbi/src/md5sum.rs +++ /dev/null @@ -1,69 +0,0 @@ -use anyhow::{anyhow, Result}; -use md5::{Digest, Md5}; -use std::path::PathBuf; -use tokio::fs::File; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; - -async fn get_md5_for_file(target_file: &PathBuf, md5_file: &PathBuf) -> Result { - let file = File::open(md5_file).await.map_err(|e| { - tokio::io::Error::new( - tokio::io::ErrorKind::NotFound, - format!("File operation failed: {:?}-{:?}", md5_file, e), - ) - })?; - let reader = BufReader::new(file); - let mut lines = reader.lines(); - - let file_name = target_file - .file_name() - .unwrap() - .to_string_lossy() - .to_string(); - while let Some(line) = lines.next_line().await? { - if line.starts_with('#') || line.trim().is_empty() { - continue; // 跳过注释和空行 - } - - let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() >= 2 { - let hash_key = parts.last().unwrap(); - if hash_key.ends_with(&file_name) { - let md5 = parts[0].trim().to_string(); - return Ok(md5); - } - } - } - - Err(anyhow!("MD5 checksum not found for file {}", file_name)) -} - -pub async fn check_md5sum_file(target_file: &PathBuf, md5_file: &PathBuf) -> Result { - let md5_value = get_md5_for_file(target_file, md5_file).await?; - - // 异步读取目标文件并计算其 MD5 散列值 - let mut hasher = Md5::new(); - let mut fna = File::open(target_file).await?; - let mut buffer = vec![0; 8192]; // 使用缓冲区以减少内存占用 - - loop { - let n = fna.read(&mut buffer).await?; - if n == 0 { - break; - } - hasher.update(&buffer[..n]); - } - - let fna_md5 = hasher.finalize(); - Ok(format!("{:x}", fna_md5) == md5_value) -} - -// fn write_failed_to_file(file_path: PathBuf, items: Vec) -> Result<()> {#[derive(Clone)] - -// let mut file = File::create(file_path)?; - -// for item in items { -// writeln!(file, "{}", item)?; -// } - -// Ok(()) -// } diff --git a/ncbi/src/meta.rs b/ncbi/src/meta.rs deleted file mode 100644 index 48757a0..0000000 --- a/ncbi/src/meta.rs +++ /dev/null @@ -1,96 +0,0 @@ -use anyhow::Result; -use lazy_static::lazy_static; -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::fs::File; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::Mutex; - -const META_FILE_NAME: &'static str = ".metadata"; - -async fn parse_metadata(filename: &PathBuf) -> Result> { - let mut meta: HashMap = HashMap::new(); - if !filename.exists() { - File::create(filename).await?; - } - - let file = File::open(filename).await?; - let reader = BufReader::new(file); - let mut lines = reader.lines(); - - while let Some(line) = lines.next_line().await? { - if line.starts_with('#') || line.trim().is_empty() { - continue; - } - - let parts: Vec<&str> = line.split(',').collect(); - if parts.len() < 2 { - continue; - } - let url = parts[0].to_string(); - let etag = parts[1].to_string(); - meta.insert(url, etag); - } - - Ok(meta) -} - -pub struct Meta { - inner: HashMap, -} - -impl Meta { - pub fn new() -> Self { - Self { - inner: HashMap::new(), - } - } - - pub async fn init(&mut self, db_path: &PathBuf) { - let meta_file: PathBuf = db_path.clone().join(META_FILE_NAME); - if let Ok(meta) = parse_metadata(&meta_file).await { - self.inner = meta; - } - } - - pub fn get_etag(&self, key: &str) -> Option { - self.inner.get(key).map(|s| s.to_string()) - } - - pub fn insert_or_update(&mut self, key: String, new_etag: String) { - self.inner.insert(key, new_etag); - } -} - -pub async fn init_meta(db_path: &PathBuf) { - let mut meta = META.lock().await; - meta.init(db_path).await; -} - -pub async fn get_local_etag(key: &str) -> Option { - let meta = META.lock().await; - meta.get_etag(key) -} - -pub async fn insert_local_etag(key: String, new_etag: String) { - let mut meta = META.lock().await; - meta.insert_or_update(key, new_etag); -} - -pub async fn save_meta(db_path: &PathBuf) -> Result<()> { - let meta = META.lock().await; - let meta_file: PathBuf = db_path.join(META_FILE_NAME); - let mut file = File::create(meta_file).await?; - - for (url, etag) in meta.inner.iter() { - file.write_all(format!("{},{}\n", url, etag).as_bytes()) - .await?; - } - - Ok(()) -} - -lazy_static! { - pub static ref META: Arc> = Arc::new(Mutex::new(Meta::new())); -} diff --git a/ncbi/src/plas.rs b/ncbi/src/plas.rs deleted file mode 100644 index 7d27c5f..0000000 --- a/ncbi/src/plas.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::path::PathBuf; - -use crate::client::retry_client; -use crate::load::{DownTuple, NcbiFile, NCBI_GEN_URL}; -use crate::meta::get_local_etag; -use anyhow::Result; -use regex::Regex; - -pub async fn download_plas_files(data_dir: PathBuf, plas_type: &str) -> Result<()> { - let plas_url = format!("{}refseq/{}/", NCBI_GEN_URL, plas_type); - let client = retry_client(); - let response: reqwest::Response = client.get(&plas_url).send().await?; - if response.status().is_success() { - let contents = response.text().await?; - // println!("contents {:?}", contents); - // 正则表达式匹配所有 href 属性 - let re = Regex::new(r#"href="([^"]+\.genomic\.fna\.gz)""#)?; - - // 查找并打印所有匹配的 href - for cap in re.captures_iter(&contents) { - let filename = &cap[1]; - let url = format!("{}{}", plas_url, filename); - println!("url {:?}", url); - let etag = get_local_etag(&url).await; - let output_path = data_dir.join(filename); - let ncbi_file = NcbiFile::Summary(DownTuple::new( - url.clone(), - output_path, - etag.unwrap_or("".into()), - )); - ncbi_file.run().await?; - } - } else { - log::error!("Failed to fetch the webpage."); - } - Ok(()) -} diff --git a/ncbi/src/site.rs b/ncbi/src/site.rs deleted file mode 100644 index e72d863..0000000 --- a/ncbi/src/site.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::collections::HashMap; - -use crate::client::retry_client; -use anyhow::Result; -use regex::Regex; -use std::fmt; - -// #[derive(Debug)] -pub struct Item { - name: String, - date: String, -} - -impl fmt::Debug for Item { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "({}, {})", self.name, self.date) - } -} - -pub const NCBI_SITES: &[&str] = &["refseq"]; -pub const NCBI_GEN_URL: &'static str = "https://ftp.ncbi.nlm.nih.gov/genomes/"; - -// 从https://ftp.ncbi.nlm.nih.gov/genomes/下的 genbank或者 refreq 目录获取解析 assembly_summary的信息。 -pub fn extract_info_from_html(html: &str) -> Result> { - let re = Regex::new(r#"([^<]*)\s*([\d-]{10} \d{2}:\d{2})\s*([-\dKM]*)"#) - .unwrap(); - let mut items = Vec::new(); - - for cap in re.captures_iter(html) { - let item = Item { - name: cap[1].trim_end_matches("/").to_string(), - date: cap[2].to_string(), - }; - if item.name.starts_with("README") || item.name.contains("assembly") { - continue; - } else { - items.push(item); - } - } - - Ok(items) -} - -pub fn site_display(map: &HashMap>) { - for (key, item) in map { - log::info!("{}: {:?}", key, item); - } -} - -pub fn check_group(group: &str, map: &HashMap>) -> bool { - for (_, items) in map { - for item in items { - if item.name == group { - return true; - } - } - } - false -} - -pub async fn open_ncbi_site(display: bool) -> Result>> { - let mut map: HashMap> = HashMap::new(); - let client = retry_client(); - - for site in NCBI_SITES { - let url: String = format!("{}{}/", NCBI_GEN_URL, site); - - let response: reqwest::Response = client.get(&url).send().await?; - - if response.status().is_success() { - let contents = response.text().await?; - let item_vec = extract_info_from_html(&contents).unwrap(); - map.insert(site.to_string(), item_vec); - } else { - log::error!("Failed to fetch the webpage."); - } - } - - if display { - site_display(&map); - } - Ok(map) -} diff --git a/ncbi/src/task.rs b/ncbi/src/task.rs deleted file mode 100644 index 97538b2..0000000 --- a/ncbi/src/task.rs +++ /dev/null @@ -1,222 +0,0 @@ -use crate::load::NcbiFile; -use anyhow::Result; -use futures::stream::StreamExt; -use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use tokio::sync::{mpsc, Semaphore}; - -async fn process_tasks( - task_type: String, - mut receiver: mpsc::Receiver<(NcbiFile, tokio::sync::OwnedSemaphorePermit)>, - // semaphore: Arc, -) -> Result { - // let semaphore = Arc::new(Semaphore::new(num_threads)); - // let mut futures = vec![]; - let counter: Arc = Arc::new(AtomicUsize::new(0)); - - let mut stream = futures::stream::FuturesUnordered::new(); - - while let Some((task, permit)) = receiver.recv().await { - // let permit = semaphore.clone().acquire_owned().await?; - // let next_tx_clone = next_tx.clone(); - let task_type_clone = task_type.clone(); - let counter_clone = counter.clone(); - - let task_future = tokio::spawn(async move { - let result = match task_type_clone.as_str() { - "run" => { - let result = task.run().await; - if result.is_ok() { - task.check().await - } else { - result - } - } - "check" => task.check().await, - _ => unreachable!(), - }; - drop(permit); - if result.is_ok() { - counter_clone.fetch_add(1, Ordering::SeqCst); - } - result - }); - stream.push(task_future); - } - - // let mut stream = futures::stream::FuturesUnordered::new(); - // for future in futures { - // stream.push(future); - // } - - // let mut stream = futures::stream::iter(futures).buffer_unordered(semaphore.available_permits()); - - while let Some(result) = stream.next().await { - match result { - Ok(Ok(_)) => {} // 任务成功完成 - Ok(Err(e)) => log::error!("Task failed: {}", e), - Err(e) => log::error!("Task panicked or could not be joined: {}", e), - } - } - - Ok(counter.load(Ordering::SeqCst)) -} - -/// 处理 assembly 文件 -async fn process_assembly_tasks( - site: &str, - group: &str, - data_dir: &PathBuf, - asm_levels: &Vec<&str>, - tx: mpsc::Sender<(NcbiFile, tokio::sync::OwnedSemaphorePermit)>, - semaphore: Arc, -) -> Result { - let counter = Arc::new(AtomicUsize::new(0)); - let assembly = NcbiFile::from_group(group, data_dir, site).await; - match assembly.run().await { - Ok(_) => { - let result = assembly - .process_summary_and_apply(site, data_dir, asm_levels, |file: NcbiFile| { - let tx_clone = tx.clone(); - let counter_clone = counter.clone(); - let semaphore_clone = semaphore.clone(); - async move { - let permit = semaphore_clone.acquire_owned().await.unwrap(); - let _ = tx_clone.send((file, permit)).await; - counter_clone.fetch_add(1, Ordering::SeqCst); - } - }) - .await; - if result.is_err() { - log::error!("Error parsing assembly file: {:?}", result); - } - } - Err(e) => { - log::info!("{}", e); - } - } - - Ok(counter.load(Ordering::SeqCst)) -} - -pub async fn run_task( - site: &str, - group: &str, - data_dir: &PathBuf, - asm_levels: &Vec<&str>, - num_threads: usize, -) -> Result<()> { - log::info!("{} {} download file start...", group, site); - let (tx, rx) = mpsc::channel(4096); // 通道大小可以根据需要调整 - // let (tx1, rx1) = mpsc::channel(4096); // 通道大小可以根据需要调整 - let semaphore = Arc::new(Semaphore::new(num_threads)); - let assembly_tasks = - process_assembly_tasks(site, group, data_dir, asm_levels, tx, semaphore.clone()); - let download_handle = process_tasks("run".to_string(), rx); - // let md5_handle = process_tasks("check".to_string(), rx1, None, semaphore.clone()); - // // 等待处理任务完成 - let (ably_res, down_res) = tokio::join!(assembly_tasks, download_handle); - log::info!( - "{} {} file total count: {}, downloaded: {}", - group, - site, - ably_res?, - down_res?, - ); - log::info!("{} {} file finished...", group, site); - Ok(()) -} - -pub async fn run_check( - site: &str, - group: &str, - data_dir: &PathBuf, - asm_levels: &Vec<&str>, - num_threads: usize, -) -> Result<()> { - log::info!("{} {} check md5 start...", group, site); - let (tx, rx) = mpsc::channel(4096); // 通道大小可以根据需要调整 - let semaphore = Arc::new(Semaphore::new(num_threads)); - let assembly_tasks = - process_assembly_tasks(site, group, data_dir, asm_levels, tx, semaphore.clone()); - let md5_handle = process_tasks("check".to_string(), rx); - // // 等待处理任务完成 - let (ably_res, md5_res) = tokio::join!(assembly_tasks, md5_handle); - log::info!( - "{} {} file total count: {}, md5match: {}", - group, - site, - ably_res?, - md5_res? - ); - Ok(()) -} - -pub async fn run_taxo(taxo_dir: &PathBuf) -> Result<()> { - log::info!("download taxonomy..."); - let files = [ - "taxdump.tar.gz", - // "accession2taxid/nucl_gb.accession2taxid.gz", - // "accession2taxid/nucl_wgs.accession2taxid.gz", - ]; - for url_path in files.iter() { - let ncbi_file = NcbiFile::new_taxo(taxo_dir, &url_path).await; - let result = ncbi_file.run().await; - if result.is_ok() && url_path.to_string() == "taxdump.tar.gz" { - ncbi_file.decompress(taxo_dir).await?; - } - } - log::info!("download taxonomy finished..."); - Ok(()) -} - -pub async fn run_download_file(site: &str, data_dir: &PathBuf, fna_url: &str) -> Result<()> { - let ncbi_file = NcbiFile::from_file(site, data_dir, fna_url).await; - log::info!("{} download file start...", fna_url); - ncbi_file.clear().await; - ncbi_file.run().await?; - ncbi_file.check().await?; - log::info!("{} download file end...", fna_url); - Ok(()) -} - -pub async fn run_assembly( - site: &str, - group: &str, - asm_levels: &Vec<&str>, - data_dir: &PathBuf, -) -> Result<()> { - let assembly = NcbiFile::from_group(group, data_dir, site).await; - if !assembly.file_exists() { - let _ = assembly.run().await; - } - let total_counter = Arc::new(AtomicUsize::new(0)); - let counter = Arc::new(AtomicUsize::new(0)); - let result = assembly - .process_summary_and_apply(site, data_dir, asm_levels, |file: NcbiFile| { - let counter_clone = counter.clone(); - let total_counter_clone = total_counter.clone(); - async move { - total_counter_clone.fetch_add(1, Ordering::SeqCst); - if file.file_exists() { - counter_clone.fetch_add(1, Ordering::SeqCst); - } - } - }) - .await; - if result.is_err() { - log::error!("Error parsing assembly file: {:?}", result); - } - - let total_count = total_counter.load(Ordering::SeqCst); - let count = counter.load(Ordering::SeqCst); - log::info!( - "{} {} 总文件数: {}, 本地文件数: {}", - group, - site, - total_count, - count - ); - Ok(()) -} diff --git a/ncbi/src/utils.rs b/ncbi/src/utils.rs deleted file mode 100644 index 0c3f5cb..0000000 --- a/ncbi/src/utils.rs +++ /dev/null @@ -1,44 +0,0 @@ -use anyhow::Result; -use std::fs::{create_dir_all, File}; -use std::path::PathBuf; -use std::str::FromStr; - -// 获取 url 地址的最后一层目录 -pub fn get_last_segment_of_url(url: &str) -> &str { - url.trim_end_matches('/').split('/').last().unwrap_or("") -} - -// 创建文件,并检查文件所在目录是否存在,不存在则创建 -pub fn create_file_in_dir(filename: &str) -> Result { - let path = PathBuf::from_str(filename).unwrap(); - - if let Some(parent) = path.parent() { - create_dir_all(parent)?; - } - - let file: File = File::create(filename)?; - - Ok(file) -} - -pub fn create_data_dir(dirname: &PathBuf) -> Result { - // let path: PathBuf = PathBuf::from_str(dirname).unwrap(); - if !dirname.exists() { - create_dir_all(&dirname)?; - } - Ok(dirname.clone()) -} - -pub fn create_dir(dirname: &PathBuf) -> Result<(), anyhow::Error> { - if !dirname.exists() { - create_dir_all(&dirname)?; - } - Ok(()) -} - -pub fn parse_comma_separated_list(s: &str) -> Vec { - s.split(',') - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) - .collect() -}