MQTT 5.0 and 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support.
- Documentation: https://hivemq.github.io/hivemq-mqtt-client/
- Community forum: https://community.hivemq.com/
- HiveMQ website: https://www.hivemq.com/
- Contribution guidelines: CONTRIBUTING.md
- License: LICENSE
- MQTT resources:
- All MQTT 3.1.1 and MQTT 5.0 features
- API flavors:
- Reactive: Reactive Streams compatible, RxJava and Reactor APIs available
- Asynchronous API: futures and callbacks
- Blocking API: for quick start and testing
- Switch flexibly between flavours and use them concurrently
- Flavours are clearly separated but have a consistent API style
- Backpressure support:
- QoS 1 and 2
- QoS 0 (dropping incoming messages, if necessary)
- Bringing MQTT flow control and reactive pull backpressure together
- Transports:
- TCP
- SSL/TLS
- All TLS versions up to TLS 1.3 are supported
- TLS mutual authentication
- TLS Server Name Indication (SNI)
- TLS Session Resumption
- Default and customizable hostname verification
- WebSocket, Secure WebSocket
- Proxy: SOCKS4, SOCKS5, HTTP CONNECT
- All possible combinations
- Automatic and configurable thread management
- Automatic and configurable reconnect handling and message redelivery
- Automatic and configurable resubscribe if the session expired
- Manual message acknowledgment
- Selectively enable manual acknowledgment for specific streams
- Acknowledge messages that are emitted to multiple streams independently per stream (the client aggregates the acknowledgments before sending MQTT acknowledgments)
- Order of manual acknowledgment does not matter (the client automatically ensures the order of MQTT acknowledgments for 100% compatibility with the MQTT specification)
- Lifecycle listeners
- When connected
- When disconnected or connection failed
- MQTT 5 specific:
- Pluggable Enhanced Authentication support (additional to MQTT specification: server-triggered re-authentication)
- Automatic Topic Alias mapping
- Interceptors for QoS flows
If you use the HiveMQ MQTT Client in a project that is not listed here, feel free to open an issue or pull request.
Java 8 or higher is required.
If you use Gradle, just include the following inside your build.gradle(.kts)
file.
dependencies {
implementation("com.hivemq:hivemq-mqtt-client:1.2.2")
}
For optional features you can choose to include additional modules:
dependencies {
implementation(platform("com.hivemq:hivemq-mqtt-client-websocket:1.2.2"))
implementation(platform("com.hivemq:hivemq-mqtt-client-proxy:1.2.2"))
implementation(platform("com.hivemq:hivemq-mqtt-client-epoll:1.2.2"))
implementation("com.hivemq:hivemq-mqtt-client-reactor:1.2.2")
}
If you use Maven, just include the following inside your pom.xml
file.
<project>
...
<dependencies>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.2.2</version>
</dependency>
</dependencies>
...
</project>
NOTE: You have to set the compiler version to 1.8
or higher.
<project>
...
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
...
</project>
For optional features you can choose to include additional modules:
<project>
...
<dependencies>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client-websocket</artifactId>
<version>1.2.2</version>
<type>pom</type>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client-proxy</artifactId>
<version>1.2.2</version>
<type>pom</type>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client-epoll</artifactId>
<version>1.2.2</version>
<type>pom</type>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client-reactor</artifactId>
<version>1.2.2</version>
</dependency>
</dependencies>
...
</project>
If you are experiencing problems with transitive dependencies, you can try the shaded version.
This version packs the transitive dependencies which are only used internal under a different package name.
The shaded version includes the websocket, proxy and epoll modules.
To use the shaded version just append -shaded
to the artifact name.
dependencies {
implementation("com.hivemq:hivemq-mqtt-client-shaded:1.2.2")
}
<project>
...
<dependencies>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client-shaded</artifactId>
<version>1.2.2</version>
</dependency>
</dependencies>
...
</project>
Snapshots can be obtained using JitPack.
repositories {
...
maven { url 'https://jitpack.io' }
}
dependencies {
implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client:develop-SNAPSHOT")
// snapshots for optional modules
implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-websocket:develop-SNAPSHOT"))
implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-proxy:develop-SNAPSHOT"))
implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-epoll:develop-SNAPSHOT"))
implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-reactor:develop-SNAPSHOT")
}
<project>
...
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.github.hivemq.hivemq-mqtt-client</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>develop-SNAPSHOT</version>
</dependency>
</dependencies>
<!-- snapshots for optional modules -->
<dependencies>
<dependency>
<groupId>com.github.hivemq.hivemq-mqtt-client</groupId>
<artifactId>hivemq-mqtt-client-websocket</artifactId>
<version>develop-SNAPSHOT</version>
<type>pom</type>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>com.github.hivemq.hivemq-mqtt-client</groupId>
<artifactId>hivemq-mqtt-client-proxy</artifactId>
<version>develop-SNAPSHOT</version>
<type>pom</type>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>com.github.hivemq.hivemq-mqtt-client</groupId>
<artifactId>hivemq-mqtt-client-epoll</artifactId>
<version>develop-SNAPSHOT</version>
<type>pom</type>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>com.github.hivemq.hivemq-mqtt-client</groupId>
<artifactId>hivemq-mqtt-client-reactor</artifactId>
<version>develop-SNAPSHOT</version>
</dependency>
</dependencies>
...
</project>
Change the artifact name to hivemq-mqtt-client-shaded
to get snapshots of the shaded version.
JitPack works for all branches and also specific commits.
Just specify <branch>-SNAPSHOT
or the first 10 digits of the commit id in the version.
- API and implementation are clearly separated. All classes inside
internal
packages must not be used directly. - The API is mostly fluent and uses fluent builders to create clients, configurations and messages.
- The API is designed to be consistent:
- The same principles are used throughout the library.
- The MQTT 3 and 5 interfaces are as consistent as possible with only version-specific differences.
Base classes: Mqtt3Client
, Mqtt5Client
Mqtt5Client client = MqttClient.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.useMqttVersion5()
.build();
Mqtt3Client client = MqttClient.builder()...useMqttVersion3().build();
Or if the version is known upfront:
Mqtt5Client client = Mqtt5Client.builder()...build();
Mqtt3Client client = Mqtt3Client.builder()...build();
For each API style exists a specific build...()
method.
Each API style has its own interface to separate them clearly. At any time it is possible to switch the API style.
- Builder method:
buildBlocking()
- Switch method:
client.toBlocking()
final Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildBlocking();
client.connect();
try (final Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
client.subscribeWith().topicFilter("test/topic").qos(MqttQos.AT_LEAST_ONCE).send();
publishes.receive(1, TimeUnit.SECONDS).ifPresent(System.out::println);
publishes.receive(100, TimeUnit.MILLISECONDS).ifPresent(System.out::println);
} finally {
client.disconnect();
}
Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildBlocking();
client.connect();
client.publishWith().topic("test/topic").qos(MqttQos.AT_LEAST_ONCE).payload("1".getBytes()).send();
client.disconnect();
client.connect();
Or with customized properties of the Connect message:
client.connectWith().keepAlive(10).send();
Or with pre-built Connect message:
Mqtt5Connect connectMessage = Mqtt5Connect.builder().keepAlive(10).build();
client.connect(connectMessage);
client.publishWith()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("payload".getBytes())
.send();
Or with pre-built Publish message:
Mqtt5Publish publishMessage = Mqtt5Publish.builder()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("payload".getBytes())
.build();
client.publish(publishMessage);
client.subscribeWith().topicFilter("test/topic").qos(MqttQos.EXACTLY_ONCE).send();
Or with pre-built Subscribe message:
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.build();
client.subscribe(subscribeMessage);
client.unsubscribeWith().topicFilter("test/topic").send();
Or with pre-built Unsubscribe message:
Mqtt5Unsubscribe unsubscribeMessage = Mqtt5Unsubscribe.builder().topicFilter("test/topic").build();
client.unsubscribe(unsubscribeMessage);
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
Mqtt5Publish publishMessage = publishes.receive();
// or with timeout
Optional<Mqtt5Publish> publishMessage = publishes.receive(10, TimeUnit.SECONDS);
// or without blocking
Optional<Mqtt5Publish> publishMessage = publishes.receiveNow();
}
publishes
must be called before subscribe
to ensure no message is lost.
It can be called before connect
to receive messages of a previous session.
client.disconnect();
Or with customized properties of the DISCONNECT message (only MQTT 5):
client.disconnectWith().reasonString("test").send();
Or with pre-built Disconnect message (only MQTT 5):
Mqtt5Disconnect disconnectMessage = Mqtt5Disconnect.builder().reasonString("test").build();
client.disconnect(disconnectMessage);
client.reauth();
- Builder method:
buildAsync()
- Switch method:
client.toAsync()
Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildBlocking();
client.connect();
client.toAsync().subscribeWith()
.topicFilter("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.callback(System.out::println)
.send();
Mqtt5AsyncClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildAsync();
client.connect()
.thenCompose(connAck -> client.publishWith().topic("test/topic").payload("1".getBytes()).send())
.thenCompose(publishResult -> client.disconnect());
connect()
, connectWith()
and connect(Mqtt3/5Connect)
method calls are analog to the Blocking API but return
CompletableFuture
.
publishWith()
and publish(Mqtt3/5Publish)
method calls are analog to the Blocking API but return
CompletableFuture
.
subscribeWith()
and subscribe(Mqtt3/5Subscribe)
method calls are analog to the Blocking API but return
CompletableFuture
.
Additionally messages can be consumed per subscribe:
client.subscribeWith()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.callback(System.out::println)
.executor(executor) // optional
.send();
Or with pre-built Subscribe message:
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.build();
client.subscribe(subscribeMessage, System.out::println);
client.subscribe(subscribeMessage, System.out::println, executor);
unsubscribeWith()
and unsubscribe(Mqtt3/5Unsubscribe)
method calls are analog to the Blocking API but return
CompletableFuture
.
Messages can either be consumed per subscribe (described above) or globally:
client.publishes(MqttGlobalPublishFilter.ALL, System.out::println);
Or with executing the callback on a specified executor:
client.publishes(MqttGlobalPublishFilter.ALL, System.out::println, executor);
publishes
must be called before subscribe
to ensure no message is lost.
It can be called before connect
to receive messages of a previous session.
disconnect()
, disconnectWith()
and disconnect(Mqtt5Disconnect)
method calls are analog to the Blocking API but
return CompletableFuture
.
reauth()
method call is analog to the Blocking API but returns CompletableFuture
.
- Builder method:
buildRx()
- Switch method:
client.toRx()
Mqtt5RxClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildRx();
// As we use the reactive API, the following line does not connect yet, but returns a reactive type.
// e.g. Single is something like a lazy and reusable future. Think of it as a source for the ConnAck message.
Single<Mqtt5ConnAck> connAckSingle = client.connect();
// Same here: the following line does not subscribe yet, but returns a reactive type.
// FlowableWithSingle is a combination of the single SubAck message and a Flowable of Publish messages.
// A Flowable is an asynchronous stream that enables backpressure from the application over the client to the broker.
FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> subAckAndMatchingPublishes = client.subscribeStreamWith()
.topicFilter("a/b/c").qos(MqttQos.AT_LEAST_ONCE)
.addSubscription().topicFilter("a/b/c/d").qos(MqttQos.EXACTLY_ONCE).applySubscription()
.applySubscribe();
// The reactive types offer many operators that will not be covered here.
// Here we register callbacks to print messages when we received the CONNACK, SUBACK and matching PUBLISH messages.
Completable connectScenario = connAckSingle
.doOnSuccess(connAck -> System.out.println("Connected, " + connAck.getReasonCode()))
.doOnError(throwable -> System.out.println("Connection failed, " + throwable.getMessage()))
.ignoreElement();
Completable subscribeScenario = subAckAndMatchingPublishes
.doOnSingle(subAck -> System.out.println("Subscribed, " + subAck.getReasonCodes()))
.doOnNext(publish -> System.out.println(
"Received publish" + ", topic: " + publish.getTopic() + ", QoS: " + publish.getQos() +
", payload: " + new String(publish.getPayloadAsBytes())))
.ignoreElements();
// Reactive types can be easily and flexibly combined
connectScenario.andThen(subscribeScenario).blockingAwait();
Mqtt5RxClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildRx();
// As we use the reactive API, the following line does not connect yet, but returns a reactive type.
Completable connectScenario = client.connect()
.doOnSuccess(connAck -> System.out.println("Connected, " + connAck.getReasonCode()))
.doOnError(throwable -> System.out.println("Connection failed, " + throwable.getMessage()))
.ignoreElement();
// Fake a stream of Publish messages with an incrementing number in the payload
Flowable<Mqtt5Publish> messagesToPublish = Flowable.range(0, 10_000)
.map(i -> Mqtt5Publish.builder()
.topic("a/b/c")
.qos(MqttQos.AT_LEAST_ONCE)
.payload(("test " + i).getBytes())
.build())
// Emit 1 message only every 100 milliseconds
.zipWith(Flowable.interval(100, TimeUnit.MILLISECONDS), (publish, i) -> publish);
// As we use the reactive API, the following line does not publish yet, but returns a reactive type.
Completable publishScenario = client.publish(messagesToPublish)
.doOnNext(publishResult -> System.out.println(
"Publish acknowledged: " + new String(publishResult.getPublish().getPayloadAsBytes())))
.ignoreElements();
// As we use the reactive API, the following line does not disconnect yet, but returns a reactive type.
Completable disconnectScenario = client.disconnect().doOnComplete(() -> System.out.println("Disconnected"));
// Reactive types can be easily and flexibly combined
connectScenario.andThen(publishScenario).andThen(disconnectScenario).blockingAwait();
connect()
, connectWith()
and connect(Mqtt3/5Connect)
method calls are analog to the Async and Blocking API but
return Single<ConnAck>
.
publish
takes a reactive stream of Publish messages (Flowable
) and returns a reactive stream of Publish results
(Flowable
).
The Reactive API is usually not used for publishing single messages. Nevertheless it is possible with the following code.
Single<Mqtt5PublishResult> result =
client.publish(Flowable.just(Mqtt5Publish.builder()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("payload".getBytes())
.build())).singleOrError();
subscribeWith()
and subscribe(Mqtt3/5Subscribe)
method calls are analog to the Async and Blocking API but return
Single<SubAck>
.
Additionally messages can be consumed per subscribe:
Flowable<Mqtt5Publish> result =
client.subscribeStreamWith()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.applySubscribe()
.doOnSingle(subAck -> System.out.println("subscribed"))
.doOnNext(publish -> System.out.println("received publish"));
Or with pre-built Subscribe message:
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.build();
Flowable<Mqtt5Publish> result =
client.subscribeStreamWith(subscribeMessage)
.doOnSingle(subAck -> System.out.println("subscribed"))
.doOnNext(publish -> System.out.println("received publish"));
unsubscribeWith()
and unsubscribe(Mqtt3/5Unsubscribe)
method calls are analog to the Async and Blocking API but
return Single<UnsubAck>
.
Messages can either be consumed per subscribe (described above) or globally:
Flowable<Mqtt5Publish> result =
client.publishes(MqttGlobalPublishFilter.ALL).doOnNext(System.out::println);
publishes
must be called before subscribe
to ensure no message is lost.
It can be called before connect
to receive messages of a previous session.
disconnect()
, disconnectWith()
and disconnect(Mqtt5Disconnect)
method calls are analog to the Async and Blocking
API but return Completable
.
reauth()
method call is analog to the Async and Blocking API but returns Completable
.
Semantic Versioning is used.
All code inside com.hivemq.client.internal
packages must not be used directly. It can change at any time and is not
part of the public API.
Interfaces annotated with DoNotImplement
must not be implemented. The implementation is provided by the library.
This allows the library to later add methods to the interface without breaking backwards compatibility with implementing
classes.
If you want to contribute to HiveMQ MQTT Client, see the contribution guidelines.
HiveMQ MQTT Client is licensed under the APACHE LICENSE, VERSION 2.0
. A copy of the license can be found here.