Skip to content

Commit

Permalink
fix: snaptik
Browse files Browse the repository at this point in the history
  • Loading branch information
krypton-byte committed Aug 6, 2022
1 parent 476ce11 commit dfa809f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
8 changes: 7 additions & 1 deletion tiktok_downloader/scrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@
from .utils import info_videotiktok
import re

def extract_id(initf: Callable[[info_post, str], None])->Callable[[info_post, str], None]:

def extract_id(
initf: Callable[[info_post, str], None]
) -> Callable[[info_post, str], None]:
subdo_redirect = ['vt', 'vm']

def regex(url: str) -> str:
if not findall(r'[0-9]{19}', url):
raise InvalidUrl()
return findall(r'[0-9]{19}', url)[0]

def apply(cls: info_post, url: str):
subdo = re.findall(r'://(\w+)\.', url)
if subdo and subdo[0] in subdo_redirect:
Expand All @@ -23,6 +28,7 @@ def apply(cls: info_post, url: str):
initf(cls, regex(url))
return apply


class Author:
def __init__(self, ascp: dict):
self.username = ascp['unique_id']
Expand Down
16 changes: 7 additions & 9 deletions tiktok_downloader/snaptik.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,13 @@ def get_media(self) -> list[info_videotiktok]:
i,
self
)
for i in set(
map(
lambda x:x[0].strip('\\'),
findall(
r'\"(https?://(tikcdn\.net|snapsave\.info).*?)\"',
dec
)
)
)
for i in set(['https://snaptik.app'+x.strip('\\') for x in findall(
r'(/file.php?.*?)\"',
dec
)] + [i.strip('\\') for i in findall(
r'\"(https?://snapxcdn.*?)\"',
dec
)])
]

def __iter__(self):
Expand Down
24 changes: 13 additions & 11 deletions tiktok_downloader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
simplefilter('ignore')



class Odummy:
def __init__(self):
pass
Expand Down Expand Up @@ -61,16 +60,19 @@ def download(
open(out, 'wb') if isinstance(
out,
str) else BytesIO())
with tqdm(
total=int(request.headers['Content-Length']),
unit='iB',
unit_scale=True) if bar else Odummy() as pbar:
for i in request.iter_content(chunk_size):
stream.write(i)
pbar.update(i.__len__())
pbar.update(int(request.headers['Content-Length']))
if not isinstance(pbar, Odummy):
sleep(1)
if request.headers.get('content-length'):
with tqdm(
total=int(request.headers['Content-Length']),
unit='iB',
unit_scale=True) if bar else Odummy() as pbar:
for i in request.iter_content(chunk_size):
stream.write(i)
pbar.update(i.__len__())
pbar.update(int(request.headers['Content-Length']))
if not isinstance(pbar, Odummy):
sleep(1)
else:
stream.write(request.content)
return None if isinstance(out, (str, BufferedWriter)) else stream

def __str__(self) -> str:
Expand Down

0 comments on commit dfa809f

Please sign in to comment.