Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

This patch addresses two cache issues #295

Open
wants to merge 1 commit into
base: branch-0.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/main/scala/shark/execution/SparkLoadTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,15 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe
if (work.cacheMode != CacheType.TACHYON) {
val memoryTable = getOrCreateMemoryTable(hiveTable)
work.commandType match {
case (SparkLoadWork.CommandTypes.OVERWRITE | SparkLoadWork.CommandTypes.NEW_ENTRY) =>
memoryTable.put(tablePartitionRDD, tableStats.toMap)
case (SparkLoadWork.CommandTypes.OVERWRITE | SparkLoadWork.CommandTypes.NEW_ENTRY) => {
val prevRDDandStatsOpt = memoryTable.put(tablePartitionRDD, tableStats.toMap)
if (prevRDDandStatsOpt.isDefined){
// Prevent memory leaks when partition is overwritten
val (prevRdd, prevStats) = (prevRDDandStatsOpt.get._1, prevRDDandStatsOpt.get._2)
RDDUtils.unpersistRDD(prevRdd)
}

}
case SparkLoadWork.CommandTypes.INSERT => {
memoryTable.update(tablePartitionRDD, tableStats)
}
Expand Down Expand Up @@ -398,7 +405,13 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe
(work.commandType == SparkLoadWork.CommandTypes.INSERT)) {
partitionedTable.updatePartition(partitionKey, tablePartitionRDD, tableStats)
} else {
partitionedTable.putPartition(partitionKey, tablePartitionRDD, tableStats.toMap)
val prevRDDandStatsOpt = partitionedTable.putPartition(partitionKey, tablePartitionRDD, tableStats.toMap)
if (prevRDDandStatsOpt.isDefined){
// Prevent memory leaks when partition is overwritten
val (prevRdd, prevStats) = (prevRDDandStatsOpt.get._1, prevRDDandStatsOpt.get._2)
RDDUtils.unpersistRDD(prevRdd)
}

}
}
}
Expand Down
39 changes: 25 additions & 14 deletions src/main/scala/shark/memstore2/MemoryMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@ class MemoryMetadataManager extends LogHelper {
databaseName: String,
tableName: String,
cacheMode: CacheType.CacheType): MemoryTable = {
val tableKey = MemoryMetadataManager.makeTableKey(databaseName, tableName)
val newTable = new MemoryTable(databaseName, tableName, cacheMode)
_tables.put(tableKey, newTable)
newTable
val tableKey = MemoryMetadataManager.makeTableKey(databaseName, tableName)
// Clear out any existing tables with the same key; prevent memory leak
if (containsTable(databaseName, tableName)) {
logInfo("Attempt to create new table when one already exists - " + tableKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be checked by Hive's DDL code. If the problem is caused by CACHE <table> handling, then we should add a check in SharkDDLSemanticAnalyzer that throws an exception if the user tries to CACHE a table already stored at MEMORY level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a problem when we deal with tables that mutate. i.e. folks using cache on partitioned tables or on tables that get data inserted. That creates a need to call cache a second time. I think we need to support repeated calls to cache without an exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could silently ignore repeated cache calls, or return a more descriptive error message saying that the first cache call insures that successive inserts on a MEMORY table will be pinned in memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That feature does not work. Here is what fails for me:

Create external partitioned table
add partition
cache table
add partition

the second partition is not in memory; and when I query the table, I get partial and incorrect results. i.e. I only get back incorrect results from the first partition and no error to the user. Its more than a exception/messaging issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wasn't too clear above. That issue of corrupted partitioned tables
has been addressed in the PR linked in the comments below.
These inline comments about the MemoryMetadataManager workaround address the
concern that reloading entire tables on repeated CACHE calls is redundant
if the first call works correctly (given the fix for partitioned tables).
On Feb 26, 2014 10:48 AM, "sundeepn" notifications@github.com wrote:

In src/main/scala/shark/memstore2/MemoryMetadataManager.scala:

@@ -54,10 +54,16 @@ class MemoryMetadataManager extends LogHelper {
databaseName: String,
tableName: String,
cacheMode: CacheType.CacheType): MemoryTable = {

  • val tableKey = MemoryMetadataManager.makeTableKey(databaseName, tableName)
  • val newTable = new MemoryTable(databaseName, tableName, cacheMode)
  • _tables.put(tableKey, newTable)
  • newTable
  • val tableKey = MemoryMetadataManager.makeTableKey(databaseName, tableName)
  • // Clear out any existing tables with the same key; prevent memory leak
  • if (containsTable(databaseName, tableName)) {
  •  logInfo("Attempt to create new table when one already exists - " + tableKey)
    

That feature does not work. Here is what fails for me:

Create external partitioned table
add partition
cache table
add partition

the second partition is not in memory; and when I query the table, I get
partial and incorrect results. i.e. I only get back incorrect results from
the first partition and no error to the user. Its more than a
exception/messaging issue.

Reply to this email directly or view it on GitHubhttps://github.com//pull/295/files#r10092827
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.c. I have not looked at the 0.9 code yet, but in the 0.8.1, the new partition does not get automatically cached without issueing another 'cache tablename' directive. I agree that we should avoid reload of the entire table for each cache directive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the commits will have to be back-ported to 0.8.2.

_tables.get(tableKey).get.asInstanceOf[MemoryTable]
} else {
val newTable = new MemoryTable(databaseName, tableName, cacheMode)
_tables.put(tableKey, newTable)
newTable
}
}

def createPartitionedMemoryTable(
Expand All @@ -67,16 +73,21 @@ class MemoryMetadataManager extends LogHelper {
tblProps: JavaMap[String, String]
): PartitionedMemoryTable = {
val tableKey = MemoryMetadataManager.makeTableKey(databaseName, tableName)
val newTable = new PartitionedMemoryTable(databaseName, tableName, cacheMode)
// Determine the cache policy to use and read any user-specified cache settings.
val cachePolicyStr = tblProps.getOrElse(SharkTblProperties.CACHE_POLICY.varname,
SharkTblProperties.CACHE_POLICY.defaultVal)
val maxCacheSize = tblProps.getOrElse(SharkTblProperties.MAX_PARTITION_CACHE_SIZE.varname,
SharkTblProperties.MAX_PARTITION_CACHE_SIZE.defaultVal).toInt
newTable.setPartitionCachePolicy(cachePolicyStr, maxCacheSize)

_tables.put(tableKey, newTable)
newTable
// Clear out any existing tables with the same key; prevent memory leak
if (containsTable(databaseName, tableName)) {
logInfo("Attempt to create new table when one already exists - " + tableKey)
_tables.get(tableKey).get.asInstanceOf[PartitionedMemoryTable]
} else {
val newTable = new PartitionedMemoryTable(databaseName, tableName, cacheMode)
// Determine the cache policy to use and read any user-specified cache settings.
val cachePolicyStr = tblProps.getOrElse(SharkTblProperties.CACHE_POLICY.varname,
SharkTblProperties.CACHE_POLICY.defaultVal)
val maxCacheSize = tblProps.getOrElse(SharkTblProperties.MAX_PARTITION_CACHE_SIZE.varname,
SharkTblProperties.MAX_PARTITION_CACHE_SIZE.defaultVal).toInt
newTable.setPartitionCachePolicy(cachePolicyStr, maxCacheSize)
_tables.put(tableKey, newTable)
newTable
}
}

def getTable(databaseName: String, tableName: String): Option[Table] = {
Expand Down