Skip to content

Commit

Permalink
Merge pull request #13 from edsu/init-schema
Browse files Browse the repository at this point in the history
Add init command and migrations
  • Loading branch information
Florents-Tselai committed Oct 20, 2023
2 parents e5f7446 + 763582a commit 7da8f4d
Show file tree
Hide file tree
Showing 7 changed files with 607 additions and 45 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pip install warcdb

```shell

# Create the database `archive.warcdb`.
warcdb init archive.warcdb

# Load the `archive.warcdb` file with data.
warcdb import archive.warcdb ./tests/google.warc ./tests/frontpages.warc.gz "https://tselai.com/data/google.warc"

Expand All @@ -35,8 +38,16 @@ Individual `.warc` files are read and parsed and their data is inserted into an
## Schema
Here's the relational schema of the `.warcdb` file.
If there is a new major or minor version of warcdb you may need to migrate existing databases to use the new database schema (if there have been any changes). To do this you first upgrade warcdb, and then migrate the database:
```shell
pip install --upgrade warcdb
warcdb migrate archive.warcdb
```
If there are no migrations to run the `migrate` command will do nothing.
Here's the relational schema of the `.warcdb` file.
![WarcDB Schema](schema.png)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ click = "^8.1"
more-itertools = "^10.1"
tqdm = "^4.66"
requests = "^2.31"
sqlite-migrate = "^0"

[tool.poetry.group.test.dependencies]
pytest = "^7.4"
Expand Down
Binary file removed tests/apod.warc.gz
Binary file not shown.
418 changes: 418 additions & 0 deletions tests/no-warc-info.warc

Large diffs are not rendered by default.

49 changes: 37 additions & 12 deletions tests/test_warcdb.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,57 @@
from click.testing import CliRunner
from warcdb import warcdb_cli
import os
import re
import pathlib
import pytest
import sqlite_utils
from unittest import TestCase

db_file = "test_warc.db"
tests_dir = pathlib.Path(__file__).parent

# all these WARC files were created with wget except for apod.warc.gz which was
# created with browsertrix-crawler

@pytest.mark.parametrize("warc_path", [str(tests_dir / "google.warc"),
str(tests_dir / "google.warc.gz"),
str(tests_dir / "no-warc-info.warc"),
"https://tselai.com/data/google.warc",
"https://tselai.com/data/google.warc.gz"
])


def test_import(warc_path):
runner = CliRunner()

with runner.isolated_filesystem() as fs:
DB_FILE = "test_warc.db"
# initialize db
result = runner.invoke(warcdb_cli, ['init', db_file])
assert result.exit_code == 0

args = ["import", db_file, warc_path]
result = runner.invoke(warcdb_cli, args)
assert result.exit_code == 0
db = sqlite_utils.Database(db_file)
assert set(db.table_names()) == {
'metadata', 'request', 'resource', 'response', 'warcinfo', '_sqlite_migrations'
}

if warc_path == str(tests_dir / "google.warc"):
assert db.table('warcinfo').get('<urn:uuid:7ABED2CA-7CBD-48A0-92E5-0059EBFC111A>')
assert db.table('request').get('<urn:uuid:524F62DD-D788-4085-B14D-22B0CDC0AC53>')

os.remove(db_file)


def test_column_names():
runner = CliRunner()
runner.invoke(warcdb_cli, ['init', db_file])
runner.invoke(warcdb_cli, ["import", db_file, str(pathlib.Path('tests/google.warc'))])

args = ["import", DB_FILE, warc_path]
result = runner.invoke(warcdb_cli, args)
assert result.exit_code == 0
db = sqlite_utils.Database(DB_FILE)
assert set(db.table_names()) == {
'metadata', 'request', 'resource', 'response', 'warcinfo'
}
# make sure that the columns are named correctly (lowercase with underscores)
db = sqlite_utils.Database(db_file)
for table in db.tables:
for col in table.columns:
assert re.match(r'^[a-z_]+', col.name), f'column {col.name} named correctly'

if warc_path == str(tests_dir / "google.warc"):
assert db.table('warcinfo').get('<urn:uuid:7ABED2CA-7CBD-48A0-92E5-0059EBFC111A>')
assert db.table('request').get('<urn:uuid:524F62DD-D788-4085-B14D-22B0CDC0AC53>')
os.remove(db_file)
74 changes: 42 additions & 32 deletions warcdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
import datetime
import sys
import warnings
import click
import sqlite_utils
from textwrap import dedent
import warcio
from json import JSONEncoder, dumps
from warcio import ArchiveIterator, StatusAndHeaders
from more_itertools import always_iterable
from http.client import HTTPMessage, HTTPResponse
from email.parser import Parser, HeaderParser
from collections.abc import MutableMapping
from warcio.recordloader import ArcWarcRecord, ArcWarcRecordLoader
from warcio.recordbuilder import RecordBuilder
from typing import Iterable
from functools import cache
from itertools import chain
from json import dumps

import click
import requests as req
import sqlite_utils
from more_itertools import always_iterable
from tqdm import tqdm
from warcio import ArchiveIterator, StatusAndHeaders
from warcio.recordloader import ArcWarcRecord

from warcdb.migrations import migration


def dict_union(*args):
Expand Down Expand Up @@ -51,8 +46,7 @@ def record_payload(self: ArcWarcRecord):
@cache
def record_as_dict(self: ArcWarcRecord):
"""Method to easily represent a record as a dict, to be fed into db_utils.Database.insert()"""

return dict(self.rec_headers.headers)
return {k.lower().replace('-', '_'): v for k, v in self.rec_headers.headers}


setattr(ArcWarcRecord, 'as_dict', record_as_dict)
Expand Down Expand Up @@ -140,14 +134,14 @@ def __iadd__(self, r: ArcWarcRecord):
====
* For all rec_types: also store WARC/1.0 field (warc and version?)
* Todo pass conversions: {'Content-Length': int, WARC-Date: datet
* Todo pass conversions: {'Content-Length': int, warc-date: datet
* All 'response', 'resource', 'request', 'revisit', 'conversion' and 'continuation' records may have a payload.
All 'warcinfo' and 'metadata' records shall not have a payload.
"""
col_type_conversions = {
'Content-Length': int,
'content_length': int,
'payload': str,
'WARC-Date': datetime.datetime,
'warc_date': datetime.datetime,

}
record_dict = r.as_dict()
Expand All @@ -166,15 +160,15 @@ def __iadd__(self, r: ArcWarcRecord):
if r.rec_type == 'warcinfo':

self.db.table('warcinfo').insert(record_dict,
pk='WARC-Record-ID',
pk='warc_record_id',
alter=True,
ignore=True,
columns=col_type_conversions)
elif r.rec_type == 'request':
self.db.table('request').insert(record_dict,
pk='WARC-Record-ID',
pk='warc_record_id',
foreign_keys=[
("WARC-Warcinfo-ID", "warcinfo", "WARC-Record-ID")
("warc_warcinfo_id", "warcinfo", "warc-record-id")
],
alter=True,
ignore=True,
Expand All @@ -183,10 +177,10 @@ def __iadd__(self, r: ArcWarcRecord):

elif r.rec_type == 'response':
self.db.table('response').insert(record_dict,
pk='WARC-Record-ID',
pk='warc_record_id',
foreign_keys=[
("WARC-Warcinfo-ID", "warcinfo", "WARC-Record-ID"),
("WARC-Concurrent-To", "request", "WARC-Record-ID")
("warc_warcinfo_id", "warcinfo", "warc_record_id"),
("warc_concurrent_to", "request", "warc_record_id")
],
alter=True,
ignore=True,
Expand All @@ -195,10 +189,10 @@ def __iadd__(self, r: ArcWarcRecord):

elif r.rec_type == 'metadata':
self.db.table('metadata').insert(record_dict,
pk='WARC-Record-ID',
pk='warc_record_id',
foreign_keys=[
("WARC-Warcinfo-ID", "warcinfo", "WARC-Record-ID"),
("WARC-Concurrent-To", "response", "WARC-Record-ID")
("warc-warcinfo-id", "warcinfo", "warc_record_id"),
("warc_concurrent_to", "response", "warc_record_id")
],
alter=True,
ignore=True,
Expand All @@ -207,10 +201,10 @@ def __iadd__(self, r: ArcWarcRecord):

elif r.rec_type == 'resource':
self.db.table('resource').insert(record_dict,
pk='WARC-Record-ID',
pk='warc_record_id',
foreign_keys=[
("WARC-Warcinfo-ID", "warcinfo", "WARC-Record-ID"),
("WARC-Concurrent-To", "metadata", "WARC-Record-ID")
("warc-warcinfo-id", "warcinfo", "warc_record_id"),
("warc_concurrent_to", "metadata", "warc_record_id")
],
alter=True,
ignore=True,
Expand All @@ -230,10 +224,23 @@ def __iadd__(self, r: ArcWarcRecord):
"Commands for interacting with .warcdb files\n\nBased on SQLite-Utils"


@warcdb_cli.command('init')
@click.argument(
"db_path",
type=click.Path(file_okay=True, dir_okay=False, exists=False, allow_dash=False),
)
def init (db_path):
"""
Initialize a new warcdb database
"""
db = WarcDB(db_path)
migration.apply(db.db)


@warcdb_cli.command('import')
@click.argument(
"db_path",
type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
type=click.Path(file_okay=True, dir_okay=False, exists=True, allow_dash=False),
)
@click.argument('warc_path',
type=click.STRING,
Expand All @@ -243,6 +250,9 @@ def __iadd__(self, r: ArcWarcRecord):
type=click.INT, default=1000,
help="Batch size for chunked INSERTs [Note: ignored for now]", )
def import_(db_path, warc_path, batch_size):
"""
Import a WARC file into the database
"""
db = WarcDB(db_path, batch_size=batch_size)

# if batch_size:
Expand Down
97 changes: 97 additions & 0 deletions warcdb/migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from sqlite_migrate import Migrations

migration = Migrations("warcdb")


@migration()
def m001_initial(db):
db["warcinfo"].create(
{
"warc_type": str,
"content_type": str,
"warc_date": str,
"warc_record_id": str,
"warc_filename": str,
"warc_block_digest": str,
"content_length": int,
"payload": str,
},
pk="warc_record_id",
)

db["request"].create(
{
"warc_type": str,
"warc_target_uri": str,
"content_type": str,
"warc_date": str,
"warc_record_id": str,
"warc_ip_address": str,
"warc_warcinfo_id": str,
"warc_block_digest": str,
"content_length": int,
"payload": str,
"http_headers": str,
},
pk="warc_record_id",
foreign_keys=[("warc_warcinfo_id", "warcinfo", "warc_record_id")],
)

db["response"].create(
{
"warc_type": str,
"warc_record_id": str,
"warc_warcinfo_id": str,
"warc_concurrent_to": str,
"warc_target_uri": str,
"warc_date": str,
"warc_ip_address": str,
"warc_block_digest": str,
"warc_payload_digest": str,
"content_type": str,
"content_length": int,
"payload": str,
"http_headers": str,
},
pk="warc_record_id",
foreign_keys=[
("warc_warcinfo_id", "warcinfo", "warc_record_id"),
("warc_concurrent_to", "request", "warc_record_id"),
],
)

db["metadata"].create(
{
"warc_type": str,
"warc_record_id": str,
"warc_warcinfo_id": str,
"warc_target_uri": str,
"warc_date": str,
"warc_block_digest": str,
"content_type": str,
"content_length": int,
"payload": str,
},
pk="warc_record_id",
foreign_keys=[("warc_warcinfo_id", "warcinfo", "warc_record_id")],
)

db["resource"].create(
{
"warc_type": str,
"warc_record_id": str,
"warc_warcinfo_id": str,
"warc_concurrent_to": str,
"warc_target_uri": str,
"warc_date": str,
"warc_block_digest": str,
"content_type": str,
"content_length": int,
"payload": str,
},
pk="warc_record_id",
foreign_keys=[
("warc_warcinfo_id", "warcinfo", "warc_record_id"),
("warc_concurrent_to", "metadata", "warc_record_id"),
],
)

0 comments on commit 7da8f4d

Please sign in to comment.