-
Notifications
You must be signed in to change notification settings - Fork 38
/
pynonymize.py
143 lines (117 loc) · 4.64 KB
/
pynonymize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from dataclasses import dataclass
import logging
from typing import Optional
from pynonymizer.database.mssql import MsSqlProvider
from pynonymizer.database.mysql import MySqlProvider
from pynonymizer.database.postgres import PostgreSqlProvider
from pynonymizer.strategy.parser import StrategyParser
from pynonymizer.strategy.config import read_config
from pynonymizer.exceptions import ArgumentValidationError
from pynonymizer.process_steps import ProcessSteps
from pynonymizer.strategy.database import DatabaseStrategy
import uuid
import os
logger = logging.getLogger(__name__)
def get_temp_db_name(filename=None):
name, _ = os.path.splitext(os.path.basename(filename))
return f"{name}_{uuid.uuid4().hex}"
def pynonymize(
progress,
actions,
db_type,
db_workers,
input_path=None,
strategyfile_path=None,
output_path=None,
db_user=None,
db_password=None,
db_host=None,
db_name=None,
db_port=None,
seed_rows=None,
ignore_anonymization_errors=False,
**kwargs,
):
"""
Runs a pynonymize process as if the CLI had been invoked.
:raises:
ArgumentValidationError: used when kwargs are missing or unable to be auto-resolved.
"""
# Validate mandatory args (depends on step actions)
validations = []
if not actions.skipped(ProcessSteps.RESTORE_DB):
if input_path is None:
validations.append("Missing INPUT")
if not actions.skipped(ProcessSteps.ANONYMIZE_DB):
if strategyfile_path is None:
validations.append("Missing STRATEGYFILE")
else:
# only auto-determine the db_name if we have a strategyfile AND we are anonymizing.
if db_name is None:
db_name = get_temp_db_name(strategyfile_path)
if not actions.skipped(ProcessSteps.DUMP_DB):
if output_path is None:
validations.append("Missing OUTPUT")
# do not validate db_user/password as these are managed by providers
# Mysql supports my.cnf files with additional config, so we have to assume db_host, db_user, db_password, db_port could all be in there
# postgres supports implicit db_pass using the .pgpass file
# mssql could be using integrated security or connectionstr
if db_name is None:
validations.append("Missing DB_NAME: Auto-resolve failed.")
# Discover db-type kwargs
# mssql_backup_option -> backup_option and pass these to the constructor
db_kwargs = {}
db_arg_prefix = f"{db_type}_"
for k, v in kwargs.items():
if k.startswith(db_arg_prefix):
db_kwargs[k[len(db_arg_prefix) :]] = v
logger.debug(
"Database: (%s:%s)%s@%s name: %s", db_host, db_port, db_type, db_user, db_name
)
if db_type == "mysql":
Provider = MySqlProvider
elif db_type == "postgres":
Provider = PostgreSqlProvider
elif db_type == "mssql":
Provider = MsSqlProvider
else:
validations.append(f"{db_type} is not a known database type.")
if len(validations) > 0:
raise ArgumentValidationError(validations)
# init strategy as it relies on I/O - fail fast here preferred to after restore
if not actions.skipped(ProcessSteps.ANONYMIZE_DB):
strategy_parser = StrategyParser()
logger.debug("loading strategyfile %s...", strategyfile_path)
file_data = read_config(strategyfile_path)
strategy = strategy_parser.parse_config(file_data)
db_provider = Provider(
db_host=db_host,
db_user=db_user,
db_pass=db_password,
db_name=db_name,
db_port=db_port,
seed_rows=seed_rows,
progress=progress,
**db_kwargs,
)
# main process - no destructive/non-retryable actions should happen before this line ---
logger.info(actions.summary(ProcessSteps.CREATE_DB))
if not actions.skipped(ProcessSteps.CREATE_DB):
db_provider.create_database()
logger.info(actions.summary(ProcessSteps.RESTORE_DB))
if not actions.skipped(ProcessSteps.RESTORE_DB):
db_provider.restore_database(input_path)
logger.info(actions.summary(ProcessSteps.ANONYMIZE_DB))
if not actions.skipped(ProcessSteps.ANONYMIZE_DB):
try:
db_provider.anonymize_database(strategy, db_workers=db_workers)
except Exception as e:
if not ignore_anonymization_errors:
raise e
logger.info(actions.summary(ProcessSteps.DUMP_DB))
if not actions.skipped(ProcessSteps.DUMP_DB):
db_provider.dump_database(output_path)
logger.info(actions.summary(ProcessSteps.DROP_DB))
if not actions.skipped(ProcessSteps.DROP_DB):
db_provider.drop_database()
logger.info("Process complete!")