Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extra gcs and s3 #339

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ Please See [Documentation](https://gokart.readthedocs.io/en/latest/) .

Have a good gokart life.

# Contributing to gokart

install

```bash
git clone https://github.com/m3dev/gokart.git # or your own fork

poetry install --extras=s3 --extras=gcs --with=dev-s3
```

test

```bash
poetry run pytest # see `tox.ini` for specific extras
```

# Achievements

Gokart is a proven product.
Expand Down
4 changes: 4 additions & 0 deletions docs/intro_to_gokart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Within the activated Python environment, use the following command to install go
.. code:: sh

pip install gokart
# or
pip install gokart[s3] # to use `s3://`
# or
pip install gokart[gcs] # to use `gs://`



Expand Down
1 change: 0 additions & 1 deletion gokart/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from logging import getLogger

import luigi
import luigi.contrib.s3
import luigi.format
import numpy as np
import pandas as pd
Expand Down
6 changes: 6 additions & 0 deletions gokart/gcs_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
try:
import googleapiclient # noqa: F401
except ImportError:
# sentinal: this file should not be imported if [gcs] extra is not installed.
raise

import json
import os

Expand Down
50 changes: 43 additions & 7 deletions gokart/object_storage.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,47 @@
from datetime import datetime

import luigi
import luigi.contrib.gcs
import luigi.contrib.s3
from luigi.format import Format

from gokart.gcs_config import GCSConfig
from gokart.gcs_zip_client import GCSZipClient
from gokart.s3_config import S3Config
from gokart.s3_zip_client import S3ZipClient
from gokart.zip_client import ZipClient

try:
from gokart.gcs_config import GCSConfig
from gokart.gcs_zip_client import GCSZipClient

# to avoid warning, import here which means gcs dependencies are exist
import luigi.contrib.gcs # isort: skip
GCS_AVAILABLE = True
except ImportError:
GCS_AVAILABLE = False

try:
from gokart.s3_config import S3Config
from gokart.s3_zip_client import S3ZipClient

# to avoid warning, import here which means s3 dependencies are exist
import luigi.contrib.s3 # isort: skip
S3_AVAILABLE = True
except ImportError:
S3_AVAILABLE = False

object_storage_path_prefix = ['s3://', 'gs://']


def assert_gcs_available():
if GCS_AVAILABLE:
return

raise ImportError('gs:// is not available. You may need `pip install gokart[gcs]`')


def assert_s3_available():
if S3_AVAILABLE:
return

raise ImportError('s3:// is not available. You may need `pip install gokart[s3]`')


class ObjectStorage(object):

@staticmethod
Expand All @@ -26,26 +54,32 @@ def if_object_storage_path(path: str) -> bool:
@staticmethod
def get_object_storage_target(path: str, format: Format) -> luigi.Target:
if path.startswith('s3://'):
assert_s3_available()
return luigi.contrib.s3.S3Target(path, client=S3Config().get_s3_client(), format=format)
elif path.startswith('gs://'):
assert_gcs_available()
return luigi.contrib.gcs.GCSTarget(path, client=GCSConfig().get_gcs_client(), format=format)
else:
raise

@staticmethod
def exists(path: str) -> bool:
if path.startswith('s3://'):
assert_s3_available()
return S3Config().get_s3_client().exists(path)
elif path.startswith('gs://'):
assert_gcs_available()
return GCSConfig().get_gcs_client().exists(path)
else:
raise

@staticmethod
def get_timestamp(path: str) -> datetime:
if path.startswith('s3://'):
assert_s3_available()
return S3Config().get_s3_client().get_key(path).last_modified
elif path.startswith('gs://'):
assert_gcs_available()
# for gcs object
# should PR to luigi
bucket, obj = GCSConfig().get_gcs_client()._path_to_bucket_and_key(path)
Expand All @@ -57,12 +91,14 @@ def get_timestamp(path: str) -> datetime:
@staticmethod
def get_zip_client(file_path: str, temporary_directory: str) -> ZipClient:
if file_path.startswith('s3://'):
assert_s3_available()
return S3ZipClient(file_path=file_path, temporary_directory=temporary_directory)
elif file_path.startswith('gs://'):
assert_gcs_available()
return GCSZipClient(file_path=file_path, temporary_directory=temporary_directory)
else:
raise

@staticmethod
def is_buffered_reader(file: object):
return not isinstance(file, luigi.contrib.s3.ReadableS3File)
return not (S3_AVAILABLE and isinstance(file, luigi.contrib.s3.ReadableS3File))
6 changes: 6 additions & 0 deletions gokart/s3_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
try:
import boto3 # noqa: F401
except ImportError:
# sentinal: this file should not be imported if [s3] extra is not installed.
raise

import os

import luigi
Expand Down
93 changes: 72 additions & 21 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading