Skip to content

Commit

Permalink
Merge branch 'cassandra-4.0' into cassandra-4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
smiklosovic committed May 2, 2024
2 parents 7c79d91 + 334ca05 commit 85039aa
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Do not go to disk for reading hints file sizes (CASSANDRA-19477)
* Fix system_views.settings to handle array types (CASSANDRA-19475)
Merged from 4.0:
* Make nodetool import congruent with the documentation by not relying on the folder structure of the imported SSTable files (CASSANDRA-19401)
* IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182)
* Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736)
* Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566)
Expand Down
55 changes: 45 additions & 10 deletions src/java/org/apache/cassandra/db/Directories.java
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,21 @@ public SSTableLister snapshots(String sn)

public Map<Descriptor, Set<Component>> list()
{
filter();
return list(false);
}

/**
* This method is used upon SSTable importing (nodetool import) as there is no strict requirement to
* place SSTables in a directory structure where name of a directory equals to table name
* and parent of such directory equals to keyspace name. For such cases, we want to include such SSTables too,
* rendering the parameter to be set to true.
*
* @param includeForeignTables whether descriptors not matching metadata of this lister should be included
* @return found descriptors and related set of components
*/
public Map<Descriptor, Set<Component>> list(boolean includeForeignTables)
{
filter(includeForeignTables);
return ImmutableMap.copyOf(components);
}

Expand All @@ -922,7 +936,12 @@ public List<Map.Entry<Descriptor, Set<Component>>> sortedList()

public List<File> listFiles()
{
filter();
return listFiles(false);
}

public List<File> listFiles(boolean includeForeignTables)
{
filter(includeForeignTables);
List<File> l = new ArrayList<>(nbFiles);
for (Map.Entry<Descriptor, Set<Component>> entry : components.entrySet())
{
Expand All @@ -934,7 +953,7 @@ public List<File> listFiles()
return l;
}

private void filter()
private void filter(boolean includeForeignTables)
{
if (filtered)
return;
Expand All @@ -946,21 +965,21 @@ private void filter()

if (snapshotName != null)
{
LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(), onTxnErr);
LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(includeForeignTables), onTxnErr);
continue;
}

if (!onlyBackups)
LifecycleTransaction.getFiles(location.toPath(), getFilter(), onTxnErr);
LifecycleTransaction.getFiles(location.toPath(), getFilter(includeForeignTables), onTxnErr);

if (includeBackups)
LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(), onTxnErr);
LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(includeForeignTables), onTxnErr);
}

filtered = true;
}

private BiPredicate<File, FileType> getFilter()
private BiPredicate<File, FileType> getFilter(boolean includeForeignTables)
{
// This function always return false since it adds to the components map
return (file, type) ->
Expand All @@ -978,15 +997,31 @@ private BiPredicate<File, FileType> getFilter()
if (pair == null)
return false;

Descriptor descriptor = null;

// we are only interested in the SSTable files that belong to the specific ColumnFamily
if (!pair.left.ksname.equals(metadata.keyspace) || !pair.left.cfname.equals(metadata.name))
return false;
{
if (!includeForeignTables)
return false;

descriptor = new Descriptor(pair.left.version,
pair.left.directory,
metadata.keyspace,
metadata.name,
pair.left.id,
pair.left.formatType);
}
else
{
descriptor = pair.left;
}

Set<Component> previous = components.get(pair.left);
Set<Component> previous = components.get(descriptor);
if (previous == null)
{
previous = new HashSet<>();
components.put(pair.left, previous);
components.put(descriptor, previous);
}
previous.add(pair.right);
nbFiles++;
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/SSTableImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ synchronized List<String> importNewSSTables(Options options)
{
Directories.SSTableLister lister = listerPair.left;
String dir = listerPair.right;
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
{
Descriptor descriptor = entry.getKey();
if (!currentDescriptors.contains(entry.getKey()))
Expand Down Expand Up @@ -124,7 +124,7 @@ synchronized List<String> importNewSSTables(Options options)

Set<MovedSSTable> movedSSTables = new HashSet<>();
Set<SSTableReader> newSSTablesPerDirectory = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
{
try
{
Expand Down
71 changes: 63 additions & 8 deletions test/unit/org/apache/cassandra/db/ImportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -56,6 +57,11 @@

public class ImportTest extends CQLTester
{
@After
public void afterTest()
{
SSTableReader.resetTidying();
}

@Test
public void basicImportByMovingTest() throws Throwable
Expand Down Expand Up @@ -224,13 +230,10 @@ public void importClearRepairedTest() throws Throwable
assertFalse(sstable.isRepaired());
}

private File moveToBackupDir(Set<SSTableReader> sstables) throws IOException
private File moveToBackupDir(Set<SSTableReader> sstables, String keyspace, String table) throws IOException
{
Path temp = Files.createTempDirectory("importtest");
SSTableReader sst = sstables.iterator().next();
String tabledir = sst.descriptor.directory.name();
String ksdir = sst.descriptor.directory.parent().name();
Path backupdir = createDirectories(temp.toString(), ksdir, tabledir);
Path backupdir = createDirectories(temp.toString(), keyspace, table);
for (SSTableReader sstable : sstables)
{
sstable.selfRef().release();
Expand All @@ -248,6 +251,14 @@ private File moveToBackupDir(Set<SSTableReader> sstables) throws IOException
return new File(backupdir);
}

private File moveToBackupDir(Set<SSTableReader> sstables) throws IOException
{
SSTableReader sst = sstables.iterator().next();
String tabledir = sst.descriptor.directory.name();
String ksdir = sst.descriptor.directory.parent().name();
return moveToBackupDir(sstables, ksdir, tabledir);
}

private Path createDirectories(String base, String ... subdirs)
{
File b = new File(base);
Expand Down Expand Up @@ -411,9 +422,9 @@ public void testImportOutOfRange() throws Throwable

TokenMetadata tmd = StorageService.instance.getTokenMetadata();

tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.1"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.2"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.3"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.1"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.2"));
tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.3"));


File backupdir = moveToBackupDir(sstables);
Expand Down Expand Up @@ -536,6 +547,7 @@ public void testImportInvalidateCache() throws Throwable
beforeFirstImport.forEach(s -> s.selfRef().release());
options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).invalidateCaches(true).build();
importer.importNewSSTables(options);
Thread.sleep(2000);
assertEquals(10, CacheService.instance.rowCache.size());
it = CacheService.instance.rowCache.keyIterator();
while (it.hasNext())
Expand Down Expand Up @@ -707,6 +719,49 @@ public void importExoticTableNamesTest() throws Throwable
}
}

/**
* This test verifies that we successfully import SSTables which are in a directory structure
* where table name and keyspace name (current dir name and parent dir name) do not match the keyspace and
* table arguments on the command line.
*
* @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-19401">CASSANDRA-19401</a>
*/
@Test
public void importFromNonMatchingKeyspaceTableDir() throws Throwable
{
String table = "nonmatchingtable";
try
{
schemaChange(String.format("CREATE TABLE %s.%s (id int primary key, d int)", KEYSPACE, table));

for (int i = 0; i < 10; i++)
execute(String.format("INSERT INTO %s.%s (id, d) values (?, ?)", KEYSPACE, table), i, i);

ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, table);
Util.flush(cfs);

Set<SSTableReader> sstables = cfs.getLiveSSTables();
cfs.clearUnsafe();

File backupDir = moveToBackupDir(sstables, "randomdir1", "randomdir2");

assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, table)).size());

SSTableImporter importer = new SSTableImporter(cfs);
SSTableImporter.Options options = SSTableImporter.Options.options(backupDir.toString()).copyData(true).build();
List<String> failedDirectories = importer.importNewSSTables(options);
assertTrue(failedDirectories.isEmpty());
assertEquals(10, execute(String.format("select * from %s.%s", KEYSPACE, table)).size());

// files are left there as they were just copied
Assert.assertNotEquals(0, countFiles(backupDir));
}
finally
{
execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, table));
}
}

private static class MockCFS extends ColumnFamilyStore
{
public MockCFS(ColumnFamilyStore cfs, Directories dirs)
Expand Down

0 comments on commit 85039aa

Please sign in to comment.