-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatastore.py
executable file
·394 lines (350 loc) · 14.3 KB
/
datastore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
#!/bin/python3.9
import argparse
import asyncio
import json
import logging
import os
import shutil
import sys
import time
from io import BytesIO
from pathlib import Path
from tarfile import TarFile
from typing import Any, Callable, Optional
try:
# normally in a package
from forest import pghelp, utils
except ImportError:
# maybe we're local?
try:
import pghelp # type: ignore
import utils # type: ignore
except ImportError:
# i wasn't asking
sys.path.append("forest")
sys.path.append("..")
import pghelp # type: ignore # pylint: disable=ungrouped-imports
import utils # type: ignore # pylint: disable=ungrouped-imports
if utils.get_secret("MIGRATE"):
get_datastore = "SELECT account, datastore FROM {self.table} WHERE id=$1"
else:
get_datastore = "SELECT datastore FROM {self.table} WHERE id=$1"
class DatastoreError(Exception):
pass
AccountPGExpressions = pghelp.PGExpressions(
table="signal_accounts",
# rename="ALTAR TABLE IF EXISTS prod_users RENAME TO {self.table}",
migrate="ALTER TABLE IF EXISTS {self.table} ADD IF NOT EXISTS datastore BYTEA, ADD IF NOT EXISTS notes TEXT",
create_table="CREATE TABLE IF NOT EXISTS {self.table} \
(id TEXT PRIMARY KEY, \
datastore BYTEA, \
last_update_ms BIGINT, \
last_claim_ms BIGINT, \
active_node_name TEXT, \
notes TEXT);",
is_registered="SELECT datastore is not null as registered FROM {self.table} WHERE id=$1",
get_datastore=get_datastore,
get_claim="SELECT active_node_name FROM {self.table} WHERE id=$1",
mark_account_claimed="UPDATE {self.table} \
SET active_node_name = $2, \
last_claim_ms = (extract(epoch from now()) * 1000) \
WHERE id=$1;",
mark_account_freed="UPDATE {self.table} SET last_claim_ms = 0, \
active_node_name = NULL WHERE id=$1;",
get_free_account="SELECT id, datastore FROM {self.table} \
WHERE active_node_name IS NULL \
AND last_claim_ms = 0 \
LIMIT 1;",
upload="INSERT INTO {self.table} (id, datastore, last_update_ms) \
VALUES($1, $2, (extract(epoch from now()) * 1000)) \
ON CONFLICT (id) DO UPDATE SET \
datastore = $2, last_update_ms = EXCLUDED.last_update_ms;",
free_accounts_not_updated_in_the_last_hour="UPDATE {self.table} \
SET last_claim_ms = 0, active_node_name = NULL \
WHERE last_update_ms < ((extract(epoch from now())-3600) * 1000);",
get_timestamp="select last_update_ms from {self.table} where id=$1",
)
def get_account_interface() -> pghelp.PGInterface:
return pghelp.PGInterface(
query_strings=AccountPGExpressions,
database=utils.get_secret("DATABASE_URL"),
)
class SignalDatastore:
"""
Download, claim, mount, and sync a signal datastore
"""
def __init__(self, number: str, tmpdir=True):
self.account_interface = get_account_interface()
formatted_number = utils.signal_format(number)
if isinstance(formatted_number, str):
self.number: str = formatted_number
else:
raise Exception("not a valid number")
logging.info("SignalDatastore number is %s", self.number)
self.filepath = "data/" + number
# await self.account_interface.create_table()
if tmpdir:
setup_tmpdir() # shouldn't do anything if not running locally
def is_registered_locally(self) -> bool:
try:
return json.load(open(self.filepath))["registered"]
except (FileNotFoundError, json.JSONDecodeError, KeyError) as e:
logging.error(e)
return False
async def is_claimed(self) -> Optional[str]:
record = await self.account_interface.get_claim(self.number)
if not record:
logging.warning("checking claim without plus instead")
record = await self.account_interface.get_claim(self.number[1:])
if record:
return record[0].get("active_node_name")
raise Exception(f"no record in db for {self.number}")
return record[0].get("active_node_name")
async def download(self) -> None:
"""Fetch our account datastore from postgresql and mark it claimed"""
logging.info("datastore download entered")
await self.account_interface.free_accounts_not_updated_in_the_last_hour()
for i in range(5):
logging.info("checking claim")
claim = await self.is_claimed()
if not claim:
logging.info("no account claim!")
break
# you can also try to kill the other process
logging.info(
"this account is claimed by %s, waiting",
claim,
)
await asyncio.sleep(6)
if i == 4:
logging.info("time's up")
logging.info("downloading")
record = await self.account_interface.get_datastore(self.number)
if not record and utils.get_secret("MIGRATE"):
logging.warning("trying without plus")
record = await self.account_interface.get_datastore(
self.number.removeprefix("+")
)
logging.info("got datastore from pg")
if json_data := record[0].get("account"):
# legacy json-only field
loaded_data = json.loads(json_data)
if "username" in loaded_data:
try:
os.mkdir("data")
except FileExistsError:
pass
open("data/" + loaded_data["username"], "w").write(json_data)
return
buffer = BytesIO(record[0].get("datastore"))
tarball = TarFile(fileobj=buffer)
fnames = [member.name for member in tarball.getmembers()]
logging.debug(fnames[:2])
logging.info(
"expected file %s exists: %s",
self.filepath,
self.filepath in fnames,
)
tarball.extractall(utils.ROOT_DIR)
# open("last_downloaded_checksum", "w").write(zlib.crc32(buffer.seek(0).read()))
await self.account_interface.mark_account_claimed(self.number, utils.HOSTNAME)
logging.debug("marked account as claimed, asserting that this is the case")
assert await self.is_claimed()
return
def tarball_data(self) -> Optional[bytes]:
"""Tarball our data files"""
if not self.is_registered_locally():
logging.error("datastore not registered. not uploading")
return None
# fixme: check if the last thing we downloaded/uploaded
# is older than the last thing in the db
buffer = BytesIO()
tarball = TarFile(fileobj=buffer, mode="w")
try:
tarball.add(self.filepath)
try:
tarball.add(self.filepath + ".d")
except FileNotFoundError:
logging.info("ignoring no %s", self.filepath + ".d")
except FileNotFoundError:
logging.warning(
"couldn't find %s in %s, adding data instead",
self.filepath + ".d",
os.getcwd(),
)
tarball.add("data")
fnames = [member.name for member in tarball.getmembers()]
logging.debug(fnames[:2])
tarball.close()
buffer.seek(0)
data = buffer.read()
return data
async def upload(self) -> float:
"""Puts account datastore in postgresql."""
data = self.tarball_data()
if not data:
return 0.0
kb = round(len(data) / 1024, 1)
# maybe something like:
# upload and return registered timestamp. write timestamp locally. when uploading, check that the last_updated_ts in postgres matches the file
# if it doesn't, you've probably diverged, but someone may have put an invalid ratchet more recently by mistake (e.g. restarting triggering upload despite crashing)
# or:
# open("last_uploaded_checksum", "w").write(zlib.crc32(buffer.seek(0).read()))
await self.account_interface.upload(self.number, data)
logging.debug("saved %s kb of tarballed datastore to supabase", kb)
return kb
async def mark_freed(self) -> list:
"""Marks account as freed in PG database."""
return await self.account_interface.mark_account_freed(self.number)
def setup_tmpdir() -> None:
if not utils.LOCAL:
return
if utils.ROOT_DIR == ".":
logging.warning("not setting up tmpdir")
return
logging.info("setitng up tmpdir")
# if utils.ROOT_DIR == "/tmp/local-signal/":
# try:
# shutil.rmtree(utils.ROOT_DIR)
# except (FileNotFoundError, OSError) as e:
# logging.warning("couldn't remove rootdir: %s", e)
(Path(utils.ROOT_DIR) / "data").mkdir(exist_ok=True, parents=True)
# assume we're running in the repo
sigcli = utils.get_secret("SIGNAL_CLI_PATH") or "auxin-cli"
sigcli_path = Path(sigcli).absolute()
try:
logging.info("symlinking %s to %s", sigcli_path, utils.ROOT_DIR)
os.symlink(sigcli_path, utils.ROOT_DIR + "/auxin-cli")
except FileExistsError:
logging.info("auxin-cli's already there")
try:
os.symlink(Path("avatar.png").absolute(), utils.ROOT_DIR + "/avatar.png")
except FileExistsError:
pass
logging.info("chdir to %s", utils.ROOT_DIR)
os.chdir(utils.ROOT_DIR)
logging.info("not starting memfs because running locally")
return
async def getFreeSignalDatastore() -> SignalDatastore:
interface = get_account_interface()
await interface.free_accounts_not_updated_in_the_last_hour()
record = await interface.get_free_account()
if not record:
raise Exception("no free accounts")
# alternatively, register an account...
# could put some of register.py/signalcaptcha handler here...
number = record[0].get("id")
logging.info(number)
assert number
return SignalDatastore(number)
# this stuff needs to be cleaned up
# maybe a config about where we're running:
# MEMFS, DOWNLOAD, ROOT_DIR, HOSTNAME, etc
# is HCL overkill?
parser = argparse.ArgumentParser(
description="manage the signal datastore. use ENV=... to use something other than dev"
)
subparser = parser.add_subparsers(dest="subparser") # ?
# h/t https://gist.github.com/mivade/384c2c41c3a29c637cb6c603d4197f9f
def argument(*name_or_flags: Any, **kwargs: Any) -> tuple:
"""Convenience function to properly format arguments to pass to the
subcommand decorator.
"""
return (list(name_or_flags), kwargs)
def subcommand(
_args: Optional[list] = None, parent: argparse._SubParsersAction = subparser
) -> Callable:
"""Decorator to define a new subcommand in a sanity-preserving way.
The function will be stored in the ``func`` variable when the parser
parses arguments so that it can be called directly like so::
args = cli.parse_args()
args.func(args)
Usage example::
@subcommand([argument("-d", help="Enable debug mode", action="store_true")])
def subcommand(args):
print(args)
Then on the command line::
$ python cli.py subcommand -d
"""
def decorator(func: Callable) -> None:
_parser = parent.add_parser(func.__name__, description=func.__doc__)
for arg in _args if _args else []:
_parser.add_argument(*arg[0], **arg[1])
_parser.set_defaults(func=func)
return decorator
@subcommand()
async def list_accounts(_args: argparse.Namespace) -> None:
"list available accounts in table format"
cols = ["id", "last_update_ms", "last_claim_ms", "active_node_name"]
interface = get_account_interface()
# sorry
if "notes" in [
column.get("column_name")
for column in (
await interface.execute(
"select column_name from information_schema.columns where table_name='signal_accounts';"
)
or [] # don't error if the query fails
)
]:
cols.append("notes")
query = f"select {' ,'.join(cols)} from signal_accounts order by id"
accounts = await get_account_interface().execute(query)
if not isinstance(accounts, list):
return
table = [cols] + [
[str(value) for value in account.values()] for account in accounts
]
str_widths = [max(len(row[index]) for row in table) for index in range(len(cols))]
row_format = " ".join("{:<" + str(width) + "}" for width in str_widths)
for row in table:
print((row_format.format(*row).rstrip()))
return
@subcommand([argument("--number")])
async def free(ns: argparse.Namespace) -> None:
"mark account freed"
await get_account_interface().mark_account_freed(ns.number)
@subcommand([argument("--number"), argument("note", help="new note for number")])
async def set_note(ns: argparse.Namespace) -> None:
"set the note field for a number"
await get_account_interface().execute(
f"update signal_accounts set notes='{ns.note}' where id='{ns.number}'"
)
@subcommand([argument("--number")])
async def sync(ns: argparse.Namespace) -> None:
# maybe worth running autosave after all?
try:
datastore = SignalDatastore(ns.number)
await datastore.download()
except (IndexError, DatastoreError):
datastore = await getFreeSignalDatastore()
await datastore.download()
try:
while 1:
time.sleep(3600)
except KeyboardInterrupt:
await datastore.upload()
await datastore.mark_freed()
upload_parser = subparser.add_parser("upload")
upload_parser.add_argument("--no_tmpdir", "-n", action="store_false")
upload_parser.add_argument("--path")
upload_parser.add_argument("--number")
# download_parser = subparser.add_parser("download")
# download_parser.add_argument("--number")
# migrate_parser = subparser.add_parser("migrate")
# migrate_parser.add_argument("--create")
if __name__ == "__main__":
args = parser.parse_args()
if hasattr(args, "func"):
asyncio.run(args.func(args))
elif args.subparser == "upload":
if args.path:
os.chdir(args.path)
if args.number:
num = args.number
else:
num = os.listdir("data")[0]
store = SignalDatastore(num, args.no_tmpdir)
asyncio.run(store.upload())
else:
print("not implemented")