Pub/Sub Messaging
Valkey GLIDE PubSub aims to unify various nuances into a coherent interface, minimizing differences between Sharded, Cluster, and Standalone configurations. Additionally, GLIDE is responsible for tracking topology changes in real time, ensuring the client remains subscribed regardless of any connectivity issues.
This guide will go over how to configure GLIDE’s PubSub system.
Using Separate Clients
Section titled “Using Separate Clients”GLIDE uses the RESP3 protocol, which allows PubSub subscriptions and regular commands to coexist on the same client without issues. However, for high-throughput PubSub workloads, consider using a dedicated client for subscriptions to reduce latency on regular commands.
Publishing
Section titled “Publishing”To publish a message, create a separate client to your Valkey instance. Both Standalone and Cluster are supported.
GlideClientConfiguration publisherConfig = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .build();try (var publisher = GlideClient.createClient(publisherConfig).get()) { // Publish message on 'ch1' channel publisher.publish("Test message", "ch1").get();}const publisherConfig = { addresses: [{ host: "localhost", port: 6379 }],};const publisher = await GlideClient.createClient(publisherConfig);
// Publish message on 'ch1' channelawait publisher.publish("Test message", "ch1");
publisher.close();publisher_config = GlideClientConfiguration( [NodeAddress("localhost", 6379)])
publisher = await GlideClient.create(publisher_config)
# Publish message on 'ch1' channelawait publisher.publish("Test message", "ch1")publisher, _ := NewGlideClient(NewGlideClientConfiguration(). WithAddress(&NodeAddress{}))
// Publish message on 'ch1' channelpublisher.Publish("ch1", "Hello from ch1")$publisher = new ValkeyGlide();$publisher->connect(addresses: [['host' => 'localhost', 'port' => 6379]]);
// Publish message on 'ch1' channel$publisher->publish('ch1', 'Test message');using Valkey.Glide;using static Valkey.Glide.ConnectionConfiguration;
var publisherConfig = new StandaloneClientConfigurationBuilder() .WithAddress("localhost", 6379) .Build();
await using var publisher = await GlideClient.CreateClient(publisherConfig);
// Publish message on 'ch1' channelawait publisher.PublishAsync("ch1", "Test message");Subscribing
Section titled “Subscribing”GLIDE provides two ways to subscribe to channels: dynamic subscriptions via API calls at runtime, and config-based subscriptions defined at client creation time.
Dynamic Subscriptions (GLIDE 2.3+)
Section titled “Dynamic Subscriptions (GLIDE 2.3+)”Starting with GLIDE 2.3, you can subscribe to channels and patterns at any point during the client’s lifetime. Dynamic subscriptions come in two variants:
- Blocking: Waits for server confirmation before returning. Accepts a
timeout_msparameter (0 = wait indefinitely). - Non-blocking (lazy): Updates the desired subscription state and returns immediately. The subscription is applied asynchronously by the background synchronizer.
No special configuration is needed at client creation time — you can subscribe dynamically on any client. Messages are buffered and retrievable via polling by default. If you want callback-based delivery instead (explained below), provide a subscription configuration with a callback at client creation time (the initial channel set can be empty).
// Create a regular client — no subscription configuration neededGlideClientConfiguration config = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .requestTimeout(3000) .build();try (var client = GlideClient.createClient(config).get()) {
// --- Exact channel subscriptions ---
// Blocking: waits up to 5000ms for server confirmation client.subscribe(Set.of("news", "updates"), 5000).get();
// Non-blocking (lazy): returns immediately, subscribes in background client.subscribeLazy(Set.of("alerts")).get();
// --- Pattern subscriptions ---
// Blocking client.psubscribe(Set.of("chat*", "event*"), 5000).get();
// Non-blocking (lazy) client.psubscribeLazy(Set.of("log*")).get();
// --- Sharded subscriptions (cluster mode only) --- // var clusterClient = GlideClusterClient.createClient(clusterConfig).get(); // clusterClient.ssubscribe(Set.of("shard-ch1"), 5000).get(); // clusterClient.ssubscribeLazy(Set.of("shard-ch2")).get();
// Retrieve messages via polling PubSubMessage msg = client.getPubSubMessage().get();
} // remaining subscriptions cleaned up on close// Create a regular client — no subscription configuration neededconst config = { addresses: [{ host: "localhost", port: 6379 }],};const client = await GlideClient.createClient(config);
// --- Exact channel subscriptions ---
// Blocking: waits up to 5000ms for server confirmationawait client.subscribe(new Set(["news", "updates"]), 5000);
// Non-blocking (lazy): returns immediately, subscribes in backgroundawait client.subscribeLazy(new Set(["alerts"]));
// --- Pattern subscriptions ---
// Blockingawait client.psubscribe(new Set(["chat*", "event*"]), 5000);
// Non-blocking (lazy)await client.psubscribeLazy(new Set(["log*"]));
// --- Sharded subscriptions (cluster mode only) ---// await clusterClient.ssubscribe(new Set(["shard-ch1"]), 5000);// await clusterClient.ssubscribeLazy(new Set(["shard-ch2"]));
// Retrieve messages via pollingconst msg = await client.getPubSubMessage();
await client.close();# Create a regular client — no subscription configuration neededconfig = GlideClientConfiguration( [NodeAddress("localhost", 6379)],)client = await GlideClient.create(config)
# --- Exact channel subscriptions ---
# Blocking: waits up to 5000ms for server confirmationawait client.subscribe({"news", "updates"}, timeout_ms=5000)
# Non-blocking (lazy): returns immediately, subscribes in backgroundawait client.subscribe_lazy({"alerts"})
# --- Pattern subscriptions ---
# Blockingawait client.psubscribe({"chat*", "event*"}, timeout_ms=5000)
# Non-blocking (lazy)await client.psubscribe_lazy({"log*"})
# --- Sharded subscriptions (cluster mode only) ---# await cluster_client.ssubscribe({"shard-ch1"}, timeout_ms=5000)# await cluster_client.ssubscribe_lazy({"shard-ch2"})
# Retrieve messages via pollingmsg = await client.get_pubsub_message()
await client.close()ctx := context.Background()
// Create a regular client — no subscription configuration neededconfig := NewGlideClientConfiguration(). WithAddress(&NodeAddress{})client, _ := NewGlideClient(config)defer client.Close()
// --- Exact channel subscriptions ---
// Blocking: waits up to 5000ms for server confirmationerr := client.Subscribe(ctx, []string{"news", "updates"}, 5000)
// Non-blocking (lazy): returns immediately, subscribes in backgrounderr = client.SubscribeLazy(ctx, []string{"alerts"})
// --- Pattern subscriptions ---
// Blockingerr = client.PSubscribe(ctx, []string{"chat*", "event*"}, 5000)
// Non-blocking (lazy)err = client.PSubscribeLazy(ctx, []string{"log*"})
// --- Sharded subscriptions (cluster mode only) ---// err = clusterClient.SSubscribe(ctx, []string{"shard-ch1"}, 5000)// err = clusterClient.SSubscribeLazy(ctx, []string{"shard-ch2"})
// Retrieve messages via pollingqueue, _ := client.GetQueue()if msg := queue.Pop(); msg != nil { fmt.Printf("Received: %s on %s\n", msg.Message, msg.Channel)}// Note: PHP uses the PHPRedis-style API where subscribe() is BLOCKING.// It enters a message loop and the callback fires for each message.// Use separate processes/threads for subscriber and publisher.
$client = new ValkeyGlide();$client->connect(addresses: [['host' => 'localhost', 'port' => 6379]]);
// --- Exact channel subscriptions ---// subscribe() blocks and enters a message loop$client->subscribe(['news', 'updates'], function ($client, $channel, $message) { echo "Received '$message' on '$channel'\n";
// Unsubscribe from a specific channel if ($message === 'stop') { $client->unsubscribe([$channel]); }});// Code here runs after all channels are unsubscribed
// --- Pattern subscriptions ---$client->psubscribe(['chat*', 'event*'], function ($client, $channel, $message, $pattern) { echo "[$pattern] $channel: $message\n";
// Unsubscribe from a specific pattern $client->punsubscribe([$pattern]);});
// Unsubscribe from all channels/patterns (from within callback):// $client->unsubscribe(); // all exact channels// $client->punsubscribe(); // all patterns
$client->close();using Valkey.Glide;using static Valkey.Glide.ConnectionConfiguration;
var config = new ClusterClientConfigurationBuilder() .WithAddress("localhost", 6379) .Build();
await using var client = await GlideClusterClient.CreateClient(config);
// Wait up to 5 seconds for server confirmation.var blockFor = TimeSpan.FromSeconds(5);
// --- Exact channel subscriptions ---
await client.SubscribeAsync(new[] { "news", "updates" }, blockFor ); // Blockingawait client.SubscribeLazyAsync(new[] { "alerts" }); // Non-blocking (lazy)
// --- Pattern subscriptions ---
await client.PSubscribeAsync(new[] { "chat*", "event*" }, blockFor); // Blockingawait client.PSubscribeLazyAsync(new[] { "log*" }); // Non-blocking (lazy)
// --- Sharded subscriptions (cluster mode only) ---
await client.SSubscribeAsync(new[] { "shard-ch1" }, blockFor); // Blockingawait client.SSubscribeLazyAsync(new[] { "shard-ch2" }); // Non-blocking (lazy)
// Retrieve messages via pollingvar queue = client.PubSubQueue!;var msg = await queue.GetMessageAsync();Config-Based Subscriptions (All Versions)
Section titled “Config-Based Subscriptions (All Versions)”For GLIDE versions prior to 2.3, or when your subscriptions are known at startup, you can define them in the client configuration. These subscriptions are applied immediately when the client connects.
// Define callback and contextMessageCallback callback = (msg, ctx) -> System.out.printf("Received %s, context %s\n", msg, ctx);String context = "example";
// Configure the client to invoke the callback for messages published// to 'ch1' and 'ch2' and to channels matched by 'chat*' glob pattern.StandaloneSubscriptionConfiguration subscriptionConfig = StandaloneSubscriptionConfiguration.builder() .subscription(EXACT, "ch1") .subscription(EXACT, "ch2") .subscription(PATTERN, "chat*") .callback(callback, context) .build()
GlideClientConfiguration config = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .requestTimeout(3000) .subscriptionConfiguration(subscriptionConfig) .build();
try (var listeningClient = GlideClient.createClient(config).get()) { // Do some work/wait - the callback will be invoked on incoming messages} // Unsubscribe happens here// Define callback and contextconst callback = (msg: PubSubMsg, ctx: any) => { console.log(`Received ${msg.message}, context ${ctx}`);};const context = "example";
// Configure the client to invoke the callback for messages published// to 'ch1' and 'ch2' and to channels matched by 'chat*' glob pattern.const subscriptionConfig = { channelsAndPatterns: { [GlideClientConfiguration.PubSubChannelModes.Exact]: new Set(["ch1", "ch2"]), [GlideClientConfiguration.PubSubChannelModes.Pattern]: new Set(["chat*"]), }, callback: callback, context: context,}
const config = { addresses: [{ host: "localhost", port: 6379 }], pubsubSubscriptions: subscriptionConfig};
const listeningClient = await GlideClient.createClient(config);
// Do some work/wait - the callback will be invoked on incoming messages
listeningClient.close(); // Unsubscribe happens here# Define callback and contextdef callback(msg: CoreCommands.PubSubMsg, ctx: Any): print(f"Received {msg}, context {ctx}\n")context = "example"
# Configure the client to invoke the callback for messages published# to 'ch1' and 'ch2' and to channels matched by 'chat*' glob pattern.subscription_config = GlideClientConfiguration.PubSubSubscriptions( channels_and_patterns={ GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"}, GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"} }, callback=callback, context=context,)
config = GlideClientConfiguration( [NodeAddress("localhost", 6379)], pubsub_subscriptions=subscription_config)
listening_client = await GlideClient.create(config)
# Do some work/wait - the callback will be invoked on incoming messages
await listening_client.close() # Unsubscribe happens here// Define callback and contextcallback := func(message *PubSubMessage, context any) { fmt.Printf("Received %s, context %s\n", message.Message, context)}ctx := "example"
// Configure the client to invoke the callback for messages published// to 'ch1' and 'ch2' and to channels matched by 'chat*' glob pattern.subscriptionConfig := NewStandaloneSubscriptionConfig(). WithSubscription(ExactChannelMode, "ch1"). WithSubscription(ExactChannelMode, "ch2"). WithSubscription(PatternChannelMode, "chat*"). WithCallback(callback, ctx)
// Create a client with subscriptionslisteningClient, _ := NewGlideClient(NewGlideClientConfiguration(). WithAddress(&NodeAddress{}). WithSubscriptionConfig(subscriptionConfig))
// Do some work/wait - the callback will be invoked on incoming messagesusing Valkey.Glide;using static Valkey.Glide.ConnectionConfiguration;
// Define callback and contextvoid PubSubCallback(PubSubMessage message, object? context) => Console.WriteLine($"Received {message.Message} on {message.Channel}, context {context}");var context = "example";
// Configure the client to invoke the callback for messages published// to 'ch1' and 'ch2' and to channels matched by 'chat*' glob pattern.var subscriptionConfig = new StandalonePubSubSubscriptionConfig() .WithChannel("ch1") .WithChannel("ch2") .WithPattern("chat*") .WithCallback(PubSubCallback, context);
var config = new StandaloneClientConfigurationBuilder() .WithAddress("localhost", 6379) .WithPubSubSubscriptions(subscriptionConfig) .Build();
await using var listeningClient = await GlideClient.CreateClient(config);
// Do some work/wait - the callback will be invoked on incoming messagesReceiving Messages
Section titled “Receiving Messages”There are two approaches: callback-based (messages are pushed to your function automatically) and polling-based (you pull messages when ready). The approach is determined by whether a callback is provided in the subscription configuration.
Callback
Section titled “Callback”To use callback-based delivery, provide a subscription configuration with a callback at client creation time. The callback fires for every incoming message — from both config-based and dynamic subscriptions.
List<String> received = Collections.synchronizedList(new ArrayList<>());MessageCallback callback = (msg, context) -> { received.add(msg.getMessage()); System.out.printf("Received '%s' on '%s'\n", msg.getMessage(), msg.getChannel());};
GlideClientConfiguration config = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .requestTimeout(3000) .subscriptionConfiguration(StandaloneSubscriptionConfiguration.builder() .callback(callback) .build()) .build();try (var client = GlideClient.createClient(config).get()) { // Subscribe dynamically — messages are delivered to the callback client.subscribe(Set.of("news"), 5000).get();
// Publish a message (from another client) publishingClient.publish("Hello!", "news").get(); Thread.sleep(500);
// Verify the callback received the message assert received.contains("Hello!");}const received: string[] = [];const callback = (msg: PubSubMsg, context: any) => { received.push(msg.message); console.log(`Received '${msg.message}' on '${msg.channel}'`);};
const config = { addresses: [{ host: "localhost", port: 6379 }], pubsubSubscriptions: { channelsAndPatterns: {}, callback: callback, context: null, },};const client = await GlideClient.createClient(config);
// Subscribe dynamically — messages are delivered to the callbackawait client.subscribe(new Set(["news"]), 5000);
// Publish a message (from another client)await publishingClient.publish("Hello!", "news");await new Promise((r) => setTimeout(r, 500));
// Verify the callback received the messageconsole.assert(received.includes("Hello!"));
client.close();received = []
def callback(msg: CoreCommands.PubSubMsg, context: Any): received.append(msg.message) print(f"Received '{msg.message}' on '{msg.channel}'")
config = GlideClientConfiguration( [NodeAddress("localhost", 6379)], pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions( channels_and_patterns={}, callback=callback, context=None, ),)client = await GlideClient.create(config)
# Subscribe dynamically — messages are delivered to the callbackawait client.subscribe({"news"}, timeout_ms=5000)
# Publish a message (from another client)await publishing_client.publish("Hello!", "news")await asyncio.sleep(0.5)
# Verify the callback received the messageassert "Hello!" in received
await client.close()var received []stringvar mu sync.Mutexcallback := func(message *PubSubMessage, context any) { mu.Lock() received = append(received, message.Message) mu.Unlock() fmt.Printf("Received '%s' on '%s'\n", message.Message, message.Channel)}
sConfig := NewStandaloneSubscriptionConfig(). WithCallback(callback, nil)config := NewGlideClientConfiguration(). WithAddress(&NodeAddress{}). WithSubscriptionConfig(sConfig)client, _ := NewGlideClient(config)defer client.Close()
// Subscribe dynamically — messages are delivered to the callbackctx := context.Background()_ = client.Subscribe(ctx, []string{"news"}, 5000)
// Publish a message (from another client)publisher.Publish("news", "Hello!")time.Sleep(500 * time.Millisecond)
// Verify the callback received the messagemu.Lock()fmt.Println("Received messages:", received)mu.Unlock()using Valkey.Glide;using static Valkey.Glide.ConnectionConfiguration;
var received = new List<string>();
void PubSubCallback(PubSubMessage message, object? context){ lock (received) { received.Add(message.Message); } Console.WriteLine($"Received '{message.Message}' on '{message.Channel}'");}
var subscriptionConfig = new StandalonePubSubSubscriptionConfig() .WithCallback(PubSubCallback);
var config = new StandaloneClientConfigurationBuilder() .WithAddress("localhost", 6379) .WithPubSubSubscriptions(subscriptionConfig) .Build();
await using var client = await GlideClient.CreateClient(config);
// Subscribe dynamically — messages are delivered to the callbackawait client.SubscribeAsync(new[] { "news" }, TimeSpan.FromSeconds(5));
// Publish a message (from another client)await publishingClient.PublishAsync("news", "Hello!");await Task.Delay(500);
// Verify the callback received the messagelock (received) { Debug.Assert(received.Contains("Hello!")); }Polling
Section titled “Polling”If no callback is configured, messages are buffered in an unbounded queue. You retrieve them using:
- Async wait:
getPubSubMessage()/get_pubsub_message()— blocks until a message is available. - Non-blocking poll:
tryGetPubSubMessage()/try_get_pubsub_message()— returns the next message ornull/Noneimmediately.
// Configure the client to receive messages published to 'ch1'// and 'ch2' and to channels matched by 'chat*' glob pattern.StandaloneSubscriptionConfiguration subscriptionConfig = StandaloneSubscriptionConfiguration.builder() .subscription(EXACT, Set.of("ch1", "ch2")) .subscription(PATTERN, "chat*") .build();
GlideClientConfiguration config = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .subscriptionConfiguration(subscriptionConfig) .build();
try (var listeningClient = GlideClient.createClient(config).get()) {
// Non-blocking: returns null if no message is available Message msg = listeningClient.tryGetPubSubMessage();
// Async: waits for the next message Message msg = listeningClient.getPubSubMessage().get();} // Unsubscribe happens here// Configure the client to receive messages published to 'ch1'// and 'ch2' and to channels matched by 'chat*' glob pattern.const subscriptionConfig = { channelsAndPatterns: { [GlideClientConfiguration.PubSubChannelModes.Exact]: new Set(["ch1", "ch2"]), [GlideClientConfiguration.PubSubChannelModes.Pattern]: new Set(["chat*"]), },};
const config = { addresses: [{ host: "localhost", port: 6379 }], pubsubSubscriptions: subscriptionConfig,};
const listeningClient = await GlideClient.createClient(config);
// Non-blocking: returns null if no message is availableconst message2 = listeningClient.tryGetPubSubMessage();
// Async: waits for the next messageconst message = await listeningClient.getPubSubMessage();
listeningClient.close(); // Unsubscribe happens here# Configure the client to receive messages published to 'ch1'# and 'ch2' and to channels matched by 'chat*' glob pattern.subscriptions_config = GlideClientConfiguration.PubSubSubscriptions( channels_and_patterns={ GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"}, GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"}, },)
config = GlideClientConfiguration( [NodeAddress("localhost", 6379)], pubsub_subscriptions=subscriptions_config,)
listening_client = await GlideClient.create(config)
# Non-blocking: returns None if no message is availablemessage = listening_client.try_get_pubsub_message()
# Async: waits for the next messagemessage = await listening_client.get_pubsub_message()
await listening_client.close() # Unsubscribe happens here// Create a signal channel to receive notifications of new messagessignalCh := make(chan struct{}, 1)
// Configure the client to receive messages published to 'ch1'// and 'ch2' and to channels matched by 'chat*' glob pattern.subscriptionConfig := NewStandaloneSubscriptionConfig(). WithSubscription(ExactChannelMode, "ch1"). WithSubscription(ExactChannelMode, "ch2"). WithSubscription(PatternChannelMode, "chat*")
// Create a client with subscriptionslisteningClient, _ := NewGlideClient(NewGlideClientConfiguration(). WithAddress(&NodeAddress{}). WithSubscriptionConfig(subscriptionConfig))defer listeningClient.Close()
// Get the message queue from the clientqueue, _ := listeningClient.GetQueue()
// Register our signal channelqueue.RegisterSignalChannel(signalCh)defer queue.UnregisterSignalChannel(signalCh)
// Create a context for cancellationctx, cancel := context.WithCancel(context.Background())defer cancel()
// Start processing messages in a goroutinego func() { for { select { case <-ctx.Done(): fmt.Println("Receiver shutting down...") return case <-signalCh: // Process all available messages for { if msg := queue.Pop(); msg != nil { fmt.Printf("Received message: %s on channel: %s\n", msg.Message, msg.Channel) } else { break // No more messages } } } }}()
// The main application can continue running...// When it's time to stop, just call cancel() or close the clientusing Valkey.Glide;using static Valkey.Glide.ConnectionConfiguration;
// Configure the client to receive messages published// to 'ch1' and 'ch2' and to channels matched by 'chat*' glob pattern.var subscriptionConfig = new StandalonePubSubSubscriptionConfig() .WithChannel("ch1") .WithChannel("ch2") .WithPattern("chat*");
var config = new StandaloneClientConfigurationBuilder() .WithAddress("localhost", 6379) .WithPubSubSubscriptions(subscriptionConfig) .Build();
await using var listeningClient = await GlideClient.CreateClient(config);
var queue = listeningClient.PubSubQueue!;
// Non-blocking: returns false if no message is availableif (queue.TryGetMessage(out var msg)){ Console.WriteLine($"Received: {msg!.Message} on {msg.Channel}");}
// Async: waits for the next messagevar message = await queue.GetMessageAsync();Unsubscribing (GLIDE 2.3+)
Section titled “Unsubscribing (GLIDE 2.3+)”Starting with GLIDE 2.3, you can unsubscribe from channels and patterns at runtime. Like subscribing, unsubscribing comes in blocking and non-blocking (lazy) variants. This works for both config-based and dynamically subscribed channels.
To unsubscribe from all channels/patterns of a given type, pass None/null/empty, or use the provided constants: ALL_CHANNELS, ALL_PATTERNS, ALL_SHARDED_CHANNELS.
// Unsubscribe from specific exact channels (blocking)client.unsubscribe(Set.of("news"), 5000).get();
// Unsubscribe from specific exact channels (lazy)client.unsubscribeLazy(Set.of("alerts")).get();
// Unsubscribe from all exact channelsclient.unsubscribe(PubSubBaseCommands.ALL_CHANNELS, 5000).get();
// Unsubscribe from specific patterns (blocking)client.punsubscribe(Set.of("chat*"), 5000).get();
// Unsubscribe from specific patterns (lazy)client.punsubscribeLazy(Set.of("log*")).get();
// Unsubscribe from all patternsclient.punsubscribe(PubSubBaseCommands.ALL_PATTERNS, 5000).get();
// Sharded unsubscribe (cluster mode only)// clusterClient.sunsubscribe(Set.of("shard-ch1"), 5000).get();// clusterClient.sunsubscribeLazy(Set.of("shard-ch2")).get();// clusterClient.sunsubscribe(PubSubClusterCommands.ALL_SHARDED_CHANNELS, 5000).get();// Unsubscribe from specific exact channels (blocking)await client.unsubscribe(new Set(["news"]), 5000);
// Unsubscribe from specific exact channels (lazy)await client.unsubscribeLazy(new Set(["alerts"]));
// Unsubscribe from all exact channelsawait client.unsubscribe(ALL_CHANNELS, 5000);
// Unsubscribe from specific patterns (blocking)await client.punsubscribe(new Set(["chat*"]), 5000);
// Unsubscribe from specific patterns (lazy)await client.punsubscribeLazy(new Set(["log*"]));
// Unsubscribe from all patternsawait client.punsubscribe(ALL_PATTERNS, 5000);
// Sharded unsubscribe (cluster mode only)// await clusterClient.sunsubscribe(new Set(["shard-ch1"]), 5000);// await clusterClient.sunsubscribeLazy(new Set(["shard-ch2"]));// await clusterClient.sunsubscribe(ALL_SHARDED_CHANNELS, 5000);# Unsubscribe from specific exact channels (blocking)await client.unsubscribe({"news"}, timeout_ms=5000)
# Unsubscribe from specific exact channels (lazy)await client.unsubscribe_lazy({"alerts"})
# Unsubscribe from all exact channelsawait client.unsubscribe(ALL_CHANNELS, timeout_ms=5000)
# Unsubscribe from specific patterns (blocking)await client.punsubscribe({"chat*"}, timeout_ms=5000)
# Unsubscribe from specific patterns (lazy)await client.punsubscribe_lazy({"log*"})
# Unsubscribe from all patternsawait client.punsubscribe(ALL_PATTERNS, timeout_ms=5000)
# Sharded unsubscribe (cluster mode only)# await cluster_client.sunsubscribe({"shard-ch1"}, timeout_ms=5000)# await cluster_client.sunsubscribe_lazy({"shard-ch2"})# await cluster_client.sunsubscribe(ALL_SHARDED_CHANNELS, timeout_ms=5000)// Unsubscribe from specific exact channels (blocking)err := client.Unsubscribe(ctx, []string{"news"}, 5000)
// Unsubscribe from specific exact channels (lazy)err = client.UnsubscribeLazy(ctx, []string{"alerts"})
// Unsubscribe from all exact channelserr = client.Unsubscribe(ctx, AllChannels, 5000)
// Unsubscribe from specific patterns (blocking)err = client.PUnsubscribe(ctx, []string{"chat*"}, 5000)
// Unsubscribe from specific patterns (lazy)err = client.PUnsubscribeLazy(ctx, []string{"log*"})
// Unsubscribe from all patternserr = client.PUnsubscribe(ctx, AllPatterns, 5000)
// Sharded unsubscribe (cluster mode only)// err = clusterClient.SUnsubscribe(ctx, []string{"shard-ch1"}, 5000)// err = clusterClient.SUnsubscribeLazy(ctx, []string{"shard-ch2"})// In PHP, unsubscribe is called from within the subscribe callback.
$client->subscribe(['ch1', 'ch2', 'ch3'], function ($client, $channel, $message) { // Unsubscribe from a specific channel $client->unsubscribe([$channel]);
// Or unsubscribe from all exact channels at once // $client->unsubscribe();});
$client->psubscribe(['chat*', 'log*'], function ($client, $channel, $message, $pattern) { // Unsubscribe from a specific pattern $client->punsubscribe([$pattern]);
// Or unsubscribe from all patterns at once // $client->punsubscribe();});var blockFor = TimeSpan.FromSeconds(5);
// Unsubscribe from specific exact channels (blocking)await client.UnsubscribeAsync(new[] { "news" }, blockFor);
// Unsubscribe from specific exact channels (lazy)await client.UnsubscribeLazyAsync(new[] { "alerts" });
// Unsubscribe from all exact channels (blocking)await client.UnsubscribeAsync(PubSub.AllChannels, blockFor);
// Unsubscribe from specific patterns (blocking)await client.PUnsubscribeAsync(new[] { "chat*" }, blockFor);
// Unsubscribe from specific patterns (lazy)await client.PUnsubscribeLazyAsync(new[] { "log*" });
// Unsubscribe from all patterns (blocking)await client.PUnsubscribeAsync(PubSub.AllPatterns, blockFor);
// Sharded unsubscribe (cluster mode only)await clusterClient.SUnsubscribeAsync(new[] { "shard-ch1" }, blockFor);await clusterClient.SUnsubscribeLazyAsync(new[] { "shard-ch2" });await clusterClient.SUnsubscribeAsync(PubSub.AllShardedChannels, blockFor);Subscription State Introspection (GLIDE 2.3+)
Section titled “Subscription State Introspection (GLIDE 2.3+)”Use get_subscriptions() to inspect the current subscription state. It returns both the desired subscriptions (what you’ve requested) and the actual subscriptions (what the server has confirmed). This is useful for verifying that lazy subscriptions have been fully applied.
PubSubState state = client.getSubscriptions().get();System.out.println("Desired: " + state.getDesiredSubscriptions());System.out.println("Actual: " + state.getActualSubscriptions());const state = await client.getSubscriptions();console.log("Desired:", state.desiredSubscriptions);console.log("Actual:", state.actualSubscriptions);state = await client.get_subscriptions()print(f"Desired: {state.desired_subscriptions}")print(f"Actual: {state.actual_subscriptions}")state, err := client.GetSubscriptions(ctx)fmt.Println("Desired:", state.DesiredSubscriptions)fmt.Println("Actual:", state.ActualSubscriptions)var state = await client.GetSubscriptionsAsync();Console.WriteLine($"Desired: {state.Desired}");Console.WriteLine($"Actual: {state.Actual}");Next Steps
Section titled “Next Steps”To learn more about GLIDE’s PubSub model and the synchronizer architecture, see our explanation.