-
Notifications
You must be signed in to change notification settings - Fork 869
feat(shard-distributor): add shard handover latency metrics #7442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(shard-distributor): add shard handover latency metrics #7442
Conversation
53a34b4 to
096cd69
Compare
| if len(newAssignedShardIDs) == 0 { | ||
| // no handovers happened, nothing to do | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if len(newAssignedShardIDs) == 0 { | |
| // no handovers happened, nothing to do | |
| return | |
| } |
Nit: This is not necessary, the loop below will just not run if the list is empty
096cd69 to
b49bcb7
Compare
b49bcb7 to
cb1371c
Compare
| package ptr | ||
|
|
||
| // ToPtr returns a pointer to the given value. | ||
| func ToPtr[T any](v T) *T { | ||
| return &v | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually already have this function in common/convert.go
| // emits metrics in background to not block the heartbeat response | ||
| h.emitShardAssignmentMetrics(request.Namespace, heartbeatTime, previousHeartbeat, assignedShards) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not done in the background I think. Did you mean to spawn a go-routine?
| // check if handover stats exist at all | ||
| isShardHandoverStatsExists := assignedState.ShardHandoverStats != nil | ||
|
|
||
| for _, shardID := range newAssignedShardIDs { | ||
| if !isShardHandoverStatsExists { | ||
| // no handover stats at all, means no handovers happened before | ||
| continue | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading from nil returns empty, false so we don't need this check
| // check if handover stats exist at all | |
| isShardHandoverStatsExists := assignedState.ShardHandoverStats != nil | |
| for _, shardID := range newAssignedShardIDs { | |
| if !isShardHandoverStatsExists { | |
| // no handover stats at all, means no handovers happened before | |
| continue | |
| } | |
| for _, shardID := range newAssignedShardIDs { |
| func filterNewlyAssignedShardIDs(previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) []string { | ||
| // if assignedState is nil, no shards are assigned | ||
| if assignedState == nil || len(assignedState.AssignedShards) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| // if previousHeartbeat is nil, all assigned shards are new | ||
| if previousHeartbeat == nil { | ||
| var newAssignedShardIDs = make([]string, len(assignedState.AssignedShards)) | ||
|
|
||
| var i int | ||
| for assignedShardID := range assignedState.AssignedShards { | ||
| newAssignedShardIDs[i] = assignedShardID | ||
| i++ | ||
| } | ||
|
|
||
| return newAssignedShardIDs | ||
| } | ||
|
|
||
| // find shards that are assigned now but were not reported in the previous heartbeat | ||
| var newAssignedShardIDs []string | ||
| for assignedShardID := range assignedState.AssignedShards { | ||
| if _, ok := previousHeartbeat.ReportedShards[assignedShardID]; !ok { | ||
| newAssignedShardIDs = append(newAssignedShardIDs, assignedShardID) | ||
| } | ||
| } | ||
|
|
||
| return newAssignedShardIDs | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this function hard to follow. Maybe we can do something like this:
| func filterNewlyAssignedShardIDs(previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) []string { | |
| // if assignedState is nil, no shards are assigned | |
| if assignedState == nil || len(assignedState.AssignedShards) == 0 { | |
| return nil | |
| } | |
| // if previousHeartbeat is nil, all assigned shards are new | |
| if previousHeartbeat == nil { | |
| var newAssignedShardIDs = make([]string, len(assignedState.AssignedShards)) | |
| var i int | |
| for assignedShardID := range assignedState.AssignedShards { | |
| newAssignedShardIDs[i] = assignedShardID | |
| i++ | |
| } | |
| return newAssignedShardIDs | |
| } | |
| // find shards that are assigned now but were not reported in the previous heartbeat | |
| var newAssignedShardIDs []string | |
| for assignedShardID := range assignedState.AssignedShards { | |
| if _, ok := previousHeartbeat.ReportedShards[assignedShardID]; !ok { | |
| newAssignedShardIDs = append(newAssignedShardIDs, assignedShardID) | |
| } | |
| } | |
| return newAssignedShardIDs | |
| } | |
| func filterNewlyAssignedShardIDs(previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) []string { | |
| if assignedState == nil || len(assignedState.AssignedShards) == 0 { | |
| return nil | |
| } | |
| newAssignedShardIDs := make([]string, 0) | |
| for assignedShardID := range assignedState.AssignedShards { | |
| if previousHeartbeat == nil || !shardInReportedShards(previousHeartbeat.ReportedShards, assignedShardID) { | |
| newAssignedShardIDs = append(newAssignedShardIDs, assignedShardID) | |
| } | |
| } | |
| return newAssignedShardIDs | |
| } | |
| func shardInReportedShards(reportedShards map[string]*types.ShardStatusReport, shardID string) bool { | |
| _, ok := reportedShards[shardID] | |
| return ok | |
| } |
| // find newly assigned shards, if there are none, no handovers happened | ||
| newAssignedShardIDs := filterNewlyAssignedShardIDs(previousHeartbeat, assignedState) | ||
| if len(newAssignedShardIDs) == 0 { | ||
| // no handovers happened, nothing to do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i don't think we need this comment
| for name, tc := range map[string]struct { | ||
| previousHeartbeat *store.HeartbeatState | ||
| assignedState *store.AssignedState | ||
|
|
||
| expectedDistributionLatency *time.Duration | ||
| expectedHandoverLatencies []*time.Duration | ||
| }{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the other tests in the file does a slightly different table test setup, lets keep it consistent.
| ctrl := gomock.NewController(t) | ||
| defer ctrl.Finish() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not use ctrl anywhere
| ctrl := gomock.NewController(t) | ||
| defer ctrl.Finish() | ||
|
|
||
| metricsClient := &metricmocks.Client{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets use tally.NewTestScope instead of the old testify mocks. E.g. like here: https://github.com/cadence-workflow/cadence/blob/master/service/sharddistributor/wrappers/metered/metered_test.go#L80
| } | ||
| } | ||
|
|
||
| func TestEmitShardAssignmentMetrics(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should consider having some helpers to generate the AssignedShards and ShardHandoverStats. e.g:
func makeAssignedShards(shardIDs ...string) map[string]*types.ShardAssignment {
m := make(map[string]*types.ShardAssignment, len(shardIDs))
for _, id := range shardIDs {
// Default to READY since that is the common case in your tests
m[id] = &types.ShardAssignment{Status: types.AssignmentStatusREADY}
}
return m
}| expectedHandoverLatencies: nil, | ||
| }, | ||
| "newly assigned shard with handover stats": { | ||
| previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previousHeartbeat is the same in every test. I think we should have a fixed value in the test code - we already tested the filter function in the other test
What changed?
Added metrics to track shard assignment distribution and handover latency in the shard distributor service. This includes:
ShardDistributorShardAssignmentDistributionLatencymetric measuring time from shard assignment to distributionShardDistributorShardHandoverLatencymetric measuring handover time between executorsHandoverTypeenum (GRACEFUL/EMERGENCY) to distinguish handover typesWhy?
To provide visibility into shard distribution performance and identify potential issues with shard handovers, particularly distinguishing between graceful and emergency handovers.
How did you test it?
Added comprehensive unit tests covering metric emission logic and shard statistics tracking.
Potential risks
Additional storage operations during heartbeat processing (mitigated by running metrics emission in background goroutine).
Release notes
Added shard handover latency metrics for monitoring distribution performance.
Documentation Changes
None required.