Skip to content

Pub/Sub Messaging

Valkey GLIDE PubSub aims to unify various nuances into a coherent interface, minimizing differences between Sharded, Cluster, and Standalone configurations. Additionally, GLIDE is responsible for tracking topology changes in real time, ensuring the client remains subscribed regardless of any connectivity issues.

This guide will go over how to configure GLIDE’s PubSub system.

GLIDE uses the RESP3 protocol, which allows PubSub subscriptions and regular commands to coexist on the same client without issues. However, for high-throughput PubSub workloads, consider using a dedicated client for subscriptions to reduce latency on regular commands.

To publish a message, create a separate client to your Valkey instance. Both Standalone and Cluster are supported.

GlideClientConfiguration publisherConfig = GlideClientConfiguration.builder()
.address(NodeAddress.builder().port(6379).build())
.build();
try (var publisher = GlideClient.createClient(publisherConfig).get()) {
// Publish message on 'ch1' channel
publisher.publish("Test message", "ch1").get();
}

GLIDE provides two ways to subscribe to channels: dynamic subscriptions via API calls at runtime, and config-based subscriptions defined at client creation time.

Starting with GLIDE 2.3, you can subscribe to channels and patterns at any point during the client’s lifetime. Dynamic subscriptions come in two variants:

  • Blocking: Waits for server confirmation before returning. Accepts a timeout_ms parameter (0 = wait indefinitely).
  • Non-blocking (lazy): Updates the desired subscription state and returns immediately. The subscription is applied asynchronously by the background synchronizer.

No special configuration is needed at client creation time — you can subscribe dynamically on any client. Messages are buffered and retrievable via polling by default. If you want callback-based delivery instead (explained below), provide a subscription configuration with a callback at client creation time (the initial channel set can be empty).

// Create a regular client — no subscription configuration needed
GlideClientConfiguration config = GlideClientConfiguration.builder()
.address(NodeAddress.builder().port(6379).build())
.requestTimeout(3000)
.build();
try (var client = GlideClient.createClient(config).get()) {
// --- Exact channel subscriptions ---
// Blocking: waits up to 5000ms for server confirmation
client.subscribe(Set.of("news", "updates"), 5000).get();
// Non-blocking (lazy): returns immediately, subscribes in background
client.subscribeLazy(Set.of("alerts")).get();
// --- Pattern subscriptions ---
// Blocking
client.psubscribe(Set.of("chat*", "event*"), 5000).get();
// Non-blocking (lazy)
client.psubscribeLazy(Set.of("log*")).get();
// --- Sharded subscriptions (cluster mode only) ---
// var clusterClient = GlideClusterClient.createClient(clusterConfig).get();
// clusterClient.ssubscribe(Set.of("shard-ch1"), 5000).get();
// clusterClient.ssubscribeLazy(Set.of("shard-ch2")).get();
// Retrieve messages via polling
PubSubMessage msg = client.getPubSubMessage().get();
} // remaining subscriptions cleaned up on close

For GLIDE versions prior to 2.3, or when your subscriptions are known at startup, you can define them in the client configuration. These subscriptions are applied immediately when the client connects.

// Define callback and context
MessageCallback callback =
(msg, ctx) -> System.out.printf("Received %s, context %s\n", msg, ctx);
String context = "example";
// Configure the client to invoke the callback for messages published
// to 'ch1' and 'ch2' and to channels matched by 'chat*' glob pattern.
StandaloneSubscriptionConfiguration subscriptionConfig = StandaloneSubscriptionConfiguration.builder()
.subscription(EXACT, "ch1")
.subscription(EXACT, "ch2")
.subscription(PATTERN, "chat*")
.callback(callback, context)
.build()
GlideClientConfiguration config = GlideClientConfiguration.builder()
.address(NodeAddress.builder().port(6379).build())
.requestTimeout(3000)
.subscriptionConfiguration(subscriptionConfig)
.build();
try (var listeningClient = GlideClient.createClient(config).get()) {
// Do some work/wait - the callback will be invoked on incoming messages
} // Unsubscribe happens here

There are two approaches: callback-based (messages are pushed to your function automatically) and polling-based (you pull messages when ready). The approach is determined by whether a callback is provided in the subscription configuration.

To use callback-based delivery, provide a subscription configuration with a callback at client creation time. The callback fires for every incoming message — from both config-based and dynamic subscriptions.

List<String> received = Collections.synchronizedList(new ArrayList<>());
MessageCallback callback = (msg, context) -> {
received.add(msg.getMessage());
System.out.printf("Received '%s' on '%s'\n", msg.getMessage(), msg.getChannel());
};
GlideClientConfiguration config = GlideClientConfiguration.builder()
.address(NodeAddress.builder().port(6379).build())
.requestTimeout(3000)
.subscriptionConfiguration(StandaloneSubscriptionConfiguration.builder()
.callback(callback)
.build())
.build();
try (var client = GlideClient.createClient(config).get()) {
// Subscribe dynamically — messages are delivered to the callback
client.subscribe(Set.of("news"), 5000).get();
// Publish a message (from another client)
publishingClient.publish("Hello!", "news").get();
Thread.sleep(500);
// Verify the callback received the message
assert received.contains("Hello!");
}

If no callback is configured, messages are buffered in an unbounded queue. You retrieve them using:

  • Async wait: getPubSubMessage() / get_pubsub_message() — blocks until a message is available.
  • Non-blocking poll: tryGetPubSubMessage() / try_get_pubsub_message() — returns the next message or null/None immediately.
// Configure the client to receive messages published to 'ch1'
// and 'ch2' and to channels matched by 'chat*' glob pattern.
StandaloneSubscriptionConfiguration subscriptionConfig = StandaloneSubscriptionConfiguration.builder()
.subscription(EXACT, Set.of("ch1", "ch2"))
.subscription(PATTERN, "chat*")
.build();
GlideClientConfiguration config = GlideClientConfiguration.builder()
.address(NodeAddress.builder().port(6379).build())
.subscriptionConfiguration(subscriptionConfig)
.build();
try (var listeningClient = GlideClient.createClient(config).get()) {
// Non-blocking: returns null if no message is available
Message msg = listeningClient.tryGetPubSubMessage();
// Async: waits for the next message
Message msg = listeningClient.getPubSubMessage().get();
} // Unsubscribe happens here

Starting with GLIDE 2.3, you can unsubscribe from channels and patterns at runtime. Like subscribing, unsubscribing comes in blocking and non-blocking (lazy) variants. This works for both config-based and dynamically subscribed channels.

To unsubscribe from all channels/patterns of a given type, pass None/null/empty, or use the provided constants: ALL_CHANNELS, ALL_PATTERNS, ALL_SHARDED_CHANNELS.

// Unsubscribe from specific exact channels (blocking)
client.unsubscribe(Set.of("news"), 5000).get();
// Unsubscribe from specific exact channels (lazy)
client.unsubscribeLazy(Set.of("alerts")).get();
// Unsubscribe from all exact channels
client.unsubscribe(PubSubBaseCommands.ALL_CHANNELS, 5000).get();
// Unsubscribe from specific patterns (blocking)
client.punsubscribe(Set.of("chat*"), 5000).get();
// Unsubscribe from specific patterns (lazy)
client.punsubscribeLazy(Set.of("log*")).get();
// Unsubscribe from all patterns
client.punsubscribe(PubSubBaseCommands.ALL_PATTERNS, 5000).get();
// Sharded unsubscribe (cluster mode only)
// clusterClient.sunsubscribe(Set.of("shard-ch1"), 5000).get();
// clusterClient.sunsubscribeLazy(Set.of("shard-ch2")).get();
// clusterClient.sunsubscribe(PubSubClusterCommands.ALL_SHARDED_CHANNELS, 5000).get();

Subscription State Introspection (GLIDE 2.3+)

Section titled “Subscription State Introspection (GLIDE 2.3+)”

Use get_subscriptions() to inspect the current subscription state. It returns both the desired subscriptions (what you’ve requested) and the actual subscriptions (what the server has confirmed). This is useful for verifying that lazy subscriptions have been fully applied.

PubSubState state = client.getSubscriptions().get();
System.out.println("Desired: " + state.getDesiredSubscriptions());
System.out.println("Actual: " + state.getActualSubscriptions());

To learn more about GLIDE’s PubSub model and the synchronizer architecture, see our explanation.