Redis as a Cache

Redis is an in-memory data structure store that excels as a cache due to its sub-millisecond latency, rich data types, and built-in expiration. When used correctly, Redis can reduce database load by 90% or more while dramatically improving application response times.
Data Structures Overview
| Structure | Use Case | Example | |-----------|----------|---------| | String | Simple values, counters | User session, page views | | Hash | Object fields | User profile fields | | List | Ordered collection | Message queue, timeline | | Set | Unique values | Tags, followers | | Sorted Set | Ranked data | Leaderboard, rate limiting | | HyperLogLog | Cardinality estimation | Unique visitors | | Bitmap | Boolean flags | Daily active users |
Caching Patterns
Cache-Aside (Lazy Loading)
The most common caching pattern. The application checks the cache first; on a miss, it loads data from the database and populates the cache.
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_user(user_id):
cache_key = f"user:{user_id}"
Try cache first
cached = r.get(cache_key)
if cached:
return json.loads(cached)
Cache miss: load from database
user = db.query("SELECT * FROM users WHERE id = %s", [user_id])
if user:
Populate cache with TTL
r.setex(cache_key, 3600, json.dumps(user))
return user
Pros : Only caches data that is requested, resilient to cache failures. Cons : Cache miss penalty (three round trips), stale data until TTL expires.
Write-Through
Data is written to the cache first, then to the database. Reads always hit the cache.
def update_user(user_id, data):
cache_key = f"user:{user_id}"
Write to cache first
r.setex(cache_key, 3600, json.dumps(data))
Then write to database
db.execute(
"UPDATE users SET name = %s, email = %s WHERE id = %s",
[data['name'], data['email'], user_id]
)
Pros : Cache is always consistent with database writes. Cons : Write latency increases, cache stores data that may never be read.
Write-Behind (Write-Back)
Data is written to cache and asynchronously written to the database later.
def write_behind(user_id, data):
cache_key = f"user:{user_id}"
Write to cache immediately
r.setex(cache_key, 3600, json.dumps(data))
Queue database write for batch processing
r.lpush("db:write:queue", json.dumps({
"table": "users",
"id": user_id,
"data": data
}))
Pros : Very fast writes, can batch database operations. Cons : Risk of data loss if cache fails before database write.
Cache Invalidation Strategies
| Strategy | How It Works | Best For | |----------|-------------|----------| | TTL expiration | Automatic expiry after set time | Most applications | | Key deletion | Delete cache key on data update | Write-through | | Versioned keys | Include version in key name | Schema changes | | Pub/sub invalidation | Notify all instances to invalidate | Distributed caches |
Advanced Patterns
Distributed Locking
def acquire_lock(lock_name, acquire_timeout=10):
identifier = str(uuid.uuid4())
end = time.time() + acquire_timeout
while time.time() < end:
if r.setnx(f"lock:{lock_name}", identifier):
r.expire(f"lock:{lock_name}", 10)
return identifier
time.sleep(0.001)
return None
def release_lock(lock_name, identifier):
Use Lua script for atomic release
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
r.eval(script, 1, f"lock:{lock_name}", identifier)
Rate Limiting with Sorted Sets
def is_rate_limited(user_id, max_requests=100, window_seconds=60):
key = f"ratelimit:{user_id}"
now = time.time()
Remove old entries
r.zremrangebyscore(key, 0, now - window_seconds)
Count current requests
if r.zcard(key) >= max_requests:
return True
Add current request
r.zadd(key, {now: now})
r.expire(key, window_seconds)
return False
Session Storage
Store session with hash
def create_session(session_id, user_data, ttl=86400):
key = f"session:{session_id}"
r.hset(key, mapping={
'user_id': user_data['id'],
'username': user_data['username'],
'roles': json.dumps(user_data['roles']),
'ip': user_data['ip'],
'created_at': time.time()
})
r.expire(key, ttl)
def get_session(session_id):
key = f"session:{session_id}"
data = r.hgetall(key)
if data:
data['roles'] = json.loads(data['roles'])
return data
Performance Optimization
Connection Pooling
from redis.connection import ConnectionPool
Reuse connections across requests
pool = ConnectionPool(
host='localhost',
port=6379,
max_connections=50,
socket_timeout=5
)
r = redis.Redis(connection_pool=pool)
Pipeline / Batching
Batch operations to reduce round trips
pipe = r.pipeline()
for user_id in user_ids:
pipe.get(f"user:{user_id}")
results = pipe.execute()
Monitoring
Redis CLI monitoring
redis-cli info stats
Key metrics to watch
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\- hit_rate: keyspace_hits / (keyspace_hits + keyspace_misses)
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\- evicted_keys: keys evicted due to maxmemory
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\- maxmemory: configured memory limit
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\- connected_clients: active connections
Common Pitfalls
| Pitfall | Consequence | Fix | |---------|-------------|-----| | No TTL | Memory exhaustion | Always set TTL | | Cache stampede | Overload database on miss | Use mutex locking | | Too-large values | Memory waste, slow serialization | Compress or split | | Hot keys | Single key becomes bottleneck | Shard or replicate | | Cache-aside without TTL | Stale data lives forever | Always set TTL |
Summary
Redis caching can dramatically improve application performance when the right pattern is applied. Use cache-aside as the default pattern with appropriate TTLs, implement write-through for consistency-critical data, use distributed locking for race conditions, and sorted sets for rate limiting. Monitor hit rates and eviction counts, and always set TTLs to prevent memory exhaustion.
Enjoy this article? Share your thoughts, questions, or experiences in the comments below — your insights help other readers too.
Join the discussion ↓