Configure Pub/Sub
Valkey GLIDE PubSub aims to unify various nuances into a coherent interface, minimizing differences between Sharded, Cluster, and Standalone configurations. Additionally, GLIDE is responsible for tracking topology changes in real time, ensuring the client remains subscribed regardless of any connectivity issues.
This guide will go over how to configure GLIDE pubsub system.
Use Separate Clients
Section titled “Use Separate Clients”Due to the implementation of the resubscription logic, it is recommended to use a dedicated client for PubSub subscriptions. That is, the client with subscriptions should not be the same client that issues commands. In case of topology changes, the internal connections might be reestablished to resubscribe to the correct servers.
Publishing Client
Section titled “Publishing Client”To publish a message, create a separate client to your Valkey instance. Both Standalone and Cluster are supported.
// Publishing is done with a separate clientGlideClientConfiguration publishConfig = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .build();try (var publishingClient = GlideClient.createClient(publishConfig).get()) { // publish message on ch1 channel publishingClient.publish("Test message", "ch1").get();}// Publishing is done with a separate clientconst publishConfig = { addresses: [{ host: "localhost", port: 6379 }],};const publishingClient = await GlideClient.createClient(publishConfig);
// publish message on ch1 channelawait publishingClient.publish("Test message", "ch1");
publishingClient.close();publishing_config = GlideClientConfiguration( [NodeAddress("localhost", 6379)])
publishing_client = await GlideClient.create(publishing_config)
# publish message on ch1 channelawait publishing_client.publish("Test message", "ch1")publisher, _ := NewGlideClient(NewGlideClientConfiguration(). WithAddress(&NodeAddress{}))
publisher.Publish("ch1", "Hello from ch1")// Publisher process (separate):$publishingClient = new ValkeyGlide();$publishingClient->connect(addresses: [['host' => 'localhost', 'port' => 6379]]);$publishingClient->publish('ch1', 'Test message');Subscriber Clients
Section titled “Subscriber Clients”To subscribe to messages, use a separate client with subscriptions configured through the client’s configuration object.
With Callback
Section titled “With Callback”Glide clients can register a callback method to handle subscriptions in the client configuration.
MessageCallback callback = (msg, context) -> System.out.printf("Received %s, context %s\n", msg, context);
GlideClientConfiguration config = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .requestTimeout(3000) // subscriptions are configured here .subscriptionConfiguration(StandaloneSubscriptionConfiguration.builder() .subscription(EXACT, "ch1") // Listens for messages published to 'ch1' channel, in unsharded mode .subscription(EXACT, "ch2") // Listens for messages published to 'ch2' channel, in unsharded mode .subscription(PATTERN, "chat*") // Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode .callback(callback) .callback(callback, context) // callback or callback with context are configured here .build()) .build();try (var regularClient = GlideClient.createClient(config).get()) { // Do some work/wait - the callbacks will be dispatched on incoming messages} // unsubscribe happens hereconst callback = (msg: PubSubMsg, context: any) => { console.log(`Received ${msg.message}, context ${context}`);};
const listeningConfig = { addresses: [{ host: "localhost", port: 6379 }], // subscriptions are configured here pubsubSubscriptions: { channelsAndPatterns: { [GlideClientConfiguration.PubSubChannelModes.Exact]: new Set(["ch1", "ch2"]), // Listens for messages published to 'ch1' and 'ch2' channel, in unsharded mode [GlideClientConfiguration.PubSubChannelModes.Pattern]: new Set(["chat*"]), // Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode }, callback: callback, context: context, },};
const listeningClient = await GlideClient.createClient(listeningConfig);
// Do some work/wait - the callback will receive messages
listeningClient.close(); // unsubscribe happens heredef callback (msg: CoreCommands.PubSubMsg, context: Any): print(f"Received {msg}, context {context}\n")
listening_config = GlideClientConfiguration( [NodeAddress("localhost", 6379)], pubsub_subscriptions = GlideClientConfiguration.PubSubSubscriptions( # subscriptions are configured here channels_and_patterns={ GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"}, # Listens for messages published to 'ch1' and 'ch2' channel, in unsharded mode GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"} # Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode }, callback=callback, context=context, ))
listening_client = await GlideClient.create(listening_config)
# Do some work/wait - the callback will receive messages
await listening_client.close() # unsubscribe happens here// Create a callback and context namectx := "example"callback := func(message *PubSubMessage, context any) { fmt.Printf("Received %s, context %s\n", message.Message, context)}
// define subscriptionssConfig := NewStandaloneSubscriptionConfig(). WithSubscription(ExactChannelMode, "ch1"). WithSubscription(ExactChannelMode, "ch2"). WithSubscription(PatternChannelMode, "chat*"). WithCallback(callback, ctx)
// create configuration for subscriberconfigforsub := NewGlideClientConfiguration(). WithAddress(&NodeAddress{}). WithSubscriptionConfig(sConfig)
// create a client with a subscriptionNewGlideClient(configforsub)
// Do more work here// Note: subscribe() is BLOCKING in PHP - it enters a message loop// Use separate processes/threads for subscriber and publisher
// Subscriber process:$listeningClient = new ValkeyGlide();$listeningClient->connect(addresses: [['host' => 'localhost', 'port' => 6379]]);
// This call BLOCKS and enters message loop$listeningClient->subscribe(['ch1', 'ch2'], function($client, $channel, $message) { echo "Received $message on channel $channel\n";
// To exit the loop, call unsubscribe if ($message === 'exit') { $client->unsubscribe(['ch1', 'ch2']); }});
// Code here only runs after unsubscribe$listeningClient->close();Non-Callback
Section titled “Non-Callback”Alternatively, GLIDE clients also provide options to use the client’s native async features to handle polling for messages.
GlideClientConfiguration config = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .requestTimeout(3000) // subscriptions are configured here .subscriptionConfiguration(StandaloneSubscriptionConfiguration.builder() .subscription(EXACT, Set.of("ch1", "ch2")) // there is option to set multiple subscriptions at a time .subscription(Map.of(PATTERN, "chat*", EXACT, Set.of("ch1", "ch2"))) // or even all subscriptions at a time .build()) // no callback is configured .build()try (var regularClient = GlideClient.createClient(config).get()) { Message msg = regularClient.tryGetPubSubMessage(); // sync, does not block Message msg = regularClient.getPubSubMessage().get(); // async, waits for the next message} // unsubscribe happens hereconst listeningConfig = { addresses: [{ host: "localhost", port: 6379 }], // subscriptions are configured here pubsubSubscriptions: { channelsAndPatterns: { [GlideClientConfiguration.PubSubChannelModes.Exact]: new Set(["ch1", "ch2"]), // Listens for messages published to 'ch1' and 'ch2' channel, in unsharded mode [GlideClientConfiguration.PubSubChannelModes.Pattern]: new Set(["chat*"]), // Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode }, }, // no callback is configured};
const listeningClient = await GlideClient.createClient(listeningConfig);
// waits for message to arriveconst message = await listeningClient.getPubSubMessage();
// returns null if no message is availableconst message2 = listeningClient.tryGetPubSubMessage();
listeningClient.close(); // unsubscribe happens herelistening_config = GlideClientConfiguration( [NodeAddress("localhost", 6379)], pubsub_subscriptions = GlideClientConfiguration.PubSubSubscriptions( # subscriptions are configured here channels_and_patterns={ GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"}, # Listens for messages published to 'ch1' and 'ch2' channel, in unsharded mode GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"} # Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode }, None, None, ))
listening_client = await GlideClient.create(listening_config)
# waits for message to arrivemessage = await listening_client.get_pubsub_message()
# returns None if no message is availablemessage = listening_client.try_get_pubsub_message()
await listening_client.close() # unsubscribe happens here// Create a signal channel to receive notifications of new messagessignalCh := make(chan struct{}, 1)
// Setup client and subscription configsConfig := NewStandaloneSubscriptionConfig(). WithSubscription(ExactChannelMode, "ch1"). WithSubscription(ExactChannelMode, "ch2"). WithSubscription(PatternChannelMode, "chat*")
client, _ := NewGlideClient(NewGlideClientConfiguration(). WithAddress(&NodeAddress{}). WithSubscriptionConfig(sConfig))defer client.Close()
// Get the message queue from the clientqueue, _ := client.GetQueue()
// Register our signal channelqueue.RegisterSignalChannel(signalCh)defer queue.UnregisterSignalChannel(signalCh)
// Create a context for cancellationctx, cancel := context.WithCancel(context.Background())defer cancel()
// Start processing messages in a goroutinego func() { for { select { case <-ctx.Done(): fmt.Println("Receiver shutting down...") return case <-signalCh: // Process all available messages for { if msg := queue.Pop(); msg != nil { fmt.Printf("Received message: %s on channel: %s\n", msg.Message, msg.Channel) } else { break // No more messages } } } }}()
// The main application can continue running...// When it's time to stop, just call cancel() or close the clientNext Steps
Section titled “Next Steps”To learn more about GLIDE’s PubSub model, see our explanation.