Skip to content

Commit

Permalink
Merge pull request #16583 from iterate-ch/bugfix/SDS-1621
Browse files Browse the repository at this point in the history
Always run FileKey generation in background
  • Loading branch information
dkocher authored Nov 26, 2024
2 parents ef0b879 + 6c4c2e2 commit 2790232
Show file tree
Hide file tree
Showing 20 changed files with 157 additions and 140 deletions.
17 changes: 14 additions & 3 deletions core/src/main/java/ch/cyberduck/core/features/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@
*/

import ch.cyberduck.core.PasswordCallback;
import ch.cyberduck.core.pool.SessionPool;

import java.util.concurrent.Future;

@Optional
public interface Scheduler<R> {
R repeat(SessionPool pool, PasswordCallback callback);
/**
* Repeated execution on background thread
*/
Future<R> repeat(PasswordCallback callback);

void shutdown();
/**
* Single execution on background thread with no delay
*/
Future<R> execute(PasswordCallback callback);

/**
* Shutdown thread pool
*/
void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import ch.cyberduck.core.PasswordCallback;
import ch.cyberduck.core.features.Scheduler;
import ch.cyberduck.core.pool.SessionPool;

import java.util.concurrent.Future;

public class DelegatingSchedulerFeature implements Scheduler<Void> {

Expand All @@ -28,9 +29,17 @@ public DelegatingSchedulerFeature(final Scheduler... features) {
}

@Override
public Void repeat(final SessionPool pool, final PasswordCallback callback) {
public Future<Void> repeat(final PasswordCallback callback) {
for(Scheduler scheduler : features) {
scheduler.repeat(callback);
}
return null;
}

@Override
public Future<Void> execute(final PasswordCallback callback) {
for(Scheduler scheduler : features) {
scheduler.repeat(pool, callback);
scheduler.execute(callback);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,18 @@
*/

import ch.cyberduck.core.PasswordCallback;
import ch.cyberduck.core.Path;
import ch.cyberduck.core.exception.BackgroundException;
import ch.cyberduck.core.features.Scheduler;
import ch.cyberduck.core.pool.SessionPool;
import ch.cyberduck.core.threading.ThreadPool;
import ch.cyberduck.core.threading.ThreadPoolFactory;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;

public abstract class OneTimeSchedulerFeature<R> implements Scheduler<Future<R>> {
public abstract class OneTimeSchedulerFeature<R> extends ThreadPoolSchedulerFeature<R> {

private final Path file;

private final ThreadPool scheduler = ThreadPoolFactory.get("scheduler", 1);

public OneTimeSchedulerFeature(final Path file) {
this.file = file;
}

protected abstract R operate(PasswordCallback callback, Path file) throws BackgroundException;

@Override
public Future<R> repeat(final SessionPool pool, final PasswordCallback callback) {
return scheduler.execute(new Callable<R>() {
@Override
public R call() throws Exception {
return operate(callback, file);
}
});
public OneTimeSchedulerFeature() {
super(Long.MAX_VALUE);
}

@Override
public void shutdown() {
scheduler.shutdown(false);
public Future<R> repeat(final PasswordCallback callback) {
// No repeat
return this.execute(callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,63 +16,64 @@
*/

import ch.cyberduck.core.PasswordCallback;
import ch.cyberduck.core.Path;
import ch.cyberduck.core.Session;
import ch.cyberduck.core.exception.BackgroundException;
import ch.cyberduck.core.exception.ConnectionCanceledException;
import ch.cyberduck.core.exception.LoginFailureException;
import ch.cyberduck.core.features.Scheduler;
import ch.cyberduck.core.pool.SessionPool;
import ch.cyberduck.core.threading.BackgroundActionState;
import ch.cyberduck.core.threading.ScheduledThreadPool;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public abstract class AbstractSchedulerFeature<R> implements Scheduler<Void> {
private static final Logger log = LogManager.getLogger(AbstractSchedulerFeature.class);
public abstract class ThreadPoolSchedulerFeature<R> implements Scheduler<R> {
private static final Logger log = LogManager.getLogger(ThreadPoolSchedulerFeature.class);

private final long period;
private final ScheduledThreadPool scheduler = new ScheduledThreadPool();

public AbstractSchedulerFeature(final long period) {
public ThreadPoolSchedulerFeature(final long period) {
this.period = period;
}

@Override
public Void repeat(final SessionPool pool, final PasswordCallback callback) {
scheduler.repeat(() -> {
public Future<R> repeat(final PasswordCallback callback) {
return (ScheduledFuture<R>) scheduler.repeat(new FailureAwareRunnable(callback), period, TimeUnit.MILLISECONDS);
}

@Override
public Future<R> execute(final PasswordCallback callback) {
return (ScheduledFuture<R>) scheduler.schedule(new FailureAwareRunnable(callback), 0L, TimeUnit.MILLISECONDS);
}

protected abstract R operate(PasswordCallback callback) throws BackgroundException;

@Override
public void shutdown() {
log.debug("Shutting down scheduler thread pool {}", this);
scheduler.shutdown();
}

private final class FailureAwareRunnable implements Runnable {
private final PasswordCallback callback;

public FailureAwareRunnable(final PasswordCallback callback) {
this.callback = callback;
}

@Override
public void run() {
try {
final Session<?> session = pool.borrow(BackgroundActionState.running);
try {
this.operate(session, callback, null);
}
finally {
pool.release(session, null);
}
}
catch(LoginFailureException | ConnectionCanceledException e) {
log.warn("Cancel processing scheduled task after failure {}", e.getMessage());
this.shutdown();
ThreadPoolSchedulerFeature.this.operate(callback);
}
catch(BackgroundException e) {
log.warn("Failure processing scheduled task. {}", e.getMessage(), e);
}
catch(Exception e) {
log.error("Failure processing scheduled task {}", e.getMessage(), e);
this.shutdown();
ThreadPoolSchedulerFeature.this.shutdown();
}
}, period, TimeUnit.MILLISECONDS);
return null;
}

protected abstract R operate(Session<?> session, PasswordCallback callback, Path file) throws BackgroundException;

@Override
public void shutdown() {
log.debug("Shutting down scheduler thread pool");
scheduler.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPool {
private static final Logger log = LogManager.getLogger(ScheduledThreadPool.class);

private final ScheduledExecutorService pool;
private final ScheduledThreadPoolExecutor pool;

/**
* With FIFO (first-in-first-out) ordered wait queue.
Expand All @@ -52,7 +51,12 @@ public ScheduledThreadPool(final String threadNamePrefix) {
}

public ScheduledThreadPool(final Thread.UncaughtExceptionHandler handler, final String threadNamePrefix) {
this.pool = Executors.newScheduledThreadPool(1, new NamedThreadFactory(threadNamePrefix, handler));
this.pool = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(threadNamePrefix, handler),
new DefaultThreadPool.CustomCallerPolicy());
// no execute after shutdown
this.pool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
// cancel periodic tasks on shutdown
this.pool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
}

/**
Expand Down Expand Up @@ -95,5 +99,13 @@ public ScheduledFuture<?> schedule(final Runnable runnable, final Long delay, fi
public void shutdown() {
log.info("Shutdown pool {}", pool);
pool.shutdown();
try {
while(!pool.awaitTermination(1L, TimeUnit.SECONDS)) {
log.warn("Await termination for pool {}", pool);
}
}
catch(InterruptedException e) {
log.error("Failure awaiting pool termination. {}", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import ch.cyberduck.core.exception.BackgroundException;
import ch.cyberduck.core.features.Bulk;
import ch.cyberduck.core.features.Delete;
import ch.cyberduck.core.features.Scheduler;
import ch.cyberduck.core.preferences.HostPreferences;
import ch.cyberduck.core.transfer.Transfer;
import ch.cyberduck.core.transfer.TransferItem;
Expand Down Expand Up @@ -91,15 +92,15 @@ public void post(final Transfer.Type type, final Map<TransferItem, TransferStatu
default:
if(new HostPreferences(session.getHost()).getBoolean("sds.encryption.missingkeys.upload")) {
if(session.userAccount().isEncryptionEnabled()) {
final SDSMissingFileKeysSchedulerFeature background = new SDSMissingFileKeysSchedulerFeature();
final Scheduler scheduler = session.getFeature(Scheduler.class);
final Map<Path, Boolean> rooms = this.getRoomEncryptionStatus(files);
for(Map.Entry<TransferItem, TransferStatus> entry : files.entrySet()) {
final Path file = entry.getKey().remote;
if(file.isFile()) {
final Path container = containerService.getContainer(file);
if(rooms.get(container)) {
log.debug("Run missing file keys for {}", file);
background.operate(session, callback, file);
scheduler.execute(callback);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import ch.cyberduck.core.Credentials;
import ch.cyberduck.core.PasswordCallback;
import ch.cyberduck.core.Path;
import ch.cyberduck.core.Session;
import ch.cyberduck.core.exception.BackgroundException;
import ch.cyberduck.core.features.VersionIdProvider;
import ch.cyberduck.core.preferences.HostPreferences;
import ch.cyberduck.core.preferences.PreferencesFactory;
import ch.cyberduck.core.sds.io.swagger.client.ApiException;
Expand All @@ -36,7 +34,7 @@
import ch.cyberduck.core.sds.triplecrypt.TripleCryptConverter;
import ch.cyberduck.core.sds.triplecrypt.TripleCryptExceptionMappingService;
import ch.cyberduck.core.sds.triplecrypt.TripleCryptKeyPair;
import ch.cyberduck.core.shared.AbstractSchedulerFeature;
import ch.cyberduck.core.shared.ThreadPoolSchedulerFeature;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand All @@ -61,21 +59,30 @@

import static java.util.stream.Collectors.groupingBy;

public class SDSMissingFileKeysSchedulerFeature extends AbstractSchedulerFeature<List<UserFileKeySetRequest>> {
public class SDSMissingFileKeysSchedulerFeature extends ThreadPoolSchedulerFeature<List<UserFileKeySetRequest>> {
private static final Logger log = LogManager.getLogger(SDSMissingFileKeysSchedulerFeature.class);

public SDSMissingFileKeysSchedulerFeature() {
this(PreferencesFactory.get().getLong("sds.encryption.missingkeys.scheduler.period"));
private final SDSSession session;
private final SDSNodeIdProvider nodeid;
private final Path file;

public SDSMissingFileKeysSchedulerFeature(final SDSSession session, final SDSNodeIdProvider nodeid) {
this(session, nodeid, null);
}

public SDSMissingFileKeysSchedulerFeature(final SDSSession session, final SDSNodeIdProvider nodeid, final Path file) {
this(session, nodeid, file, PreferencesFactory.get().getLong("sds.encryption.missingkeys.scheduler.period"));
}

public SDSMissingFileKeysSchedulerFeature(final long period) {
public SDSMissingFileKeysSchedulerFeature(final SDSSession session, final SDSNodeIdProvider nodeid, final Path file, final long period) {
super(period);
this.file = file;
this.session = session;
this.nodeid = nodeid;
}

@Override
public List<UserFileKeySetRequest> operate(final Session<?> client, final PasswordCallback callback, final Path file) throws BackgroundException {
final SDSSession session = (SDSSession) client;
final SDSNodeIdProvider nodeid = (SDSNodeIdProvider) session._getFeature(VersionIdProvider.class);
protected List<UserFileKeySetRequest> operate(final PasswordCallback callback) throws BackgroundException {
try {
final UserAccountWrapper account = session.userAccount();
if(!account.isEncryptionEnabled()) {
Expand All @@ -85,16 +92,16 @@ public List<UserFileKeySetRequest> operate(final Session<?> client, final Passwo
final List<UserFileKeySetRequest> processed = new ArrayList<>();
// Null when operating from scheduler. File reference is set for post upload.
final Long fileId = file != null ? Long.parseLong(nodeid.getVersionId(file)) : null;
final HashMap<UserKeyPairContainer, Credentials> passphrases = new HashMap<>();
UserFileKeySetBatchRequest request;
do {
log.debug("Request a list of missing file keys for file {}", file);
log.debug("Request a list of missing file keys limited to {}", fileId);
request = new UserFileKeySetBatchRequest();
final MissingKeysResponse missingKeys = new NodesApi(session.getClient()).requestMissingFileKeys(
null, null, null, fileId, null, null, null);
final Map<Long, List<UserUserPublicKey>> userPublicKeys = missingKeys.getUsers().stream().collect(groupingBy(UserUserPublicKey::getId));
final Map<Long, List<FileFileKeys>> files = missingKeys.getFiles().stream().collect(groupingBy(FileFileKeys::getId));
for(UserIdFileIdItem item : missingKeys.getItems()) {
final HashMap<UserKeyPairContainer, Credentials> passphrases = new HashMap<>();
for(FileFileKeys fileKey : files.get(item.getFileId())) {
final EncryptedFileKey encryptedFileKey = TripleCryptConverter.toCryptoEncryptedFileKey(fileKey.getFileKeyContainer());
final UserKeyPairContainer keyPairForDecryption = session.getKeyPairForFileKey(encryptedFileKey.getVersion());
Expand Down
4 changes: 0 additions & 4 deletions dracoon/src/main/java/ch/cyberduck/core/sds/SDSProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import ch.cyberduck.core.Protocol;
import ch.cyberduck.core.Scheme;
import ch.cyberduck.core.features.Pairing;
import ch.cyberduck.core.features.Scheduler;
import ch.cyberduck.core.sds.triplecrypt.TripleCryptCleanupFeature;
import ch.cyberduck.core.shared.CredentialsCleanupService;
import ch.cyberduck.core.shared.DelegatingPairingFeature;
Expand Down Expand Up @@ -123,9 +122,6 @@ public enum Authorization {
@Override
@SuppressWarnings("unchecked")
public <T> T getFeature(final Class<T> type) {
if(type == Scheduler.class) {
return (T) new SDSMissingFileKeysSchedulerFeature();
}
if(type == Pairing.class) {
return (T) new DelegatingPairingFeature(new CredentialsCleanupService(), new TripleCryptCleanupFeature());
}
Expand Down
5 changes: 5 additions & 0 deletions dracoon/src/main/java/ch/cyberduck/core/sds/SDSSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public class SDSSession extends HttpSession<SDSApiClient> {
private UserKeyPair.Version requiredKeyPairVersion;

private final SDSNodeIdProvider nodeid = new SDSNodeIdProvider(this);
private final SDSMissingFileKeysSchedulerFeature scheduler = new SDSMissingFileKeysSchedulerFeature(this, nodeid);

public SDSSession(final Host host, final X509TrustManager trust, final X509KeyManager key) {
super(host, trust, key);
Expand Down Expand Up @@ -521,6 +522,7 @@ public UserKeyPair.Version requiredKeyPairVersion() {

@Override
protected void logout() {
scheduler.shutdown();
client.getHttpClient().close();
nodeid.clear();
}
Expand Down Expand Up @@ -594,6 +596,9 @@ public <T> T _getFeature(final Class<T> type) {
if(type == Encryptor.class) {
return (T) new SDSTripleCryptEncryptorFeature(this, nodeid);
}
if(type == Scheduler.class) {
return (T) scheduler;
}
return super._getFeature(type);
}
}
Loading

0 comments on commit 2790232

Please sign in to comment.