NATS Logo by Example

Pull Consumers in JetStream

A pull consumer allows for the application to fetch one or more messages on-demand using a subscription bound to the consumer. This allows the application to control the flow of the messages coming in so it can process and ack them in an appropriate amount of time.

A consumer can either be durable or ephemeral. A durable consumer will have its state tracked on the server, most importantly, the last acknowledged message from the client.

Ephemeral consumers are useful as one-off needs and are a bit cheaper in terms of resources and management. However, ephemerals do not (of course) persist after the primary subscriber unsubscribes. The server will automatically clean up (delete) the consumer after a period of time.

Since each subscription is fetching messages on-demand, multiple subscriptions can be create bound to the same pull consumer without any additional configuration. Each subscriber can fetch batches of messages and process them concurrently.

It is important to note that the messages in a given batch are ordered with respect to each other, but each subscriber will be handling a batch independently. If there is a need to have determinstic partitioning for scalable order processing, learn more here.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/pull-consumer/java
View the source code or learn how to run this example yourself

Code

package example;


import io.nats.client.*;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;


import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;


public class Main {
    public static void main(String[] args) {
        String natsURL = System.getenv("NATS_URL");
        if (natsURL == null) {
            natsURL = "nats://127.0.0.1:4222";
        }

Initialize a connection to the server. The connection is AutoCloseable on exit.

        try (Connection nc = Nats.connect(natsURL)) {

Access JetStream and JetStreamManagement which provide methods to create streams and consumers as well as convenience methods for publishing to streams and consuming messages from the streams.

            JetStream js = nc.jetStream();
            JetStreamManagement jsm = nc.jetStreamManagement();

Declare a simple stream and populate it with a few messages.

            String streamName = "EVENTS";
            StreamConfiguration config = StreamConfiguration.builder()
                    .name(streamName)
                    .subjects("events.>")
                    .build();


            StreamInfo stream = jsm.addStream(config);


            js.publish("events.1", null);
            js.publish("events.2", null);
            js.publish("events.3", null);

Create the consumer bound to the previously created stream. If durable name is not supplied, consumer will be removed after InactiveThreshold (defaults to 5 seconds) is reached when not actively consuming messages. name is optional, if not provided it will be auto-generated. For this example, let’s use the consumer with no options, which will be ephemeral with auto-generated name.

            StreamContext streamContext = js.getStreamContext(streamName);
            ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().build());

Messages can be consumed continuously in callback using consume method. consume can be supplied with various options, but for this example we will use the default ones. CountDownLatch is used as part of this example to make sure to stop processing after we process 3 messages (so that it does not interfere with other examples).

            CountDownLatch latch = new CountDownLatch(3);
            MessageHandler handler = msg -> {
                System.out.printf("Received msg on %s\n", msg.getSubject());
                msg.ack();
                latch.countDown();
            };


            try (MessageConsumer messageConsumer = consumerContext.consume(handler)) {
                latch.await();

Consume can be stopped by calling stop on the returned MessageConsumer. This will stop the MessageConsumer from asking for any more messages from the server. The consumer will finish all pull request already in progress, but will not start any new ones.

                messageConsumer.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }

Publish more messages.

            js.publish("events.1", null);
            js.publish("events.2", null);
            js.publish("events.3", null);

We can fetch messages in batches. The fetch can have a maximum number of messages and/or bytes that should be returned. For this first fetch we ask for two, and we will get those since they are in the stream.

            try (FetchConsumer fetchConsumer = consumerContext.fetchMessages(2)) {
                int count = 0;


                Message msg;
                while ((msg = fetchConsumer.nextMessage()) != null) {

Let’s ack the messages so they are not redelivered.

                    msg.ack();
                    count++;
                }


                System.out.printf("Got %d messages\n", count);
                fetchConsumer.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }

fetch returns the messages by calling nextMessage and will return null when the requested number of messages have been received or the operation times out. If we do not want to wait for the rest of the messages and want to quickly return as many messages as there are available (up to provided batch size), we can use noWait instead. Here, because we have already received two messages, we will only get one more.

            FetchConsumeOptions fetchConsumeOptions = FetchConsumeOptions.builder().noWait().build();
            try (FetchConsumer fetchConsumer = consumerContext.fetch(fetchConsumeOptions)) {
                int count = 0;


                Message msg;
                while ((msg = fetchConsumer.nextMessage()) != null) {
                    msg.ack();
                    count++;
                }


                System.out.printf("Got %d messages\n", count);
                fetchConsumer.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }

Finally, if we are at the end of the stream and we call fetch, the call will be blocked until the “expires in” time which is 30 seconds by default, but this can be set explicitly as an option.

            long start = System.currentTimeMillis();
            fetchConsumeOptions = FetchConsumeOptions.builder()
                    .expiresIn(Duration.ofSeconds(1).toMillis())
                    .build();


            try (FetchConsumer fetchConsumer = consumerContext.fetch(fetchConsumeOptions)) {
                int count = 0;


                Message msg;
                while ((msg = fetchConsumer.nextMessage()) != null) {
                    msg.ack();
                    count++;
                }


                Duration elapsed = Duration.ofMillis(System.currentTimeMillis() - start);
                System.out.printf("Got %d messages in %s\n", count, elapsed);
                fetchConsumer.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }

Durable consumers can be created by specifying the durable name. Durable consumers are not removed automatically regardless of the InactiveThreshold. They can be removed by calling deleteConsumer.

            ConsumerConfiguration durableConfig = ConsumerConfiguration.builder()
                    .durable("processor")
                    .build();
            ConsumerContext durableContext = streamContext.createOrUpdateConsumer(durableConfig);

Consume and fetch work the same way for durable consumers. But since we only want one message we can simply request the next message as well.

            Message msg = durableContext.next();
            System.out.printf("Received '%s' from durable consumer\n", msg.getSubject());

While ephemeral consumers will be removed after InactiveThreshold, durable consumers have to be removed explicitly if no longer needed.

            streamContext.deleteConsumer("processor");

Let’s try to get the consumer to make sure it’s gone.

            try {
                streamContext.getConsumerContext("processor");
            } catch (JetStreamApiException e) {
                System.out.printf("Consumer deleted: %s\n", e.getMessage());
            }
        } catch (InterruptedException | IOException | JetStreamApiException | JetStreamStatusCheckedException e) {
            e.printStackTrace();
        }
    }
}

Output

Received msg on events.1
Received msg on events.2
Received msg on events.3
Got 2 messages
Got 1 messages
Got 0 messages in PT1S
Received 'events.1' from durable consumer
Consumer deleted: consumer not found [10014]

Recording

Note, playback is half speed to make it a bit easier to follow.