Skip to content

Commit 5dc1df9

Browse files
committed
the pinging for fixed namespaces now works
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent ae1cf2b commit 5dc1df9

File tree

8 files changed

+74
-21
lines changed

8 files changed

+74
-21
lines changed

cmd/sharddistributor-canary/main.go

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
11
package main
22

33
import (
4+
"fmt"
5+
"net"
46
"os"
57
"time"
68

79
"github.com/uber-go/tally"
810
"github.com/urfave/cli/v2"
911
"go.uber.org/fx"
1012
"go.uber.org/yarpc"
13+
"go.uber.org/yarpc/api/peer"
1114
"go.uber.org/yarpc/transport/grpc"
1215
"go.uber.org/zap"
1316

17+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
1418
"github.com/uber/cadence/common/clock"
1519
"github.com/uber/cadence/common/log"
1620
"github.com/uber/cadence/service/sharddistributor/canary"
1721
"github.com/uber/cadence/service/sharddistributor/canary/executors"
1822
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
23+
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
24+
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
1925
"github.com/uber/cadence/service/sharddistributor/config"
2026
"github.com/uber/cadence/tools/common/commoncli"
2127
)
@@ -25,6 +31,7 @@ const (
2531
defaultShardDistributorEndpoint = "127.0.0.1:7943"
2632
defaultFixedNamespace = "shard-distributor-canary"
2733
defaultEphemeralNamespace = "shard-distributor-canary-ephemeral"
34+
defaultCanaryGRPCPort = 7953 // Port for canary to receive ping requests
2835

2936
shardDistributorServiceName = "cadence-shard-distributor"
3037
)
@@ -33,11 +40,12 @@ func runApp(c *cli.Context) {
3340
endpoint := c.String("endpoint")
3441
fixedNamespace := c.String("fixed-namespace")
3542
ephemeralNamespace := c.String("ephemeral-namespace")
43+
canaryGRPCPort := c.Int("canary-grpc-port")
3644

37-
fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint)).Run()
45+
fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint, canaryGRPCPort)).Run()
3846
}
3947

40-
func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
48+
func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort int) fx.Option {
4149
configuration := clientcommon.Config{
4250
Namespaces: []clientcommon.NamespaceConfig{
4351
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeONBOARDED},
@@ -49,22 +57,51 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
4957
},
5058
}
5159

60+
canaryGRPCAddress := fmt.Sprintf("127.0.0.1:%d", canaryGRPCPort)
61+
62+
// Create listener for GRPC inbound
63+
listener, err := net.Listen("tcp", canaryGRPCAddress)
64+
if err != nil {
65+
panic(err)
66+
}
67+
5268
transport := grpc.NewTransport()
53-
yarpcConfig := yarpc.Config{
54-
Name: "shard-distributor-canary",
55-
Outbounds: yarpc.Outbounds{
56-
shardDistributorServiceName: {
57-
Unary: transport.NewSingleOutbound(endpoint),
58-
},
59-
},
69+
70+
executorMetadata := executorclient.ExecutorMetadata{
71+
clientcommon.GrpcAddressMetadataKey: canaryGRPCAddress,
6072
}
6173

6274
return fx.Options(
6375
fx.Supply(
6476
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
6577
fx.Annotate(clock.NewRealTimeSource(), fx.As(new(clock.TimeSource))),
66-
yarpcConfig,
6778
configuration,
79+
transport,
80+
executorMetadata,
81+
),
82+
83+
fx.Provide(func(peerChooser spectatorclient.SpectatorPeerChooserInterface) yarpc.Config {
84+
return yarpc.Config{
85+
Name: "shard-distributor-canary",
86+
Inbounds: yarpc.Inbounds{
87+
transport.NewInbound(listener), // Listen for incoming ping requests
88+
},
89+
Outbounds: yarpc.Outbounds{
90+
shardDistributorServiceName: {
91+
Unary: transport.NewSingleOutbound(endpoint),
92+
Stream: transport.NewSingleOutbound(endpoint),
93+
},
94+
// canary-to-canary outbound uses peer chooser to route to other canary instances
95+
"shard-distributor-canary": {
96+
Unary: transport.NewOutbound(peerChooser),
97+
Stream: transport.NewOutbound(peerChooser),
98+
},
99+
},
100+
}
101+
}),
102+
103+
fx.Provide(
104+
func(t *grpc.Transport) peer.Transport { return t },
68105
),
69106
fx.Provide(
70107
yarpc.NewDispatcher,
@@ -73,12 +110,17 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
73110
fx.Provide(zap.NewDevelopment),
74111
fx.Provide(log.NewLogger),
75112

113+
// Register canary procedures with dispatcher
114+
fx.Invoke(func(dispatcher *yarpc.Dispatcher, server sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCServer) {
115+
dispatcher.Register(sharddistributorv1.BuildShardDistributorExecutorCanaryAPIYARPCProcedures(server))
116+
}),
117+
76118
// Start the YARPC dispatcher
77119
fx.Invoke(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) {
78120
lc.Append(fx.StartStopHook(dispatcher.Start, dispatcher.Stop))
79121
}),
80122

81-
// Include the canary module
123+
// Include the canary module - it will set up spectator peer choosers and canary client
82124
canary.Module(canary.NamespacesNames{FixedNamespace: fixedNamespace, EphemeralNamespace: ephemeralNamespace, ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace, SharddistributorServiceName: shardDistributorServiceName}),
83125
)
84126
}
@@ -110,6 +152,11 @@ func buildCLI() *cli.App {
110152
Value: defaultEphemeralNamespace,
111153
Usage: "namespace for ephemeral shard creation testing",
112154
},
155+
&cli.IntFlag{
156+
Name: "canary-grpc-port",
157+
Value: defaultCanaryGRPCPort,
158+
Usage: "port for canary to receive ping requests",
159+
},
113160
},
114161
Action: func(c *cli.Context) error {
115162
runApp(c)

cmd/sharddistributor-canary/main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ import (
88
)
99

1010
func TestDependenciesAreSatisfied(t *testing.T) {
11-
assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint)))
11+
assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint, defaultCanaryGRPCPort)))
1212
}

service/sharddistributor/canary/module.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func opts(names NamespacesNames) fx.Option {
8484
)),
8585
fx.Provide(sharddistributorv1.NewFxShardDistributorExecutorCanaryAPIYARPCProcedures()),
8686

87-
fx.Invoke(func(lc fx.Lifecycle, chooser spectatorclient.SpectatorPeerChooserInterface, spectators spectatorclient.Spectators) {
87+
fx.Invoke(func(lc fx.Lifecycle, chooser spectatorclient.SpectatorPeerChooserInterface, spectators *spectatorclient.Spectators) {
8888
lc.Append(fx.Hook{
8989
OnStart: func(ctx context.Context) error {
9090
chooser.SetSpectators(spectators)

service/sharddistributor/canary/pinger/pingAndLog.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ import (
44
"context"
55
"time"
66

7-
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
8-
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
97
"go.uber.org/yarpc"
108
"go.uber.org/zap"
9+
10+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
11+
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
1112
)
1213

1314
const (
@@ -26,11 +27,13 @@ func PingShard(ctx context.Context, canaryClient sharddistributorv1.ShardDistrib
2627
response, err := canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, namespace))
2728
if err != nil {
2829
logger.Error("Failed to ping shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.Error(err))
30+
return
2931
}
3032

3133
// Verify response
3234
if !response.GetOwnsShard() {
3335
logger.Warn("Executor does not own shard", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
36+
return
3437
}
3538

3639
logger.Info("Successfully pinged shard owner", zap.String("namespace", namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))

service/sharddistributor/canary/processorephemeral/canary_client_mock_test.go

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/sharddistributor/client/spectatorclient/peer_chooser.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ import (
1313

1414
"github.com/uber/cadence/common/log"
1515
"github.com/uber/cadence/common/log/tag"
16+
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
1617
)
1718

1819
const (
19-
NamespaceHeader = "x-shard-distributor-namespace"
20-
grpcAddressMetadataKey = "grpc_address"
20+
NamespaceHeader = "x-shard-distributor-namespace"
2121
)
2222

2323
// SpectatorPeerChooserInterface extends peer.Chooser with SetSpectators method
@@ -127,7 +127,7 @@ func (c *SpectatorPeerChooser) Choose(ctx context.Context, req *transport.Reques
127127
}
128128

129129
// Extract GRPC address from owner metadata
130-
grpcAddress, ok := owner.Metadata[grpcAddressMetadataKey]
130+
grpcAddress, ok := owner.Metadata[clientcommon.GrpcAddressMetadataKey]
131131
if !ok || grpcAddress == "" {
132132
return nil, nil, yarpcerrors.InternalErrorf("no grpc_address in metadata for executor %s owning shard %s", owner.ExecutorID, req.ShardKey)
133133
}

service/sharddistributor/client/spectatorclient/peer_chooser_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.uber.org/yarpc/transport/grpc"
1313

1414
"github.com/uber/cadence/common/log/testlogger"
15+
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
1516
)
1617

1718
func TestSpectatorPeerChooser_Choose_MissingShardKey(t *testing.T) {
@@ -128,7 +129,7 @@ func TestSpectatorPeerChooser_Choose_Success(t *testing.T) {
128129
Return(&ShardOwner{
129130
ExecutorID: "executor-1",
130131
Metadata: map[string]string{
131-
grpcAddressMetadataKey: "127.0.0.1:7953",
132+
clientcommon.GrpcAddressMetadataKey: "127.0.0.1:7953",
132133
},
133134
}, nil)
134135

@@ -172,7 +173,7 @@ func TestSpectatorPeerChooser_Choose_ReusesPeer(t *testing.T) {
172173
Return(&ShardOwner{
173174
ExecutorID: "executor-1",
174175
Metadata: map[string]string{
175-
grpcAddressMetadataKey: "127.0.0.1:7953",
176+
clientcommon.GrpcAddressMetadataKey: "127.0.0.1:7953",
176177
},
177178
}, nil).Times(2)
178179

service/sharddistributor/leader/process/processor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ func (p *namespaceProcessor) identifyStaleExecutors(namespaceState *store.Namesp
256256

257257
for executorID, state := range namespaceState.Executors {
258258
if now.Sub(state.LastHeartbeat) > p.cfg.HeartbeatTTL {
259+
p.logger.Info("Executor has not reported a heartbeat recently", tag.ShardExecutor(executorID), tag.ShardNamespace(p.namespaceCfg.Name), tag.Value(state.LastHeartbeat))
259260
expiredExecutors[executorID] = namespaceState.ShardAssignments[executorID].ModRevision
260261
}
261262
}

0 commit comments

Comments
 (0)