A Python client library for Redis with Sentinel support with command auto routing to master/slave replicas.
Credis-Python is a Redis client library that provides a simplified interface for working with Redis Sentinel for high availability Redis deployments. It wraps the standard Redis-py library and provides both synchronous and asynchronous APIs.
- Built-in Redis Sentinel support for high availability
- Master/Slave separation for read/write operations
- Application prefix for key namespace isolation
- Both synchronous and asynchronous APIs
- Comprehensive Redis commands support
- Pipeline support for batching operations
pip install git+https://github.com/wealthy/credis-python.git
uv add git+https://github.com/wealthy/credis-python.git
from credis import Client # Initialize the client client = Client( host="127.0.0.1", # Redis Sentinel host port="26379", # Redis Sentinel port app_prefix="myapp", # Application prefix for namespace isolation password="password", # Optional: Redis password socket_timeout=0.5, # Optional: Socket timeout in seconds masterset_name="mymaster" # Optional: Master set name in Sentinel ) # Basic operations client.set("key", "value") value = client.get("key") print(value) # Output: value # Delete a key client.delete("key") # Check if a key exists exists = client.exists("key") print(exists) # Output: 0 (key doesn't exist) # Working with hashes client.hset("myhash", "field1", "value1") client.hset("myhash", mapping={"field2": "value2", "field3": "value3"}) value = client.hget("myhash", "field1") print(value) # Output: value1 all_values = client.hgetall("myhash") print(all_values) # Output: {b'field1': b'value1', b'field2': b'value2', b'field3': b'value3'} # Working with sets client.sadd("myset", "member1", "member2", "member3") members = client.smembers("myset") print(members) # Output: {b'member1', b'member2', b'member3'} is_member = client.sismember("myset", "member1") print(is_member) # Output: 1 (member exists) # Working with sorted sets client.zadd("myzset", {"member1": 1.0, "member2": 2.0, "member3": 3.0}) members = client.zrange("myzset", 0, -1, withscores=True) print(members) # Output: [(b'member1', 1.0), (b'member2', 2.0), (b'member3', 3.0)] # Using pipeline for batching operations # This write pipeline connects to the master node. Its usage should be prioritized for write operations. # For reading purposes, use the read pipeline which connects to slave nodes instead. # Use client.make_key() to add the app_prefix to your keys in pipeline operations pipe = client.write_pipeline() pipe.set(client.make_key("key1"), "value1") # Using make_key to add prefix pipe.set(client.make_key("key2"), "value2") # Using make_key to add prefix pipe.get(client.make_key("key1")) # Using make_key to add prefix pipe.get(client.make_key("key2")) # Using make_key to add prefix results = pipe.execute() print(results) # Output: [True, True, b'value1', b'value2'] # Close the connection when done client.close()
import asyncio from credis.asyncio import AsyncClient async def main(): # Initialize the async client client = AsyncClient( host="127.0.0.1", # Redis Sentinel host port="26379", # Redis Sentinel port app_prefix="myapp", # Application prefix for namespace isolation password="password", # Optional: Redis password socket_timeout=0.5, # Optional: Socket timeout in seconds masterset_name="mymaster" # Optional: Master set name in Sentinel ) # Connect to Redis. await client.connect() # This is optional # Basic operations await client.set("key", "value") value = await client.get("key") print(value) # Output: value # Delete a key await client.delete("key") # Check if a key exists exists = await client.exists("key") print(exists) # Output: 0 (key doesn't exist) # Working with hashes await client.hset("myhash", "field1", "value1") await client.hset("myhash", mapping={"field2": "value2", "field3": "value3"}) value = await client.hget("myhash", "field1") print(value) # Output: value1 all_values = await client.hgetall("myhash") print(all_values) # Output: {b'field1': b'value1', b'field2': b'value2', b'field3': b'value3'} # Using pipeline for batching operations # Use client.make_key() to add the app_prefix to your keys in pipeline operations pipe = client.write_pipeline() pipe.set(client.make_key("key1"), "value1") # Using make_key to add prefix pipe.set(client.make_key("key2"), "value2") # Using make_key to add prefix pipe.get(client.make_key("key1")) # Using make_key to add prefix pipe.get(client.make_key("key2")) # Using make_key to add prefix results = await pipe.execute() print(results) # Output: [True, True, b'value1', b'value2'] # Close the connection when done await client.close() # Run the async example asyncio.run(main())
All keys are automatically prefixed with your application prefix to avoid key collisions in a shared Redis instance:
client = Client(host="127.0.0.1", port="26379", app_prefix="myapp") client.set("user:1", "John") # Actually sets "myapp:user:1"
Credis automatically routes write operations to the master and read operations to the slave:
- Write operations (set, delete, hset, etc.) go to the master
- Read operations (get, exists, hget, etc.) go to the slave
Credis provides separate pipelines for read and write operations:
# Write pipeline (to the master) write_pipe = client.write_pipeline() write_pipe.set("key1", "value1") write_pipe.set("key2", "value2") write_results = write_pipe.execute() # Read pipeline (from the slave) read_pipe = client.read_pipeline() read_pipe.get("key1") read_pipe.get("key2") read_results = read_pipe.execute()
Note: When using pipelines, the application prefix is not automatically applied. You must manually add the prefix to your keys in pipeline operations. For example, if your app_prefix is "myapp", you should use
pipe.set("myapp:key1", "value1")instead ofpipe.set("key1", "value1")to maintain consistency with regular operations.
Credis provides several exception types for different error cases:
from credis.exceptions import ConnectionError, SentinelError, InitError try: client = Client(host="127.0.0.1", port="26379", app_prefix="myapp") client.set("key", "value") except SentinelError as e: print(f"Error connecting to Redis Sentinel: {e}") except ConnectionError as e: print(f"Error connecting to Redis: {e}") except InitError as e: print(f"Initialization error: {e}")
Credis can be easily integrated with FastAPI using its dependency injection system. The context manager methods (write_pipeline_ctx and read_pipeline_ctx) can be used directly with FastAPI's Depends():
from fastapi import FastAPI, Depends from credis.asyncio import AsyncClient from redis.asyncio.client import Pipeline app = FastAPI() # Create a Redis client redis_client = AsyncClient( host="127.0.0.1", port="26379", app_prefix="myapp", password="password", socket_timeout=0.5, masterset_name="mymaster" ) # Example endpoint using write_pipeline_ctx directly with dependency injection @app.post("/users/{user_id}") async def create_user( user_id: str, name: str, pipe: Pipeline = Depends(redis_client.write_pipeline_ctx) ): pipe.hset(f"user:{user_id}", mapping={"name": name, "created_at": "2025-04-21"}) pipe.sadd("users", user_id) result = await pipe.execute() return {"success": True, "results": result} # Example endpoint using read_pipeline_ctx directly with dependency injection @app.get("/users/{user_id}") async def get_user( user_id: str, pipe: Pipeline = Depends(redis_client.read_pipeline_ctx) ): pipe.hgetall(f"user:{user_id}") pipe.sismember("users", user_id) result = await pipe.execute() if not result[1]: return {"error": "User not found"} return {"user_id": user_id, "data": result[0]} # Example combining both pipelines in the same endpoint @app.put("/users/{user_id}") async def update_user( user_id: str, name: str, read_pipe: Pipeline = Depends(redis_client.read_pipeline_ctx), write_pipe: Pipeline = Depends(redis_client.write_pipeline_ctx) ): # First check if user exists read_pipe.sismember("users", user_id) exists = (await read_pipe.execute())[0] if not exists: return {"error": "User not found"} # Then update user data write_pipe.hset(f"user:{user_id}", "name", name) write_pipe.hset(f"user:{user_id}", "updated_at", "2025-04-21") await write_pipe.execute() return {"success": True, "user_id": user_id}
This approach offers several benefits:
- Direct usage of the context manager methods with FastAPI's dependency system
- Automatic connection management and cleanup
- Proper context management of pipelines
- Clear separation of read and write operations
- Efficient batching of Redis commands
You can also use Credis normally in FastAPI without dependency injection. This gives you more flexibility in how you manage the client lifecycle:
from fastapi import FastAPI from credis.asyncio import AsyncClient app = FastAPI() # Create a Redis client redis_client = AsyncClient( host="127.0.0.1", port="26379", app_prefix="myapp", password="password" ) # Example using pipeline context managers directly @app.post("/users/{user_id}/with-pipeline") async def create_user_with_pipeline(user_id: str, name: str): # Use pipeline context manager directly async with redis_client.write_pipeline_ctx() as pipe: pipe.hset(f"user:{user_id}", mapping={"name": name, "created_at": "2025-04-21"}) pipe.sadd("users", user_id) result = await pipe.execute() return {"success": True, "results": result} @app.get("/users/{user_id}/with-pipeline") async def get_user_with_pipeline(user_id: str): # Use pipeline context manager directly async with redis_client.read_pipeline_ctx() as pipe: pipe.hgetall(f"user:{user_id}") pipe.sismember("users", user_id) result = await pipe.execute() if not result[1]: return {"error": "User not found"} return {"user_id": user_id, "data": result[0]} # Examples using simple async/await without pipelines @app.post("/users/{user_id}") async def create_user(user_id: str, name: str): # Simple operations using async/await directly await redis_client.hset( f"user:{user_id}", mapping={"name": name, "created_at": "2025-04-21"} ) await redis_client.sadd("users", user_id) return {"success": True} @app.get("/users/{user_id}") async def get_user(user_id: str): # Perform simple operations using async/await directly exists = await redis_client.sismember("users", user_id) if not exists: return {"error": "User not found"} user_data = await redis_client.hgetall(f"user:{user_id}") return {"user_id": user_id, "data": user_data} @app.put("/users/{user_id}") async def update_user(user_id: str, name: str): # Check if user exists first exists = await redis_client.sismember("users", user_id) if not exists: return {"error": "User not found"} # Update user data await redis_client.hset(f"user:{user_id}", "name", name) await redis_client.hset(f"user:{user_id}", "updated_at", "2025-04-21") return {"success": True, "user_id": user_id} @app.delete("/users/{user_id}") async def delete_user(user_id: str): # Check if user exists first exists = await redis_client.sismember("users", user_id) if not exists: return {"error": "User not found"} # Delete user data and remove from users set await redis_client.delete(f"user:{user_id}") await redis_client.srem("users", user_id) return {"success": True, "message": f"User {user_id} deleted"}
This approach is useful when:
- You want more control over when the pipeline is created and executed
- You need to use the Redis client for various purposes within a single endpoint
- You prefer simpler code without pipelines for basic operations
- You need to perform conditional operations based on intermediate results