Skip to content

Commit

Permalink
Merge pull request #24 from aoki-h-jp/feature/1.1.0/spot-filter
Browse files Browse the repository at this point in the history
Feature/1.1.0/spot filter
  • Loading branch information
aoki-h-jp authored Jan 4, 2025
2 parents 01999ee + 5b99589 commit 665248c
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 76 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pytest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9 # 使用するPythonのバージョンを指定してください
python-version: 3.9

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pipenv
pip install git+https://github.com/aoki-h-jp/binance-bulk-downloader
pipenv install --dev # Pipenvを使用して依存関係をインストール
pipenv install --dev
- name: Run pytest
run: |
pipenv run pytest -v -s # pytestを実行するコマンドを指定
pipenv run pytest -v -s
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@ downloader = BinanceBulkDownloader(data_frequency='1h', asset='spot')
downloader.run_download()
```

### Download specific symbols only

```python
from binance_bulk_downloader.downloader import BinanceBulkDownloader

# Download single symbol
downloader = BinanceBulkDownloader(
data_type="klines",
data_frequency="1h",
asset="spot",
timeperiod_per_file="daily",
symbols="BTCUSDT",
)
downloader.run_download()

# Download multiple symbols
downloader = BinanceBulkDownloader(
data_type="trades",
asset="spot",
timeperiod_per_file="daily",
symbols=["BTCUSDT", "ETHUSDT"],
)
downloader.run_download()
```

### Download all aggTrades data (USDT-M futures)

```python
Expand Down
228 changes: 156 additions & 72 deletions binance_bulk_downloader/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
from concurrent.futures import ThreadPoolExecutor
from xml.etree import ElementTree
from zipfile import BadZipfile
from typing import Optional, List, Union

# import third-party libraries
import requests
from rich.console import Console
from rich.progress import track
from rich.panel import Panel
from rich.live import Live
from rich.text import Text

# import my libraries
from binance_bulk_downloader.exceptions import (
Expand All @@ -23,6 +25,11 @@


class BinanceBulkDownloader:
"""
Binance Bulk Downloader class for downloading historical data from Binance Vision.
Supports all asset types (spot, USDT-M, COIN-M, options) and all data frequencies.
"""

_CHUNK_SIZE = 100
_BINANCE_DATA_S3_BUCKET_URL = (
"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision"
Expand Down Expand Up @@ -118,19 +125,25 @@ def __init__(
data_frequency="1m",
asset="um",
timeperiod_per_file="daily",
symbols: Optional[Union[str, List[str]]] = None,
) -> None:
"""
Initialize BinanceBulkDownloader
:param destination_dir: Destination directory for downloaded files
:param data_type: Type of data to download (klines, aggTrades, etc.)
:param data_frequency: Frequency of data to download (1m, 1h, 1d, etc.)
:param asset: Type of asset to download (um, cm, spot, option)
:param timeperiod_per_file: Time period per file (daily, monthly)
:param symbols: Optional. Symbol or list of symbols to download (e.g., "BTCUSDT" or ["BTCUSDT", "ETHUSDT"]).
If None or empty list is provided, all available symbols will be downloaded.
"""
self._destination_dir = destination_dir
self._data_type = data_type
self._data_frequency = data_frequency
self._asset = asset
self._timeperiod_per_file = timeperiod_per_file
self._symbols = [symbols] if isinstance(symbols, str) else symbols
self.marker = None
self.is_truncated = True
self.downloaded_list: list[str] = []
Expand Down Expand Up @@ -186,37 +199,70 @@ def _check_params(self) -> None:
f"data_frequency 1s is not supported for {self._asset}."
)

def _get_file_list_from_s3_bucket(self, prefix, marker=None, is_truncated=False):
def _get_file_list_from_s3_bucket(self, prefix):
"""
Get file list from s3 bucket
:param prefix: s3 bucket prefix
:param marker: marker
:param is_truncated: is truncated
:return: list of files
"""
self.console.print(Panel(f"Getting file list: {prefix}", style="blue"))
params = {"prefix": prefix, "max-keys": 1000}
if marker:
params["marker"] = marker

response = requests.get(self._BINANCE_DATA_S3_BUCKET_URL, params=params)
tree = ElementTree.fromstring(response.content)

files = []
for content in tree.findall(
"{http://s3.amazonaws.com/doc/2006-03-01/}Contents"
):
key = content.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text
if key.endswith(".zip"):
files.append(key)
self.marker = key

is_truncated_element = tree.find(
"{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated"
)
self.is_truncated = is_truncated_element.text == "true"
marker = None
is_truncated = True
MAX_DISPLAY_FILES = 5

with Live(refresh_per_second=4) as live:
status_text = Text(f"Getting file list: {prefix}")
live.update(Panel(status_text, style="blue"))

while is_truncated:
params = {"prefix": prefix, "max-keys": 1000}
if marker:
params["marker"] = marker

response = requests.get(self._BINANCE_DATA_S3_BUCKET_URL, params=params)
tree = ElementTree.fromstring(response.content)

for content in tree.findall(
"{http://s3.amazonaws.com/doc/2006-03-01/}Contents"
):
key = content.find(
"{http://s3.amazonaws.com/doc/2006-03-01/}Key"
).text
if key.endswith(".zip"):
# Filter by symbols if multiple symbols are specified
if isinstance(self._symbols, list) and len(self._symbols) > 1:
if any(symbol.upper() in key for symbol in self._symbols):
files.append(key)
marker = key
else:
files.append(key)
marker = key

# Update display (latest files and total count)
status_text.plain = f"Getting file list: {prefix}\nTotal files found: {len(files)}"
if files:
status_text.append("\n\nLatest files:")
for recent_file in files[-MAX_DISPLAY_FILES:]:
status_text.append(f"\n{recent_file}")
live.update(Panel(status_text, style="blue"))

is_truncated_element = tree.find(
"{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated"
)
is_truncated = (
is_truncated_element is not None
and is_truncated_element.text.lower() == "true"
)

return files
status_text.plain = (
f"File list complete: {prefix}\nTotal files found: {len(files)}"
)
if files:
status_text.append("\n\nLatest files:")
for recent_file in files[-MAX_DISPLAY_FILES:]:
status_text.append(f"\n{recent_file}")
live.update(Panel(status_text, style="green"))
return files

def _make_asset_type(self) -> str:
"""
Expand Down Expand Up @@ -256,8 +302,30 @@ def _build_prefix(self) -> str:
self._timeperiod_per_file,
self._data_type,
]
prefix = "/".join(url_parts)
return prefix

# If single symbol is specified, add it to the prefix
if isinstance(self._symbols, list) and len(self._symbols) == 1:
symbol = self._symbols[0].upper()
url_parts.append(symbol)
# For trades and aggTrades, add symbol directory
if self._data_type in ["trades", "aggTrades"]:
url_parts.append(symbol)
elif isinstance(self._symbols, str):
symbol = self._symbols.upper()
url_parts.append(symbol)
# For trades and aggTrades, add symbol directory
if self._data_type in ["trades", "aggTrades"]:
url_parts.append(symbol)

# If data frequency is required and specified, add it to the prefix
if (
self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE
and self._data_frequency
):
if isinstance(self._symbols, (str, list)):
url_parts.append(self._data_frequency)

return "/".join(url_parts)

def _download(self, prefix) -> None:
"""
Expand All @@ -277,41 +345,33 @@ def _download(self, prefix) -> None:
try:
os.makedirs(os.path.dirname(zip_destination_path))
except (PermissionError, OSError) as e:
self.console.print(
f"Directory creation error: {str(e)}", style="red"
raise BinanceBulkDownloaderDownloadError(
f"Directory creation error: {str(e)}"
)
raise BinanceBulkDownloaderDownloadError from e

# Don't download if already exists
if os.path.exists(csv_destination_path):
self.console.print(
f"Already exists: {csv_destination_path}", style="yellow"
)
return

url = f"{self._BINANCE_DATA_DOWNLOAD_BASE_URL}/{prefix}"
self.console.print(Panel(f"Downloading: {url}", style="blue"))

try:
response = requests.get(url)
response.raise_for_status()
self.console.print(f"Downloaded: {url}", style="green")
except (
requests.exceptions.RequestException,
requests.exceptions.HTTPError,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
) as e:
self.console.print(f"Download error: {str(e)}", style="red")
raise BinanceBulkDownloaderDownloadError from e
raise BinanceBulkDownloaderDownloadError(f"Download error: {str(e)}")

try:
with open(zip_destination_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
except OSError as e:
self.console.print(f"File write error: {str(e)}", style="red")
raise BinanceBulkDownloaderDownloadError from e
raise BinanceBulkDownloaderDownloadError(f"File write error: {str(e)}")

try:
unzipped_path = "/".join(zip_destination_path.split("/")[:-1])
Expand All @@ -321,38 +381,28 @@ def _download(self, prefix) -> None:
csv_destination_path, unzipped_path
)
)
self.console.print(
f"Unzipped: {zip_destination_path}", style="green"
)
except BadZipfile as e:
self.console.print(f"Bad Zip File: {zip_destination_path}", style="red")
if os.path.exists(zip_destination_path):
os.remove(zip_destination_path)
self.console.print(
f"Removed: {zip_destination_path}", style="green"
)
raise BinanceBulkDownloaderDownloadError from e
raise BinanceBulkDownloaderDownloadError(
f"Bad Zip File: {zip_destination_path}"
)
except OSError as e:
self.console.print(f"Unzip error: {str(e)}", style="red")
if os.path.exists(zip_destination_path):
os.remove(zip_destination_path)
self.console.print(
f"Removed: {zip_destination_path}", style="green"
)
raise BinanceBulkDownloaderDownloadError from e
raise BinanceBulkDownloaderDownloadError(f"Unzip error: {str(e)}")

# Delete zip file
try:
os.remove(zip_destination_path)
self.console.print(f"Removed: {zip_destination_path}", style="green")
except OSError as e:
self.console.print(f"File removal error: {str(e)}", style="red")
raise BinanceBulkDownloaderDownloadError from e
raise BinanceBulkDownloaderDownloadError(
f"File removal error: {str(e)}"
)

except Exception as e:
if not isinstance(e, BinanceBulkDownloaderDownloadError):
self.console.print(f"Unexpected error: {str(e)}", style="red")
raise BinanceBulkDownloaderDownloadError from e
raise BinanceBulkDownloaderDownloadError(f"Unexpected error: {str(e)}")
raise

@staticmethod
Expand All @@ -374,20 +424,54 @@ def run_download(self):
Panel(f"Starting download for {self._data_type}", style="blue bold")
)

while self.is_truncated:
file_list_generator = self._get_file_list_from_s3_bucket(
self._build_prefix(), self.marker, self.is_truncated
)
if self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE:
file_list_generator = [
prefix
for prefix in file_list_generator
if prefix.count(self._data_frequency) == 2
]
for prefix_chunk in track(
self.make_chunks(file_list_generator, self._CHUNK_SIZE),
description="Downloading",
):
file_list = []
# Handle multiple symbols by getting each symbol's files separately
if isinstance(self._symbols, list) and len(self._symbols) > 1:
original_symbols = self._symbols
for symbol in original_symbols:
self._symbols = symbol # Temporarily set to single symbol
symbol_files = self._get_file_list_from_s3_bucket(self._build_prefix())
file_list.extend(symbol_files)
self._symbols = original_symbols # Restore original symbols
else:
file_list = self._get_file_list_from_s3_bucket(self._build_prefix())

# Filter by data frequency only if not already filtered by prefix
if (
self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE
and not isinstance(self._symbols, (str, list))
):
file_list = [
prefix
for prefix in file_list
if prefix.count(self._data_frequency) == 2
]

# Create progress display
with Live(refresh_per_second=4) as live:
status = Text()
chunks = self.make_chunks(file_list, self._CHUNK_SIZE)
total_chunks = len(chunks)

# Download files in chunks
for chunk_index, prefix_chunk in enumerate(chunks, 1):
with ThreadPoolExecutor() as executor:
executor.map(self._download, prefix_chunk)
futures = []
for prefix in prefix_chunk:
future = executor.submit(self._download, prefix)
futures.append((future, prefix))

# Update status as files complete
for future, prefix in futures:
try:
future.result()
progress = (
(len(self.downloaded_list) + 1) / len(file_list) * 100
)
status.plain = f"[{chunk_index}/{total_chunks}] Progress: {progress:.1f}% | Latest: {os.path.basename(prefix)}"
live.update(status)
except Exception as e:
status.plain = f"Error: {str(e)}"
live.update(status)

self.downloaded_list.extend(prefix_chunk)
Loading

0 comments on commit 665248c

Please sign in to comment.