-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-7117 Prevent OOM in Debezium Redis Sink #48
DBZ-7117 Prevent OOM in Debezium Redis Sink #48
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eizners Thanks for the PR, could you please come back and address my two comments?
@@ -31,6 +31,7 @@ public class RedisMemoryThreshold { | |||
|
|||
private RedisClient client; | |||
private long memoryLimit; | |||
private static long maxMemory = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mistake, probably remained from previous code I wrote. This is fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eizners Is it really fixed? I still see the static
. Maybe you forgot to push the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was fixed
debezium-server-redis/src/main/java/io/debezium/server/redis/RedisMemoryThreshold.java
Outdated
Show resolved
Hide resolved
if (!redisMemoryThreshold.checkMemory(getObjectSize(recordsMap.get(0)), recordsMap.size(), | ||
config.getBufferFillRate())) { | ||
LOGGER.info("Stopped consuming records!\n"); | ||
Thread.sleep(500); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If sleep is needed please use DelayStrategy
. Also the pause should configurable or derived from another configuration value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
oos.writeObject(record); | ||
oos.flush(); | ||
long size = boas.size() + 50; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the magical 50 constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created constant. 50 is redis stream memory overhead based on my observation.,
|
||
private long getObjectSize(SimpleEntry<String, Map<String, String>> record) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this will be a bit slow. We have io.debezium.util.ApproximateStructSizeCalculator
. Maybe something similar can be introduced?
private void disable() { | ||
isMemoryOk = MEMORY_OK; | ||
LOGGER.warn("Memory threshold percentage check is disabled!"); | ||
maximumMemory = configuredMemory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also set in https://github.com/debezium/debezium-server/pull/48/files#diff-8750121457077fe6a5f2ec0683ba43484e3a9592259bdecf64e2763066d45192R63
IMHO you should either return values or set them on the attributes but you should not mix both approaches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
List<SimpleEntry<String, Map<String, String>>> recordsMap = new ArrayList<>(clonedBatch.size()); | ||
for (ChangeEvent<Object, Object> record : clonedBatch) { | ||
String destination = streamNameMapper.map(record.destination()); | ||
Map<String, String> recordMap = recordMapFunction.apply(record); | ||
recordsMap.add(new SimpleEntry<>(destination, recordMap)); | ||
} | ||
if (!redisMemoryThreshold.checkMemory(getObjectSize(recordsMap.get(0)), recordsMap.size(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should guard against empty batch, would throw an excaption in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added check for empty batch
long usedMemory = memoryTuple.getItem1(); | ||
long prevAccumulatedMemory = accumulatedMemory; | ||
long diff = usedMemory - previouslyUsedMemory; | ||
if (diff == 0L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand reason for this. You are doing the query every time. What is the reasoning about caching of used memory in previouslyUsedMemory
? You could always operate on the most resent used memory. Also in that case accumulatedMemory
is probably also unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explained in a chat
String memory = client.info(INFO_MEMORY); | ||
LOGGER.trace(memory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better logging message needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
…o DBZ-7117-prevent-oom-in-debezium-redis-sink
private String humanReadableSize(long size) { | ||
if (size < 1024) { | ||
return size + " B"; | ||
} | ||
else if (size < 1024 * 1024) { | ||
return String.format("%.2f KB", size / 1024.0); | ||
} | ||
else if (size < 1024 * 1024 * 1024) { | ||
return String.format("%.2f MB", size / (1024.0 * 1024.0)); | ||
} | ||
else { | ||
return String.format("%.2f GB", size / (1024.0 * 1024.0 * 1024.0)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider this instead:
/**
* Formats a raw file size value into a human-readable string with appropriate units (B, KB, MB, GB).
*
* @param size The size value to be formatted.
* @return A human-readable string representing the file size with units (B, KB, MB, GB).
*/
private String formatFileSize(long size) {
final String[] units = {"B", "KB", "MB", "GB"};
int unitIndex = 0;
double sizeInUnit = size;
while (sizeInUnit >= 1024 && unitIndex < units.length - 1) {
sizeInUnit /= 1024.0;
unitIndex++;
}
return String.format("%.2f %s", sizeInUnit, units[unitIndex]);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considered, tested, replaced
@eizners I think we are almost done. Could you please remove the merge commit and/or squash the commits to a single one? |
No description provided.