Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.
- Installation
- Usage
- Migrating from
fs2-google-pubsub
- Contributors to this project
Add the following line to your build.sbt
file:
libraryDependencies += "com.permutive" %% "fs2-pubsub" % "1.1.0"
The library is published for Scala versions: 2.12
, 2.13
and 3
.
To start using the library, you'll need an http4s Client
with permission to
call Pub/Sub APIs in GCP. You can create one using gcp-auth
:
import org.http4s.ember.client.EmberClientBuilder
import cats.effect.IO
import cats.syntax.all._
import com.permutive.gcp.auth.TokenProvider
val client = EmberClientBuilder
.default[IO]
.withHttp2
.build
.mproduct(client => TokenProvider.userAccount(client).toResource)
.map { case (client, tokenProvider) => tokenProvider.clientMiddleware(client) }
To publish messages to Pub/Sub, you can use the PubsubPublisher
class:
import fs2.pubsub._
val publisher: PubSubPublisher[IO, String] = PubSubPublisher
.http[IO, String]
.projectId(ProjectId("my-project"))
.topic(Topic("my-topic"))
.defaultUri
.httpClient(client)
.noRetry
Then you can use any of the PubSubPublisher
methods to send messages to Pub/Sub.
// Producing a single message
publisher.publishOne("message")
// Producing multiple messages
val records = List(
PubSubRecord.Publisher("message1"),
PubSubRecord.Publisher("message2"),
PubSubRecord.Publisher("message3")
)
publisher.publishMany(records)
// Producing a message with attributes
publisher.publishOne("message", "key" -> "value")
// Producing a message using the record type
val record = PubSubRecord.Publisher("message").withAttribute("key", "value")
publisher.publishOne(record)
There are several configuration options available for the publisher:
projectId
: The GCP project ID.topic
: The Pub/Sub topic name.uri
: The URI of the Pub/Sub API. By default, it uses the Google Cloud Pub/Sub API.httpClient
: The http4sClient
to use for making requests to the Pub/Sub API.retry
: The retry policy to use when sending messages to Pub/Sub. By default, it retries up to 3 times with exponential backoff.
These configurations can either by provided by using a configuration object
(PubSubPublisher.Config
) or by using the builder pattern.
You can use PubSubPublisher.grpc
to create a publisher that uses gRPC to connect
to Pub/Sub.
This type of publisher is only available on Scala 2.13
or 3.x
.
In order to publish messages asynchronously, you can use the PubSubPublisher.Async
.
You can create an instance of this class from a regular PubSubPublisher
by using the
batching
method:
import cats.effect.Resource
import scala.concurrent.duration._
val asyncPublisher: Resource[IO, PubSubPublisher.Async[IO, String]] =
publisher
.batching
.batchSize(10)
.maxLatency(1.second)
Then you can use any of the PubSubPublisher.Async
methods to send messages to Pub/Sub.
These methods are the same ones you'll find in the regular PubSubPublisher
, with
the difference that they return a F[Unit]
instead of a F[MessageId]
and that
they expect a PubSubRecord.Publisher.WithCallback
instead of a regular
PubSubRecord.Publisher
.
In order to construct such class you can either use the PubSubRecord.Publisher.WithCallback
constructor or use the withCallback
method on a regular PubSubRecord.Publisher
:
val recordWithCallback = PubSubRecord.Publisher("message").withCallback { _ =>
IO(println("Message sent!"))
}
To subscribe to a Pub/Sub subscription, you can use the PubSubSubscriber
class:
import fs2.Stream
val subscriber: Stream[IO, Option[String]] = PubSubSubscriber
.http[IO]
.projectId(ProjectId("my-project"))
.subscription(Subscription("my-subscription"))
.defaultUri
.httpClient(client)
.noRetry
.noErrorHandling
.withDefaults
.decodeTo[String]
.subscribeAndAck
There are several configuration options available for the subscriber:
projectId
: The GCP project ID.subscription
: The Pub/Sub subscription name.uri
: The URI of the Pub/Sub API. By default, it uses the Google Cloud Pub/Sub API.httpClient
: The http4sClient
to use for making requests to the Pub/Sub API.retry
: The retry policy to use when receiving messages from Pub/Sub. By default, it retries up to 3 times with exponential backoff.errorHandling
: The error handling policy to use when performing operations such as decoding messages or acknowledging them.batchSize
: The maximum number of messages to acknowledge at once.maxLatency
: The maximum time to wait for a batch of messages before acknowledging them.maxMessages
: The maximum number of messages to receive in a single batch.readConcurrency
: The number of concurrent reads from the subscription.
These configurations can either by provided by using a configuration object
(PubSubSubscriber.Config
) or by using the builder pattern.
You can use PubSubSubscriber.grpc
to create a subscriber that uses gRPC to connect
to Pub/Sub.
This type of subscriber is only available on Scala 2.13
or 3.x
.
There are two types of subscribers available in the library: raw and decoded.
The raw subscriber returns the raw message received from Pub/Sub, while the decoded subscriber decodes the message to a specific type.
The former is useful when you want to handle the message yourself, while the
latter is useful when you want to work with a specific type. You can create
a raw subscriber by using the raw
method instead of decodeTo
.
The library provides a way to load the configuration from a ConfigSource
using
pureconfig
.
You just need to add the following line to your build.sbt
file:
libraryDependencies += "com.permutive" %% "fs2-pubsub-pureconfig" % "1.1.0"
And then add the following import when you want to use the pureconfig
integration:
import pureconfig.ConfigSource
import fs2.pubsub.PubSubPublisher
import fs2.pubsub.pureconfig._
val config = ConfigSource.default.loadOrThrow[PubSubPublisher.Config]
PubSubPublisher
.http[IO, String]
.fromConfig(config)
.httpClient(client)
.noRetry
The most important thing you need to take into account while migrating is that
the library no longer creates an authenticated Client
for you. You need to
provide one yourself using permutive-engineering/gcp-auth
.
You can use the following table to
find the equivalent classes and methods in fs2-pubsub
:
CremboC | bastewart | TimWSpence | travisbrown | alejandrohdezma | ChristianJohnston97 | janstenpickle |
chrisjl154 | marcelocarlos | desbo | kythyra | mcgizzle | istreeter | Joe8Bit |
arunas-cesonis |