Skip to content

Commit

Permalink
[SCRAPER]: Websocket Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
amadolid committed Jan 10, 2024
1 parent db89a40 commit a755bf2
Show file tree
Hide file tree
Showing 10 changed files with 554 additions and 420 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from openai import OpenAI
from os import environ, unlink
from os import getenv, unlink
from datetime import datetime
from requests import get
from uuid import uuid4
Expand All @@ -12,21 +12,19 @@
ES_CLIENT = None
CONFIG = {
"elastic": {
"url": environ.get("ELASTICSEARCH_URL", "http://localhost:9200"),
"key": environ.get("ELASTICSEARCH_API_KEY"),
"url": getenv("ELASTICSEARCH_URL", "http://localhost:9200"),
"key": getenv("ELASTICSEARCH_API_KEY"),
"index_template": {
"name": environ.get("ELASTICSEARCH_INDEX_TEMPLATE") or "openai-embeddings",
"name": getenv("ELASTICSEARCH_INDEX_TEMPLATE") or "openai-embeddings",
"index_patterns": (
environ.get("ELASTICSEARCH_INDEX_PATTERNS") or "oai-emb-*"
getenv("ELASTICSEARCH_INDEX_PATTERNS") or "oai-emb-*"
).split(","),
"priority": 500,
"version": 1,
"template": {
"settings": {
"number_of_shards": int(environ.get("ELASTICSEARCH_SHARDS", "1")),
"number_of_replicas": int(
environ.get("ELASTICSEARCH_REPLICAS", "1")
),
"number_of_shards": int(getenv("ELASTICSEARCH_SHARDS", "1")),
"number_of_replicas": int(getenv("ELASTICSEARCH_REPLICAS", "1")),
"refresh_interval": "1s",
},
"mappings": {
Expand All @@ -35,31 +33,27 @@
"id": {"type": "keyword"},
"embedding": {
"type": "dense_vector",
"dims": int(
environ.get("ELASTICSEARCH_VECTOR_SIZE", "1536")
),
"dims": int(getenv("ELASTICSEARCH_VECTOR_SIZE", "1536")),
"index": True,
"similarity": environ.get(
"ELASTICSEARCH_SIMILARITY", "cosine"
),
"similarity": getenv("ELASTICSEARCH_SIMILARITY", "cosine"),
},
"version": {"type": "keyword"},
},
},
},
},
},
"openai": {"api_key": environ.get("OPENAI_API_KEY")},
"openai": {"api_key": getenv("OPENAI_API_KEY")},
"openai_embedding": {
"model": environ.get("OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002"),
"model": getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002"),
},
"chunk_config": {
"chunk_size": int(environ.get("CHUNK_SIZE", "200")),
"min_chunk_size_chars": int(environ.get("MIN_CHUNK_SIZE_CHARS", "350")),
"min_chunk_length_to_embed": int(environ.get("MIN_CHUNK_LENGTH_TO_EMBED", "5")),
"max_num_chunks": int(environ.get("MAX_NUM_CHUNKS", "10000")),
"chunk_size": int(getenv("CHUNK_SIZE", "200")),
"min_chunk_size_chars": int(getenv("MIN_CHUNK_SIZE_CHARS", "350")),
"min_chunk_length_to_embed": int(getenv("MIN_CHUNK_LENGTH_TO_EMBED", "5")),
"max_num_chunks": int(getenv("MAX_NUM_CHUNKS", "10000")),
},
"batch_size": int(environ.get("BATCH_SIZE", "100")),
"batch_size": int(getenv("BATCH_SIZE", "100")),
}


Expand Down
28 changes: 11 additions & 17 deletions jaseci_ai_kit/jac_misc/jac_misc/scraper/README.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
# **SCRAPER (`Playwright Python`)**

## wbs.**`url_to_filename`**
> **`Arguments`:** \
> **url**: str
>
> **`Return`:** \
> str
>
> **`Usage`:** \
> To convert url to a valid file name
>
##### **`HOW TO TRIGGER`**
```js
wbs.url_to_filename("https://google.com")
## **`!IMPORTANT!`**
remote action scraper needs to be run via `FastAPI` instead of `jaseci serv_action`. Remote action runs via uvicorn and this will require running playwright asynchronously and currently, remote action use non async function hence creating dedicated async scrape api. This is to allow local and remote scraper.
To run remotely:
```bash
export SCRAPER_SOCKET_ENABLED=true
export SCRAPER_SOCKET_URL=ws://your-websocket/ws # defaults to ws://jaseci-socket/ws
uvicorn jac_misc.scraper:app --host 0.0.0.0 --port 80 --reload
```

## wbs.**`scrape`**
> **`Arguments`:** \
> **pages**: list (structure below) \
> **pre_configs**: list (structure below)\
> **detailed**: bool = False
> **detailed**: bool = False\
> **target**: str = None
>
> **`Return`:** \
> str or dict
Expand All @@ -29,7 +22,8 @@ wbs.url_to_filename("https://google.com")
> To scrape specified url
>
> **`Remarks`:** \
> **detailed** true will return dict with scanned/scraped urls
> **detailed** true will return dict with scanned/scraped urls\
> **target** optional client id for websocket progress notifications
>
##### **`STRUCTURE`**
```python
Expand Down
170 changes: 170 additions & 0 deletions jaseci_ai_kit/jac_misc/jac_misc/scraper/async_scraper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import asyncio

from re import search
from playwright.async_api import async_playwright, Page

from jac_misc.scraper.utils import (
notify_client,
add_url,
add_crawl,
get_script,
get_hostname,
)


async def scrape(
pages: list, pre_configs: list = [], detailed: bool = False, target: str = None
):
content = ""
urls = {"scanned": [], "scanned_urls": set(), "scraped": set(), "crawled": set()}

async with async_playwright() as aspw:
browser = await aspw.chromium.launch()
page = await browser.new_page()

while pages:
try:
pg: dict = pages.pop(0)

pg_goto = pg.get("goto") or {}
url = pg_goto.get("url") or "N/A"

notify_client(target, pages, urls, {"url": url, "status": "started"})

await goto(page, pg_goto, urls)

content += await getters(page, pg.get("getters") or [], urls)

await crawler(page, pg.get("crawler") or {}, urls, pages, pre_configs)

notify_client(target, pages, urls, {"url": url, "status": "completed"})
except Exception as e:
add_url(page, urls, error=str(e))

await browser.close()

content = " ".join(content.split())

if detailed:
content = {
"content": content,
"scanned": urls["scanned"],
"scraped": list(urls["scraped"]),
}

notify_client(target, pages, urls, None, content)

return content


async def goto(page: Page, specs: dict, urls: dict):
if specs:
post = get_script(specs, "post")
await run_scripts(page, get_script(specs, "pre"), urls)

print(f'[goto]: loading {specs["url"]}')

await page.goto(**specs)
add_url(page, urls)

await run_scripts(page, post, urls)


async def getters(page: Page, specss: list[dict], urls: dict):
content = ""
for specs in specss:
if specs:
post = get_script(specs, "post")
await run_scripts(page, get_script(specs, "pre"), urls)

exel_str = ""
for exel in (
specs.get("excluded_element", ["script", "style", "link", "noscript"])
or []
):
exel_str += (
f'clone.querySelectorAll("{exel}").forEach(d => d.remove());\n'
)

method = specs.get("method")
if method == "selector":
expression = f"""
Array.prototype.map.call(
document.querySelectorAll("{specs.get("expression")}"),
d => {{
clone = d.cloneNode(true);
{exel_str}
return clone.textContent;
}}).join("\n");
"""
elif method == "custom":
expression = f'{{{specs.get("expression")}}}'
elif method == "none":
expression = '""'
else:
expression = f"""{{
clone = document.body.cloneNode(true);
{exel_str}
return clone.textContent;
}}"""

if expression:
print(f"[getters]: getting content from {page.url}")
content += await page.evaluate(f"() =>{expression}")
add_url(page, urls, expression)

await run_scripts(page, post, urls)

return content


async def crawler(page: Page, specs: dict, urls: dict, pages: list, pre_configs: list):
if specs:
post = get_script(specs, "post")
await run_scripts(page, get_script(specs, "pre"), urls)

queries = specs.get("queries") or [{"selector": "a[href]", "attribute": "href"}]
filters = specs.get("filters") or []
depth = specs.get("depth", 1) or 0

if depth > 0:
for query in queries:
for node in await page.query_selector_all(
query.get("selector") or "a[href]"
):
url = await node.get_attribute(query.get("attribute") or "href")
c_url = get_hostname(page)

if url.startswith("/"):
url = f"{c_url}{url}"

if url.startswith("http") and url not in urls["crawled"]:
included = not bool(filters)

for filter in filters:
if search(filter, url):
included = True
break

if included:
add_crawl(
pages,
pre_configs,
urls,
url,
{
"queries": queries,
"depth": depth - 1,
"filters": filters,
},
)

await run_scripts(page, post, urls)


async def run_scripts(page: Page, scripts: list[dict], urls: dict):
for script in scripts:
method = script.pop("method", "evalutate") or "evaluate"
print(f"[script]: running method {method}\n{str(script)}")
await getattr(page, method)(**script)
add_url(page, urls)
Loading

0 comments on commit a755bf2

Please sign in to comment.