from typing import Dict, List, Optional, Tuple, Union from redis.exceptions import DataError from redis.typing import KeyT, Number ADD_CMD = "TS.ADD" ALTER_CMD = "TS.ALTER" CREATERULE_CMD = "TS.CREATERULE" CREATE_CMD = "TS.CREATE" DECRBY_CMD = "TS.DECRBY" DELETERULE_CMD = "TS.DELETERULE" DEL_CMD = "TS.DEL" GET_CMD = "TS.GET" INCRBY_CMD = "TS.INCRBY" INFO_CMD = "TS.INFO" MADD_CMD = "TS.MADD" MGET_CMD = "TS.MGET" MRANGE_CMD = "TS.MRANGE" MREVRANGE_CMD = "TS.MREVRANGE" QUERYINDEX_CMD = "TS.QUERYINDEX" RANGE_CMD = "TS.RANGE" REVRANGE_CMD = "TS.REVRANGE" class TimeSeriesCommands: """RedisTimeSeries Commands.""" def create( self, key: KeyT, retention_msecs: Optional[int] = None, uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, ignore_max_time_diff: Optional[int] = None, ignore_max_val_diff: Optional[Number] = None, ): """ Create a new time-series. For more information see https://redis.io/commands/ts.create/ Args: key: The time-series key. retention_msecs: Maximum age for samples, compared to the highest reported timestamp in milliseconds. If `None` or `0` is passed, the series is not trimmed at all. uncompressed: Changes data storage from compressed (default) to uncompressed. labels: A dictionary of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. Must be a multiple of 8 in the range `[48..1048576]`. In earlier versions of the module the minimum value was different. duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': An error will occur and the new value will be ignored. - 'first': Ignore the new value. - 'last': Override with the latest value. - 'min': Only override if the value is lower than the existing value. - 'max': Only override if the value is higher than the existing value. - 'sum': If a previous sample exists, add the new sample to it so that the updated value is equal to (previous + new). If no previous sample exists, set the updated value equal to the new value. ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last timestamp and the new timestamp is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_val_diff` is also set. Available since RedisTimeSeries version 1.12.0. ignore_max_val_diff: A non-negative floating point value, that sets an ignore threshold for added values. If the difference between the last value and the new value is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is also set. Available since RedisTimeSeries version 1.12.0. """ params = [key] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) self._append_insertion_filters( params, ignore_max_time_diff, ignore_max_val_diff ) return self.execute_command(CREATE_CMD, *params) def alter( self, key: KeyT, retention_msecs: Optional[int] = None, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, ignore_max_time_diff: Optional[int] = None, ignore_max_val_diff: Optional[Number] = None, ): """ Update an existing time series. For more information see https://redis.io/commands/ts.alter/ Args: key: The time-series key. retention_msecs: Maximum age for samples, compared to the highest reported timestamp in milliseconds. If `None` or `0` is passed, the series is not trimmed at all. labels: A dictionary of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. Must be a multiple of 8 in the range `[48..1048576]`. In earlier versions of the module the minimum value was different. Changing this value does not affect existing chunks. duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': An error will occur and the new value will be ignored. - 'first': Ignore the new value. - 'last': Override with the latest value. - 'min': Only override if the value is lower than the existing value. - 'max': Only override if the value is higher than the existing value. - 'sum': If a previous sample exists, add the new sample to it so that the updated value is equal to (previous + new). If no previous sample exists, set the updated value equal to the new value. ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last timestamp and the new timestamp is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_val_diff` is also set. Available since RedisTimeSeries version 1.12.0. ignore_max_val_diff: A non-negative floating point value, that sets an ignore threshold for added values. If the difference between the last value and the new value is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is also set. Available since RedisTimeSeries version 1.12.0. """ params = [key] self._append_retention(params, retention_msecs) self._append_chunk_size(params, chunk_size) self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) self._append_insertion_filters( params, ignore_max_time_diff, ignore_max_val_diff ) return self.execute_command(ALTER_CMD, *params) def add( self, key: KeyT, timestamp: Union[int, str], value: Number, retention_msecs: Optional[int] = None, uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, ignore_max_time_diff: Optional[int] = None, ignore_max_val_diff: Optional[Number] = None, on_duplicate: Optional[str] = None, ): """ Append a sample to a time series. When the specified key does not exist, a new time series is created. For more information see https://redis.io/commands/ts.add/ Args: key: The time-series key. timestamp: Timestamp of the sample. `*` can be used for automatic timestamp (using the system clock). value: Numeric data value of the sample. retention_msecs: Maximum age for samples, compared to the highest reported timestamp in milliseconds. If `None` or `0` is passed, the series is not trimmed at all. uncompressed: Changes data storage from compressed (default) to uncompressed. labels: A dictionary of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. Must be a multiple of 8 in the range `[48..1048576]`. In earlier versions of the module the minimum value was different. duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': An error will occur and the new value will be ignored. - 'first': Ignore the new value. - 'last': Override with the latest value. - 'min': Only override if the value is lower than the existing value. - 'max': Only override if the value is higher than the existing value. - 'sum': If a previous sample exists, add the new sample to it so that the updated value is equal to (previous + new). If no previous sample exists, set the updated value equal to the new value. ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last timestamp and the new timestamp is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_val_diff` is also set. Available since RedisTimeSeries version 1.12.0. ignore_max_val_diff: A non-negative floating point value, that sets an ignore threshold for added values. If the difference between the last value and the new value is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is also set. Available since RedisTimeSeries version 1.12.0. on_duplicate: Use a specific duplicate policy for the specified timestamp. Overrides the duplicate policy set by `duplicate_policy`. """ params = [key, timestamp, value] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) self._append_insertion_filters( params, ignore_max_time_diff, ignore_max_val_diff ) self._append_on_duplicate(params, on_duplicate) return self.execute_command(ADD_CMD, *params) def madd(self, ktv_tuples: List[Tuple[KeyT, Union[int, str], Number]]): """ Append new samples to one or more time series. Each time series must already exist. The method expects a list of tuples. Each tuple should contain three elements: (`key`, `timestamp`, `value`). The `value` will be appended to the time series identified by 'key', at the given 'timestamp'. For more information see https://redis.io/commands/ts.madd/ Args: ktv_tuples: A list of tuples, where each tuple contains: - `key`: The key of the time series. - `timestamp`: The timestamp at which the value should be appended. - `value`: The value to append to the time series. Returns: A list that contains, for each sample, either the timestamp that was used, or an error, if the sample could not be added. """ params = [] for ktv in ktv_tuples: params.extend(ktv) return self.execute_command(MADD_CMD, *params) def incrby( self, key: KeyT, value: Number, timestamp: Optional[Union[int, str]] = None, retention_msecs: Optional[int] = None, uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, ignore_max_time_diff: Optional[int] = None, ignore_max_val_diff: Optional[Number] = None, ): """ Increment the latest sample's of a series. When the specified key does not exist, a new time series is created. This command can be used as a counter or gauge that automatically gets history as a time series. For more information see https://redis.io/commands/ts.incrby/ Args: key: The time-series key. value: Numeric value to be added (addend). timestamp: Timestamp of the sample. `*` can be used for automatic timestamp (using the system clock). `timestamp` must be equal to or higher than the maximum existing timestamp in the series. When equal, the value of the sample with the maximum existing timestamp is increased. If it is higher, a new sample with a timestamp set to `timestamp` is created, and its value is set to the value of the sample with the maximum existing timestamp plus the addend. retention_msecs: Maximum age for samples, compared to the highest reported timestamp in milliseconds. If `None` or `0` is passed, the series is not trimmed at all. uncompressed: Changes data storage from compressed (default) to uncompressed. labels: A dictionary of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. Must be a multiple of 8 in the range `[48..1048576]`. In earlier versions of the module the minimum value was different. duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': An error will occur and the new value will be ignored. - 'first': Ignore the new value. - 'last': Override with the latest value. - 'min': Only override if the value is lower than the existing value. - 'max': Only override if the value is higher than the existing value. - 'sum': If a previous sample exists, add the new sample to it so that the updated value is equal to (previous + new). If no previous sample exists, set the updated value equal to the new value. ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last timestamp and the new timestamp is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_val_diff` is also set. Available since RedisTimeSeries version 1.12.0. ignore_max_val_diff: A non-negative floating point value, that sets an ignore threshold for added values. If the difference between the last value and the new value is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is also set. Available since RedisTimeSeries version 1.12.0. Returns: The timestamp of the sample that was modified or added. """ params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) self._append_insertion_filters( params, ignore_max_time_diff, ignore_max_val_diff ) return self.execute_command(INCRBY_CMD, *params) def decrby( self, key: KeyT, value: Number, timestamp: Optional[Union[int, str]] = None, retention_msecs: Optional[int] = None, uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, ignore_max_time_diff: Optional[int] = None, ignore_max_val_diff: Optional[Number] = None, ): """ Decrement the latest sample's of a series. When the specified key does not exist, a new time series is created. This command can be used as a counter or gauge that automatically gets history as a time series. For more information see https://redis.io/commands/ts.decrby/ Args: key: The time-series key. value: Numeric value to subtract (subtrahend). timestamp: Timestamp of the sample. `*` can be used for automatic timestamp (using the system clock). `timestamp` must be equal to or higher than the maximum existing timestamp in the series. When equal, the value of the sample with the maximum existing timestamp is decreased. If it is higher, a new sample with a timestamp set to `timestamp` is created, and its value is set to the value of the sample with the maximum existing timestamp minus subtrahend. retention_msecs: Maximum age for samples, compared to the highest reported timestamp in milliseconds. If `None` or `0` is passed, the series is not trimmed at all. uncompressed: Changes data storage from compressed (default) to uncompressed. labels: A dictionary of label-value pairs that represent metadata labels of the key. chunk_size: Memory size, in bytes, allocated for each data chunk. Must be a multiple of 8 in the range `[48..1048576]`. In earlier versions of the module the minimum value was different. duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': An error will occur and the new value will be ignored. - 'first': Ignore the new value. - 'last': Override with the latest value. - 'min': Only override if the value is lower than the existing value. - 'max': Only override if the value is higher than the existing value. - 'sum': If a previous sample exists, add the new sample to it so that the updated value is equal to (previous + new). If no previous sample exists, set the updated value equal to the new value. ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last timestamp and the new timestamp is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_val_diff` is also set. Available since RedisTimeSeries version 1.12.0. ignore_max_val_diff: A non-negative floating point value, that sets an ignore threshold for added values. If the difference between the last value and the new value is lower than this threshold, the new entry is ignored. Only applicable if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is also set. Available since RedisTimeSeries version 1.12.0. Returns: The timestamp of the sample that was modified or added. """ params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) self._append_insertion_filters( params, ignore_max_time_diff, ignore_max_val_diff ) return self.execute_command(DECRBY_CMD, *params) def delete(self, key: KeyT, from_time: int, to_time: int): """ Delete all samples between two timestamps for a given time series. The given timestamp interval is closed (inclusive), meaning that samples whose timestamp equals `from_time` or `to_time` are also deleted. For more information see https://redis.io/commands/ts.del/ Args: key: The time-series key. from_time: Start timestamp for the range deletion. to_time: End timestamp for the range deletion. Returns: The number of samples deleted. """ return self.execute_command(DEL_CMD, key, from_time, to_time) def createrule( self, source_key: KeyT, dest_key: KeyT, aggregation_type: str, bucket_size_msec: int, align_timestamp: Optional[int] = None, ): """ Create a compaction rule from values added to `source_key` into `dest_key`. For more information see https://redis.io/commands/ts.createrule/ Args: source_key: Key name for source time series. dest_key: Key name for destination (compacted) time series. aggregation_type: Aggregation type: One of the following: [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Duration of each bucket, in milliseconds. align_timestamp: Assure that there is a bucket that starts at exactly align_timestamp and align all other buckets accordingly. """ params = [source_key, dest_key] self._append_aggregation(params, aggregation_type, bucket_size_msec) if align_timestamp is not None: params.append(align_timestamp) return self.execute_command(CREATERULE_CMD, *params) def deleterule(self, source_key: KeyT, dest_key: KeyT): """ Delete a compaction rule from `source_key` to `dest_key`. For more information see https://redis.io/commands/ts.deleterule/ """ return self.execute_command(DELETERULE_CMD, source_key, dest_key) def __range_params( self, key: KeyT, from_time: Union[int, str], to_time: Union[int, str], count: Optional[int], aggregation_type: Optional[str], bucket_size_msec: Optional[int], filter_by_ts: Optional[List[int]], filter_by_min_value: Optional[int], filter_by_max_value: Optional[int], align: Optional[Union[int, str]], latest: Optional[bool], bucket_timestamp: Optional[str], empty: Optional[bool], ): """Create TS.RANGE and TS.REVRANGE arguments.""" params = [key, from_time, to_time] self._append_latest(params, latest) self._append_filer_by_ts(params, filter_by_ts) self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value) self._append_count(params, count) self._append_align(params, align) self._append_aggregation(params, aggregation_type, bucket_size_msec) self._append_bucket_timestamp(params, bucket_timestamp) self._append_empty(params, empty) return params def range( self, key: KeyT, from_time: Union[int, str], to_time: Union[int, str], count: Optional[int] = None, aggregation_type: Optional[str] = None, bucket_size_msec: Optional[int] = 0, filter_by_ts: Optional[List[int]] = None, filter_by_min_value: Optional[int] = None, filter_by_max_value: Optional[int] = None, align: Optional[Union[int, str]] = None, latest: Optional[bool] = False, bucket_timestamp: Optional[str] = None, empty: Optional[bool] = False, ): """ Query a range in forward direction for a specific time-series. For more information see https://redis.io/commands/ts.range/ Args: key: Key name for timeseries. from_time: Start timestamp for the range query. `-` can be used to express the minimum possible timestamp (0). to_time: End timestamp for range query, `+` can be used to express the maximum possible timestamp. count: Limits the number of returned samples. aggregation_type: Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: Filter result by minimum value (must mention also `filter by_max_value`). filter_by_max_value: Filter result by maximum value (must mention also `filter by_min_value`). align: Timestamp for alignment control for aggregation. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket. bucket_timestamp: Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, `high`, `~`, `mid`]. empty: Reports aggregations for empty buckets. """ params = self.__range_params( key, from_time, to_time, count, aggregation_type, bucket_size_msec, filter_by_ts, filter_by_min_value, filter_by_max_value, align, latest, bucket_timestamp, empty, ) return self.execute_command(RANGE_CMD, *params) def revrange( self, key: KeyT, from_time: Union[int, str], to_time: Union[int, str], count: Optional[int] = None, aggregation_type: Optional[str] = None, bucket_size_msec: Optional[int] = 0, filter_by_ts: Optional[List[int]] = None, filter_by_min_value: Optional[int] = None, filter_by_max_value: Optional[int] = None, align: Optional[Union[int, str]] = None, latest: Optional[bool] = False, bucket_timestamp: Optional[str] = None, empty: Optional[bool] = False, ): """ Query a range in reverse direction for a specific time-series. **Note**: This command is only available since RedisTimeSeries >= v1.4 For more information see https://redis.io/commands/ts.revrange/ Args: key: Key name for timeseries. from_time: Start timestamp for the range query. `-` can be used to express the minimum possible timestamp (0). to_time: End timestamp for range query, `+` can be used to express the maximum possible timestamp. count: Limits the number of returned samples. aggregation_type: Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: Filter result by minimum value (must mention also `filter_by_max_value`). filter_by_max_value: Filter result by maximum value (must mention also `filter_by_min_value`). align: Timestamp for alignment control for aggregation. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket. bucket_timestamp: Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, `high`, `~`, `mid`]. empty: Reports aggregations for empty buckets. """ params = self.__range_params( key, from_time, to_time, count, aggregation_type, bucket_size_msec, filter_by_ts, filter_by_min_value, filter_by_max_value, align, latest, bucket_timestamp, empty, ) return self.execute_command(REVRANGE_CMD, *params) def __mrange_params( self, aggregation_type: Optional[str], bucket_size_msec: Optional[int], count: Optional[int], filters: List[str], from_time: Union[int, str], to_time: Union[int, str], with_labels: Optional[bool], filter_by_ts: Optional[List[int]], filter_by_min_value: Optional[int], filter_by_max_value: Optional[int], groupby: Optional[str], reduce: Optional[str], select_labels: Optional[List[str]], align: Optional[Union[int, str]], latest: Optional[bool], bucket_timestamp: Optional[str], empty: Optional[bool], ): """Create TS.MRANGE and TS.MREVRANGE arguments.""" params = [from_time, to_time] self._append_latest(params, latest) self._append_filer_by_ts(params, filter_by_ts) self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value) self._append_with_labels(params, with_labels, select_labels) self._append_count(params, count) self._append_align(params, align) self._append_aggregation(params, aggregation_type, bucket_size_msec) self._append_bucket_timestamp(params, bucket_timestamp) self._append_empty(params, empty) params.extend(["FILTER"]) params += filters self._append_groupby_reduce(params, groupby, reduce) return params def mrange( self, from_time: Union[int, str], to_time: Union[int, str], filters: List[str], count: Optional[int] = None, aggregation_type: Optional[str] = None, bucket_size_msec: Optional[int] = 0, with_labels: Optional[bool] = False, filter_by_ts: Optional[List[int]] = None, filter_by_min_value: Optional[int] = None, filter_by_max_value: Optional[int] = None, groupby: Optional[str] = None, reduce: Optional[str] = None, select_labels: Optional[List[str]] = None, align: Optional[Union[int, str]] = None, latest: Optional[bool] = False, bucket_timestamp: Optional[str] = None, empty: Optional[bool] = False, ): """ Query a range across multiple time-series by filters in forward direction. For more information see https://redis.io/commands/ts.mrange/ Args: from_time: Start timestamp for the range query. `-` can be used to express the minimum possible timestamp (0). to_time: End timestamp for range query, `+` can be used to express the maximum possible timestamp. filters: Filter to match the time-series labels. count: Limits the number of returned samples. aggregation_type: Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. with_labels: Include in the reply all label-value pairs representing metadata labels of the time series. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: Filter result by minimum value (must mention also `filter_by_max_value`). filter_by_max_value: Filter result by maximum value (must mention also `filter_by_min_value`). groupby: Grouping by fields the results (must mention also `reduce`). reduce: Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. select_labels: Include in the reply only a subset of the key-value pair labels of a series. align: Timestamp for alignment control for aggregation. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket. bucket_timestamp: Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, `high`, `~`, `mid`]. empty: Reports aggregations for empty buckets. """ params = self.__mrange_params( aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels, filter_by_ts, filter_by_min_value, filter_by_max_value, groupby, reduce, select_labels, align, latest, bucket_timestamp, empty, ) return self.execute_command(MRANGE_CMD, *params) def mrevrange( self, from_time: Union[int, str], to_time: Union[int, str], filters: List[str], count: Optional[int] = None, aggregation_type: Optional[str] = None, bucket_size_msec: Optional[int] = 0, with_labels: Optional[bool] = False, filter_by_ts: Optional[List[int]] = None, filter_by_min_value: Optional[int] = None, filter_by_max_value: Optional[int] = None, groupby: Optional[str] = None, reduce: Optional[str] = None, select_labels: Optional[List[str]] = None, align: Optional[Union[int, str]] = None, latest: Optional[bool] = False, bucket_timestamp: Optional[str] = None, empty: Optional[bool] = False, ): """ Query a range across multiple time-series by filters in reverse direction. For more information see https://redis.io/commands/ts.mrevrange/ Args: from_time: Start timestamp for the range query. '-' can be used to express the minimum possible timestamp (0). to_time: End timestamp for range query, '+' can be used to express the maximum possible timestamp. filters: Filter to match the time-series labels. count: Limits the number of returned samples. aggregation_type: Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`]. bucket_size_msec: Time bucket for aggregation in milliseconds. with_labels: Include in the reply all label-value pairs representing metadata labels of the time series. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: Filter result by minimum value (must mention also `filter_by_max_value`). filter_by_max_value: Filter result by maximum value (must mention also `filter_by_min_value`). groupby: Grouping by fields the results (must mention also `reduce`). reduce: Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. select_labels: Include in the reply only a subset of the key-value pair labels of a series. align: Timestamp for alignment control for aggregation. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket. bucket_timestamp: Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, `high`, `~`, `mid`]. empty: Reports aggregations for empty buckets. """ params = self.__mrange_params( aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels, filter_by_ts, filter_by_min_value, filter_by_max_value, groupby, reduce, select_labels, align, latest, bucket_timestamp, empty, ) return self.execute_command(MREVRANGE_CMD, *params) def get(self, key: KeyT, latest: Optional[bool] = False): """ Get the last sample of `key`. For more information see https://redis.io/commands/ts.get/ Args: latest: Used when a time series is a compaction, reports the compacted value of the latest (possibly partial) bucket. """ params = [key] self._append_latest(params, latest) return self.execute_command(GET_CMD, *params) def mget( self, filters: List[str], with_labels: Optional[bool] = False, select_labels: Optional[List[str]] = None, latest: Optional[bool] = False, ): """ Get the last samples matching the specific `filter`. For more information see https://redis.io/commands/ts.mget/ Args: filters: Filter to match the time-series labels. with_labels: Include in the reply all label-value pairs representing metadata labels of the time series. select_labels: Include in the reply only a subset of the key-value pair labels o the time series. latest: Used when a time series is a compaction, reports the compacted value of the latest possibly partial bucket. """ params = [] self._append_latest(params, latest) self._append_with_labels(params, with_labels, select_labels) params.extend(["FILTER"]) params += filters return self.execute_command(MGET_CMD, *params) def info(self, key: KeyT): """ Get information of `key`. For more information see https://redis.io/commands/ts.info/ """ return self.execute_command(INFO_CMD, key) def queryindex(self, filters: List[str]): """ Get all time series keys matching the `filter` list. For more information see https://redis.io/commands/ts.queryindex/ """ return self.execute_command(QUERYINDEX_CMD, *filters) @staticmethod def _append_uncompressed(params: List[str], uncompressed: Optional[bool]): """Append UNCOMPRESSED tag to params.""" if uncompressed: params.extend(["ENCODING", "UNCOMPRESSED"]) @staticmethod def _append_with_labels( params: List[str], with_labels: Optional[bool], select_labels: Optional[List[str]], ): """Append labels behavior to params.""" if with_labels and select_labels: raise DataError( "with_labels and select_labels cannot be provided together." ) if with_labels: params.extend(["WITHLABELS"]) if select_labels: params.extend(["SELECTED_LABELS", *select_labels]) @staticmethod def _append_groupby_reduce( params: List[str], groupby: Optional[str], reduce: Optional[str] ): """Append GROUPBY REDUCE property to params.""" if groupby is not None and reduce is not None: params.extend(["GROUPBY", groupby, "REDUCE", reduce.upper()]) @staticmethod def _append_retention(params: List[str], retention: Optional[int]): """Append RETENTION property to params.""" if retention is not None: params.extend(["RETENTION", retention]) @staticmethod def _append_labels(params: List[str], labels: Optional[List[str]]): """Append LABELS property to params.""" if labels: params.append("LABELS") for k, v in labels.items(): params.extend([k, v]) @staticmethod def _append_count(params: List[str], count: Optional[int]): """Append COUNT property to params.""" if count is not None: params.extend(["COUNT", count]) @staticmethod def _append_timestamp(params: List[str], timestamp: Optional[int]): """Append TIMESTAMP property to params.""" if timestamp is not None: params.extend(["TIMESTAMP", timestamp]) @staticmethod def _append_align(params: List[str], align: Optional[Union[int, str]]): """Append ALIGN property to params.""" if align is not None: params.extend(["ALIGN", align]) @staticmethod def _append_aggregation( params: List[str], aggregation_type: Optional[str], bucket_size_msec: Optional[int], ): """Append AGGREGATION property to params.""" if aggregation_type is not None: params.extend(["AGGREGATION", aggregation_type, bucket_size_msec]) @staticmethod def _append_chunk_size(params: List[str], chunk_size: Optional[int]): """Append CHUNK_SIZE property to params.""" if chunk_size is not None: params.extend(["CHUNK_SIZE", chunk_size]) @staticmethod def _append_duplicate_policy(params: List[str], duplicate_policy: Optional[str]): """Append DUPLICATE_POLICY property to params.""" if duplicate_policy is not None: params.extend(["DUPLICATE_POLICY", duplicate_policy]) @staticmethod def _append_on_duplicate(params: List[str], on_duplicate: Optional[str]): """Append ON_DUPLICATE property to params.""" if on_duplicate is not None: params.extend(["ON_DUPLICATE", on_duplicate]) @staticmethod def _append_filer_by_ts(params: List[str], ts_list: Optional[List[int]]): """Append FILTER_BY_TS property to params.""" if ts_list is not None: params.extend(["FILTER_BY_TS", *ts_list]) @staticmethod def _append_filer_by_value( params: List[str], min_value: Optional[int], max_value: Optional[int] ): """Append FILTER_BY_VALUE property to params.""" if min_value is not None and max_value is not None: params.extend(["FILTER_BY_VALUE", min_value, max_value]) @staticmethod def _append_latest(params: List[str], latest: Optional[bool]): """Append LATEST property to params.""" if latest: params.append("LATEST") @staticmethod def _append_bucket_timestamp(params: List[str], bucket_timestamp: Optional[str]): """Append BUCKET_TIMESTAMP property to params.""" if bucket_timestamp is not None: params.extend(["BUCKETTIMESTAMP", bucket_timestamp]) @staticmethod def _append_empty(params: List[str], empty: Optional[bool]): """Append EMPTY property to params.""" if empty: params.append("EMPTY") @staticmethod def _append_insertion_filters( params: List[str], ignore_max_time_diff: Optional[int] = None, ignore_max_val_diff: Optional[Number] = None, ): """Append insertion filters to params.""" if (ignore_max_time_diff is None) != (ignore_max_val_diff is None): raise ValueError( "Both ignore_max_time_diff and ignore_max_val_diff must be set." ) if ignore_max_time_diff is not None and ignore_max_val_diff is not None: params.extend( ["IGNORE", str(ignore_max_time_diff), str(ignore_max_val_diff)] )