Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
/ CyRedis Public

a smarter redis client implmented in Cython for python

License

Notifications You must be signed in to change notification settings

Ugbot/CyRedis

Folders and files

NameName
Last commit message
Last commit date

Latest commit

History

23 Commits

Repository files navigation

πŸš€ CyRedis - A wildly experimental High-Performance Redis Client with Enterprise Features

A Cython Redis client that provides FastAPI-style web application support with advanced features for distributed, real-time applications.

CyRedis Logo Python Redis License

🌟 Features

Core Redis Operations

  • High Performance: Cython-optimized Redis operations with native speed
  • Connection Pooling: Efficient connection management with automatic failover
  • Protocol Support: RESP2 and RESP3 protocol negotiation
  • Async Support: Full async/await support with uvloop optimization

Advanced Data Structures

  • Streams: Redis Streams with consumer groups
  • JSON: RedisJSON operations
  • Geospatial: Redis geospatial indexing
  • Time Series: RedisTimeSeries support
  • Bloom Filters: Probabilistic data structures
  • Bitmaps: Bit-level operations

Web Application Support (FastAPI-Style)

  • Worker Coordination: Graceful scaling and recovery of crashed workers
  • JWT Authentication: Token management with refresh tokens and blacklisting
  • Session Management: Multi-session tracking with automatic cleanup
  • 2FA Support: Time-based OTP with backup codes
  • Rate Limiting: Built-in rate limiting capabilities
  • Streaming Iterators: Real-time data streaming over SSE/WebSockets

Distributed Features

  • Distributed Locks: Redis-based distributed locking
  • Shared State: Thread-safe shared dictionaries across processes
  • Cluster Operations: Redis Cluster support with slot management
  • Health Monitoring: Automatic dead worker detection and recovery

πŸš€ Quick Start

Installation

# Install from PyPI
pip install cyredis
# Or install from source
git clone https://github.com/yourusername/cyredis.git
cd cyredis
pip install -e .

Basic Usage

from cy_redis import HighPerformanceRedis
# Synchronous usage
with HighPerformanceRedis() as redis:
 redis.set("key", "value")
 value = redis.get("key")
 print(value) # "value"
# Async usage
import asyncio
async def main():
 async with HighPerformanceRedis() as redis:
 await redis.set_async("key", "value")
 value = await redis.get_async("key")
 print(value)
asyncio.run(main())

🌐 FastAPI Integration

CyRedis provides enterprise-grade web application support that integrates seamlessly with FastAPI:

Basic FastAPI Setup

from fastapi import FastAPI
from cy_redis.web_app_support import WebApplicationSupport
app = FastAPI()
# Initialize CyRedis web app support
app_support = WebApplicationSupport()
@app.on_event("startup")
async def startup():
 app_support.initialize()
@app.on_event("shutdown")
async def shutdown():
 app_support.shutdown_graceful() # Graceful shutdown with workload yielding
@app.post("/login")
async def login(username: str, password: str):
 result = await app_support.authenticate_user(username, password)
 if not result['success']:
 raise HTTPException(status_code=401, detail=result['error'])
 return result
@app.get("/protected")
async def protected_route(token: str):
 payload = app_support.verify_user_access(token)
 if not payload:
 raise HTTPException(status_code=401, detail="Invalid token")
 return {"user_id": payload['user_id']}
if __name__ == "__main__":
 import uvicorn
 uvicorn.run(app, host="0.0.0.0", port=8000)

Real-time Streaming with SSE/WebSockets

from fastapi.responses import StreamingResponse
from fastapi import WebSocket, WebSocketDisconnect
@app.get("/sse/chat/{user_id}")
async def sse_chat(user_id: str, token: str):
 """Real-time chat via Server-Sent Events"""
 # Verify JWT token
 payload = app_support.jwt_middleware(token, required_permissions=['stream_access'])
 if not payload or payload['user_id'] != user_id:
 raise HTTPException(status_code=403, detail="Access denied")
 def generate_sse():
 async def sse_generator():
 async with app_support.stream_iterator(
 "chat_messages",
 consumer_group=f"chat_{user_id}",
 consumer_name=f"user_{user_id}"
 ) as stream_iter:
 async for messages in stream_iter:
 for message in messages:
 yield f"data: {json.dumps(message['data'])}\n\n"
 return sse_generator()
 return StreamingResponse(generate_sse(), media_type="text/event-stream")
@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(websocket: WebSocket, user_id: str, token: str):
 """Real-time chat via WebSocket"""
 # Verify websocket token
 ws_payload = app_support.jwt_middleware(token, required_permissions=['stream_access', 'realtime_access'])
 if not ws_payload or ws_payload['user_id'] != user_id:
 await websocket.send_json({"error": "Invalid websocket token"})
 await websocket.close()
 return
 await websocket.accept()
 try:
 async with app_support.stream_iterator(
 "chat_messages",
 consumer_group=f"ws_chat_{user_id}",
 consumer_name=f"ws_{uuid.uuid4().hex[:8]}"
 ) as stream_iter:
 async for messages in stream_iter:
 for message in messages:
 await websocket.send_json(message['data'])
 except WebSocketDisconnect:
 pass
 except Exception as e:
 await websocket.send_json({"error": str(e)})
 finally:
 await websocket.close()

JWT Authentication Middleware

from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
 """JWT authentication dependency"""
 token = credentials.credentials
 payload = app_support.jwt_middleware(token)
 if not payload:
 raise HTTPException(status_code=401, detail="Invalid authentication token")
 return payload
@app.get("/api/data")
async def get_data(current_user = Depends(get_current_user)):
 """Protected API endpoint"""
 return {"user_id": current_user['user_id'], "data": "sensitive information"}
@app.post("/api/write")
async def write_data(data: dict, current_user = Depends(get_current_user)):
 """Protected write endpoint"""
 # Check if user has write permissions
 if not app_support.require_scopes(current_user['access_token'], ['write']):
 raise HTTPException(status_code=403, detail="Insufficient permissions")
 # Process the write operation
 return {"status": "success"}

🎯 Worker Coordination Features

CyRedis provides enterprise-grade worker coordination for handling scaling and recovery:

Automatic Worker Registration

# Workers automatically register themselves
worker_info = app_support.get_worker_info()
print(f"Worker ID: {worker_info['worker_id']}")
print(f"Status: {worker_info['status']}")

Graceful Shutdown with Workload Yielding

# Graceful shutdown yields workload to other workers
app_support.shutdown_graceful()
# Process:
# 1. Detect active sessions and tasks
# 2. Find available healthy workers
# 3. Redistribute sessions and tasks
# 4. Wait for active work to complete
# 5. Clean shutdown

Dead Worker Detection & Recovery

# Monitor worker health
cluster_info = app_support.get_cluster_info()
print(f"Healthy workers: {cluster_info['healthy_workers']}")
print(f"Dead workers: {cluster_info['dead_workers']}")
# Force recovery of specific worker
app_support.force_worker_recovery("worker_123")

πŸ“Š Monitoring & Statistics

System Statistics

stats = app_support.get_system_stats()
print(json.dumps(stats, indent=2))
# Output includes:
# - User counts and session info
# - Worker cluster health
# - Queue statistics
# - Rate limiting stats
# - Shared dictionary stats

Worker Monitoring

# Monitor individual worker
worker_stats = app_support.get_worker_info()
print(f"Worker uptime: {worker_stats['uptime_seconds']}s")
print(f"Worker health: {worker_stats['status']}")
# Monitor entire cluster
cluster_stats = app_support.get_cluster_info()
print(f"Total workers: {cluster_stats['total_workers']}")
print(f"Healthy workers: {cluster_stats['healthy_workers']}")

πŸ”’ Security Features

JWT Token Management

# Create different token types
access_token = app_support.create_access_token(user_id)
websocket_token = app_support.create_websocket_token(user_id)
api_token = app_support.create_api_token(user_id, scopes=['read', 'write'])
# Verify tokens
payload = app_support.verify_user_access(access_token)
ws_payload = app_support.verify_websocket_token(websocket_token)
api_payload = app_support.verify_api_token(api_token, ['read'])
# Token revocation
app_support.revoke_token(access_token)

Authentication Middleware

# Different authentication levels
@app.get("/public")
async def public_endpoint():
 return {"message": "Public data"}
@app.get("/protected")
async def protected_endpoint(token: str):
 payload = app_support.require_auth(token)
 if not payload:
 raise HTTPException(status_code=401)
 return {"user_id": payload['user_id']}
@app.get("/admin")
async def admin_endpoint(token: str):
 if not app_support.require_scopes(token, ['admin']):
 raise HTTPException(status_code=403)
 return {"message": "Admin data"}

πŸŽͺ Real-time Streaming

Redis Stream Iterators

# Stream chat messages
async with app_support.stream_iterator(
 "chat_messages",
 consumer_group="chat_users",
 consumer_name=f"user_{user_id}"
) as stream_iter:
 async for messages in stream_iter:
 for message in messages:
 # Process each message
 yield f"data: {json.dumps(message['data'])}\n\n"

List-based Notifications

# Stream user notifications
async with app_support.list_iterator(f"notifications:{user_id}") as list_iter:
 async for items in list_iter:
 for item in items:
 # Process each notification
 yield f"data: {json.dumps({'message': item})}\n\n"

Pub/Sub Broadcasting

# Listen to pub/sub channels
async with app_support.pubsub_iterator(["user_notifications", "global_news"]) as pubsub_iter:
 async for message in pubsub_iter:
 # Process broadcast message
 yield f"data: {json.dumps(message)}\n\n"

πŸ§ͺ Testing

Run All Tests

# Run all tests
pytest
# With coverage
pytest --cov=cy_redis --cov-report=html
# Specific test categories
pytest -m integration # Redis integration tests
pytest -m unit # Unit tests
pytest -m worker_coordination # Worker coordination tests

Test Scripts

# Run worker coordination tests
python tests/test_worker_coordination.py
# Run with main test runner
./scripts/run_tests.sh --worker-coord --coverage
# Run example applications
python examples/web_app_example.py
python examples/streaming_example.py

Docker Testing

# Start test environment
./scripts/run_tests.sh --docker-up --all
# Run tests with Docker
./scripts/run_tests.sh --docker-up --integration

πŸ“š Examples

Complete Examples

  • examples/web_app_example.py: Full web application with authentication, sessions, and shared state
  • examples/streaming_example.py: Real-time streaming with SSE/WebSocket integration
  • examples/enhanced_cyredis_demo.py: Advanced features demonstration

Usage Patterns

# 1. Initialize web app support
from cy_redis.web_app_support import WebApplicationSupport
app_support = WebApplicationSupport()
# 2. User authentication
tokens = app_support.authenticate_user(user_id, password)
access_token = tokens['access_token']
# 3. Session management
session_id = app_support.create_user_session(user_id)
session = app_support.get_session(session_id)
# 4. Real-time streaming
async with app_support.stream_iterator("chat_messages") as stream:
 async for messages in stream:
 for message in messages:
 yield message
# 5. Shared state
shared_dict = app_support.get_shared_dict("user_data")
shared_dict[user_id] = {"preferences": {...}}
# 6. Worker coordination
cluster_info = app_support.get_cluster_info()
worker_info = app_support.get_worker_info()

πŸ”§ Configuration Options

Redis Connection

app_support = WebApplicationSupport(
 host="redis-cluster.example.com",
 port=6379,
 max_connections=20,
 max_workers=8
)

Token Configuration

# Custom token settings
token_manager = TokenManager(
 redis_client,
 secret_key="your-secret-key",
 access_token_expiry=900, # 15 minutes
 refresh_token_expiry=604800 # 7 days
)

Session Configuration

session_manager = SessionManager(
 redis_client,
 session_timeout=7200, # 2 hours
 cleanup_interval=600 # 10 minutes
)

🚨 Troubleshooting

Common Issues

  1. High Memory Usage

    # Adjust cache settings
    shared_dict = app_support.get_shared_dict('large_dataset')
    shared_dict.cache_ttl = 10 # Shorter cache for volatile data
  2. Worker Lock Contention

    # Increase lock timeouts
    lock = DistributedLock(redis, "my_lock", ttl_ms=10000)
  3. Token Verification Failures

    # Check Redis connectivity and token blacklisting
    payload = app_support.verify_user_access(token)

Performance Optimization

# Connection pooling
redis_client = CyRedisClient(max_connections=20)
# Caching strategy
fast_dict = app_support.get_shared_dict('fast_changing_data')
fast_dict.cache_ttl = 10 # Short cache
slow_dict = app_support.get_shared_dict('static_config')
slow_dict.cache_ttl = 300 # Long cache

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new features
  4. Run the test suite
  5. Submit a pull request

πŸ“„ License

MIT License - see LICENSE file for details.

πŸ™ Acknowledgments

πŸ“ž Support


CyRedis - Bringing enterprise-grade Redis functionality to Python with FastAPI-style web application support! πŸš€

About

a smarter redis client implmented in Cython for python

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

AltStyle γ«γ‚ˆγ£γ¦ε€‰ζ›γ•γ‚ŒγŸγƒšγƒΌγ‚Έ (->γ‚ͺγƒͺγ‚ΈγƒŠγƒ«) /