Skip to content

Conversation

@arzonus
Copy link
Contributor

@arzonus arzonus commented Nov 14, 2025

What changed?
Added metrics to track shard assignment distribution and handover latency in the shard distributor service. This includes:

  • New ShardDistributorShardAssignmentDistributionLatency metric measuring time from shard assignment to distribution
  • New ShardDistributorShardHandoverLatency metric measuring handover time between executors
  • HandoverType enum (GRACEFUL/EMERGENCY) to distinguish handover types

Why?
To provide visibility into shard distribution performance and identify potential issues with shard handovers, particularly distinguishing between graceful and emergency handovers.

How did you test it?
Added comprehensive unit tests covering metric emission logic and shard statistics tracking.

Potential risks
Additional storage operations during heartbeat processing (mitigated by running metrics emission in background goroutine).

Release notes
Added shard handover latency metrics for monitoring distribution performance.

Documentation Changes
None required.

@arzonus arzonus changed the title feat(sharddistributor): add shard handover latency metrics feat(shard-distributor): add shard handover latency metrics Nov 14, 2025
@arzonus arzonus marked this pull request as draft November 18, 2025 08:47
@arzonus arzonus force-pushed the add-shard-handover-latency-metric branch 5 times, most recently from 53a34b4 to 096cd69 Compare November 24, 2025 12:18
@arzonus arzonus marked this pull request as ready for review November 24, 2025 12:21
Comment on lines +123 to +125
if len(newAssignedShardIDs) == 0 {
// no handovers happened, nothing to do
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(newAssignedShardIDs) == 0 {
// no handovers happened, nothing to do
return
}

Nit: This is not necessary, the loop below will just not run if the list is empty

@arzonus arzonus force-pushed the add-shard-handover-latency-metric branch from 096cd69 to b49bcb7 Compare November 27, 2025 13:44
@arzonus arzonus force-pushed the add-shard-handover-latency-metric branch from b49bcb7 to cb1371c Compare November 28, 2025 10:07
Comment on lines +1 to +6
package ptr

// ToPtr returns a pointer to the given value.
func ToPtr[T any](v T) *T {
return &v
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually already have this function in common/convert.go

Comment on lines +109 to +110
// emits metrics in background to not block the heartbeat response
h.emitShardAssignmentMetrics(request.Namespace, heartbeatTime, previousHeartbeat, assignedShards)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not done in the background I think. Did you mean to spawn a go-routine?

Comment on lines +133 to +140
// check if handover stats exist at all
isShardHandoverStatsExists := assignedState.ShardHandoverStats != nil

for _, shardID := range newAssignedShardIDs {
if !isShardHandoverStatsExists {
// no handover stats at all, means no handovers happened before
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading from nil returns empty, false so we don't need this check

Suggested change
// check if handover stats exist at all
isShardHandoverStatsExists := assignedState.ShardHandoverStats != nil
for _, shardID := range newAssignedShardIDs {
if !isShardHandoverStatsExists {
// no handover stats at all, means no handovers happened before
continue
}
for _, shardID := range newAssignedShardIDs {

Comment on lines +214 to +242
func filterNewlyAssignedShardIDs(previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) []string {
// if assignedState is nil, no shards are assigned
if assignedState == nil || len(assignedState.AssignedShards) == 0 {
return nil
}

// if previousHeartbeat is nil, all assigned shards are new
if previousHeartbeat == nil {
var newAssignedShardIDs = make([]string, len(assignedState.AssignedShards))

var i int
for assignedShardID := range assignedState.AssignedShards {
newAssignedShardIDs[i] = assignedShardID
i++
}

return newAssignedShardIDs
}

// find shards that are assigned now but were not reported in the previous heartbeat
var newAssignedShardIDs []string
for assignedShardID := range assignedState.AssignedShards {
if _, ok := previousHeartbeat.ReportedShards[assignedShardID]; !ok {
newAssignedShardIDs = append(newAssignedShardIDs, assignedShardID)
}
}

return newAssignedShardIDs
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this function hard to follow. Maybe we can do something like this:

Suggested change
func filterNewlyAssignedShardIDs(previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) []string {
// if assignedState is nil, no shards are assigned
if assignedState == nil || len(assignedState.AssignedShards) == 0 {
return nil
}
// if previousHeartbeat is nil, all assigned shards are new
if previousHeartbeat == nil {
var newAssignedShardIDs = make([]string, len(assignedState.AssignedShards))
var i int
for assignedShardID := range assignedState.AssignedShards {
newAssignedShardIDs[i] = assignedShardID
i++
}
return newAssignedShardIDs
}
// find shards that are assigned now but were not reported in the previous heartbeat
var newAssignedShardIDs []string
for assignedShardID := range assignedState.AssignedShards {
if _, ok := previousHeartbeat.ReportedShards[assignedShardID]; !ok {
newAssignedShardIDs = append(newAssignedShardIDs, assignedShardID)
}
}
return newAssignedShardIDs
}
func filterNewlyAssignedShardIDs(previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) []string {
if assignedState == nil || len(assignedState.AssignedShards) == 0 {
return nil
}
newAssignedShardIDs := make([]string, 0)
for assignedShardID := range assignedState.AssignedShards {
if previousHeartbeat == nil || !shardInReportedShards(previousHeartbeat.ReportedShards, assignedShardID) {
newAssignedShardIDs = append(newAssignedShardIDs, assignedShardID)
}
}
return newAssignedShardIDs
}
func shardInReportedShards(reportedShards map[string]*types.ShardStatusReport, shardID string) bool {
_, ok := reportedShards[shardID]
return ok
}

// find newly assigned shards, if there are none, no handovers happened
newAssignedShardIDs := filterNewlyAssignedShardIDs(previousHeartbeat, assignedState)
if len(newAssignedShardIDs) == 0 {
// no handovers happened, nothing to do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i don't think we need this comment

Comment on lines +701 to +707
for name, tc := range map[string]struct {
previousHeartbeat *store.HeartbeatState
assignedState *store.AssignedState

expectedDistributionLatency *time.Duration
expectedHandoverLatencies []*time.Duration
}{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the other tests in the file does a slightly different table test setup, lets keep it consistent.

Comment on lines +823 to +824
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not use ctrl anywhere

ctrl := gomock.NewController(t)
defer ctrl.Finish()

metricsClient := &metricmocks.Client{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets use tally.NewTestScope instead of the old testify mocks. E.g. like here: https://github.com/cadence-workflow/cadence/blob/master/service/sharddistributor/wrappers/metered/metered_test.go#L80

}
}

func TestEmitShardAssignmentMetrics(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider having some helpers to generate the AssignedShards and ShardHandoverStats. e.g:

func makeAssignedShards(shardIDs ...string) map[string]*types.ShardAssignment {
    m := make(map[string]*types.ShardAssignment, len(shardIDs))
    for _, id := range shardIDs {
        // Default to READY since that is the common case in your tests
        m[id] = &types.ShardAssignment{Status: types.AssignmentStatusREADY}
    }
    return m
}

expectedHandoverLatencies: nil,
},
"newly assigned shard with handover stats": {
previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previousHeartbeat is the same in every test. I think we should have a fixed value in the test code - we already tested the filter function in the other test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants