Skip to content

Commit

Permalink
Create deleted-accounts records keyed by both e164 and PNI
Browse files Browse the repository at this point in the history
  • Loading branch information
jkt-signal authored Nov 25, 2024
1 parent 49d6a5e commit ffed19d
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.MigrateDeletedAccountsCommand;
import org.whispersystems.textsecuregcm.workers.MigrateRegistrationRecoveryPasswordsCommand;
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
Expand Down Expand Up @@ -331,6 +332,7 @@ public void initialize(final Bootstrap<WhisperServerConfiguration> bootstrap) {
"Processes scheduled jobs to send notifications to idle devices",
new IdleDeviceNotificationSchedulerFactory()));

bootstrap.addCommand(new MigrateDeletedAccountsCommand());
bootstrap.addCommand(new MigrateRegistrationRecoveryPasswordsCommand());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand All @@ -57,6 +59,7 @@
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Put;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
Expand Down Expand Up @@ -439,8 +442,10 @@ public void changeNumber(final Account account,
writeItems.add(buildConstraintTablePut(phoneNumberIdentifierConstraintTableName, uuidAttr, ATTR_PNI_UUID, pniAttr));
writeItems.add(buildRemoveDeletedAccount(number));
writeItems.add(buildRemoveDeletedAccount(phoneNumberIdentifier));
maybeDisplacedAccountIdentifier.ifPresent(displacedAccountIdentifier ->
writeItems.add(buildPutDeletedAccount(displacedAccountIdentifier, originalNumber)));
maybeDisplacedAccountIdentifier.ifPresent(displacedAccountIdentifier -> {
writeItems.add(buildPutDeletedAccount(displacedAccountIdentifier, originalNumber));
writeItems.add(buildPutDeletedAccount(displacedAccountIdentifier, originalPni));
});

// The `catch (TransactionCanceledException) block needs to check whether the cancellation reason is the account
// update write item
Expand Down Expand Up @@ -1163,7 +1168,19 @@ private TransactWriteItem buildPutDeletedAccount(final UUID uuid, final String e
.item(Map.of(
DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164),
DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond())))
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(clock.instant().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond())))
.build())
.build();
}

private TransactWriteItem buildPutDeletedAccount(final UUID aci, final UUID pni) {
return TransactWriteItem.builder()
.put(Put.builder()
.tableName(deletedAccountsTableName)
.item(Map.of(
DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(pni.toString()),
DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci),
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(clock.instant().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond())))
.build())
.build();
}
Expand Down Expand Up @@ -1203,6 +1220,16 @@ public Optional<UUID> findRecentlyDeletedAccountIdentifier(final String e164) {
return Optional.ofNullable(AttributeValues.getUUID(response.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null));
}

public Optional<UUID> findRecentlyDeletedAccountIdentifier(final UUID phoneNumberIdentifier) {
final GetItemResponse response = db().getItem(GetItemRequest.builder()
.tableName(deletedAccountsTableName)
.consistentRead(true)
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(phoneNumberIdentifier.toString())))
.build());

return Optional.ofNullable(AttributeValues.getUUID(response.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null));
}

public Optional<String> findRecentlyDeletedE164(final UUID uuid) {
final QueryResponse response = db().query(QueryRequest.builder()
.tableName(deletedAccountsTableName)
Expand Down Expand Up @@ -1232,7 +1259,8 @@ public CompletableFuture<Void> delete(final UUID uuid, final List<TransactWriteI
buildDelete(phoneNumberConstraintTableName, ATTR_ACCOUNT_E164, account.getNumber()),
buildDelete(accountsTableName, KEY_ACCOUNT_UUID, uuid),
buildDelete(phoneNumberIdentifierConstraintTableName, ATTR_PNI_UUID, account.getPhoneNumberIdentifier()),
buildPutDeletedAccount(uuid, account.getNumber())
buildPutDeletedAccount(uuid, account.getNumber()),
buildPutDeletedAccount(uuid, account.getPhoneNumberIdentifier())
));

account.getUsernameHash().ifPresent(usernameHash -> transactWriteItems.add(
Expand Down Expand Up @@ -1268,6 +1296,68 @@ Flux<Account> getAll(final int segments, final Scheduler scheduler) {
.sequential();
}

public Flux<Tuple3<String, UUID, Long>> getE164KeyedDeletedAccounts(final int segments, final Scheduler scheduler) {
if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive");
}

return Flux.range(0, segments)
.parallel()
.runOn(scheduler)
.flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder()
.tableName(deletedAccountsTableName)
.consistentRead(true)
.segment(segment)
.totalSegments(segments)
.build())
.items())
.map(item ->
Tuples.of(
item.get(DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s(),
AttributeValues.getUUID(item, DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null),
AttributeValues.getLong(item, DELETED_ACCOUNTS_ATTR_EXPIRES, 0)))
.filter(item -> item.getT1().startsWith("+"))
.sequential();
}

public CompletableFuture<Boolean> insertPniDeletedAccount(final String e164, final UUID pni, final UUID aci, final long expiration) {
// This happens under a pessimistic lock, but that wasn't taken before we found the record we want to migrate,
// so make sure the e164 record is unchanged before updating the PNI record
return asyncClient.getItem(GetItemRequest.builder()
.tableName(deletedAccountsTableName)
.consistentRead(true)
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164.toString())))
.build())
.thenComposeAsync(getItemResponse ->
getItemResponse.hasItem()
&& AttributeValues.getString(
getItemResponse.item(), DELETED_ACCOUNTS_KEY_ACCOUNT_E164, "").equals(e164)
&& AttributeValues.getUUID(
getItemResponse.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, UUID.randomUUID()).equals(aci)
&& AttributeValues.getLong(
getItemResponse.item(), DELETED_ACCOUNTS_ATTR_EXPIRES, 0) == expiration
? asyncClient.putItem(
PutItemRequest.builder()
.tableName(deletedAccountsTableName)
.item(
Map.of(
DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(pni.toString()),
DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci),
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(expiration)))
.conditionExpression("attribute_not_exists(#key)")
.expressionAttributeNames(Map.of("#key", DELETED_ACCOUNTS_KEY_ACCOUNT_E164))
.build())
.thenApply(ignored -> true)
.exceptionally(throwable -> {
if (ExceptionUtils.unwrap(throwable) instanceof ConditionalCheckFailedException) {
// there was already a PNI record; no problem, do nothing
return false;
}
throw ExceptionUtils.wrap(throwable);
})
: CompletableFuture.completedFuture(false));
}

@Nonnull
private Optional<Account> getByIndirectLookup(
final Timer timer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple3;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;

Expand Down Expand Up @@ -1216,6 +1217,19 @@ public Flux<Account> streamAllFromDynamo(final int segments, final Scheduler sch
return accounts.getAll(segments, scheduler);
}

public Flux<Tuple3<String, UUID, Long>> getE164KeyedDeletedAccounts(final int segments, final Scheduler scheduler) {
return accounts.getE164KeyedDeletedAccounts(segments, scheduler);
}

public CompletableFuture<Boolean> migrateDeletedAccount(final String e164, final UUID aci, final long expiration) {
return phoneNumberIdentifiers.getPhoneNumberIdentifier(e164)
.thenCompose(
pni -> accountLockManager.withLockAsync(
List.of(pni),
() -> accounts.insertPniDeletedAccount(e164, pni, aci, expiration),
accountLockExecutor));
}

public CompletableFuture<Void> delete(final Account account, final DeletionReason deletionReason) {
final Timer.Sample sample = Timer.start();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.workers;

import io.dropwizard.core.Application;
import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependencies {

private static final String RECORDS_INSPECTED_COUNTER_NAME =
MetricsUtil.name(MigrateDeletedAccountsCommand.class, "recordsInspected");

private static final String RECORDS_MIGRATED_COUNTER_NAME =
MetricsUtil.name(MigrateDeletedAccountsCommand.class, "recordsMigrated");

private static final String DRY_RUN_TAG = "dryRun";

private final Logger logger = LoggerFactory.getLogger(getClass());

private static final String SEGMENT_COUNT_ARGUMENT = "segments";
private static final String DRY_RUN_ARGUMENT = "dry-run";
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";

private static final int DEFAULT_SEGMENT_COUNT = 1;
private static final int DEFAULT_CONCURRENCY = 16;

public MigrateDeletedAccountsCommand() {
super(new Application<>() {
@Override
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
}
}, "migrate-deleted-accounts", "Migrates recently-deleted account records from E164 to PNI-keyed schema");
}

@Override
public void configure(final Subparser subparser) {
super.configure(subparser);

subparser.addArgument("--segments")
.type(Integer.class)
.dest(SEGMENT_COUNT_ARGUMENT)
.required(false)
.setDefault(DEFAULT_SEGMENT_COUNT)
.help("The total number of segments for a DynamoDB scan");

subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.required(false)
.setDefault(DEFAULT_CONCURRENCY)
.help("Max concurrency for migrations.");

subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, don’t actually migrate any deleted accounts records");
}

@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
final int segments = namespace.getInt(SEGMENT_COUNT_ARGUMENT);
final int concurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);

final String deletedAccountsTableName = configuration.getDynamoDbTables().getDeletedAccounts().getTableName();
logger.info("Crawling deleted accounts with {} segments and {} processors",
segments,
Runtime.getRuntime().availableProcessors());

final Counter recordsInspectedCounter =
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));

final Counter recordsMigratedCounter =
Metrics.counter(RECORDS_MIGRATED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));

final AccountsManager accounts = commandDependencies.accountsManager();

accounts.getE164KeyedDeletedAccounts(segments, Schedulers.parallel())
.doOnNext(tuple -> recordsInspectedCounter.increment())
.flatMap(
tuple -> dryRun
? Mono.just(false)
: Mono.fromFuture(
accounts.migrateDeletedAccount(
tuple.getT1(), tuple.getT2(), tuple.getT3())),
concurrency)
.filter(migrated -> migrated)
.doOnNext(ignored -> recordsMigratedCounter.increment())
.then()
.block();
}
}
Loading

0 comments on commit ffed19d

Please sign in to comment.