Skip to content

Commit

Permalink
Make nodetool import congruent with the documentation by not relying …
Browse files Browse the repository at this point in the history
…on the folder structure of the imported SSTable files

nodetool import requires keyspace and table on the command line to import SSTables to. This
was not always working as specifying keypace and table while having SSTables located in the
directory structure which differed on keyspace and table (dir and parent dir) just
ignored them.

patch by Stefan Miklosovic; reviewed by Brandon Williams for CASSANDRA-19401
  • Loading branch information
smiklosovic committed May 2, 2024
1 parent c14abb4 commit 334ca05
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
4.0.13
* Make nodetool import congruent with the documentation by not relying on the folder structure of the imported SSTable files (CASSANDRA-19401)
* IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182)
* Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736)
* Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566)
Expand Down
55 changes: 45 additions & 10 deletions src/java/org/apache/cassandra/db/Directories.java
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,21 @@ public SSTableLister snapshots(String sn)

public Map<Descriptor, Set<Component>> list()
{
filter();
return list(false);
}

/**
* This method is used upon SSTable importing (nodetool import) as there is no strict requirement to
* place SSTables in a directory structure where name of a directory equals to table name
* and parent of such directory equals to keyspace name. For such cases, we want to include such SSTables too,
* rendering the parameter to be set to true.
*
* @param includeForeignTables whether descriptors not matching metadata of this lister should be included
* @return found descriptors and related set of components
*/
public Map<Descriptor, Set<Component>> list(boolean includeForeignTables)
{
filter(includeForeignTables);
return ImmutableMap.copyOf(components);
}

Expand All @@ -895,7 +909,12 @@ public List<Map.Entry<Descriptor, Set<Component>>> sortedList()

public List<File> listFiles()
{
filter();
return listFiles(false);
}

public List<File> listFiles(boolean includeForeignTables)
{
filter(includeForeignTables);
List<File> l = new ArrayList<>(nbFiles);
for (Map.Entry<Descriptor, Set<Component>> entry : components.entrySet())
{
Expand All @@ -907,7 +926,7 @@ public List<File> listFiles()
return l;
}

private void filter()
private void filter(boolean includeForeignTables)
{
if (filtered)
return;
Expand All @@ -919,21 +938,21 @@ private void filter()

if (snapshotName != null)
{
LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(), onTxnErr);
LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(includeForeignTables), onTxnErr);
continue;
}

if (!onlyBackups)
LifecycleTransaction.getFiles(location.toPath(), getFilter(), onTxnErr);
LifecycleTransaction.getFiles(location.toPath(), getFilter(includeForeignTables), onTxnErr);

if (includeBackups)
LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(), onTxnErr);
LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(includeForeignTables), onTxnErr);
}

filtered = true;
}

private BiPredicate<File, FileType> getFilter()
private BiPredicate<File, FileType> getFilter(boolean includeForeignTables)
{
// This function always return false since it adds to the components map
return (file, type) ->
Expand All @@ -951,15 +970,31 @@ private BiPredicate<File, FileType> getFilter()
if (pair == null)
return false;

Descriptor descriptor = null;

// we are only interested in the SSTable files that belong to the specific ColumnFamily
if (!pair.left.ksname.equals(metadata.keyspace) || !pair.left.cfname.equals(metadata.name))
return false;
{
if (!includeForeignTables)
return false;

descriptor = new Descriptor(pair.left.version,
pair.left.directory,
metadata.keyspace,
metadata.name,
pair.left.generation,
pair.left.formatType);
}
else
{
descriptor = pair.left;
}

Set<Component> previous = components.get(pair.left);
Set<Component> previous = components.get(descriptor);
if (previous == null)
{
previous = new HashSet<>();
components.put(pair.left, previous);
components.put(descriptor, previous);
}
previous.add(pair.right);
nbFiles++;
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/SSTableImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ synchronized List<String> importNewSSTables(Options options)
{
Directories.SSTableLister lister = listerPair.left;
String dir = listerPair.right;
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
{
Descriptor descriptor = entry.getKey();
if (!currentDescriptors.contains(entry.getKey()))
Expand Down Expand Up @@ -124,7 +124,7 @@ synchronized List<String> importNewSSTables(Options options)

Set<MovedSSTable> movedSSTables = new HashSet<>();
Set<SSTableReader> newSSTablesPerDirectory = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
{
try
{
Expand Down
69 changes: 62 additions & 7 deletions test/unit/org/apache/cassandra/db/ImportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import org.apache.cassandra.cache.RowCacheKey;
Expand All @@ -55,6 +55,12 @@

public class ImportTest extends CQLTester
{
@After
public void afterTest()
{
SSTableReader.resetTidying();
}

@Test
public void basicImportByMovingTest() throws Throwable
{
Expand Down Expand Up @@ -213,7 +219,7 @@ public void importClearRepairedTest() throws Throwable
assertTrue(sstable.isRepaired());

getCurrentColumnFamilyStore().clearUnsafe();
backupdir = moveToBackupDir(sstables);
backupdir = moveToBackupDir(sstables, KEYSPACE, currentTable());

options = SSTableImporter.Options.options(backupdir.toString()).clearRepaired(true).build();
importer.importNewSSTables(options);
Expand All @@ -224,6 +230,11 @@ public void importClearRepairedTest() throws Throwable
}

private File moveToBackupDir(Set<SSTableReader> sstables) throws IOException
{
return moveToBackupDir(sstables, KEYSPACE, currentTable());
}

private File moveToBackupDir(Set<SSTableReader> sstables, String keyspace, String table) throws IOException
{
Path temp = Files.createTempDirectory("importtest");
SSTableReader sst = sstables.iterator().next();
Expand Down Expand Up @@ -409,12 +420,12 @@ public void testImportOutOfRange() throws Throwable

TokenMetadata tmd = StorageService.instance.getTokenMetadata();

tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.1"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.2"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.3"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.1"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.2"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.3"));


File backupdir = moveToBackupDir(sstables);
File backupdir = moveToBackupDir(sstables, KEYSPACE, currentTable());
try
{
SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).build();
Expand Down Expand Up @@ -534,6 +545,7 @@ public void testImportInvalidateCache() throws Throwable
beforeFirstImport.forEach(s -> s.selfRef().release());
options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).invalidateCaches(true).build();
importer.importNewSSTables(options);
Thread.sleep(2000);
assertEquals(10, CacheService.instance.rowCache.size());
it = CacheService.instance.rowCache.keyIterator();
while (it.hasNext())
Expand Down Expand Up @@ -682,7 +694,7 @@ public void importExoticTableNamesTest() throws Throwable
Set<SSTableReader> sstables = cfs.getLiveSSTables();
cfs.clearUnsafe();

File backupDir = moveToBackupDir(sstables);
File backupDir = moveToBackupDir(sstables, KEYSPACE, table);

assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, table)).size());

Expand All @@ -704,6 +716,49 @@ public void importExoticTableNamesTest() throws Throwable
}
}

/**
* This test verifies that we successfully import SSTables which are in a directory structure
* where table name and keyspace name (current dir name and parent dir name) do not match the keyspace and
* table arguments on the command line.
*
* @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-19401">CASSANDRA-19401</a>
*/
@Test
public void importFromNonMatchingKeyspaceTableDir() throws Throwable
{
String table = "nonmatchingtable";
try
{
schemaChange(String.format("CREATE TABLE %s.%s (id int primary key, d int)", KEYSPACE, table));

for (int i = 0; i < 10; i++)
execute(String.format("INSERT INTO %s.%s (id, d) values (?, ?)", KEYSPACE, table), i, i);

ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, table);
cfs.forceBlockingFlush();

Set<SSTableReader> sstables = cfs.getLiveSSTables();
cfs.clearUnsafe();

File backupDir = moveToBackupDir(sstables, "randomdir1", "randomdir2");

assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, table)).size());

SSTableImporter importer = new SSTableImporter(cfs);
SSTableImporter.Options options = SSTableImporter.Options.options(backupDir.toString()).copyData(true).build();
List<String> failedDirectories = importer.importNewSSTables(options);
assertTrue(failedDirectories.isEmpty());
assertEquals(10, execute(String.format("select * from %s.%s", KEYSPACE, table)).size());

// files are left there as they were just copied
Assert.assertNotEquals(0, countFiles(backupDir));
}
finally
{
execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, table));
}
}

private static class MockCFS extends ColumnFamilyStore
{
public MockCFS(ColumnFamilyStore cfs, Directories dirs)
Expand Down

0 comments on commit 334ca05

Please sign in to comment.