Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 67ab74d

Browse files
Add new stream commands (#3711)
* Add new stream commands * optimization changes
1 parent 4c9512b commit 67ab74d

File tree

3 files changed

+351
-0
lines changed

3 files changed

+351
-0
lines changed

‎redis/commands/core.py‎

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3484,6 +3484,28 @@ def xack(self, name: KeyT, groupname: GroupT, *ids: StreamIdT) -> ResponseT:
34843484
"""
34853485
return self.execute_command("XACK", name, groupname, *ids)
34863486

3487+
def xackdel(
3488+
self,
3489+
name: KeyT,
3490+
groupname: GroupT,
3491+
*ids: StreamIdT,
3492+
ref_policy: Literal["KEEPREF", "DELREF", "ACKED"] = "KEEPREF",
3493+
) -> ResponseT:
3494+
"""
3495+
Combines the functionality of XACK and XDEL. Acknowledges the specified
3496+
message IDs in the given consumer group and simultaneously attempts to
3497+
delete the corresponding entries from the stream.
3498+
"""
3499+
if not ids:
3500+
raise DataError("XACKDEL requires at least one message ID")
3501+
3502+
if ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
3503+
raise DataError("XACKDEL ref_policy must be one of: KEEPREF, DELREF, ACKED")
3504+
3505+
pieces = [name, groupname, ref_policy, "IDS", len(ids)]
3506+
pieces.extend(ids)
3507+
return self.execute_command("XACKDEL", *pieces)
3508+
34873509
def xadd(
34883510
self,
34893511
name: KeyT,
@@ -3494,6 +3516,7 @@ def xadd(
34943516
nomkstream: bool = False,
34953517
minid: Union[StreamIdT, None] = None,
34963518
limit: Optional[int] = None,
3519+
ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None,
34973520
) -> ResponseT:
34983521
"""
34993522
Add to a stream.
@@ -3507,13 +3530,20 @@ def xadd(
35073530
minid: the minimum id in the stream to query.
35083531
Can't be specified with maxlen.
35093532
limit: specifies the maximum number of entries to retrieve
3533+
ref_policy: optional reference policy for consumer groups when trimming:
3534+
- KEEPREF (default): When trimming, preserves references in consumer groups' PEL
3535+
- DELREF: When trimming, removes all references from consumer groups' PEL
3536+
- ACKED: When trimming, only removes entries acknowledged by all consumer groups
35103537
35113538
For more information see https://redis.io/commands/xadd
35123539
"""
35133540
pieces: list[EncodableT] = []
35143541
if maxlen is not None and minid is not None:
35153542
raise DataError("Only one of ```maxlen``` or ```minid``` may be specified")
35163543

3544+
if ref_policy is not None and ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
3545+
raise DataError("XADD ref_policy must be one of: KEEPREF, DELREF, ACKED")
3546+
35173547
if maxlen is not None:
35183548
if not isinstance(maxlen, int) or maxlen < 0:
35193549
raise DataError("XADD maxlen must be non-negative integer")
@@ -3530,6 +3560,8 @@ def xadd(
35303560
pieces.extend([b"LIMIT", limit])
35313561
if nomkstream:
35323562
pieces.append(b"NOMKSTREAM")
3563+
if ref_policy is not None:
3564+
pieces.append(ref_policy)
35333565
pieces.append(id)
35343566
if not isinstance(fields, dict) or len(fields) == 0:
35353567
raise DataError("XADD fields must be a non-empty dict")
@@ -3683,6 +3715,26 @@ def xdel(self, name: KeyT, *ids: StreamIdT) -> ResponseT:
36833715
"""
36843716
return self.execute_command("XDEL", name, *ids)
36853717

3718+
def xdelex(
3719+
self,
3720+
name: KeyT,
3721+
*ids: StreamIdT,
3722+
ref_policy: Literal["KEEPREF", "DELREF", "ACKED"] = "KEEPREF",
3723+
) -> ResponseT:
3724+
"""
3725+
Extended version of XDEL that provides more control over how message entries
3726+
are deleted concerning consumer groups.
3727+
"""
3728+
if not ids:
3729+
raise DataError("XDELEX requires at least one message ID")
3730+
3731+
if ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
3732+
raise DataError("XDELEX ref_policy must be one of: KEEPREF, DELREF, ACKED")
3733+
3734+
pieces = [name, ref_policy, "IDS", len(ids)]
3735+
pieces.extend(ids)
3736+
return self.execute_command("XDELEX", *pieces)
3737+
36863738
def xgroup_create(
36873739
self,
36883740
name: KeyT,
@@ -4034,6 +4086,7 @@ def xtrim(
40344086
approximate: bool = True,
40354087
minid: Union[StreamIdT, None] = None,
40364088
limit: Optional[int] = None,
4089+
ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None,
40374090
) -> ResponseT:
40384091
"""
40394092
Trims old messages from a stream.
@@ -4044,6 +4097,10 @@ def xtrim(
40444097
minid: the minimum id in the stream to query
40454098
Can't be specified with maxlen.
40464099
limit: specifies the maximum number of entries to retrieve
4100+
ref_policy: optional reference policy for consumer groups:
4101+
- KEEPREF (default): Trims entries but preserves references in consumer groups' PEL
4102+
- DELREF: Trims entries and removes all references from consumer groups' PEL
4103+
- ACKED: Only trims entries that were read and acknowledged by all consumer groups
40474104
40484105
For more information see https://redis.io/commands/xtrim
40494106
"""
@@ -4054,6 +4111,9 @@ def xtrim(
40544111
if maxlen is None and minid is None:
40554112
raise DataError("One of ``maxlen`` or ``minid`` must be specified")
40564113

4114+
if ref_policy is not None and ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
4115+
raise DataError("XTRIM ref_policy must be one of: KEEPREF, DELREF, ACKED")
4116+
40574117
if maxlen is not None:
40584118
pieces.append(b"MAXLEN")
40594119
if minid is not None:
@@ -4067,6 +4127,8 @@ def xtrim(
40674127
if limit is not None:
40684128
pieces.append(b"LIMIT")
40694129
pieces.append(limit)
4130+
if ref_policy is not None:
4131+
pieces.append(ref_policy)
40704132

40714133
return self.execute_command("XTRIM", name, *pieces)
40724134

‎tests/test_asyncio/test_commands.py‎

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3465,6 +3465,156 @@ async def test_xtrim(self, r: redis.Redis):
34653465
# 1 message is trimmed
34663466
assert await r.xtrim(stream, 3, approximate=False) == 1
34673467

3468+
@skip_if_server_version_lt("8.1.224")
3469+
async def test_xdelex(self, r: redis.Redis):
3470+
stream = "stream"
3471+
3472+
m1 = await r.xadd(stream, {"foo": "bar"})
3473+
m2 = await r.xadd(stream, {"foo": "bar"})
3474+
m3 = await r.xadd(stream, {"foo": "bar"})
3475+
m4 = await r.xadd(stream, {"foo": "bar"})
3476+
3477+
# Test XDELEX with default ref_policy (KEEPREF)
3478+
result = await r.xdelex(stream, m1)
3479+
assert result == [1]
3480+
3481+
# Test XDELEX with explicit KEEPREF
3482+
result = await r.xdelex(stream, m2, ref_policy="KEEPREF")
3483+
assert result == [1]
3484+
3485+
# Test XDELEX with DELREF
3486+
result = await r.xdelex(stream, m3, ref_policy="DELREF")
3487+
assert result == [1]
3488+
3489+
# Test XDELEX with ACKED
3490+
result = await r.xdelex(stream, m4, ref_policy="ACKED")
3491+
assert result == [1]
3492+
3493+
# Test with non-existent ID
3494+
result = await r.xdelex(stream, "999999-0", ref_policy="KEEPREF")
3495+
assert result == [-1]
3496+
3497+
# Test with multiple IDs
3498+
m5 = await r.xadd(stream, {"foo": "bar"})
3499+
m6 = await r.xadd(stream, {"foo": "bar"})
3500+
result = await r.xdelex(stream, m5, m6, ref_policy="KEEPREF")
3501+
assert result == [1, 1]
3502+
3503+
# Test error cases
3504+
with pytest.raises(redis.DataError):
3505+
await r.xdelex(stream, "123-0", ref_policy="INVALID")
3506+
3507+
with pytest.raises(redis.DataError):
3508+
await r.xdelex(stream) # No IDs provided
3509+
3510+
@skip_if_server_version_lt("8.1.224")
3511+
async def test_xackdel(self, r: redis.Redis):
3512+
stream = "stream"
3513+
group = "group"
3514+
consumer = "consumer"
3515+
3516+
m1 = await r.xadd(stream, {"foo": "bar"})
3517+
m2 = await r.xadd(stream, {"foo": "bar"})
3518+
m3 = await r.xadd(stream, {"foo": "bar"})
3519+
m4 = await r.xadd(stream, {"foo": "bar"})
3520+
await r.xgroup_create(stream, group, 0)
3521+
3522+
await r.xreadgroup(group, consumer, streams={stream: ">"})
3523+
3524+
# Test XACKDEL with default ref_policy (KEEPREF)
3525+
result = await r.xackdel(stream, group, m1)
3526+
assert result == [1]
3527+
3528+
# Test XACKDEL with explicit KEEPREF
3529+
result = await r.xackdel(stream, group, m2, ref_policy="KEEPREF")
3530+
assert result == [1]
3531+
3532+
# Test XACKDEL with DELREF
3533+
result = await r.xackdel(stream, group, m3, ref_policy="DELREF")
3534+
assert result == [1]
3535+
3536+
# Test XACKDEL with ACKED
3537+
result = await r.xackdel(stream, group, m4, ref_policy="ACKED")
3538+
assert result == [1]
3539+
3540+
# Test with non-existent ID
3541+
result = await r.xackdel(stream, group, "999999-0", ref_policy="KEEPREF")
3542+
assert result == [-1]
3543+
3544+
# Test error cases
3545+
with pytest.raises(redis.DataError):
3546+
await r.xackdel(stream, group, m1, ref_policy="INVALID")
3547+
3548+
with pytest.raises(redis.DataError):
3549+
await r.xackdel(stream, group) # No IDs provided
3550+
3551+
@skip_if_server_version_lt("8.1.224")
3552+
async def test_xtrim_with_options(self, r: redis.Redis):
3553+
stream = "stream"
3554+
3555+
await r.xadd(stream, {"foo": "bar"})
3556+
await r.xadd(stream, {"foo": "bar"})
3557+
await r.xadd(stream, {"foo": "bar"})
3558+
await r.xadd(stream, {"foo": "bar"})
3559+
3560+
# Test XTRIM with KEEPREF ref_policy
3561+
assert (
3562+
await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="KEEPREF")
3563+
== 2
3564+
)
3565+
3566+
await r.xadd(stream, {"foo": "bar"})
3567+
await r.xadd(stream, {"foo": "bar"})
3568+
3569+
# Test XTRIM with DELREF ref_policy
3570+
assert (
3571+
await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="DELREF") == 2
3572+
)
3573+
3574+
await r.xadd(stream, {"foo": "bar"})
3575+
await r.xadd(stream, {"foo": "bar"})
3576+
3577+
# Test XTRIM with ACKED ref_policy
3578+
assert (
3579+
await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="ACKED") == 2
3580+
)
3581+
3582+
# Test error case
3583+
with pytest.raises(redis.DataError):
3584+
await r.xtrim(stream, maxlen=2, ref_policy="INVALID")
3585+
3586+
@skip_if_server_version_lt("8.1.224")
3587+
async def test_xadd_with_options(self, r: redis.Redis):
3588+
stream = "stream"
3589+
3590+
# Test XADD with KEEPREF ref_policy
3591+
await r.xadd(
3592+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
3593+
)
3594+
await r.xadd(
3595+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
3596+
)
3597+
await r.xadd(
3598+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
3599+
)
3600+
assert await r.xlen(stream) == 2
3601+
3602+
# Test XADD with DELREF ref_policy
3603+
await r.xadd(
3604+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="DELREF"
3605+
)
3606+
assert await r.xlen(stream) == 2
3607+
3608+
# Test XADD with ACKED ref_policy
3609+
await r.xadd(
3610+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="ACKED"
3611+
)
3612+
assert await r.xlen(stream) == 2
3613+
3614+
# Test error case
3615+
with pytest.raises(redis.DataError):
3616+
await r.xadd(stream, {"foo": "bar"}, ref_policy="INVALID")
3617+
34683618
@pytest.mark.onlynoncluster
34693619
async def test_bitfield_operations(self, r: redis.Redis):
34703620
# comments show affected bits

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /