Skip to content

glide

GlideClient

Bases: BaseClient, StandaloneCommands

Client used for connection to standalone servers. Use :func:~BaseClient.create to request a client. For full documentation, see Valkey GLIDE Documentation

Source code in doc-gen/valkey-glide/python/glide-async/python/glide/glide_client.py
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
class GlideClient(BaseClient, StandaloneCommands):
    """
    Client used for connection to standalone servers.
    Use :func:`~BaseClient.create` to request a client.
    For full documentation, see
    [Valkey GLIDE Documentation](https://glide.valkey.io/how-to/client-initialization/#standalone)
    """

    async def get_subscriptions(
        self,
    ) -> GlideClientConfiguration.PubSubState:
        """
        Retrieves both the desired and current subscription states as tracked by the client.

        This allows verification of synchronization between what the client intends to be
        subscribed to (desired) and what it is actually subscribed to on the server (actual).

        Returns:
            GlideClientConfiguration.PubSubState: An object containing two attributes:
                - desired_subscriptions: Dict[PubSubChannelModes, Set[str]]
                - actual_subscriptions: Dict[PubSubChannelModes, Set[str]]

        Examples:
            >>> from glide import GlideClientConfiguration
            >>> PubSubChannelModes = GlideClientConfiguration.PubSubChannelModes
            >>>
            >>> # Get both subscription states
            >>> state = await client.get_subscriptions()
            >>> desired = state.desired_subscriptions
            >>> actual = state.actual_subscriptions
            >>>
            >>> # Check if subscribed to specific channel
            >>> if "channel1" in actual.get(PubSubChannelModes.Exact, set()):
            >>>     print("Subscribed to channel1")
            >>>
            >>> # Direct comparison with config
            >>> if client.config.pubsub_subscriptions.channels_and_patterns == desired:
            >>>     print("Config matches desired state")
            >>>
            >>> # Check if synchronized
            >>> if desired == actual:
            >>>     print("Subscriptions are synchronized")
            >>>
            >>> # Find missing subscriptions
            >>> missing = desired.get(PubSubChannelModes.Exact, set()) - actual.get(PubSubChannelModes.Exact, set())
            >>> if missing:
            >>>     print(f"Not yet subscribed to: {missing}")
        """
        result = await self._execute_command(RequestType.GetSubscriptions, [])
        return cast(
            GlideClientConfiguration.PubSubState,
            self._parse_pubsub_state(result, is_cluster=False),
        )

get_subscriptions() async

Retrieves both the desired and current subscription states as tracked by the client.

This allows verification of synchronization between what the client intends to be subscribed to (desired) and what it is actually subscribed to on the server (actual).

Returns:

Type Description
PubSubState

GlideClientConfiguration.PubSubState: An object containing two attributes: - desired_subscriptions: Dict[PubSubChannelModes, Set[str]] - actual_subscriptions: Dict[PubSubChannelModes, Set[str]]

Examples:

>>> from glide import GlideClientConfiguration
>>> PubSubChannelModes = GlideClientConfiguration.PubSubChannelModes
>>>
>>> # Get both subscription states
>>> state = await client.get_subscriptions()
>>> desired = state.desired_subscriptions
>>> actual = state.actual_subscriptions
>>>
>>> # Check if subscribed to specific channel
>>> if "channel1" in actual.get(PubSubChannelModes.Exact, set()):
>>>     print("Subscribed to channel1")
>>>
>>> # Direct comparison with config
>>> if client.config.pubsub_subscriptions.channels_and_patterns == desired:
>>>     print("Config matches desired state")
>>>
>>> # Check if synchronized
>>> if desired == actual:
>>>     print("Subscriptions are synchronized")
>>>
>>> # Find missing subscriptions
>>> missing = desired.get(PubSubChannelModes.Exact, set()) - actual.get(PubSubChannelModes.Exact, set())
>>> if missing:
>>>     print(f"Not yet subscribed to: {missing}")
Source code in doc-gen/valkey-glide/python/glide-async/python/glide/glide_client.py
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
async def get_subscriptions(
    self,
) -> GlideClientConfiguration.PubSubState:
    """
    Retrieves both the desired and current subscription states as tracked by the client.

    This allows verification of synchronization between what the client intends to be
    subscribed to (desired) and what it is actually subscribed to on the server (actual).

    Returns:
        GlideClientConfiguration.PubSubState: An object containing two attributes:
            - desired_subscriptions: Dict[PubSubChannelModes, Set[str]]
            - actual_subscriptions: Dict[PubSubChannelModes, Set[str]]

    Examples:
        >>> from glide import GlideClientConfiguration
        >>> PubSubChannelModes = GlideClientConfiguration.PubSubChannelModes
        >>>
        >>> # Get both subscription states
        >>> state = await client.get_subscriptions()
        >>> desired = state.desired_subscriptions
        >>> actual = state.actual_subscriptions
        >>>
        >>> # Check if subscribed to specific channel
        >>> if "channel1" in actual.get(PubSubChannelModes.Exact, set()):
        >>>     print("Subscribed to channel1")
        >>>
        >>> # Direct comparison with config
        >>> if client.config.pubsub_subscriptions.channels_and_patterns == desired:
        >>>     print("Config matches desired state")
        >>>
        >>> # Check if synchronized
        >>> if desired == actual:
        >>>     print("Subscriptions are synchronized")
        >>>
        >>> # Find missing subscriptions
        >>> missing = desired.get(PubSubChannelModes.Exact, set()) - actual.get(PubSubChannelModes.Exact, set())
        >>> if missing:
        >>>     print(f"Not yet subscribed to: {missing}")
    """
    result = await self._execute_command(RequestType.GetSubscriptions, [])
    return cast(
        GlideClientConfiguration.PubSubState,
        self._parse_pubsub_state(result, is_cluster=False),
    )

GlideClusterClient

Bases: BaseClient, ClusterCommands

Client used for connection to cluster servers. Use :func:~BaseClient.create to request a client. For full documentation, see Valkey GLIDE Documentation

Source code in doc-gen/valkey-glide/python/glide-async/python/glide/glide_client.py
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
class GlideClusterClient(BaseClient, ClusterCommands):
    """
    Client used for connection to cluster servers.
    Use :func:`~BaseClient.create` to request a client.
    For full documentation, see
    [Valkey GLIDE Documentation](https://glide.valkey.io/how-to/client-initialization/#cluster)
    """

    async def _cluster_scan(
        self,
        cursor: ClusterScanCursor,
        match: Optional[TEncodable] = None,
        count: Optional[int] = None,
        type: Optional[ObjectType] = None,
        allow_non_covered_slots: bool = False,
    ) -> List[Union[ClusterScanCursor, List[bytes]]]:
        if self._is_closed:
            raise ClosingError(
                "Unable to execute requests; the client is closed. Please create a new client."
            )
        request = CommandRequest()
        request.callback_idx = self._get_callback_index()
        # Take out the id string from the wrapping object
        cursor_string = cursor.get_cursor()
        request.cluster_scan.cursor = cursor_string
        request.cluster_scan.allow_non_covered_slots = allow_non_covered_slots
        if match is not None:
            request.cluster_scan.match_pattern = (
                self._encode_arg(match) if isinstance(match, str) else match
            )
        if count is not None:
            request.cluster_scan.count = count
        if type is not None:
            request.cluster_scan.object_type = type.value
        response = await self._write_request_await_response(request)
        return [ClusterScanCursor(bytes(response[0]).decode()), response[1]]

    def _get_protobuf_conn_request(self) -> ConnectionRequest:
        return self.config._create_a_protobuf_conn_request(cluster_mode=True)

    async def get_subscriptions(
        self,
    ) -> GlideClusterClientConfiguration.PubSubState:
        """
        Retrieves both the desired and current subscription states as tracked by the client.

        This allows verification of synchronization between what the client intends to be
        subscribed to (desired) and what it is actually subscribed to on the server (actual).

        Returns:
            GlideClusterClientConfiguration.PubSubState: An object containing two attributes:
                - desired_subscriptions: Dict[PubSubChannelModes, Set[str]]
                - actual_subscriptions: Dict[PubSubChannelModes, Set[str]]

        Examples:
            >>> from glide import GlideClusterClientConfiguration
            >>> PubSubChannelModes = GlideClusterClientConfiguration.PubSubChannelModes
            >>>
            >>> # Get both subscription states
            >>> state = await client.get_subscriptions()
            >>> desired = state.desired_subscriptions
            >>> actual = state.actual_subscriptions
            >>>
            >>> # Check if subscribed to specific channel
            >>> if "channel1" in actual.get(PubSubChannelModes.Exact, set()):
            >>>     print("Subscribed to channel1")
            >>>
            >>> # Direct comparison with config
            >>> if client.config.pubsub_subscriptions.channels_and_patterns == desired:
            >>>     print("Config matches desired state")
            >>>
            >>> # Check if synchronized
            >>> if desired == actual:
            >>>     print("Subscriptions are synchronized")
            >>>
            >>> # Find missing subscriptions
            >>> missing = desired.get(PubSubChannelModes.Exact, set()) - actual.get(PubSubChannelModes.Exact, set())
            >>> if missing:
            >>>     print(f"Not yet subscribed to: {missing}")
        """
        result = await self._execute_command(RequestType.GetSubscriptions, [])
        return cast(
            GlideClusterClientConfiguration.PubSubState,
            self._parse_pubsub_state(result, is_cluster=True),
        )

get_subscriptions() async

Retrieves both the desired and current subscription states as tracked by the client.

This allows verification of synchronization between what the client intends to be subscribed to (desired) and what it is actually subscribed to on the server (actual).

Returns:

Type Description
PubSubState

GlideClusterClientConfiguration.PubSubState: An object containing two attributes: - desired_subscriptions: Dict[PubSubChannelModes, Set[str]] - actual_subscriptions: Dict[PubSubChannelModes, Set[str]]

Examples:

>>> from glide import GlideClusterClientConfiguration
>>> PubSubChannelModes = GlideClusterClientConfiguration.PubSubChannelModes
>>>
>>> # Get both subscription states
>>> state = await client.get_subscriptions()
>>> desired = state.desired_subscriptions
>>> actual = state.actual_subscriptions
>>>
>>> # Check if subscribed to specific channel
>>> if "channel1" in actual.get(PubSubChannelModes.Exact, set()):
>>>     print("Subscribed to channel1")
>>>
>>> # Direct comparison with config
>>> if client.config.pubsub_subscriptions.channels_and_patterns == desired:
>>>     print("Config matches desired state")
>>>
>>> # Check if synchronized
>>> if desired == actual:
>>>     print("Subscriptions are synchronized")
>>>
>>> # Find missing subscriptions
>>> missing = desired.get(PubSubChannelModes.Exact, set()) - actual.get(PubSubChannelModes.Exact, set())
>>> if missing:
>>>     print(f"Not yet subscribed to: {missing}")
Source code in doc-gen/valkey-glide/python/glide-async/python/glide/glide_client.py
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
async def get_subscriptions(
    self,
) -> GlideClusterClientConfiguration.PubSubState:
    """
    Retrieves both the desired and current subscription states as tracked by the client.

    This allows verification of synchronization between what the client intends to be
    subscribed to (desired) and what it is actually subscribed to on the server (actual).

    Returns:
        GlideClusterClientConfiguration.PubSubState: An object containing two attributes:
            - desired_subscriptions: Dict[PubSubChannelModes, Set[str]]
            - actual_subscriptions: Dict[PubSubChannelModes, Set[str]]

    Examples:
        >>> from glide import GlideClusterClientConfiguration
        >>> PubSubChannelModes = GlideClusterClientConfiguration.PubSubChannelModes
        >>>
        >>> # Get both subscription states
        >>> state = await client.get_subscriptions()
        >>> desired = state.desired_subscriptions
        >>> actual = state.actual_subscriptions
        >>>
        >>> # Check if subscribed to specific channel
        >>> if "channel1" in actual.get(PubSubChannelModes.Exact, set()):
        >>>     print("Subscribed to channel1")
        >>>
        >>> # Direct comparison with config
        >>> if client.config.pubsub_subscriptions.channels_and_patterns == desired:
        >>>     print("Config matches desired state")
        >>>
        >>> # Check if synchronized
        >>> if desired == actual:
        >>>     print("Subscriptions are synchronized")
        >>>
        >>> # Find missing subscriptions
        >>> missing = desired.get(PubSubChannelModes.Exact, set()) - actual.get(PubSubChannelModes.Exact, set())
        >>> if missing:
        >>>     print(f"Not yet subscribed to: {missing}")
    """
    result = await self._execute_command(RequestType.GetSubscriptions, [])
    return cast(
        GlideClusterClientConfiguration.PubSubState,
        self._parse_pubsub_state(result, is_cluster=True),
    )

Logger

A singleton class that allows logging which is consistent with logs from the internal GLIDE core. The logger can be set up in 2 ways - 1. By calling Logger.init, which configures the logger only if it wasn't previously configured. 2. By calling Logger.set_logger_config, which replaces the existing configuration, and means that new logs will not be saved with the logs that were sent before the call. If none of these functions are called, the first log attempt will initialize a new logger with default configuration.

Source code in doc-gen/valkey-glide/python/glide-async/python/glide/logger.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class Logger:
    """
    A singleton class that allows logging which is consistent with logs from the internal GLIDE core.
    The logger can be set up in 2 ways -
    1. By calling Logger.init, which configures the logger only if it wasn't previously configured.
    2. By calling Logger.set_logger_config, which replaces the existing configuration, and means that new logs will not be
    saved with the logs that were sent before the call.
    If none of these functions are called, the first log attempt will initialize a new logger with default configuration.
    """

    _instance = None
    logger_level: internalLevel

    def __init__(self, level: Optional[Level] = None, file_name: Optional[str] = None):
        level_value = level.value if level else None
        Logger.logger_level = py_init(level_value, file_name)

    @classmethod
    def init(cls, level: Optional[Level] = None, file_name: Optional[str] = None):
        """
        Initialize a logger if it wasn't initialized before - this method is meant to be used when there is no intention to
        replace an existing logger. Otherwise, use `set_logger_config` for overriding the existing logger configs.
        The logger will filter all logs with a level lower than the given level.
        If given a file_name argument, will write the logs to files postfixed with file_name. If file_name isn't provided,
        the logs will be written to the console.

        Args:
            level (Optional[Level]): Set the logger level to one of [ERROR, WARN, INFO, DEBUG, TRACE, OFF].
                If log level isn't provided, the logger will be configured with default configuration.
                To turn off logging completely, set the level to Level.OFF.
            file_name (Optional[str]): If provided the target of the logs will be the file mentioned.
                Otherwise, logs will be printed to the console.
        """
        if cls._instance is None:
            cls._instance = cls(level, file_name)

    @classmethod
    def log(
        cls,
        log_level: Level,
        log_identifier: str,
        message: str,
        err: Optional[Exception] = None,
    ):
        """
        Logs the provided message if the provided log level is lower then the logger level.

        Args:
            log_level (Level): The log level of the provided message.
            log_identifier (str): The log identifier should give the log a context.
            message (str): The message to log.
            err (Optional[Exception]): The exception or error to log.
        """
        if not cls._instance:
            cls._instance = cls(None)
        if not log_level.value.is_lower(Logger.logger_level):
            return
        if err:
            message = f"{message}: {traceback.format_exception(type(err), err, err.__traceback__)}"
        py_log(log_level.value, log_identifier, message)

    @classmethod
    def set_logger_config(
        cls, level: Optional[Level] = None, file_name: Optional[str] = None
    ):
        """
        Creates a new logger instance and configure it with the provided log level and file name.

        Args:
            level (Optional[Level]): Set the logger level to one of [ERROR, WARN, INFO, DEBUG, TRACE, OFF].
                If log level isn't provided, the logger will be configured with default configuration.
                To turn off logging completely, set the level to OFF.
            file_name (Optional[str]): If provided the target of the logs will be the file mentioned.
                Otherwise, logs will be printed to the console.
        """
        Logger._instance = Logger(level, file_name)

init(level=None, file_name=None) classmethod

Initialize a logger if it wasn't initialized before - this method is meant to be used when there is no intention to replace an existing logger. Otherwise, use set_logger_config for overriding the existing logger configs. The logger will filter all logs with a level lower than the given level. If given a file_name argument, will write the logs to files postfixed with file_name. If file_name isn't provided, the logs will be written to the console.

Parameters:

Name Type Description Default
level Optional[Level]

Set the logger level to one of [ERROR, WARN, INFO, DEBUG, TRACE, OFF]. If log level isn't provided, the logger will be configured with default configuration. To turn off logging completely, set the level to Level.OFF.

None
file_name Optional[str]

If provided the target of the logs will be the file mentioned. Otherwise, logs will be printed to the console.

None
Source code in doc-gen/valkey-glide/python/glide-async/python/glide/logger.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@classmethod
def init(cls, level: Optional[Level] = None, file_name: Optional[str] = None):
    """
    Initialize a logger if it wasn't initialized before - this method is meant to be used when there is no intention to
    replace an existing logger. Otherwise, use `set_logger_config` for overriding the existing logger configs.
    The logger will filter all logs with a level lower than the given level.
    If given a file_name argument, will write the logs to files postfixed with file_name. If file_name isn't provided,
    the logs will be written to the console.

    Args:
        level (Optional[Level]): Set the logger level to one of [ERROR, WARN, INFO, DEBUG, TRACE, OFF].
            If log level isn't provided, the logger will be configured with default configuration.
            To turn off logging completely, set the level to Level.OFF.
        file_name (Optional[str]): If provided the target of the logs will be the file mentioned.
            Otherwise, logs will be printed to the console.
    """
    if cls._instance is None:
        cls._instance = cls(level, file_name)

log(log_level, log_identifier, message, err=None) classmethod

Logs the provided message if the provided log level is lower then the logger level.

Parameters:

Name Type Description Default
log_level Level

The log level of the provided message.

required
log_identifier str

The log identifier should give the log a context.

required
message str

The message to log.

required
err Optional[Exception]

The exception or error to log.

None
Source code in doc-gen/valkey-glide/python/glide-async/python/glide/logger.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@classmethod
def log(
    cls,
    log_level: Level,
    log_identifier: str,
    message: str,
    err: Optional[Exception] = None,
):
    """
    Logs the provided message if the provided log level is lower then the logger level.

    Args:
        log_level (Level): The log level of the provided message.
        log_identifier (str): The log identifier should give the log a context.
        message (str): The message to log.
        err (Optional[Exception]): The exception or error to log.
    """
    if not cls._instance:
        cls._instance = cls(None)
    if not log_level.value.is_lower(Logger.logger_level):
        return
    if err:
        message = f"{message}: {traceback.format_exception(type(err), err, err.__traceback__)}"
    py_log(log_level.value, log_identifier, message)

set_logger_config(level=None, file_name=None) classmethod

Creates a new logger instance and configure it with the provided log level and file name.

Parameters:

Name Type Description Default
level Optional[Level]

Set the logger level to one of [ERROR, WARN, INFO, DEBUG, TRACE, OFF]. If log level isn't provided, the logger will be configured with default configuration. To turn off logging completely, set the level to OFF.

None
file_name Optional[str]

If provided the target of the logs will be the file mentioned. Otherwise, logs will be printed to the console.

None
Source code in doc-gen/valkey-glide/python/glide-async/python/glide/logger.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@classmethod
def set_logger_config(
    cls, level: Optional[Level] = None, file_name: Optional[str] = None
):
    """
    Creates a new logger instance and configure it with the provided log level and file name.

    Args:
        level (Optional[Level]): Set the logger level to one of [ERROR, WARN, INFO, DEBUG, TRACE, OFF].
            If log level isn't provided, the logger will be configured with default configuration.
            To turn off logging completely, set the level to OFF.
        file_name (Optional[str]): If provided the target of the logs will be the file mentioned.
            Otherwise, logs will be printed to the console.
    """
    Logger._instance = Logger(level, file_name)

OpenTelemetry

Singleton class for managing OpenTelemetry configuration and operations.

This class provides a centralized way to initialize OpenTelemetry and control sampling behavior at runtime.

Example usage
from glide import OpenTelemetry, OpenTelemetryConfig, OpenTelemetryTracesConfig, OpenTelemetryMetricsConfig

OpenTelemetry.init(OpenTelemetryConfig(
    traces=OpenTelemetryTracesConfig(
        endpoint="http://localhost:4318/v1/traces",
        sample_percentage=10  # Optional, defaults to 1. Can also be changed at runtime via set_sample_percentage().
    ),
    metrics=OpenTelemetryMetricsConfig(
        endpoint="http://localhost:4318/v1/metrics"
    ),
    flush_interval_ms=1000  # Optional, defaults to 5000
))
Note

OpenTelemetry can only be initialized once per process. Subsequent calls to init() will be ignored. This is by design, as OpenTelemetry is a global resource that should be configured once at application startup.

Source code in doc-gen/valkey-glide/python/glide-async/python/glide/opentelemetry.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class OpenTelemetry:
    """
    Singleton class for managing OpenTelemetry configuration and operations.

    This class provides a centralized way to initialize OpenTelemetry and control
    sampling behavior at runtime.

    Example usage:
        ```python
        from glide import OpenTelemetry, OpenTelemetryConfig, OpenTelemetryTracesConfig, OpenTelemetryMetricsConfig

        OpenTelemetry.init(OpenTelemetryConfig(
            traces=OpenTelemetryTracesConfig(
                endpoint="http://localhost:4318/v1/traces",
                sample_percentage=10  # Optional, defaults to 1. Can also be changed at runtime via set_sample_percentage().
            ),
            metrics=OpenTelemetryMetricsConfig(
                endpoint="http://localhost:4318/v1/metrics"
            ),
            flush_interval_ms=1000  # Optional, defaults to 5000
        ))
        ```

    Note:
        OpenTelemetry can only be initialized once per process. Subsequent calls to
        init() will be ignored. This is by design, as OpenTelemetry is a global
        resource that should be configured once at application startup.
    """

    _instance: Optional["OpenTelemetry"] = None
    _config: Optional[OpenTelemetryConfig] = None

    @classmethod
    def init(cls, config: OpenTelemetryConfig) -> None:
        """
        Initialize the OpenTelemetry instance.

        Args:
            config: The OpenTelemetry configuration

        Note:
            OpenTelemetry can only be initialized once per process.
            Subsequent calls will be ignored and a warning will be logged.
        """
        if not cls._instance:
            cls._config = config
            # Convert shared config to PyO3 config and initialize
            pyo3_config = _convert_to_pyo3_config(config)
            init_opentelemetry(pyo3_config)
            cls._instance = OpenTelemetry()
            Logger.log(
                Level.INFO,
                "GlideOpenTelemetry",
                "OpenTelemetry initialized successfully",
            )
            return

        Logger.log(
            Level.WARN,
            "GlideOpenTelemetry",
            "OpenTelemetry already initialized - ignoring new configuration",
        )

    @classmethod
    def is_initialized(cls) -> bool:
        """
        Check if the OpenTelemetry instance is initialized.

        Returns:
            bool: True if the OpenTelemetry instance is initialized, False otherwise
        """
        return cls._instance is not None

    @classmethod
    def get_sample_percentage(cls) -> Optional[int]:
        """
        Get the sample percentage for traces.

        Returns:
            Optional[int]: The sample percentage for traces only if OpenTelemetry is initialized
                and the traces config is set, otherwise None.
        """
        if cls._config and cls._config.traces:
            return cls._config.traces.sample_percentage
        return None

    @classmethod
    def should_sample(cls) -> bool:
        """
        Determines if the current request should be sampled for OpenTelemetry tracing.
        Uses the configured sample percentage to randomly decide whether to create a span for this request.

        Returns:
            bool: True if the request should be sampled, False otherwise
        """
        percentage = cls.get_sample_percentage()
        return (
            cls.is_initialized()
            and percentage is not None
            and random.random() * 100 < percentage
        )

    @classmethod
    def set_sample_percentage(cls, percentage: int) -> None:
        """
        Set the percentage of requests to be sampled and traced. Must be a value between 0 and 100.
        This setting only affects traces, not metrics.

        Args:
            percentage: The sample percentage 0-100

        Raises:
            ConfigurationError: If OpenTelemetry is not initialized or traces config is not set

        Remarks:
            This method can be called at runtime to change the sampling percentage
            without reinitializing OpenTelemetry.
        """
        if not cls._config or not cls._config.traces:
            raise ConfigurationError("OpenTelemetry traces not initialized")

        if percentage < 0 or percentage > 100:
            raise ConfigurationError("Sample percentage must be between 0 and 100")

        # Update the shared config directly
        cls._config.traces.sample_percentage = percentage

init(config) classmethod

Initialize the OpenTelemetry instance.

Parameters:

Name Type Description Default
config OpenTelemetryConfig

The OpenTelemetry configuration

required
Note

OpenTelemetry can only be initialized once per process. Subsequent calls will be ignored and a warning will be logged.

Source code in doc-gen/valkey-glide/python/glide-async/python/glide/opentelemetry.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@classmethod
def init(cls, config: OpenTelemetryConfig) -> None:
    """
    Initialize the OpenTelemetry instance.

    Args:
        config: The OpenTelemetry configuration

    Note:
        OpenTelemetry can only be initialized once per process.
        Subsequent calls will be ignored and a warning will be logged.
    """
    if not cls._instance:
        cls._config = config
        # Convert shared config to PyO3 config and initialize
        pyo3_config = _convert_to_pyo3_config(config)
        init_opentelemetry(pyo3_config)
        cls._instance = OpenTelemetry()
        Logger.log(
            Level.INFO,
            "GlideOpenTelemetry",
            "OpenTelemetry initialized successfully",
        )
        return

    Logger.log(
        Level.WARN,
        "GlideOpenTelemetry",
        "OpenTelemetry already initialized - ignoring new configuration",
    )

is_initialized() classmethod

Check if the OpenTelemetry instance is initialized.

Returns:

Name Type Description
bool bool

True if the OpenTelemetry instance is initialized, False otherwise

Source code in doc-gen/valkey-glide/python/glide-async/python/glide/opentelemetry.py
119
120
121
122
123
124
125
126
127
@classmethod
def is_initialized(cls) -> bool:
    """
    Check if the OpenTelemetry instance is initialized.

    Returns:
        bool: True if the OpenTelemetry instance is initialized, False otherwise
    """
    return cls._instance is not None

get_sample_percentage() classmethod

Get the sample percentage for traces.

Returns:

Type Description
Optional[int]

Optional[int]: The sample percentage for traces only if OpenTelemetry is initialized and the traces config is set, otherwise None.

Source code in doc-gen/valkey-glide/python/glide-async/python/glide/opentelemetry.py
129
130
131
132
133
134
135
136
137
138
139
140
@classmethod
def get_sample_percentage(cls) -> Optional[int]:
    """
    Get the sample percentage for traces.

    Returns:
        Optional[int]: The sample percentage for traces only if OpenTelemetry is initialized
            and the traces config is set, otherwise None.
    """
    if cls._config and cls._config.traces:
        return cls._config.traces.sample_percentage
    return None

should_sample() classmethod

Determines if the current request should be sampled for OpenTelemetry tracing. Uses the configured sample percentage to randomly decide whether to create a span for this request.

Returns:

Name Type Description
bool bool

True if the request should be sampled, False otherwise

Source code in doc-gen/valkey-glide/python/glide-async/python/glide/opentelemetry.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@classmethod
def should_sample(cls) -> bool:
    """
    Determines if the current request should be sampled for OpenTelemetry tracing.
    Uses the configured sample percentage to randomly decide whether to create a span for this request.

    Returns:
        bool: True if the request should be sampled, False otherwise
    """
    percentage = cls.get_sample_percentage()
    return (
        cls.is_initialized()
        and percentage is not None
        and random.random() * 100 < percentage
    )

set_sample_percentage(percentage) classmethod

Set the percentage of requests to be sampled and traced. Must be a value between 0 and 100. This setting only affects traces, not metrics.

Parameters:

Name Type Description Default
percentage int

The sample percentage 0-100

required

Raises:

Type Description
ConfigurationError

If OpenTelemetry is not initialized or traces config is not set

Remarks

This method can be called at runtime to change the sampling percentage without reinitializing OpenTelemetry.

Source code in doc-gen/valkey-glide/python/glide-async/python/glide/opentelemetry.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@classmethod
def set_sample_percentage(cls, percentage: int) -> None:
    """
    Set the percentage of requests to be sampled and traced. Must be a value between 0 and 100.
    This setting only affects traces, not metrics.

    Args:
        percentage: The sample percentage 0-100

    Raises:
        ConfigurationError: If OpenTelemetry is not initialized or traces config is not set

    Remarks:
        This method can be called at runtime to change the sampling percentage
        without reinitializing OpenTelemetry.
    """
    if not cls._config or not cls._config.traces:
        raise ConfigurationError("OpenTelemetry traces not initialized")

    if percentage < 0 or percentage > 100:
        raise ConfigurationError("Sample percentage must be between 0 and 100")

    # Update the shared config directly
    cls._config.traces.sample_percentage = percentage