change LogRecord.class to MessageLogRecord.class to solve the problem… #100
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Hi, guys, I found a problem, after you started delay server, you could see that messages could be written to message_log, but no messages were replayed to schedule_log. I checked the code and found that in LogIterateService#processLog(), then visitor.nextRecord(), readOneRecord(currentBuffer), then in DelayMessageLogVisitor#readOneRecord(SegmentBuffer segmentBuffer), at the last line you could see return LogVisitorRecord.data(new MessageLogRecord(header, recordBytes, wroteOffset, payloadSize, message)), the event.getClass() was MessageLogRecord.
However, in DefaultDelayLogFacade#DefaultDelayLogFacade(final StoreConfiguration config, final Function<ScheduleIndex, Boolean> func), you guys subscribed LogRecord in
bus.subscribe(LogRecord.class, e -> { AppendLogResult<ScheduleIndex> result = appendScheduleLog(e); int code = result.getCode(); if (MessageProducerCode.SUCCESS != code) { LOGGER.error("appendMessageLog schedule log error,log:{} {},code:{}", e.getSubject(), e.getMessageId(), code); throw new AppendException("appendScheduleLogError"); } func.apply(result.getAdditional()); }); bus.subscribe(Record.class, e -> { long checkpoint = e.getStartWroteOffset() + e.getRecordSize(); updateIterateOffset(checkpoint); });
After I fixed the bug and restarted the delay server, it could run correctly.
Please have a review and approve it, thanks.