-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO #15634
Conversation
just curious. Does it happens only if remote storage is enabled? According to the description:
It looks like the issue is existent even though we don't use remote storage. |
For normal topic, once the replica becomes leader. It is able to resolve/convert the highwatermark offset (log-start-offset) to metadata by reading the segment from disk and then it updates the high-watermark to either current-leader-log-end-offset (or) the lowest LEO of all the eligible-isr replicas. In case of remote topic, the replica will fail to resolve the highwatermark offset (log-start-offset) to metadata since the segment won't be in local-disk, and then fail continuously. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kamalcph for the PR, overall LGTM.
@@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, | |||
*/ | |||
@volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None | |||
|
|||
@volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can leave it at the earliest place for this field as it is not really needed for this change.
@volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None | ||
|
||
@volatile private[kafka] var _localLogStartOffset: Long = logStartOffset | ||
@volatile private var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(_localLogStartOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There won't be any effect with this change as _localLogStartOffset
is initialized with logStartOffset
. But it is good to keep _localLogStartOffset
for consistency and relevance of this field.
assertEquals(11L, log.logStartOffset) | ||
assertEquals(31L, log.localLogStartOffset()) | ||
|
||
// Truncating the logs to below the local-log-start-offset, should update the high watermark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to see covering the truncation scenarios also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kamalcph : Thanks for the PR. Left a question below.
* | ||
* @param highWatermarkMetadata the suggested high watermark with offset metadata | ||
* @return the updated high watermark offset | ||
*/ | ||
def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { | ||
val endOffsetMetadata = localLog.logEndOffsetMetadata | ||
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { | ||
new LogOffsetMetadata(logStartOffset) | ||
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, when will we set HWM to be lower than _localLogStartOffset?
In UnifiedLog.deletableSegments()
, we have the following code that bounds the retention based deletion by highWatermark. When updating highWatermark, the value typically increases.
val predicateResult = highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will we set HWM to be lower than _localLogStartOffset?
This can happen when recovering the partition due to ungraceful shutdown and the replication-offset-checkpoint file is missing/corrupted. When the broker comes online, HWM is set to to localLogStartOffset in UnifiedLog#updateLocalLogStartOffset, then we load the HWM from the checkpoint file in Partition#createLog.
If the HWM checkpoint file is missing / does not contain the entry for partition, then the default value of 0 is taken. If 0 < LogStartOffset (LSO), then LSO is assumed as HWM . Thus, the non-monotonic update of highwatermark from LLSO to LSO can happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if replication-offset-checkpoint is corrupted, HWM could temporarily be set to below local-log-start-offset. I am still trying to understand the impact of that. In the common case, the restarted broker can't become the leader or serve reads until it's caught up. At that time, the HWM will be up to date. In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address?
The jira also says:
If the high watermark is less than the local-log-start-offset, then the UnifiedLog#fetchHighWatermarkMetadata method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark.
However, the follower read is bounded by logEndOffset, not HWM? Where does the follower read need to convert HWM to metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address?
yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election:
Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark
Leader (1001):
KafkaApis.handleFetchRequest
ReplicaManager.fetchMessages
ReplicaManager.readFromLocalLog
Partition.fetchRecords
Partition.updateFollowerFetchState
Partition.maybeExpandIsr
Partition.submitAlterPartition
...
...
...
# If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data.
# parks the request in the DelayedFetchPurgatory
Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers.
KafkaController.processReplicaLeaderElection
KafkaController.onReplicaElection
PartitionStateMachine.handleStateChanges
PartitionStateMachine.doHandleStateChanges
PartitionStateMachine.electLeaderForPartitions
ControllerChannelManager.sendRequestsToBrokers
Replica 1002 got elected as Leader and have invalid highWatermark since it didn't process the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that in LeaderAndIsr request even if one partition fails, then the remaining partitions in that request won't be processed.
KafkaApis.handleLeaderAndIsrRequest
ReplicaManager.becomeLeaderOrFollower
ReplicaManager.makeLeaders
Partition.makeLeader
Partition.maybeIncrementLeaderHW
UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
UnifiedLog.fetchHighWatermarkMetadata
The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker 1002 becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION error-code to the producer.
During this time, if a follower sends the FETCH request to read from the current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the leader:
KafkaApis.handleFetchRequest
ReplicaManager.fetchMessages
ReplicaManager.readFromLog
Partition.fetchRecords
# readFromLocalLog
Partition.updateFollowerFetchState
Partition.maybeIncrementLeaderHW
LeaderLog.maybeIncrementHighWatermark
UnifiedLog.fetchHighWatermarkMetadata
UnifiedLog.convertToOffsetMetadataOrThrow
LocalLog.convertToOffsetMetadataOrThrow
LocalLog.read
# OffsetOutOfRangeException exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed explanation.
For the makeLeaders
path, it will call UnifiedLog.convertToOffsetMetadataOrThrow
. Within it, checkLogStartOffset(offset)
shouldn't throw OFFSET_OUT_OF_RANGE since we are comparing the offset with logStartOffset. Do you know which part throws OFFSET_OUT_OF_RANGE error?
For the follower fetch path, it's bounded by LogEndOffset
. So it shouldn't need to call UnifiedLog.fetchHighWatermarkMetadata
, right? The regular consumer will call UnifiedLog.fetchHighWatermarkMetadata
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kamalcph : Thanks for the explanation. I understand the problem now.
As for the fix, it seems that it could work for HWM. However, I am not sure that we could always do the same thing of LastStableOffset. For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect.
Here is another potential approach. Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. If occasionally OffsetMetadata is not available, we don't have to force an exception in convertToOffsetMetadataOrThrow(). Instead, we can leave the OffsetMetadata as empty and just use a conservative 1 byte for estimating the amount of available bytes. This approach will apply to both HWM and LSO. The inaccurate byte estimate will be ok as long as it's infrequent. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for suggesting the alternative approach. I'll check and comeback on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect.
I'm not clear on this:
- Segments that are eligible for upload to remote storage only when the
lastStableOffset
moves beyond the segment-to-be-uploaded-end-offset. - When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data.
- When
firstUnstableOffsetMetadata
is empty, we returnhighWatermark
. With this patch, thehighWatermark
lower boundary is set tolocalLogStartOffset
so there won't be an issue.
Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes.
The LogOffsetMetadata#onOlderSegment method is used in the hot-path of incrementing the high-watermark and expects the full metadata, otherwise it throws an error. Is it ok to remove the throwable from LogOffsetMetadata#onOlderSegment method and return false
when messageOffsetOnly
available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened #15825 a draft PR with the suggested approach. PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not clear on this:
- Segments that are eligible for upload to remote storage only when the lastStableOffset moves beyond the segment-to-be-uploaded-end-offset.
- When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data.
- When firstUnstableOffsetMetadata is empty, we return highWatermark. With this patch, the highWatermark lower boundary is set to localLogStartOffset so there won't be an issue.
That's true. It's just that that is yet another offset that we need to bound. I am also not sure if there are other side effects of adjusting HWM and LSO.
Left some comments on #15825.
Gentle bump to review the diff, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kamalcph : Thanks for the reply. A couple of more comments.
* | ||
* @param highWatermarkMetadata the suggested high watermark with offset metadata | ||
* @return the updated high watermark offset | ||
*/ | ||
def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { | ||
val endOffsetMetadata = localLog.logEndOffsetMetadata | ||
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { | ||
new LogOffsetMetadata(logStartOffset) | ||
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if replication-offset-checkpoint is corrupted, HWM could temporarily be set to below local-log-start-offset. I am still trying to understand the impact of that. In the common case, the restarted broker can't become the leader or serve reads until it's caught up. At that time, the HWM will be up to date. In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address?
The jira also says:
If the high watermark is less than the local-log-start-offset, then the UnifiedLog#fetchHighWatermarkMetadata method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark.
However, the follower read is bounded by logEndOffset, not HWM? Where does the follower read need to convert HWM to metadata?
// Updating the HW below the log-start-offset / local-log-start-offset is not allowed. HW should reset to local-log-start-offset. | ||
log.updateHighWatermark(new LogOffsetMetadata(5L)) | ||
assertHighWatermark(31L) | ||
// Updating the HW between log-start-offset and local-log-start-offset is not allowed. HW should reset to local-log-start-offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moving HW below local-log-start-offset, not log-start-offset.
I'm still trying to understand this issue, so please feel free to correct me
If we make |
yes, correct. |
@@ -1223,6 +1223,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, | |||
s"but we only have log segments starting from offset: $logStartOffset.") | |||
} | |||
|
|||
private def checkLocalLogStartOffset(offset: Long): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems reading records between [logStartOffset, localLogStartOffset] is dangerous since the segment won't be in local-disk. That is a bit chaos to me as UnifiedLog
presents a unified view of local and tiered log segment (
* A log which presents a unified view of local and tiered log segments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree on this. The checkLocalLogStartOffset
is used only in the convertToOffsetMetadataOrThrow
method which reads from local-disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kamalcph : Thanks for the explanation. Added a followup comment.
* | ||
* @param highWatermarkMetadata the suggested high watermark with offset metadata | ||
* @return the updated high watermark offset | ||
*/ | ||
def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { | ||
val endOffsetMetadata = localLog.logEndOffsetMetadata | ||
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { | ||
new LogOffsetMetadata(logStartOffset) | ||
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed explanation.
For the makeLeaders
path, it will call UnifiedLog.convertToOffsetMetadataOrThrow
. Within it, checkLogStartOffset(offset)
shouldn't throw OFFSET_OUT_OF_RANGE since we are comparing the offset with logStartOffset. Do you know which part throws OFFSET_OUT_OF_RANGE error?
For the follower fetch path, it's bounded by LogEndOffset
. So it shouldn't need to call UnifiedLog.fetchHighWatermarkMetadata
, right? The regular consumer will call UnifiedLog.fetchHighWatermarkMetadata
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kamalcph : Thanks for the explanation. Left another comment.
* | ||
* @param highWatermarkMetadata the suggested high watermark with offset metadata | ||
* @return the updated high watermark offset | ||
*/ | ||
def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { | ||
val endOffsetMetadata = localLog.logEndOffsetMetadata | ||
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { | ||
new LogOffsetMetadata(logStartOffset) | ||
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kamalcph : Thanks for the explanation. I understand the problem now.
As for the fix, it seems that it could work for HWM. However, I am not sure that we could always do the same thing of LastStableOffset. For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect.
Here is another potential approach. Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. If occasionally OffsetMetadata is not available, we don't have to force an exception in convertToOffsetMetadataOrThrow(). Instead, we can leave the OffsetMetadata as empty and just use a conservative 1 byte for estimating the amount of available bytes. This approach will apply to both HWM and LSO. The inaccurate byte estimate will be ok as long as it's infrequent. What do you think?
Pardon me. I'm a bit confused about this. Please feel free to correct me to help me catch up 😄 case 0: the checkpoint file is missing and the remote storage is disabledThe LSO is initialized to LLSO
so I can't understand why the non-monotonic update happens? After all, LLSO and LSO are the same in this scenario. case 1: the checkpoint file is missing and the remote storage is enabledThe LSO is initialzied to
And then HWM will be update to LLSO which is larger than zero.
And this could be a problem when Partition#createLog get called since the HWM is changed from LLSO (non-zero) to LSO (zero). Also, the incorrect HWM causes error in If I understand correctly, it seems the root cause is that "when the checkpoint files are not working, we will initialize a and so could we fix that by re-build
|
Sorry that the story I mentioned above seems be another issue. Let me have the summary about my thought.
|
Thanks @chia7712 for the review!
Most of the time when the follower joins the ISR, it updates the log-start-offset and high-watermark from the leader FETCH response. The issue can happen only when the follower gets elected as leader before updating it's state as mentioned in the summary/comments. When the
This is not an issue for normal topic. But for cluster enabled with remote-storage, if the issue happens even on 1 partition, then it starts to affect subset of topics. Controller batches the partitions in the LeaderAndIsr request. If the broker fails to process the LISR for one partition, then the remaining partition in that batch won't be processed. The producers producing to those topics will start receiving NOT_LEADER_FOR_PARTITION error. |
…offset and log-end-offset The high watermark should not go below the local-log-start offset. If the high watermark is less than the local-log-start-offset, then the UnifiedLog#fetchHighWatermarkMetadata method will throw OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark. This issue can happen when the partition undergoes recovery due to corruption in the checkpoint file and it gets elected as leader before it gets a chance to update the HW from the previous leader. The follower sends the FETCH request to the leader, the leader checks whether the isFollowerInSync, then expands the ISR. Also, parks the request in DelayedFetchPurgatory. If the replica was elected as leader before the fetch-response gets processed, then the new-leader will have wrong high-watermark.
Bound high-watermark offset between local-log-start-offset and log-end-offset:
The high watermark should not go below the local-log-start offset. If the high watermark is less than the local-log-start-offset, then the UnifiedLog#fetchHighWatermarkMetadata method will throw OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark.
This issue can happen when the partition undergoes recovery due to corruption in the checkpoint file and it gets elected as leader before it gets a chance to update the HW from the previous leader.
The follower sends the first FETCH request to the leader, the leader checks whether the isFollowerInSync, then expands the ISR. Also, parks the request in DelayedFetchPurgatory. If the replica was elected as leader before the fetch-response gets processed, then the new-leader will have wrong high-watermark.
Committer Checklist (excluded from commit message)