Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Add NATS async consumer #882

Open
matsuev opened this issue Sep 14, 2024 · 6 comments
Open

[feature] Add NATS async consumer #882

matsuev opened this issue Sep 14, 2024 · 6 comments

Comments

@matsuev
Copy link

matsuev commented Sep 14, 2024

Many projects use NATS as a distributed message bus between microservices.

NATS async consumer will allow for efficient interaction between services and Centrifugo.

You can try the fully working NATS consumer code here:

https://github.com/matsuev/centrifugo/tree/nats-consumer

It uses NATS server (version >= 2.10.0) and JetStream for guaranteed message delivery.

WorkQueue policy provides distributed processing of messages across multiple instances of Centrifugo (in a cluster architecture).

Automatic reconnection to the NATS server allows you to start the Centrifugo server before the NATS server is available (or restore the connection after a reboot NATS or Centrifugo).

Current limitations:

  • TLS connections are not implemented (feature)
  • Distributed delivery (in Centrifugo cluster) does not guarantee the order in which messages are processed
  • The maximum message size is limited by NATS server settings (see Configuring NATS Server)

Example of usage:

In the Centrifugo server config define NATS consumer

...
"consumers": [
	...
	{
		"name": "my_nats_consumer",
		"type": "nats",
		"nats": {
			"brokers": ["nats://token@hostname:4222","nats://user:password@hostname:4222", ...],
			"subjects": ["commands"],
			"stream_name": "CENTRIFUGO",
			"consumer_group": "group_name",
			"max_poll_records": 25,			// optional, default 100
			"disable_reconnect": false,		// optional, default false
			"heartbeat_interval": "10s",		// optional, default "5s"
			"create_stream_if_not_exist": true	// !!!EXPERIMENTAL!!!, default false
		}
	},
	... 
],
...

Important configuration options:

  • The "subjects" option must be identical for all consumers with the same "consumer_group"
  • The "create_stream_if_not_exist" option is for development purposes only and will be removed in the future. This option enables the creation of a stream in JetStream if it does not exist. I think this is a bad idea and you should initialize all streams in JetStream before connecting to them.

Enable and configure JetStream.

See Configuring JetStream

In application code you can send commands:

package main

import (
	...
	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
	...
)

func main() {
	...
	// Try to connect 
	nc, err := nats.Connect("nats://hostname:4222", nats.UserInfo("user", "password"))
	if err != nil {
		log.Fatalln("NATS connect error:", err)
	}
	defer func() { _ = nc.Drain() }()

	// Try to open JetStream 
	js, err := jetstream.New(nc)
	if err != nil {
		log.Fatalln("NATS create JetStream error:", err)
	}

	// Prepare command message (see Centrifugo API documentation)
	msg := map[string]any{
		"method": "publish",
		"payload": map[string]any{
			"channel": "news",
			"data": map[string]any{
				"message": "test message",
			},
		},
	}

	// Try to send message
	if data, err := json.Marshal(msg); err == nil {
		if _, err := js.Publish(ctx, "commands", data); err != nil {
			log.Println(err)
		}
	} else {
		log.Println(err)
	}
	...
}

@FZambia
Copy link
Member

FZambia commented Sep 16, 2024

Hello @matsuev

Thanks for bringing this. Do you personally have a real use case where using Nats Jetstream consumer would help? Could you please provide some details? Or it's a contribution which seems useful in general for Centrifugo users?

@matsuev
Copy link
Author

matsuev commented Sep 16, 2024

Hello @FZambia

I use Centrifugo server and NATS in my corporate messenger project. All backend microservices are interconnected via NATS. The NATS async consumer allows to send commands for Centrifugo without using additional protocols and connectors (such as HTTP or gRPC).

Additionally, I made a Centrifugo adapter that calls proxy endpoints via NATS rpc (I'll post the github link when we finish testing).

I'm planning to implement Centrifugo API endpoints via NATS RPC soon.

If these three components work reliably, then all interactions with Centrifugo can be organized through NATS.

@FZambia
Copy link
Member

FZambia commented Sep 17, 2024

Thanks for the explanation!

I think I'll ask you to rebase this work on top of the new configuration approach I am preparing for Centrifugo v6. And open PR after that for the review. It should happen soon, I hope within a month - will write here.

Regarding Nats RPC - this is very interesting, and I took a look myself on this before, but have not found an official approach from Nats team on how to organize RPC over Nats. So decided to postpone the implementation as relying on third-party solution seemed not a very robust way forward long-term for such an integration. What are you using for Nats RPC? Maybe things changed since I investigated this before.

@matsuev
Copy link
Author

matsuev commented Sep 17, 2024

@FZambia Thanks!

NATS supports RPC "out of the box" via the Request/Reply mechanism. Since NATS code is written in Go, it is very easy to integrate and maintain in GoLang projects (just like Centrifugo).

I'm experimenting with my "all in one" little server that embeds NATS and the Centrifuge library into a single binary application and connects them via the internal NATS connector (without opening network connections). So far I consider my experiments quite successful :-)

@FZambia
Copy link
Member

FZambia commented Sep 17, 2024

NATS supports RPC "out of the box" via the Request/Reply mechanism. Since NATS code is written in Go, it is very easy to integrate and maintain in GoLang projects (just like Centrifugo).

Yeah, sorry - I've meant whether there is sth on top of Nats raw RPC which can abstract communication a bit, last time I digged there was https://github.com/nats-rpc/nrpc - Centrifugo has Protobuf service definitions so building communication on top of that abstraction seemed as a simple way forward. And there were some Centrifugo users who already used nrpc in their projects. At the same time nrpc seems not actively maintained and it's not official – so at the end that stopped me from depending on it and proceed with implementation – as it can become outdated quickly and may require deprecation from Centrifugo side, while other protocols for proxy we have - HTTP and GRPC - are very simple to depend and be sure they will be actual for a reasonable time. That's sth to consider, ideally if there was an official solution from Nats team on top of Protobuf schema – that would be a no-brainer.

I'm experimenting with my "all in one" little server that embeds NATS and the Centrifuge library into a single binary application and connects them via the internal NATS connector (without opening network connections). So far I consider my experiments quite successful :-)

Will add to this a bit, since I also previously thought whether embedding Nats to Centrifugo would be a good thing. The thing I came to: it may be good for simple apps, but as an app grows in terms of logic or traffic - keeping things isolated seems a better approach from various design perspectives. One example, you need stateful set for Nats cluster, but just a simple deployment for Centrifugo nodes. So eventually you may end up deploying the app using separate flags and enable network communication anyway. And other things - like maintenance, possible breaking changes in internal interfaces, also add some headache here.

@matsuev
Copy link
Author

matsuev commented Sep 17, 2024

@FZambia Yes, you are right.

My project has the task of making a small autonomous service with a minimum of external components. Without scaling functionality.

In large projects with horizontal scaling, using HTTP or gRPC looks most preferable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
2 participants