Mipu项目中有这样一个场景
1、生成文件时,由于多个用户同时拥有写入和生成的权限,并发请求会导致生成结果出错,所以需要对生成过程加锁。 同一时间只允许一名用户操作。 这时候就需要一个锁来锁定操作过程。
2、使用缓存时,缓存失效可能会导致一下子出现大量并发请求,在渗透到数据库时可能需要使用锁将这个操作锁定在同一个并发进程上。
针对以上两种情况php锁php锁,目前的解决方案是对处理进程实现加锁机制,通过PHP实现如下
使用了Eaccelerator的内存锁和文件锁,原理如下:
1. 确定系统中是否安装了EAccelerator。 如果是,请使用显存锁定。 如果没有,请使用文件锁
2.根据带进来的key不同,可以直接并行处理多个锁,类似于Innodb的行级锁
PHP e加速器安装
e加速器官方网站:
e加速器Github:
eAccelerator下载最新版本的安装包,解压安装包,进入解压后的文件夹目录
场景使用如下:
$lock = new CacheLock('key_name'); $lock->lock(); // logic code here $lock->unlock(); //使用过程中需要注意下文件锁所在路径需要有写权限.
PHP的锁定类如下:
eAccelerator = function_exists("eAccelerator_lock"); // 缓存路径 if(!$this->eAccelerator) { $this->path = $path.($this->_mycrc32($name) % $this->hashnum) . '.txt'; // 文件路径, 例如: ./97.txt } $this->name = $name; } private function _mycrc32($string) { $crc = abs(crc32($string)); if($crc & 0x80000000) { $crc ^= 0xffffffff; $crc += 1; } return $crc; } // 上锁 public function lock() { if($this->eAccelerator) { return eaccelerator_lock($this->name); } else { $this->fp = fopen($this->path, 'w+'); if($this->fp === false) { return false; } return flock($this->fp, LOCK_EX); // 独占锁定 } } // 解锁 public function unlock() { if($this->eAccelerator) { return eaccelerator_unlock($this->name); } else { if($this->fp !== false) { flock($this->fp, LOCK_UN); // 释放锁定 clearstatcache(); // 清除文件状态缓存 } fclose($this->fp); // 关闭文件句柄 } } } $lock = new CacheLock("key_name"); $lock->lock(); // logic code here echo 'https://blog.mimvp.com'; $lock->unlock(); ?>
参考推荐:
PHP eAccelerator安装与使用
PHP多线程应用实例
PHP多线程网页爬取
PHP Pthread多线程操作
MySQL事务隔离级别及实现原理
Spring事务属性解读
Spring分布式事务实现
Spring编程技术经典面试题
Spring常见技术问题总结
MySQL中InnoDB和MyISAM总结
MySQL存储引擎InnoDB和MyISAM的区别
共同实施
说起Redis分布式锁php 锁,大多数人都会想到:setnx+lua,或者知道设置键值px毫秒nx。 后一种方法的核心实现命令如下:
- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000
- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
这种实现方法有3个要点(也是笔试概率很高的地方):
set命令必须使用set键值px毫秒nx; 该值必须是唯一的; 释放锁时必须验证该值,不能误解锁;
事实上,这种锁最大的缺点就是加锁时只能作用于一个Redis节点。 即使Redis通过sentinel保证了高可用,如果主节点由于某种原因从master切换到slave,锁也会丢失:
锁是在Redis的主节点上获取的; 但锁定的key还没有同步到从节点; master故障,发生故障转移,从节点升级为主节点; 导致锁丢失。
正因为如此,Redis作者antirez提出了一种基于分布式环境的更加中间的分布式锁实现方法:Redlock。 笔者感觉Redlock也是Redis所有分布式锁实现方法中唯一能让面试官高潮的方法。
红锁实现
antirez提出的redlock算法大致是这样的:
在Redis的分布式环境中,我们假设有N个Redis master。 这些节点彼此完全独立php 锁,不存在主从复制或其他集群协调机制。 我们确保N个实例上的锁获取和释放方式与Redis单实例下相同。 现在我们假设有5个Redis主节点,我们需要在5台服务器上运行这些Redis实例,这样它们就不会同时全部宕机。
为了获取锁,客户端应该执行以下操作:
红锁源代码
Redisson已经封装了redlock算法,接下来简单介绍其用法,并分析核心源码(假设5个redis实例)。
org.redisson
redisson
3.3.2
用法
首先我们看一下redission封装的redlock算法实现的分布式锁的使用。 它非常简单,与 ReentrantLock 类似:
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://192.168.0.1:5378")
.setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://192.168.0.1:5379")
.setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://192.168.0.1:5380")
.setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
String resourceName = "REDLOCK_KEY";
RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);
// 向3个redis实例尝试加锁
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
// isLock = redLock.tryLock();
// 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
System.out.println("isLock = "+isLock);
if (isLock) {
//TODO if get lock success, do something;
}
} catch (Exception e) {
} finally {
// 无论如何, 最后都要解锁
redLock.unlock();
}
唯一身份
实现分布式锁很重要的一点就是set的值必须是唯一的。 redisson的价值如何保证价值的唯一性? 答案是UUID+threadId。 入口在redissonClient.getLock("REDLOCK_KEY")中,源码在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
return id + ":" + threadId;
}
获取锁
获取锁的代码是redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者最终的核心源码都是下面的代码,但是后者获取锁的默认租约时间( leaseTime )是 LOCK_EXPIRATION_INTERVAL_SECONDS,即 30 秒:
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 获取锁时需要在redis实例上执行的lua命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 获取分布式锁的KEY的失效时间毫秒数
"return redis.call('pttl', KEYS[1]);",
// 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
Collections.
在获取锁的命令中,
释放锁
释放锁的代码为redLock.unlock(),核心源码如下:
protected RFuture unlockInnerAsync(long threadId) {
// 释放锁时需要在redis实例上执行的lua命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果分布式锁KEY不存在,那么向channel发布一条消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果就是当前线程占有分布式锁,那么将重入次数减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.
参考: