Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .devcontainer/devcontainer.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM mcr.microsoft.com/vscode/devcontainers/go
RUN sudo chown -R vscode:golang /go
15 changes: 15 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "watermill-nats",

"build": {
"dockerfile": "./devcontainer.Dockerfile"
},

"customizations": {
"vscode": {
"extensions":[
"ms-azuretools.vscode-docker"
]
}
}
}
100 changes: 77 additions & 23 deletions pkg/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,54 @@ type PublisherConfig struct {
// Marshaler is marshaler used to marshal messages between watermill and wire formats
Marshaler Marshaler

// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup).
// Deprecated in favour of SubjectDetailGenerator
SubjectCalculator SubjectCalculator

// JetStream holds JetStream specific settings
JetStream JetStreamConfig

// Stream name is used for the overall stream. This will passed to the subject detail generator and can be used to generate subjects.
// If "" it will use individual topics for the streams by default.
StreamName string

//Config for the individual stream
StreamConfig StreamConfig

// SubjectDetailGenerator is a function used to generate a SubjectDetailer interface which is used to generate subjects.
SubjectDetailGenerator SubjectDetailGenerator
}

// PublisherPublishConfig is the configuration subset needed for an individual publish call
type PublisherPublishConfig struct {
// Marshaler is marshaler used to marshal messages between watermill and wire formats
Marshaler Marshaler

// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup).
// Deprecated in favour of SubjectDetailGenerator
SubjectCalculator SubjectCalculator

// JetStream holds JetStream specific settings
JetStream JetStreamConfig

// Stream name is used for the overall stream. This will passed to the subject detail generator and can be used to generate subjects.
// If "" it will use individual topics for the streams by default.
StreamName string

//Config for the individual stream
StreamConfig StreamConfig

// SubjectDetailGenerator is a function used to generate a SubjectDetailer interface which is used to generate subjects.
SubjectDetailGenerator SubjectDetailGenerator
}

func (c *PublisherConfig) setDefaults() {
if c.Marshaler == nil {
c.Marshaler = &NATSMarshaler{}
}
if c.SubjectCalculator == nil {
c.SubjectCalculator = DefaultSubjectCalculator

if c.SubjectCalculator == nil && c.SubjectDetailGenerator == nil {
c.SubjectDetailGenerator = NewDefaultSubjectDetailer
}
}

Expand All @@ -52,27 +75,30 @@ func (c PublisherConfig) Validate() error {
return errors.New("PublisherConfig.Marshaler is missing")
}

if c.SubjectCalculator == nil {
return errors.New("PublisherConfig.SubjectCalculator is missing")
if c.SubjectCalculator == nil && c.SubjectDetailGenerator == nil {
return errors.New("both PublisherConfig.SubjectCalculator and PublisherConfig.SubjectDetailGenerator are missing")
}
return nil
}

// GetPublisherPublishConfig gets the configuration subset needed for individual publish calls once a connection has been established
func (c PublisherConfig) GetPublisherPublishConfig() PublisherPublishConfig {
return PublisherPublishConfig{
Marshaler: c.Marshaler,
SubjectCalculator: c.SubjectCalculator,
JetStream: c.JetStream,
Marshaler: c.Marshaler,
SubjectCalculator: c.SubjectCalculator,
JetStream: c.JetStream,
StreamName: c.StreamName,
StreamConfig: c.StreamConfig,
SubjectDetailGenerator: c.SubjectDetailGenerator,
}
}

// Publisher provides the nats implementation for watermill publish operations
type Publisher struct {
conn Connection
config PublisherPublishConfig
logger watermill.LoggerAdapter
topicInterpreter *topicInterpreter
conn Connection
config PublisherPublishConfig
logger watermill.LoggerAdapter
streamManager *streamManager
}

// NewPublisher creates a new Publisher.
Expand All @@ -98,7 +124,7 @@ func NewPublisherWithNatsConn(conn *nats.Conn, config PublisherPublishConfig, lo
}

var connection Connection = conn
var interpreter *topicInterpreter
var manager *streamManager

if !config.JetStream.Disabled {
js, err := conn.JetStream(config.JetStream.ConnectOptions...)
Expand All @@ -109,40 +135,68 @@ func NewPublisherWithNatsConn(conn *nats.Conn, config PublisherPublishConfig, lo
return nil, err
}

interpreter = newTopicInterpreter(js, config.SubjectCalculator, "")
var detailer SubjectDetailer
if config.SubjectDetailGenerator != nil {
detailer = config.SubjectDetailGenerator(config.StreamName, "")
} else {
detailer = nil
}

manager, err = newStreamManager(
js,
config.StreamConfig,
detailer,
config.SubjectCalculator,
"",
)
if err != nil {
return nil, err
}

if config.JetStream.ShouldAutoProvision() {
err := manager.ensureStream()
if err != nil {
return nil, errors.Wrap(err, "Cannot provision")
}
}
}

return &Publisher{
conn: connection,
config: config,
logger: logger,
topicInterpreter: interpreter,
conn: connection,
config: config,
logger: logger,
streamManager: manager,
}, nil
}

// Publish publishes message to NATS.
//
// Publish will not return until an ack has been received from JetStream.
// When one of messages delivery fails - function is interrupted.
// If using StreamName, then topic being "" will use the streamName as the subject;
// otherwise will append the topic to the stream.
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
// This is now only required if a stream name is not returned by the underlying topic interpreter
// TODO: should we auto provision on publish? Need durable on publish options...
// should also cache this result to minimize chatter to broker
if p.config.JetStream.ShouldAutoProvision() {
err := p.topicInterpreter.ensureStream(topic)
if len(p.streamManager.StreamName()) > 0 && p.config.JetStream.ShouldAutoProvision() {
err := p.streamManager.ensureStreamForTopic(topic)
if err != nil {
return err
}
}

subject := p.streamManager.Subject(topic)

for _, msg := range messages {
messageFields := watermill.LogFields{
"message_uuid": msg.UUID,
"topic_name": topic,
"topic_name": subject,
}

p.logger.Trace("Publishing message", messageFields)

natsMsg, err := p.config.Marshaler.Marshal(topic, msg)
natsMsg, err := p.config.Marshaler.Marshal(subject, msg)
if err != nil {
return err
}
Expand Down
18 changes: 11 additions & 7 deletions pkg/nats/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@ import (

func TestPublisherConfig_Validate(t *testing.T) {
tests := []struct {
name string
marshaler Marshaler
subjectCalculator SubjectCalculator
wantErr bool
name string
marshaler Marshaler
subjectCalculator SubjectCalculator
subjectDetailGenerator SubjectDetailGenerator
wantErr bool
}{
{name: "OK", marshaler: &GobMarshaler{}, wantErr: false, subjectCalculator: DefaultSubjectCalculator},
{name: "Invalid - No Marshaler", marshaler: nil, wantErr: true, subjectCalculator: DefaultSubjectCalculator},
{name: "Invalid - No Subject Calculator", marshaler: &GobMarshaler{}, wantErr: true, subjectCalculator: nil},
{name: "Invalid - No Subject Calculator, no Subject Detailer", marshaler: &GobMarshaler{}, wantErr: true, subjectCalculator: nil, subjectDetailGenerator: nil},
{name: "Invalid - Subject Calculator, no Subject Detailer", marshaler: &GobMarshaler{}, wantErr: false, subjectCalculator: DefaultSubjectCalculator, subjectDetailGenerator: nil},
{name: "Invalid - No Subject Calculator, Subject Detailer", marshaler: &GobMarshaler{}, wantErr: false, subjectCalculator: nil, subjectDetailGenerator: NewDefaultSubjectDetailer},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &PublisherConfig{
SubjectCalculator: tt.subjectCalculator,
Marshaler: tt.marshaler,
SubjectCalculator: tt.subjectCalculator,
SubjectDetailGenerator: tt.subjectDetailGenerator,
Marshaler: tt.marshaler,
}

if tt.wantErr {
Expand Down
95 changes: 95 additions & 0 deletions pkg/nats/stream_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package nats

import (
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
)

// TODO: Add additional stream configuration here
type StreamConfig struct {
AllowRollup bool
}

type streamManager struct {
SubjectDetailer
config StreamConfig
js nats.JetStreamManager
}

func newStreamManager(
js nats.JetStreamManager,
config StreamConfig,
detailer SubjectDetailer,
formatter SubjectCalculator,
queueGroupPrefix string,
) (*streamManager, error) {
if detailer == nil {
//Look to remove queuegroupprefix and formatter from here when subjectcalculator is removed
var err error
detailer, err = newSubjectDetailFromCalculator(formatter, queueGroupPrefix)
if err != nil {
return nil, errors.Wrap(err, "No detailer supplied")
}
}

return &streamManager{
detailer,
config,
js,
}, nil
}

func (s *streamManager) ensureStream() error {
streamName := s.StreamName()
if len(streamName) > 0 {
return s.ensureStreamForStreamName(streamName)
}

return nil
}

func (s *streamManager) ensureStreamForTopic(topic string) error {
streamName := s.StreamName()
if len(streamName) > 0 {
return s.ensureStreamForStreamName(streamName)
} else {
return s.ensureStreamForStreamName(topic)
}
}

func (s *streamManager) ensureStreamForStreamName(streamName string) error {
info, err := s.js.StreamInfo(streamName)

if err != nil {
if errors.Is(err, nats.ErrStreamNotFound) {
// TODO: provision durable as well
// or simply provide override capability
// TODO: Ensure that stream names do not contain disallowed characters
_, err = s.js.AddStream(&nats.StreamConfig{
Name: streamName,
Description: "",
Subjects: s.AllSubjects(streamName),
AllowRollup: s.config.AllowRollup,
})

if err != nil {
return err
}
} else {
_, err = s.js.UpdateStream(&nats.StreamConfig{
Name: streamName,
Description: "",
Subjects: s.AllSubjects(streamName),
AllowRollup: s.config.AllowRollup,
})

if err != nil {
return err
}
}
} else if info.Config.AllowRollup != s.config.AllowRollup {

}

return nil
}
Loading