Skip to content

Commit

Permalink
temporary
Browse files Browse the repository at this point in the history
  • Loading branch information
amadolid committed Dec 1, 2023
1 parent ebc7be2 commit fd8b373
Showing 1 changed file with 175 additions and 0 deletions.
175 changes: 175 additions & 0 deletions jaseci_ai_kit/jac_misc/jac_misc/scraper/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,188 @@
from playwright.sync_api import sync_playwright, Page
from typing import Union
from re import search
from copy import deepcopy


@jaseci_action(act_group=["wbs"], allow_remote=True)
def url_to_filename(url: str):
return "".join(c for c in url if c.isalnum())


@jaseci_action(act_group=["wbs"], allow_remote=True)
def scrape2(pages: list, pre_configs: list = [], detailed: bool = False):
content = ""
urls = {"scanned": set(), "scraped": set(), "crawled": set()}
with sync_playwright() as spw:
browser = spw.chromium.launch()
page = browser.new_page()

while pages:
pg: dict = pages.pop(0)

goto(page, pg.get("goto") or {}, urls)
content += getters(page, pg.get("getters") or [], urls)
crawler(page, pg.get("crawler") or {}, urls, pages, pre_configs)

browser.close()

content = " ".join(content.split())

if detailed:
return {
"content": content,
"scanned": list(urls["scanned"]),
"scraped": list(urls["scraped"]),
}

return content


def goto(page: Page, specs: dict, urls: dict):
if specs:
post = get_script(specs, "post")
run_scripts(page, get_script(specs, "pre"), urls)

page.goto(**specs)
add_url(page, urls)

run_scripts(page, post, urls)


def getters(page: Page, specss: list[dict], urls: dict):
content = ""
for specs in specss:
if specs:
post = get_script(specs, "post")
run_scripts(get_script(specs, "pre"))

exel_str = ""
for exel in (
specs.get("excluded_element", ["script", "style", "link", "noscript"])
or []
):
exel_str += (
f'clone.querySelectorAll("{exel}").forEach(d => d.remove());\n'
)

method = specs.get("method")
if method == "selector":
expression = f"""
Array.prototype.map.call(
document.querySelectorAll("{specs.get("expression")}"),
d => {{
clone = d.cloneNode(true);
{exel_str}
return clone.textContent;
}}).join("\n");
"""
elif method == "custom":
expression = f'{{{specs.get("expression")}}}'
elif method == "none":
expression = ""
else:
expression = f"""{{
clone = document.body.cloneNode(true);
{exel_str}
return clone.textContent;
}}"""

if expression:
content += page.evaluate(f"() =>{expression}")
add_url(page, urls, expression)

run_scripts(post)
return content


def crawler(page: Page, specs: dict, urls: dict, pages: list, pre_configs: list):
if specs:
queries = specs.get("queries") or []
filters = specs.get("filters") or []
depth = specs.get("depth") or 1
if depth:
for query in queries:
for node in page.query_selector_all(query.get("selector") or "a[href]"):
url = node.get_attribute(query.get("attribute") or "href")
c_url = get_hostname(page)

if url.startswith("/"):
url = f"{c_url}{url}"

if url.startswith("http") and url not in urls["crawled"]:
included = False

for filter in filters:
if search(filter, url):
included = True
break

if included:
add_crawl(
pages,
pre_configs,
urls,
url,
{
"queries": queries,
"depth": depth - 1,
"filters": filters,
},
)


def get_script(specs: dict, name: str):
return specs.pop(f"{name}_scripts", []) or []


def run_scripts(page: Page, scripts: list[dict], urls: dict):
for script in scripts:
getattr(page, script.pop("method", "evalutate") or "evaluate")(**script)
add_url(page, urls)


def add_url(page: Page, urls: dict, scraped: bool = False):
url = page.url
if url:
if url not in urls["scanned"]:
urls["scanned"].add(url)

if scraped and url not in urls["scraped"]:
urls["scraped"].add(url)


def add_crawl(pages: list, pre_configs: list, urls: dict, url: str, def_crawl: dict):
urls["crawled"].add(url)
scraper = {
"goto": {
"url": url,
"wait_until": "networkidle",
"pre_scripts": [],
"post_scripts": [],
},
"getters": [{"method": "default"}],
"crawler": def_crawl,
}
for pconf in pre_configs:
if search(pconf["regex"], url):
scraper = deepcopy(pconf["scraper"])
(scraper.get("goto") or {})["url"] = url
scraper["crawler"] = scraper.get("crawler") or def_crawl
break

pages.append(scraper)


def get_hostname(page: Page):
url = page.url
if url:
splitter = url.split("//")
protocol = splitter[0]
hostname = splitter[1].split("/")[0]
return f"{protocol}//{hostname}"
return url


@jaseci_action(act_group=["wbs"], allow_remote=True)
def scrape(
urls: set,
Expand Down

0 comments on commit fd8b373

Please sign in to comment.