PHP 实现 Redis 高并发解决方案
使用 PHP 实现 Redis 在缓存加速、分布式锁和队列场景中的应用。
首先确保已安装 PHP Redis 扩展
一、缓存加速实现
1. 基本缓存操作
<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 设置缓存
function setCache($key, $value, $expire = 3600) {
global $redis;
$serialized = serialize($value);
return $redis->setex($key, $expire, $serialized);
}
// 获取缓存
function getCache($key) {
global $redis;
$serialized = $redis->get($key);
return $serialized ? unserialize($serialized) : false;
}
// 删除缓存
function deleteCache($key) {
global $redis;
return $redis->del($key);
}
// 示例:用户数据缓存
function getUser($userId) {
$cacheKey = "user:{$userId}";
$user = getCache($cacheKey);
if ($user === false) {
// 模拟数据库查询
$user = [
'id' => $userId,
'name' => 'User ' . $userId,
'email' => "user{$userId}@example.com"
];
// 写入缓存,有效期1小时
setCache($cacheKey, $user, 3600);
}
return $user;
}
// 使用示例
$user = getUser(123);
print_r($user);
2. 防止缓存穿透
function getProduct($productId) {
global $redis;
$cacheKey = "product:{$productId}";
$product = getCache($cacheKey);
if ($product === false) {
// 使用互斥锁防止缓存击穿
$lockKey = "lock:product:{$productId}";
$locked = $redis->set($lockKey, 1, ['nx', 'ex' => 10]);
if ($locked) {
try {
// 模拟数据库查询
$product = [
'id' => $productId,
'name' => 'Product ' . $productId,
'price' => rand(100, 1000)
];
if (empty($product)) {
// 缓存空对象防止穿透,有效期5分钟
setCache($cacheKey, [], 300);
} else {
setCache($cacheKey, $product, 3600);
}
} finally {
$redis->del($lockKey);
}
} else {
// 等待其他进程完成缓存设置
usleep(500000); // 等待500ms
return getProduct($productId); // 重试
}
}
return $product ?: null;
}
二、 分布式锁实现
1. 基本分布式锁
class RedisLock {
private $redis;
private $lockKey;
private $identifier;
private $lockTimeout;
public function __construct($redis, $lockKey, $lockTimeout = 10) {
$this->redis = $redis;
$this->lockKey = $lockKey;
$this->lockTimeout = $lockTimeout;
$this->identifier = uniqid();
}
public function acquire($waitTimeout = 5) {
$end = microtime(true) + $waitTimeout;
while (microtime(true) < $end) {
if ($this->redis->set(
$this->lockKey,
$this->identifier,
['nx', 'ex' => $this->lockTimeout]
)) {
return true;
}
usleep(10000); // 等待10ms
}
return false;
}
public function release() {
$script = '
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
';
return $this->redis->eval($script, [$this->lockKey, $this->identifier], 1);
}
public function __destruct() {
$this->release();
}
}
// 使用示例
$lock = new RedisLock($redis, 'lock:order:123');
if ($lock->acquire()) {
try {
// 执行需要加锁的操作
echo "Lock acquired, doing critical section...\n";
sleep(2);
} finally {
$lock->release();
}
} else {
echo "Failed to acquire lock\n";
}
2. 可重入锁实现
class ReentrantRedisLock extends RedisLock {
private $heldCount = 0;
public function acquire($waitTimeout = 5) {
// 检查是否已经持有锁
if ($this->heldCount > 0) {
$this->heldCount++;
return true;
}
$acquired = parent::acquire($waitTimeout);
if ($acquired) {
$this->heldCount = 1;
}
return $acquired;
}
public function release() {
if ($this->heldCount > 1) {
$this->heldCount--;
return true;
}
$released = parent::release();
if ($released) {
$this->heldCount = 0;
}
return $released;
}
}
三、队列实现
1. 简单队列
// 生产者
function enqueue($queueName, $data) {
global $redis;
return $redis->lPush($queueName, json_encode($data));
}
// 消费者
function dequeue($queueName, $timeout = 30) {
global $redis;
$result = $redis->brPop($queueName, $timeout);
return $result ? json_decode($result[1], true) : null;
}
// 示例
enqueue('email_queue', [
'to' => 'user@example.com',
'subject' => 'Welcome',
'body' => 'Thank you for registering'
]);
$task = dequeue('email_queue');
if ($task) {
// 处理任务
echo "Sending email to: {$task['to']}\n";
}
2. 延迟队列
function enqueueDelayed($queueName, $data, $delaySeconds) {
global $redis;
$score = time() + $delaySeconds;
return $redis->zAdd($queueName, $score, json_encode($data));
}
function processDelayedQueue($queueName) {
global $redis;
$now = time();
$items = $redis->zRangeByScore($queueName, 0, $now);
if (!empty($items)) {
// 使用事务移除已处理项
$redis->multi();
foreach ($items as $item) {
$redis->zRem($queueName, $item);
}
$redis->exec();
return array_map(function($item) {
return json_decode($item, true);
}, $items);
}
return [];
}
// 示例
enqueueDelayed('reminder_queue', ['user_id' => 123, 'message' => 'Pay your bill'], 60);
$tasks = processDelayedQueue('reminder_queue');
foreach ($tasks as $task) {
echo "Sending reminder to user {$task['user_id']}: {$task['message']}\n";
}
3. Pub/Sub 模式
// 发布者
function publishMessage($channel, $message) {
global $redis;
return $redis->publish($channel, json_encode($message));
}
// 订阅者
function subscribe($channels, $callback) {
global $redis;
$pubsub = $redis->pSubscribe($channels);
try {
foreach ($pubsub as $message) {
if ($message->kind === 'message') {
$data = json_decode($message->payload, true);
call_user_func($callback, $message->channel, $data);
}
}
} catch (Exception $e) {
$redis->pUnsubscribe();
throw $e;
}
}
// 示例使用
// 在一个进程中
publishMessage('notifications', ['user_id' => 123, 'text' => 'New message']);
// 在另一个进程中
subscribe(['notifications'], function($channel, $message) {
echo "Received on {$channel}: ";
print_r($message);
});
四、综合应用示例 - 秒杀系统
class SeckillService {
private $redis;
private $productKey;
public function __construct($redis, $productId) {
$this->redis = $redis;
$this->productKey = "seckill:product:{$productId}";
}
public function initInventory($inventory) {
// 设置商品库存
$this->redis->set($this->productKey, $inventory);
}
public function seckill($userId) {
// 使用分布式锁
$lock = new RedisLock($this->redis, "lock:{$this->productKey}");
if (!$lock->acquire(1)) {
return ['success' => false, 'message' => '系统繁忙'];
}
try {
// 检查库存
$inventory = $this->redis->get($this->productKey);
if ($inventory <= 0) {
return ['success' => false, 'message' => '已售罄'];
}
// 扣减库存
$this->redis->decr($this->productKey);
// 记录订单 (实际应用中应该入队列异步处理)
$orderId = uniqid();
$this->redis->hSet("seckill:orders", $orderId, json_encode([
'user_id' => $userId,
'product_key' => $this->productKey,
'created_at' => time()
]));
return ['success' => true, 'order_id' => $orderId];
} finally {
$lock->release();
}
}
}
// 使用示例
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$seckill = new SeckillService($redis, 1001);
$seckill->initInventory(100); // 初始化100件库存
// 模拟并发请求
for ($i = 0; $i < 5; $i++) {
$result = $seckill->seckill(rand(1000, 9999));
print_r($result);
}
五、最佳实践建议
1. 连接管理:使用连接池或持久连接
$redis = new Redis();
$redis->pconnect('127.0.0.1', 6379);
2. 错误处理:添加重试机制
function redisRetry($callback, $maxRetries = 3) {
$retries = 0;
while ($retries < $maxRetries) {
try {
return $callback();
} catch (RedisException $e) {
$retries++;
if ($retries >= $maxRetries) {
throw $e;
}
usleep(100000 * $retries); // 指数退避
}
}
}
3. 性能优化:使用管道(pipeline)
$pipe = $redis->pipeline();
$pipe->set('key1', 'value1');
$pipe->set('key2', 'value2');
$pipe->incr('counter');
$pipe->expire('key1', 60);
$results = $pipe->exec();
4. Lua脚本:保证原子性
$script = '
local current = redis.call("GET", KEYS[1])
if current == ARGV[1] then
return redis.call("INCRBY", KEYS[1], ARGV[2])
else
return nil
end
';
$result = $redis->eval($script, ['counter', 'expected_value', 'increment'], 1);