Skip to content

Commit

Permalink
[SCRAPER]: Websocket improved reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
amadolid committed Jan 11, 2024
1 parent 5430b14 commit 57ce98c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 20 deletions.
36 changes: 23 additions & 13 deletions jaseci_ai_kit/jac_misc/jac_misc/scraper/async_scraper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import websocket

from uuid import uuid4
from os import getenv
from re import search
from orjson import dumps
from logging import exception
from json import loads
from logging import exception, error
from playwright.async_api import async_playwright, Page
from websocket import create_connection

Expand All @@ -19,17 +21,17 @@
class Client:
def __init__(self) -> None:
self.enabled = getenv("SCRAPER_SOCKET_ENABLED") == "true"
self.url = getenv("SCRAPER_SOCKET_URL", "ws://jaseci-socket/ws")
self.timeout = int(getenv("SCRAPER_SOCKET_TIMEOUT") or "2")
self.header = {"auth": getenv("SCRAPER_SOCKET_AUTH", "12345678")}

self.socket = None
if self.enabled:
self.create_connection()

def create_connection(self) -> websocket:
self.close()
self.socket = create_connection(
getenv("SCRAPER_SOCKET_URL", "ws://jaseci-socket/ws"),
header={"auth": getenv("SCRAPER_SOCKET_AUTH", "12345678")},
)
self.socket = create_connection(self.url, self.timeout, header=self.header)

def close(self):
if self.socket:
Expand All @@ -46,22 +48,30 @@ def notify_client(
"processing": processing,
"pending": [p["goto"]["url"] for p in pages],
"scanned": urls["scanned"],
"scraped": urls["scraped"],
}
if content:
data["response"] = content
data["content"] = content

try:
callback = uuid4()
self.socket.send(
dumps(
{
"type": "notify_client",
"data": {
"target": target,
"callback": callback,
"data": {"type": "scraper", "data": data},
},
}
)
)
notif: dict = loads(self.socket.recv())
if "callback" != notif.get("type") or str(callback) != notif.get(
"data"
):
raise Exception("Callback notification not valid!")
except Exception:
exception("Error sending notification!")
self.create_connection()
Expand All @@ -72,7 +82,7 @@ async def scrape(
pages: list, pre_configs: list = [], detailed: bool = False, target: str = None
):
content = ""
urls = {"scanned": [], "scanned_urls": set(), "scraped": set(), "crawled": set()}
urls = {"scanned": [], "scanned_urls": set(), "scraped": [], "crawled": set()}

ws = Client()

Expand All @@ -86,6 +96,7 @@ async def scrape(

pg_goto = pg.get("goto") or {}
url = pg_goto.get("url") or "N/A"
page.source = url

ws.notify_client(target, pages, urls, {"url": url, "status": "started"})

Expand All @@ -101,23 +112,22 @@ async def scrape(
except Exception as e:
add_url(page, urls, error=str(e))

ws.create_connection()
ws.notify_client(target, pages, urls, {"url": url, "status": "failed"})

await browser.close()

content = " ".join(content.split())

ws.notify_client(target, pages, urls, None, content)
ws.close()

if detailed:
content = {
return {
"content": content,
"scanned": urls["scanned"],
"scraped": list(urls["scraped"]),
"scraped": urls["scraped"],
}

ws.notify_client(target, pages, urls, None, content)
ws.close()

return content


Expand Down
14 changes: 8 additions & 6 deletions jaseci_ai_kit/jac_misc/jac_misc/scraper/sync_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def notify_client(target: str, pages: list, urls: dict, processing: dict, conten
"processing": processing,
"pending": [p["goto"]["url"] for p in pages],
"scanned": urls["scanned"],
"scraped": urls["scraped"],
}
if content:
data["response"] = content
Expand All @@ -29,7 +30,7 @@ def scrape(
pages: list, pre_configs: list = [], detailed: bool = False, target: str = None
):
content = ""
urls = {"scanned": [], "scanned_urls": set(), "scraped": set(), "crawled": set()}
urls = {"scanned": [], "scanned_urls": set(), "scraped": [], "crawled": set()}

with sync_playwright() as spw:
browser = spw.chromium.launch()
Expand All @@ -54,19 +55,20 @@ def scrape(
except Exception as e:
add_url(page, urls, error=str(e))

notify_client(target, pages, urls, {"url": url, "status": "failed"})

browser.close()

content = " ".join(content.split())

notify_client(target, pages, urls, None, content)

if detailed:
content = {
return {
"content": content,
"scanned": urls["scanned"],
"scraped": list(urls["scraped"]),
"scraped": urls["scraped"],
}

notify_client(target, pages, urls, None, content)

return content


Expand Down
5 changes: 4 additions & 1 deletion jaseci_ai_kit/jac_misc/jac_misc/scraper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@

def add_url(page, urls: dict, scraped: bool = False, error: str = None):
url = page.url
source = page.source
if url:
if url not in urls["scanned_urls"]:
urls["scanned_urls"].add(url)

scan = {"url": url}
if error:
scan["error"] = error
if url != source:
scan["source"] = source
urls["scanned"].append(scan)

if scraped and url not in urls["scraped"]:
urls["scraped"].add(url)
urls["scraped"].append(url)


def add_crawl(pages: list, pre_configs: list, urls: dict, url: str, def_crawl: dict):
Expand Down

0 comments on commit 57ce98c

Please sign in to comment.