-
Notifications
You must be signed in to change notification settings - Fork 0
/
populate.py
44 lines (36 loc) · 1.48 KB
/
populate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from pg_conn import PG
class Schema:
"""
A class which will be used to create schema and populate it from a csv file.
Attributes:
dbname: the database you wanna connect.
table_name: name of the table you wanna create schema for.
df: dataframe of the csv file provided.
"""
def __init__(self,dbname) -> None:
self.pg = PG(dbname)
def build_table(self, table_name, columns):
# Construct the columns part of the SQL statement
columns_sql = ",\n".join([f"{col_name} {data_type}" for col_name, data_type in columns.items()])
# Create the SQL statement
create_table_sql = f"""CREATE TABLE {table_name} (
{columns_sql}
);"""
# Execute the SQL statement
self.pg.execute_sql(create_table_sql)
print(f"Created {table_name}")
self.pg.conn.commit()
def populate_table_from_csv(self, table_name, df):
csv_columns = set(df.columns)
table_columns = self.pg.get_table_columns(table_name)
# Check if columns match
if csv_columns == table_columns:
# Insert data into the table
self.pg.to_psql(df, table_name)
print(f"Data from has been inserted into {table_name}")
else:
print("Column mismatch between CSV and table schema.")
print(f"CSV columns: {csv_columns}")
print(f"Table columns: {table_columns}")
def clean_up(self):
self.pg.clean_up()