Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
wangpatrick57 committed Sep 3, 2024
1 parent 10cbfe1 commit c509893
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 29 deletions.
71 changes: 51 additions & 20 deletions tune/protox/embedding/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@
)
from tune.protox.embedding.loss import COST_COLUMNS
from tune.protox.env.space.primitive_space.index_space import IndexSpace
from tune.protox.env.types import QuerySpec, QueryType, TableAttrAccessSetsMap, TableAttrListMap
from tune.protox.env.types import (
QuerySpec,
QueryType,
TableAttrAccessSetsMap,
TableAttrListMap,
)
from tune.protox.env.workload import Workload
from util.pg import create_psycopg_conn
from util.shell import subprocess_run
Expand All @@ -52,7 +57,9 @@
# pass


QueryBatches = NewType("QueryBatches", list[tuple[str, list[tuple[QueryType, str]], Any]])
QueryBatches = NewType(
"QueryBatches", list[tuple[str, list[tuple[QueryType, str]], Any]]
)


# click steup
Expand Down Expand Up @@ -254,7 +261,9 @@ def datagen(
assert False

# Process the "data structure" args
leading_col_tbls_parsed: list[str] = [] if leading_col_tbls is None else leading_col_tbls.split(",")
leading_col_tbls_parsed: list[str] = (
[] if leading_col_tbls is None else leading_col_tbls.split(",")
)
# I chose to only use the "," delimiter in override_sample_limits_str, so the dictionary is encoded as [key],[value],[key],[value]
# I felt this was better than introducing a new delimiter which might conflict with the name of a table
override_sample_limits_parsed: dict[str, int] = dict()
Expand Down Expand Up @@ -378,7 +387,9 @@ def __init__(
class EmbeddingFileGenArgs:
"""Same comment as EmbeddingDatagenGenericArgs"""

def __init__(self, table_shape: bool, dual_class: bool, pad_min: int, rebias: float):
def __init__(
self, table_shape: bool, dual_class: bool, pad_min: int, rebias: float
):
self.table_shape = table_shape
self.dual_class = dual_class
self.pad_min = pad_min
Expand All @@ -389,7 +400,11 @@ def get_traindata_dir(dbgym_cfg: DBGymConfig) -> Path:
return dbgym_cfg.dbgym_this_run_path / "traindata_dir"


def _gen_traindata_dir(dbgym_cfg: DBGymConfig, generic_args: EmbeddingDatagenGenericArgs, dir_gen_args: EmbeddingDirGenArgs) -> None:
def _gen_traindata_dir(
dbgym_cfg: DBGymConfig,
generic_args: EmbeddingDatagenGenericArgs,
dir_gen_args: EmbeddingDirGenArgs,
) -> None:
with open_and_save(dbgym_cfg, generic_args.benchmark_config_path, "r") as f:
benchmark_config = yaml.safe_load(f)

Expand All @@ -408,7 +423,11 @@ def _gen_traindata_dir(dbgym_cfg: DBGymConfig, generic_args: EmbeddingDatagenGen
results = []
job_id = 0
for tbl in tables:
cols: list[Optional[str]] = [None] if tbl not in dir_gen_args.leading_col_tbls else cast(list[Optional[str]], modified_attrs[tbl])
cols: list[Optional[str]] = (
[None]
if tbl not in dir_gen_args.leading_col_tbls
else cast(list[Optional[str]], modified_attrs[tbl])
)
for colidx, col in enumerate(cols):
if col is None:
output = traindata_dir / tbl
Expand Down Expand Up @@ -607,7 +626,9 @@ def _augment_query_data(workload: Workload, data: dict[str, float]) -> dict[str,
return data


def _execute_explains(cursor: psycopg.Cursor[Any], batches: QueryBatches, models: Optional[dict[Any, Any]]) -> dict[str, float]:
def _execute_explains(
cursor: psycopg.Cursor[Any], batches: QueryBatches, models: Optional[dict[Any, Any]]
) -> dict[str, float]:
data: dict[str, float] = {}
ou_model_data: dict[str, list[Any]] = {}

Expand Down Expand Up @@ -697,15 +718,23 @@ def acquire_model_data(q: str, plan: dict[str, Any]) -> None:
return data


def _extract_refs(generate_costs: bool, target: Optional[str], cursor: psycopg.Cursor[Any], workload: Workload, models: Optional[dict[Any, Any]]) -> tuple[dict[str, float], dict[str, float]]:
def _extract_refs(
generate_costs: bool,
target: Optional[str],
cursor: psycopg.Cursor[Any],
workload: Workload,
models: Optional[dict[Any, Any]],
) -> tuple[dict[str, float], dict[str, float]]:
ref_qs = {}
table_ref_qs = {}
if generate_costs:
# Get reference costs.
batches = QueryBatches([
(q, workload.queries[q], workload.query_aliases[q])
for q in workload.queries.keys()
])
batches = QueryBatches(
[
(q, workload.queries[q], workload.query_aliases[q])
for q in workload.queries.keys()
]
)
ref_qs = _execute_explains(cursor, batches, models)
ref_qs = _augment_query_data(workload, ref_qs)

Expand All @@ -714,7 +743,9 @@ def _extract_refs(generate_costs: bool, target: Optional[str], cursor: psycopg.C
table_ref_qs = ref_qs
else:
qs = workload.queries_for_table(target)
batches = QueryBatches([(q, workload.queries[q], workload.query_aliases[q]) for q in qs])
batches = QueryBatches(
[(q, workload.queries[q], workload.query_aliases[q]) for q in qs]
)
table_ref_qs = _execute_explains(cursor, batches, models)
table_ref_qs = _augment_query_data(workload, table_ref_qs)
return ref_qs, table_ref_qs
Expand Down Expand Up @@ -743,9 +774,7 @@ def _produce_index_data(
# models = load_ou_models(model_dir)

# Construct workload.
workload = Workload(
dbgym_cfg, tables, attributes, query_spec, workload_path, pid=p
)
workload = Workload(dbgym_cfg, tables, attributes, query_spec, workload_path, pid=p)
modified_attrs = workload.column_usages()

np.random.seed(seed)
Expand Down Expand Up @@ -843,10 +872,12 @@ def _produce_index_data(
else:
qs_for_tbl = workload.queries_for_table(ia.tbl_name)

batches = QueryBatches([
(q, workload.queries[q], workload.query_aliases[q])
for q in qs_for_tbl
])
batches = QueryBatches(
[
(q, workload.queries[q], workload.query_aliases[q])
for q in qs_for_tbl
]
)
data = _execute_explains(cursor, batches, models)
data = _augment_query_data(workload, data)
if models is None:
Expand Down
17 changes: 12 additions & 5 deletions tune/protox/embedding/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import numpy as np
import pandas as pd
from pandas import DataFrame
import tqdm
from pandas import DataFrame

from misc.utils import DBGymConfig, default_embedder_dname, link_result
from tune.protox.embedding.analyze import RANGES_FNAME, STATS_FNAME
Expand Down Expand Up @@ -159,15 +159,17 @@ def recurse_set(source: dict[Any, Any], target: dict[Any, Any]) -> None:
return data


def _attach(data: DataFrame, raw_data: DataFrame, num_limit: int=0) -> DataFrame:
def _attach(data: DataFrame, raw_data: DataFrame, num_limit: int = 0) -> DataFrame:
# As the group index goes up, the perf should go up (i.e., bounds should tighten)
filtered_data: dict[tuple[float, float], DataFrame] = {}
new_data = []
for tup in tqdm.tqdm(data.itertuples(), total=data.shape[0]):
tup_dict = {k: getattr(tup, k) for k in data.columns}
if raw_data is not None and Path(tup_dict["ranges_file"]).exists():

def compute_dist_score(current_dists: dict[str, float], base: float, upper: float) -> float:
def compute_dist_score(
current_dists: dict[str, float], base: float, upper: float
) -> float:
nonlocal filtered_data
key = (base, upper)
if key not in filtered_data:
Expand Down Expand Up @@ -219,7 +221,10 @@ def compute_dist_score(current_dists: dict[str, float], base: float, upper: floa
if drange[0] is None:
drange = (1.0 - tup_dict["bias_separation"], 1.01)
else:
drange = (drange[0] - tup_dict["bias_separation"], drange[0])
drange = (
drange[0] - tup_dict["bias_separation"],
drange[0],
)
current_dists = {}

else:
Expand All @@ -230,7 +235,9 @@ def compute_dist_score(current_dists: dict[str, float], base: float, upper: floa
if len(current_dists) > 0:
# Put the error in.
errors.append(
compute_dist_score(current_dists, 0.0, tup_dict["bias_separation"])
compute_dist_score(
current_dists, 0.0, tup_dict["bias_separation"]
)
)

tup_dict["idx_class_errors"] = ",".join(
Expand Down
10 changes: 6 additions & 4 deletions tune/protox/embedding/train_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import ray
import torch
import torch.nn as nn
from torch.optim import Adam # type: ignore[attr-defined]
import tqdm
import yaml
from pytorch_metric_learning.utils import logging_presets
Expand All @@ -26,6 +25,7 @@
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.hyperopt import HyperOptSearch
from sklearn.model_selection import train_test_split
from torch.optim import Adam # type: ignore[attr-defined]
from torch.utils.data import TensorDataset
from typing_extensions import ParamSpec

Expand Down Expand Up @@ -227,7 +227,9 @@ def train_all_embeddings(
sync_config=SyncConfig(),
verbose=2,
log_to_file=True,
storage_path=str(dbgym_cfg.cur_task_runs_path("embedding_ray_results", mkdir=True)),
storage_path=str(
dbgym_cfg.cur_task_runs_path("embedding_ray_results", mkdir=True)
),
)

resources = {"cpu": 1}
Expand Down Expand Up @@ -355,8 +357,8 @@ def _build_trainer(
benchmark_config_path: Path,
train_size: float,
workload_path: Path,
dataloader_num_workers: int=0,
disable_tqdm: bool=False,
dataloader_num_workers: int = 0,
disable_tqdm: bool = False,
) -> tuple[VAETrainer, Callable[..., Optional[dict[str, Any]]]]:
max_cat_features = 0
max_attrs = 0
Expand Down

0 comments on commit c509893

Please sign in to comment.