最佳实践与踩坑(续)
[TOC]
Noah最佳实践与踩坑(续)
代码重复:搞定代码重复的三个绝招
- 可维护性是大型项目成熟度的一个重要指标,而提升可维护性非常重要的一个手 段就是减少代码重复。那为什么这样说呢?
- 如果多处重复代码实现完全相同的功能,很容易修改一处忘记修改另一处,造成 Bug;
- 有一些代码并不是完全重复,而是相似度很高,修改这些类似的代码容易改(复制粘 贴)错,把原本有区别的地方改为了一样。
- 如何使用 Java 中的一些高级特 性、设计模式,以及一些工具消除重复代码,才能既优雅又高端。
利用工厂模式 + 模板方法模式,消除 if…else 和重复代码
需求背景:假设要开发一个购物车下单的功能,针对不同用户进行不同处理:
- 普通用户需要收取运费,运费是商品价格的 10%,无商品折扣;
- VIP 用户同样需要收取商品价格 10% 的快递费,但购买两件以上相同商品时,第三件开 始享受一定折扣
- 内部用户可以免运费,无商品折扣。
代码最佳实践:理论分析
- 设计模式:模板方法+工厂方法
- 如果我们熟记抽象类和抽象方法的定义的话,这时或许就会想到,是否可以把重复的逻辑定 义在抽象类中,三个购物车只要分别实现不同的那份逻辑呢?
- 通过spring的IOC容器能力,通过Bean的名称直接获取到AbstractCat,调用其process方法即可实现
- 这样一来,我们就利用工厂模式 + 模板方法模式,不仅消除了重复代码,还避免了修改既 有代码的风险。这就是设计模式中的开闭原则:对修改关闭,对扩展开放。
代码最佳实践:源码分析
/**
* 购物车实体类
*/
public class Cart {
//商品清单
private List<Item> items = new ArrayList<>();
//总优惠
private BigDecimal totalDiscount;
//商品总价
private BigDecimal totalItemPrice;
//总运费
private BigDecimal totalDeliveryPrice;
//应付总价
private BigDecimal payPrice;
}
/**
* 商品实体类
*/
public class Item {
//商品Id
private long id;
//商品数量
private int quantity;
//商品单价
private BigDecimal price;
//商品优惠
private BigDecimal couponPrice;
//商品运费
private BigDecimal deliveryPrice;
}
/**
* 设计模式:模板方法
* <p>
* 我们在父类中实现了购物车处理的流程模板,然后把需 要特殊处理的地方留空白也就是留抽象方法定义,让子类去实现其中的逻辑。
* 由于父类的逻 辑不完整无法单独工作,因此需要定义为抽象类。
*/
public abstract class AbstractCart {
/**
* 模板方法
*
* @param userId
* @param items
* @return
*/
public Cart process(long userId, Map<Long, Integer> items) {
Cart cart = new Cart();
List<Item> itemList = new ArrayList<>();
items.entrySet().stream().forEach(entry -> {
Item item = new Item();
item.setId(entry.getKey());
item.setPrice(Db.getItemPrice(entry.getKey()));
item.setQuantity(entry.getValue());
itemList.add(item);
});
cart.setItems(itemList);
itemList.stream().forEach(item -> {
processCouponPrice(userId, item);
processDeliveryPrice(userId, item);
});
cart.setTotalItemPrice(cart.getItems().stream().map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity()))).reduce(BigDecimal.ZERO, BigDecimal::add));
cart.setTotalDeliveryPrice(cart.getItems().stream().map(Item::getDeliveryPrice).reduce(BigDecimal.ZERO, BigDecimal::add));
cart.setTotalDiscount(cart.getItems().stream().map(Item::getCouponPrice).reduce(BigDecimal.ZERO, BigDecimal::add));
cart.setPayPrice(cart.getTotalItemPrice().add(cart.getTotalDeliveryPrice()).subtract(cart.getTotalDiscount()));
return cart;
}
/**
* 抽象方法:计算优惠券
*
* @param userId
* @param item
*/
protected abstract void processCouponPrice(long userId, Item item);
/**
* 抽象方法:计算邮费
*
* @param userId
* @param item
*/
protected abstract void processDeliveryPrice(long userId, Item item);
}
/**
* 普通用户购物车
*/
public class NormalUserCart extends AbstractCart {
protected void processCouponPrice(long userId, Item item) {
item.setCouponPrice(BigDecimal.ZERO);
}
protected void processDeliveryPrice(long userId, Item item) {
item.setDeliveryPrice(item.getPrice()
.multiply(BigDecimal.valueOf(item.getQuantity()))
.multiply(new BigDecimal("0.1")));
}
}
/**
* VIP 用户的购物车 VipUserCart,直接继承了 NormalUserCart,只需要修改多买优惠策 略
*/
public class VipUserCart extends NormalUserCart {
protected void processCouponPrice(long userId, Item item) {
if (item.getQuantity() > 2) {
item.setCouponPrice(item.getPrice()
.multiply(BigDecimal.valueOf(100 - Db.getUserCouponPercent(userId)).divide(new BigDecimal("100")))
.multiply(BigDecimal.valueOf(item.getQuantity() - 2)));
} else {
item.setCouponPrice(BigDecimal.ZERO);
}
}
}
/**
* 模板方法:优雅实现
*
* @param userId
* @return
*/
public Cart right(int userId) {
String userCategory = Db.getUserCategory(userId);
AbstractCart cart = (AbstractCart) applicationContext.getBean(userCategory + "UserCart");
return cart.process(userId, items);
}图解设计模式-模板方法
利用注解 + 反射消除重复代码
需求背景:假设银行提供了一些 API 接口,对参数的序列化有点特殊,不使用 JSON,而是需要我们 把参数依次拼在一起构成一个大字符串。
按照银行提供的 API 文档的顺序,把所有参数构成定长的数据,然后拼接在一起作为整 个字符串。
- 因为每一种参数都有固定长度,未达到长度时需要做填充处理:
- 字符串类型的参数不满长度部分需要以下划线右填充,也就是字符串内容靠左;
- 数字类型的参数不满长度部分以 0 左填充,也就是实际数字靠右;
- 货币类型的表示需要把金额向下舍入 2 位到分,以分为单位,作为数字类型同样进行左填充。
- 对所有参数做 MD5 操作作为签名(为了方便理解,Demo 中不涉及加盐处理)。
- 因为每一种参数都有固定长度,未达到长度时需要做填充处理:
普通实现代码问题:
- 三种标准数据类型的处理逻辑有重复,稍有不慎就会出现 Bug;
- 处理流程中字符串拼接、加签和发请求的逻辑,在所有方法重复;
- 实际方法的入参的参数类型和顺序,不一定和接口要求一致,容易出错;
- 代码层面针对每一个参数硬编码,无法清晰地进行核对,如果参数达到几十个、上百个,出错的概率极大。
图解-第三方请求参数Api:
代码最佳实践:理论分析
- 就是要用注解和反射!
- 所有处理参数排序、填充、加签、请求调用的核心逻辑,都汇聚在了 remoteCall 方法中
- 许多涉及类结构性的通用处理,都可以按照这个模式来减少重复代码
- 反射给予了我 们在不知晓类结构的时候,按照固定的逻辑处理类的成员;
- 而注解给了我们为这些成员补充 元数据的能力,使得我们利用反射实现通用逻辑的时候,可以从外部获得更多我们关心的数 据
代码最佳实践:源码
/**
* 我们就能通过自定义注解为接口和所有参数增加一些元数据.
* 我们定义一个接口 API 的注解 BankAPI,包含接口 URL 地址和接口说明
*/
public BankAPI {
String desc() default "";
String url() default "";
}
/**
* 用于描述接口的每一个字段规范,包 含参数的次序、类型和长度三个属性
*/
public BankAPIField {
int order() default -1;
int length() default -1;
String type() default "";
}
/**
* 要实现接口逻辑和逻辑实现的剥离,首先需要以 POJO 类(只有属性没有任何业务逻辑的 数据类)的方式定义所有的接口参数
* 创建用户API
*/
public class CreateUserAPI extends AbstractAPI {
private String name;
private String identity;
private String mobile;
private int age;
}
/**
* AbstractAPI 类是一个空实现,因为这个案例中的接口并没有公共数据可 以抽象放到基类。
*/
public abstract class AbstractAPI {
}
/**
* 反射配合注解,动态获取接口参数类
*/
public class BetterBankService {
/**
* 创建用户api接口
*
* @param name
* @param identity
* @param mobile
* @param age
* @return
* @throws IOException
*/
public static String createUser(String name, String identity, String mobile, int age) throws IOException {
CreateUserAPI createUserAPI = new CreateUserAPI();
//封装参数
createUserAPI.setName(name);
createUserAPI.setIdentity(identity);
createUserAPI.setAge(age);
createUserAPI.setMobile(mobile);
return remoteCall(createUserAPI);
}
public static String pay(long userId, BigDecimal amount) throws IOException {
PayAPI payAPI = new PayAPI();
//封装参数
payAPI.setUserId(userId);
payAPI.setAmount(amount);
return remoteCall(payAPI);
}
//所有处理参数排序、填充、加签、请求调用的核心逻辑,都汇聚在了 remoteCall 方法中
private static String remoteCall(AbstractAPI api) throws IOException {
//从BankAPI注解获取请求地址
BankAPI bankAPI = api.getClass().getAnnotation(BankAPI.class);
bankAPI.url();
StringBuilder stringBuilder = new StringBuilder();
Arrays.stream(api.getClass().getDeclaredFields()) //获得所有字段
.filter(field -> field.isAnnotationPresent(BankAPIField.class)) //查找标记了注解的字段
.sorted(Comparator.comparingInt(a -> a.getAnnotation(BankAPIField.class).order())) //根据注解中的order对字段排序
.peek(field -> field.setAccessible(true)) //设置可以访问私有字段
.forEach(field -> {
//获得注解
BankAPIField bankAPIField = field.getAnnotation(BankAPIField.class);
Object value = "";
try {
//反射获取字段值
value = field.get(api);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
//根据字段类型以正确的填充方式格式化字符串
switch (bankAPIField.type()) {
case "S": {
stringBuilder.append(String.format("%-" + bankAPIField.length() + "s", value.toString()).replace(' ', '_'));
break;
}
case "N": {
stringBuilder.append(String.format("%" + bankAPIField.length() + "s", value.toString()).replace(' ', '0'));
break;
}
case "M": {
if (!(value instanceof BigDecimal))
throw new RuntimeException(String.format("{} 的 {} 必须是BigDecimal", api, field));
stringBuilder.append(String.format("%0" + bankAPIField.length() + "d", ((BigDecimal) value).setScale(2, RoundingMode.DOWN).multiply(new BigDecimal("100")).longValue()));
break;
}
default:
break;
}
});
//签名逻辑
stringBuilder.append(DigestUtils.md2Hex(stringBuilder.toString()));
String param = stringBuilder.toString();
long begin = System.currentTimeMillis();
//发请求
String result = Request.Post("http://localhost:45678/reflection" + bankAPI.url())
.bodyString(param, ContentType.APPLICATION_JSON)
.execute().returnContent().asString();
log.info("调用银行API {} url:{} 参数:{} 耗时:{}ms", bankAPI.desc(), bankAPI.url(), param, System.currentTimeMillis() - begin);
return result;
}
}
利用属性拷贝工具消除重复代码
使用类似BeanUtils这种Mapping工具来做Bean的转换
BeanUtils.copyProperties(orderDTO, orderDO, "id"); |
问题讨论
- 除了模板方法设计模式是减少重复代码的一把好手,观察者模式也常用于减少代码重复 (并且是松耦合方式)。Spring 也提供了类似工具,你能想到有哪 些应用场景吗?
- 有名的当属guava的EventBus了
- 关于 Bean 属性复制工具,除了最简单的 Spring 的 BeanUtils 工具类的使用,你还知道 哪些对象映射类库吗?它们又有什么功能呢?
重点回顾
- 第一种代码重复是,有多个并行的类实现相似的代码逻辑。我们可以考虑提取相同逻辑在父 类中实现,差异逻辑通过抽象方法留给子类实现。使用类似的模板方法把相同的流程和逻辑 固定成模板,保留差异的同时尽可能避免代码重复。同时,可以使用 Spring 的 IoC 特性注 入相应的子类,来避免实例化子类时的大量 if…else 代码
- 第二种代码重复是,使用硬编码的方式重复实现相同的数据处理算法。我们可以考虑把规则 转换为自定义注解,作为元数据对类或对字段、方法进行描述,然后通过反射动态读取这些 元数据、字段或调用方法,实现规则参数和规则定义的分离。也就是说,把变化的部分也就 是规则的参数放入注解,规则的定义统一处理
- 第三种代码重复是,业务代码中常见的 DO、DTO、VO 转换时大量字段的手动赋值,遇到 有上百个属性的复杂类型,非常非常容易出错。我的建议是,不要手动进行赋值,考虑使用 Bean 映射工具进行。此外,还可以考虑采用单元测试对所有字段进行赋值正确性校验
TODO:使用了并发工具类库,线程安全就高枕无忧了吗?
TODO:代码加锁:不要让“锁”事成为烦心事
TODO:线程池:业务代码最常用也最容易犯错的组件
TODO:用好Java8的日期时间类,少踩一些“老三样”的坑
TODO:别以为“自动挡”就不可能出现OOM
TODO:接口设计:系统间对话的语言,一定要统一
缓存设计:缓存可以锦上添花也可以落井下石
- 通常我们会使用更快的介质(比如内存)作为缓存,来解决较慢介质(比如磁盘)读取数据 慢的问题,缓存是用空间换时间,来解决性能问题的一种架构设计模式。更重要的是,磁盘 上存储的往往是原始数据,而缓存中保存的可以是面向呈现的数据。这样一来,缓存不仅仅 是加快了 IO,还可以减少原始数据的计算工作。
- 缓存系统一般设计简单,功能相对单一,所以诸如 Redis 这种缓存系统的整体吞吐 量,能达到关系型数据库的几倍甚至几十倍,因此缓存特别适用于互联网应用的高并发场 景。
- 使用 Redis 做缓存虽然简单好用,但使用和设计缓存并不是 set 一下这么简单,需要注意 缓存的同步、雪崩、并发、穿透等问题。今天,我们就来详细聊聊。
不要把 Redis 当作数据库
- 通常,我们会使用 Redis 等分布式缓存数据库来缓存数据,但是千万别把 Redis 当做数据 库来使用。因为 Redis 中数据消失导致业务逻辑错误,并且因为没有 保留原始数据,业务都无法恢复。
- Redis 的确具有数据持久化功能,可以实现服务重启后数据不丢失。这一点,很容易让我们 误认为 Redis 可以作为高性能的 KV 数据库
- 从本质上来看,Redis(免费版)是一个内存数据库,所有数据保存在内存中,并且 直接从内存读写数据响应操作,只不过具有数据持久化能力。所以,Redis 的特点是,处理 请求很快,但无法保存超过内存大小的数据。
- VM 模式虽然可以保存超过内存大小的数据,但是因为性能原因从 2.6 开始已经 被废弃。此外,Redis 企业版提供了 Redis on Flash 可以实现 Key+ 字典 + 热数据保存 在内存中,冷数据保存在 SSD 中
- 把 Redis 用作缓存,我们需要注意两点。
- 从客户端的角度来说,缓存数据的特点一定是有原始数据来源,且允许丢失,即使设 置的缓存时间是 1 分钟,在 30 秒时缓存数据因为某种原因消失了,我们也要能接受。当数 据丢失后,我们需要从原始数据重新加载数据,不能认为缓存系统是绝对可靠的,更不能认 为缓存系统不会删除没有过期的数据。
- 从 Redis 服务端的角度来说,缓存系统可以保存的数据量一定是小于原始数据的。 首先,我们应该限制 Redis 对内存的使用量,也就是设置 maxmemory 参数;其次,我们 应该根据数据特点,明确 Redis 应该以怎样的算法来驱逐数据。常用的数据淘汰策略有
- allkeys-lru,针对所有 Key,优先删除最近最少使用的 Key;
- volatile-lru,针对带有过期时间的 Key,优先删除最近最少使用的 Key;
- volatile-ttl,针对带有过期时间的 Key,优先删除即将过期的 Key(根据 TTL 的值);
- allkeys-lfu(Redis 4.0 以上),针对所有 Key,优先删除最少使用的 Key;
- volatile-lfu(Redis 4.0 以上),针对带有过期时间的 Key,优先删除最少使用的 Key。
- 其实,这些算法是 Key 范围 +Key 选择算法的搭配组合,其中范围有 allkeys 和 volatile 两种,算法有 LRU、TTL 和 LFU 三种。接下来,我就从 Key 范围和算法角度,和你说说如 何选择合适的驱逐算法。
- 首先,从算法角度来说,Redis 4.0 以后推出的 LFU 比 LRU 更“实用”。试想一下,如果 一个 Key 访问频率是 1 天一次,但正好在 1 秒前刚访问过,那么 LRU 可能不会选择优先 淘汰这个 Key,反而可能会淘汰一个 5 秒访问一次但最近 2 秒没有访问过的 Key,而 LFU 算法不会有这个问题。而 TTL 会比较“头脑简单”一点,优先删除即将过期的 Key,但有 可能这个 Key 正在被大量访问。
- 然后,从 Key 范围角度来说,allkeys 可以确保即使 Key 没有 TTL 也能回收,如果使用的 时候客户端总是“忘记”设置缓存的过期时间,那么可以考虑使用这个系列的算法。而 volatile 会更稳妥一些,万一客户端把 Redis 当做了长效缓存使用,只是启动时候初始化一 次缓存,那么一旦删除了此类没有 TTL 的数据,可能就会导致客户端出错。
- 所以,不管是使用者还是管理者都要考虑 Redis 的使用方式,使用者需要考虑应该以缓存 的姿势来使用 Redis,管理者应该为 Redis 设置内存限制和合适的驱逐策略,避免出现 OOM。
注意缓存雪崩问题
由于缓存系统的 IOPS 比数据库高很多,因此要特别小心短时间内大量缓存失效的情况。这 种情况一旦发生,可能就会在瞬间有大量的数据需要回源到数据库查询,对数据库造成极大 的压力,极限情况下甚至导致后端数据库直接崩溃。这就是我们常说的缓存失效,也叫作缓 存雪崩。
从广义上说,产生缓存雪崩的原因有两种:
- 第一种是,缓存系统本身不可用,导致大量请求直接回源到数据库;
- 第二种是,应用设计层面大量的 Key 在同一时间过期,导致大量的数据回源。
- 第一种原因,主要涉及缓存系统本身高可用的配置,不属于缓存设计层面的问题,所以今天 我主要和你说说如何确保大量 Key 不在同一时间被动过期。
栗子源码
/**
* 产生缓存雪崩:大量的key在同一时间过期,请求流量到db去了
*/
//@PostConstruct
public void wrongInit() {
IntStream.rangeClosed(1, 1000).forEach(i -> stringRedisTemplate.opsForValue().set("city" + i, getCityFromDb(i), 30, TimeUnit.SECONDS));
log.info("Cache init finished");
//每秒一次,输出数据库访问的QPS
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
log.info("DB QPS : {}", atomicInteger.getAndSet(0));
}, 0, 1, TimeUnit.SECONDS);
}
/**
* 缓存雪崩解决方案1:ThreadLocalRandom.current().nextInt(10),增加随机休眠时间
*/
public void rightInit1() {
IntStream.rangeClosed(1, 1000).forEach(i -> stringRedisTemplate.opsForValue().set("city" + i, getCityFromDb(i), 30 + ThreadLocalRandom.current().nextInt(10), TimeUnit.SECONDS));
log.info("Cache init finished");
//每秒一次,输出数据库访问的QPS
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
log.info("DB QPS : {}", atomicInteger.getAndSet(0));
}, 0, 1, TimeUnit.SECONDS);
}
/**
* 让缓存不主动过期。初始化缓存数据的时候设置缓存永不过期,然后启动一个后台 线程 30 秒一次定时把所有数据更新到缓存,而且通过适当的休眠.
* DB QPS : 37
* DB QPS : 37
* DB QPS : 35
* DB QPS : 36
* DB QPS : 38
* DB QPS : 40
* DB QPS : 40
*
* @throws InterruptedException
*/
//@PostConstruct
public void rightInit2() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
//模拟每30s,更新数据库
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
IntStream.rangeClosed(1, 1000).forEach(i -> {
String data = getCityFromDb(i);
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
}
if (!StringUtils.isEmpty(data)) {
stringRedisTemplate.opsForValue().set("city" + i, data);
}
});
log.info("Cache update finished");
//启动程序的时候需要等待首次更新缓存完成
countDownLatch.countDown();
}, 0, 30, TimeUnit.SECONDS);
//每秒一次,输出数据库访问的QPS
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
log.info("DB QPS : {}", atomicInteger.getAndSet(0));
}, 0, 1, TimeUnit.SECONDS);
countDownLatch.await();
}
解决缓存 Key 同时大规模失效需要回源,导致数据库压力激增问题的方式有两种。
- 差异化缓存过期时间,不要让大量的 Key 在同一时间过期。比如,在初始化缓存 的时候,设置缓存的过期时间是 30 秒 +10 秒以内的随机延迟(扰动值)。这样,这些 Key 不会集中在 30 秒这个时刻过期,而是会分散在 30~40 秒之间过期
- 方案二,让缓存不主动过期。初始化缓存数据的时候设置缓存永不过期,然后启动一个后台 线程 30 秒一次定时把所有数据更新到缓存,而且通过适当的休眠,控制从数据库更新数据 的频率,降低数据库压力。并且数据库的压力会比较稳定
- 关于这两种解决方案,我们需要特别注意以下三点:
- 方案一和方案二是截然不同的两种缓存方式,如果无法全量缓存所有数据,那么只能使用方案一;
- 即使使用了方案二,缓存永不过期,同样需要在查询的时候,确保有回源的逻辑。正如 之前所说,我们无法确保缓存系统中的数据永不丢失
- 不管是方案一还是方案二,在把数据从数据库加入缓存的时候,都需要判断来自数据库 的数据是否合法,比如进行最基本的判空检查。把数据加入缓存之前一 定要校验数据。
说到这里,我们再仔细看一下回源 QPS 超过 700 的截图,可以看到在并发情况下,总共 1000 条数据回源达到了 1002 次,说明有一些条目出现了并发回源。这,就是我后面要讲 到的缓存并发问题
注意缓存击穿问题
在某些 Key 属于极端热点数据,且并发量很大的情况下,如果这个 Key 过期,可能会在某 个瞬间出现大量的并发请求同时回源,相当于大量的并发请求直接打到了数据库。这种情 况,就是我们常说的缓存击穿或缓存并发问题。
缓存击穿问题,理论分析:
- 如果回源操作特别昂贵,那么这种并发就不能忽略不计。这时,我们可以考虑使用锁机制来 限制回源的并发。
- 在真实的业务场景下,不一定要这么严格地使用双重检查分布式锁进行全局的并发限制,因 为这样虽然可以把数据库回源并发降到最低,但也限制了缓存失效时的并发。可以考虑的方式是下面两种
- 解决方案一:使用进程内的锁进行限制,这样每一个节点都可以以一个并发回源数据库;
- 解决方案二:不使用锁进行限制,而是使用类似 Semaphore 的工具限制并发数,比如限制 为 10,这样既限制了回源并发数不至于太大,又能使得一定量的线程可以同时回源。
缓存击穿问题,源码分析:
/**
* qps能达到20,产生了回源并发问题
*
* @return
*/
public String wrong() {
String data = stringRedisTemplate.opsForValue().get("hotsopt");
if (StringUtils.isEmpty(data)) {
data = getExpensiveData();
stringRedisTemplate.opsForValue().set("hotsopt", data, 5, TimeUnit.SECONDS);
}
return data;
}
/**
* 缓存击穿(缓存并发问题,)
* 分布式锁:解决回源并发问题,如果回源的成本很高,这是不能忽略的问题。
* 方案1:分布式锁+双重检查限制了全局的并发只有一个。
* <p>
* 在真实的业务场景下,不一定要这么严格地使用双重检查分布式锁进行全局的并发限制,因 为这样虽然可以把数据库回源并发降到最低,但也限制了缓存失效时的并发。可以考虑的方 式是:
* <p>
* 1、使用进程内的锁进行限制,这样每一个节点都可以以一个并发回源数据库;
* 2、不使用锁进行限制,而是使用类似 Semaphore 的工具限制并发数,比如限制 为 10,这样既限制了回源并发数不至于太大,又能使得一定量的线程可以同时回源
*
* @return
*/
public String right() {
String data = stringRedisTemplate.opsForValue().get("hotsopt");
if (StringUtils.isEmpty(data)) {
RLock locker = redissonClient.getLock("locker");
if (locker.tryLock()) {
try {
//双重检查,因为可能已经有一个B线程过了第一次判断,在等锁,然后A线程已经把结果返回了
data = stringRedisTemplate.opsForValue().get("hotsopt");
if (StringUtils.isEmpty(data)) {
data = getExpensiveData();
stringRedisTemplate.opsForValue().set("hotsopt", data, 5, TimeUnit.SECONDS);
}
} finally {
locker.unlock();
}
}
}
return data;
}
注意缓存穿透问题
- 在之前的例子中,缓存回源的逻辑都是当缓存中查不到需要的数据时,回源到数据库查询。 这里容易出现的一个漏洞是,缓存中没有数据不一定代表数据没有缓存,还有一种可能是原 始数据压根就不存在。
- 缓存穿透和缓存击穿的区别
- 缓存穿透是指,缓存没有起到压力缓冲的作用;
- 而缓存击穿是指,缓存失效时瞬时的并发打到数据库。
- 缓存穿透问题的解决方案:
- 对于不存在的数据,同样设置一个特殊的 Value 到缓存中,比如当数据库中查出 的用户信息为空的时候,设置 NODATA 这样具有特殊含义的字符串到缓存中。这样下次请 求缓存的时候还是可以命中缓存,即直接从缓存返回结果,不查询数据库
- 布隆过滤器做前置过滤。
- 布隆过滤器是一种概率型数据库结构,由一个很长的二进制向量和一系列随机映射函数组 成。它的原理是,当一个元素被加入集合时,通过 k 个散列函数将这个元素映射成一个 m 位 bit 数组中的 k 个点,并置为 1
- 检索时,我们只要看看这些点是不是都是 1 就(大概)知道集合中有没有它了。如果这些 点有任何一个 0,则被检元素一定不在;如果都是 1,则被检元素很可能在。
- 布隆过滤器不保存原始值,空间效率很高,平均每一个元素占用 2.4 字节就可以达到万分 之一的误判率。这里的误判率是指,过滤器判断值存在而实际并不存在的概率。我们可以设 置布隆过滤器使用更大的存储空间,来得到更小的误判率。
- 对于方案二,我们需要同步所有可能存在的值并加入布隆过滤器,这是比较麻烦的地方。如 果业务规则明确的话,你也可以考虑直接根据业务规则判断值是否存在。
- 其实,方案二可以和方案一同时使用,即将布隆过滤器前置,对于误判的情况再保存特殊值 到缓存,双重保险避免无效数据查询请求打到数据库。
注意缓存数据同步策略
- 前面提到的 3 个案例,其实都属于缓存数据过期后的被动删除。在实际情况下,修改了原 始数据后,考虑到缓存数据更新的及时性,我们可能会采用主动更新缓存的策略。这些策略 可能是:
- 先更新缓存,再更新数据库;
- 先更新数据库,再更新缓存;
- 先删除缓存,再更新数据库,访问的时候按需加载数据到缓存。
- 先更新数据库,再删除缓存,访问的时候按需加载数据到缓存。(最佳实践)
- “先更新缓存再更新数据库”策略不可行。数据库设计复杂,压力集中,数据库因为超时等 原因更新操作失败的可能性较大,此外还会涉及事务,很可能因为数据库更新失败,导致缓 存和数据库的数据不一致。
- “先更新数据库再更新缓存”策略不可行。一是,如果线程 A 和 B 先后完成数据库更新, 但更新缓存时却是 B 和 A 的顺序,那很可能会把旧数据更新到缓存中引起数据不一致;二 是,我们不确定缓存中的数据是否会被访问,不一定要把所有数据都更新到缓存中去。
- “先删除缓存再更新数据库,访问的时候按需加载数据到缓存”策略也不可行。在并发的情 况下,很可能删除缓存后还没来得及更新数据库,就有另一个线程先读取了旧值到缓存中, 如果并发量很大的话这个概率也会很大。
- “先更新数据库再删除缓存,访问的时候按需加载数据到缓存”策略是最好的。虽然在极端 情况下,这种策略也可能出现数据不一致的问题,但概率非常低,基本可以忽略。举一 个“极端情况”的例子,比如更新数据的时间节点恰好是缓存失效的瞬间,这时 A 先读取 到了旧值,随后在 B 操作数据库完成更新并且删除了缓存之后,A 再把旧值加入缓存。
- 需要注意的是,更新数据库后删除缓存的操作可能失败,如果失败则考虑把任务加入延迟队 列进行延迟重试,确保数据可以删除,缓存可以及时更新。因为删除操作是幂等的,所以即 使重复删问题也不是太大,这又是删除比更新好的一个原因。
- 因此,针对缓存更新更推荐的方式是,缓存中的数据不由数据更新操作主动触发,统一在需 要使用的时候按需加载,数据更新后及时删除缓存中的数据即可。
问题讨论
- 在聊到缓存并发问题时,我们说到热点 Key 回源会对数据库产生的压力问题,如果 Key 特别热的话,可能缓存系统也无法承受,毕竟所有的访问都集中打到了一台缓存服务 器。如果我们使用 Redis 来做缓存,那可以把一个热点 Key 的缓存查询压力,分散到多 个 Redis 节点上吗?
- 加随机前缀后缀是一个办法
- 热key:https://www.infoq.cn/article/3L3zAQ4H8xpNoM2glSyi
- 大 Key 也是数据缓存容易出现的一个问题。如果一个 Key 的 Value 特别大,那么可能 会对 Redis 产生巨大的性能影响,因为 Redis 是单线程模型,对大 Key 进行查询或删除 等操作,可能会引起 Redis 阻塞甚至是高可用切换。你知道怎么查询 Redis 中的大 Key,以及如何在设计上实现大 Key 的拆分吗?
布隆过滤器数据量大的问题
- https://krisives.github.io/bloom-calculator/ 10亿的数据量,期望千分之一的错误率,推荐10个Hash函数,占用内存空间不到1.8GB
重点回顾
- 第一,我们不能把诸如 Redis 的缓存数据库完全当作数据库来使用。我们不能假设缓存始 终可靠,也不能假设没有过期的数据必然可以被读取到,需要处理好缓存的回源逻辑;而且 要显式设置 Redis 的最大内存使用和数据淘汰策略,避免出现 OOM 的问题。
- 第二,缓存的性能比数据库好很多,我们需要考虑大量请求绕过缓存直击数据库造成数据库 瘫痪的各种情况。对于缓存瞬时大面积失效的缓存雪崩问题,可以通过差异化缓存过期时间 解决;对于高并发的缓存 Key 回源问题,可以使用锁来限制回源并发数;对于不存在的数 据穿透缓存的问题,可以通过布隆过滤器进行数据存在性的预判,或在缓存中也设置一个值 来解决。
- 第三,当数据库中的数据有更新的时候,需要考虑如何确保缓存中数据的一致性。我们看 到,“先更新数据库再删除缓存,访问的时候按需加载数据到缓存”的策略是最为妥当的, 并且要尽量设置合适的缓存过期时间,这样即便真的发生不一致,也可以在缓存过期后数据 得到及时同步。
TODO:业务代码写完,就意味着生产就绪了?
- 生产就绪需要做哪些工作呢?我认为,以下三方面的工作最重要。
- 第一,提供健康检测接口。传统采用 ping 的方式对应用进行探活检测并不准确。有的时 候,应用的关键内部或外部依赖已经离线,导致其根本无法正常工作,但其对外的 Web 端 口或管理端口是可以 ping 通的。我们应该提供一个专有的监控检测接口,并尽可能触达一 些内部组件
- 第二,暴露应用内部信息。应用内部诸如线程池、内存队列等组件,往往在应用内部扮演了 重要的角色,如果应用或应用框架可以对外暴露这些重要信息,并加以监控,那么就有可能 在诸如 OOM 等重大问题暴露之前发现蛛丝马迹,避免出现更大的问题。
- 第三,建立应用指标 Metrics 监控。Metrics 可以翻译为度量或者指标,指的是对于一些 关键信息以可聚合的、数值的形式做定期统计,并绘制出各种趋势图表。这里的指标监控, 包括两个方面:一是,应用内部重要组件的指标监控,比如 JVM 的一些指标、接口的 QPS 等;二是,应用的业务数据的监控,比如电商订单量、游戏在线人数等
准备工作:配置 Spring Boot Actuator
消息队列-最佳实践
- 异步处理是互联网应用不可或缺的一种架构模式,大多数业务项目都是由同步处理、异步处 理和定时任务处理三种模式相辅相成实现的。
- 区别于同步处理,异步处理无需同步等待流程处理完毕,因此适用场景主要包括:
- 服务于主流程的分支流程。
- 用户不需要实时看到结果的流程。
- 同时,异步处理因为可以有 MQ 中间件的介入用于任务的缓冲的分发,所以相比于同步处 理,在应对流量洪峰、实现模块解耦和消息广播方面有功能优势。MQ中间件虽然好用,但在实现的时候却有三个最容易犯的错,分别是异步处理流程的 可靠性问题、消息发送模式的区分问题,以及大量死信消息堵塞队列的问题。使用 Spring AMQP 来操作 RabbitMQ
异步处理需要消息补偿闭环
使用类似 RabbitMQ、RocketMQ 等 MQ 系统来做消息队列实现异步处理,虽然说消息可 以落地到磁盘保存,即使 MQ 出现问题消息数据也不会丢失,但是异步流程在消息发送、 传输、处理等环节,都可能发生消息丢失。此外,任何 MQ 中间件都无法确保 100% 可 用,需要考虑不可用时异步流程如何继续进行。
- 因此,对于异步处理流程,必须考虑补偿或者说建立主备双活流程。
栗子场景:用户注册后异步发送欢迎消息的场景。用户注册落数据库的流程为同步流程, 会员服务收到消息后发送欢迎消息的流程为异步流程
- 蓝色的线,使用 MQ 进行的异步处理,我们称作主线,可能存在消息丢失的情况(虚线 代表异步调用);
- 绿色的线,使用补偿 Job 定期进行消息补偿,我们称作备线,用来补偿主线丢失的消 息;
- 考虑到极端的 MQ 中间件失效的情况,我们要求备线的处理吞吐能力达到主线的能力水 平。
栗子理论分析:
- 消费MQ信息一定要实现幂等,处理逻辑务必考虑去重。
- MQ 消息可能会因为中间件本身配置错误、稳定性等原因出现重复。
- 自动补偿重复。比如本例,同一条消息可能既走 MQ 也走补偿,肯定会出现重复,而且 考虑到高内聚,补偿 Job 本身不会做去重处理。
- 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台 进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时 间后处理程序又收到消息了,重复处理。我之前就遇到过一次由 MQ 故障引发的事故, MQ 中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了 就先通过后台进行了人工处理,结果 MQ 系统恢复后消息又被重复处理了一次,造成大 量资金重复发放。
- 定义补偿Job,备线操作。为了实现高内聚,主线和备线处理消息,最好使用同一个方法。生产级别补偿job需要完善如下:
- 考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足 补偿的吞吐量。
- 考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前的用户再进行补偿,以 方便和主线 MQ 实时流程错开,避免冲突。
- 诸如当前补偿到哪个用户的 offset 数据,需要落地数据库。
- 补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统。
- 消费MQ信息一定要实现幂等,处理逻辑务必考虑去重。
栗子源码实现:
/**
* 用户注册接口
*/
public class UserController {
private UserService userService;
private RabbitTemplate rabbitTemplate;
/**
* 模拟用户注册,一次注册10个用户,50%发送MQ失败
*/
public void register() {
IntStream.rangeClosed(1, 10).forEach(i -> {
User user = userService.register();
if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) {
rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user);
log.info("sent mq user {}", user.getId());
}
});
}
}
/**
* 会员服务:监听(消费)MQ信息
*/
public class MemberService {
/**
* 幂等实现
*/
private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();
/**
* 监听MQ信息
*
* @param user
*/
public void listen(User user) {
log.info("receive mq user {}", user.getId());
welcome(user);
}
/**
* 会员服务
*
* @param user
*/
public void welcome(User user) {
//不存在的用户,才执行方法里面的逻辑
if (welcomeStatus.putIfAbsent(user.getId(), true) == null) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
log.info("memberService: welcome new user {}", user.getId());
}
}
/**
* {@link ConcurrentHashMap}不允许key和value为Null
* <p>
* putIfAbsent,key存在的话,返回获取到的value值。
* putIfAbsent,key不存在的话,返回null.
*/
private void testPutIfAbsent() {
Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();
welcomeStatus.put(1L, true);
if (welcomeStatus.putIfAbsent(2L, true) == null) {
System.out.println("id=2被执行了");
}
if (welcomeStatus.putIfAbsent(1L, true) == null) {
System.out.println("id=1被执行了");
}
System.out.println(welcomeStatus.toString());
}
}
/**
* 全量补偿补偿Job
* <p>
* 生产级别的todo:
* 1、考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足 补偿的吞吐量。
* 2、考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前的用户再进行补偿,以 方便和主线 MQ 实时流程错开,避免冲突。
* 3、诸如当前补偿到哪个用户的 offset 数据,需要落地数据库。
* 4、补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统。
*/
public class CompensationJob {
/**
* 补偿job异步线程池
*/
private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor(
10, 10,
1, TimeUnit.HOURS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get());
private UserService userService;
private MemberService memberService;
/**
* 目前补偿到哪个用户id
*/
private long offset = 0;
/**
* 系统启动后10s补偿,每隔5s补偿一次,每次补偿5个用户
*/
public void compensationJob() {
log.info("开始从用户ID {} 补偿", offset);
userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> {
compensationThreadPool.execute(() -> memberService.welcome(user));
offset = user.getId();
});
}
}
注意消息模式是广播还是工作队列
谈谈MQ消息模式两者的区别:广播和队列模式
- 消息广播,和我们平时说的“广播”意思差不多,就是希望同一条消息,不同消费者都能分 别消费;而队列模式,就是不同消费者共享消费同一个队列的数据,相同消息只能被某一个 消费者消费一次。
- 比如,同一个用户的注册消息,会员服务需要监听以发送欢迎短信,营销服务同样需要监听 以发送新用户小礼物。但是,会员服务、营销服务都可能有多个实例,我们期望的是同一个 用户的消息,可以同时广播给不同的服务(广播模式),但对于同一个服务的不同实例(比 如会员服务 1 和会员服务 2),不管哪个实例来处理,处理一次即可(工作队列模式):
对于类似 Kafka/RocketMQ 这样的 MQ 来说,实现类似功能比较简单直白:如果消费者属于一 个组,那么消息只会由同一个组的一个消费者来消费;如果消费者属于不同组,那么每个组 都能消费一遍消息。
- 而对于 RabbitMQ 来说,消息路由的模式采用的是队列 + 交换器,队列是消息的载体,交 换器决定了消息路由到队列的方式,配置比较复杂,容易出错。所以,接下来我重点和你讲 讲 RabbitMQ 的相关代码实现。使用RabbitMq实现广播模式和工作队列模式,上图的栗子。
第一步,实现会员服务监听用户服务发出的新用户注册消息的那部分逻辑。
如果我们启动两个会员服务,那么同一个用户的注册消息应该只能被其中一个实例消费。
图解Mq工作队列模式:
一个服务多个实例,mq工作队列模式代码最佳实践:
/**
* rabbitmq实现工作队列模式,一个服务,多个实例,只有一个实例消费MQ信息。
* <p>
* 1、为了代码简洁直观,我们把消息发布者、消费者、以及MQ的配置代码都放在了一起.
* 2、同一个会员服务两个实例都收到了消息:出现这个问题的原因是,我们没有理清楚 RabbitMQ 直接交换器和队列的绑定关系。
* 3、轮训方式负载均衡
*/
public class WorkQueueRight {
private static final String EXCHANGE = "newuserExchange";
private static final String QUEUE = "newuserQueue";
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE, "test", UUID.randomUUID().toString());
}
/**
* 定义相同队列名称的队列,两个实例绑定同一个队列
*
* @return
*/
public Queue queue() {
return new Queue(QUEUE);
}
/**
* 声明DirectExchange交换器,绑定队列到交换器
*
* @return
*/
public Declarables declarables() {
DirectExchange exchange = new DirectExchange(EXCHANGE);
return new Declarables(queue(), exchange,
BindingBuilder.bind(queue()).to(exchange).with("test"));
}
/**
* mq消息,消费者
*
* @param userName
*/
public void memberService(String userName) {
log.info("memberService: welcome message sent to new user {}", userName);
}
}
第二步,进一步完整实现用户服务需要广播消息给会员服务和营销服务的逻辑。
我们希望会员服务和营销服务都可以收到广播消息,但会员服务或营销服务中的每个实例只 需要收到一次消息。
源码分析:
/**
* 广播模式:代码最佳实践
*/
public class FanoutQueueRight {
private static final String MEMBER_QUEUE = "newusermember";
private static final String PROMOTION_QUEUE = "newuserpromotion";
private static final String EXCHANGE = "newuser";
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
}
/**
* 声明两个队列绑定到交换器
*
* @return
*/
public Declarables declarables() {
Queue memberQueue = new Queue(MEMBER_QUEUE);
Queue promotionQueue = new Queue(PROMOTION_QUEUE);
FanoutExchange exchange = new FanoutExchange(EXCHANGE);
return new Declarables(memberQueue, promotionQueue, exchange,
BindingBuilder.bind(memberQueue).to(exchange),
BindingBuilder.bind(promotionQueue).to(exchange));
}
public void memberService1(String userName) {
log.info("memberService1: welcome message sent to new user {}", userName);
}
public void memberService2(String userName) {
log.info("memberService2: welcome message sent to new user {}", userName);
}
public void promotionService1(String userName) {
log.info("promotionService1: gift sent to new user {}", userName);
}
public void promotionService2(String userName) {
log.info("promotionService2: gift sent to new user {}", userName);
}
}
别让死信堵塞了消息队列
在定义线程池的时候,如果线程池的任务队列没有上限,那么最终可能会导致 OOM。使用消息队列处理异步流程的时候,我们也同样要注意消息队列的任务堆积问题。 对于突发流量引起的消息队列堆积,问题并不大,适当调整消费者的消费能力应该就可以解 决。但在很多时候,消息队列的堆积堵塞,是因为有大量始终无法处理的消息。
比如,用户服务在用户注册后发出一条消息,会员服务监听到消息后给用户派发优惠券,但 因为用户并没有保存成功,会员服务处理消息始终失败,消息重新进入队列,然后还是处理 失败。这种在 MQ 中像幽灵一样回荡的同一条消息,就是死信。
- 随着 MQ 被越来越多的死信填满,消费者需要花费大量时间反复处理死信,导致正常消息 的消费受阻,最终 MQ 可能因为数据量过大而崩溃。
如何解决死信问题,理论分析:
- 解决死信无限重复进入队列最简单的方式是,在程序处理出错的时候,直接抛出 {@link AmqpRejectAndDontRequeueException}异常
- 对于同一条消息,能够先进行几次重试,解决因为网络问题导致 的偶发消息处理失败,如果还是不行的话,再把消息投递到专门的一个死信队列。对于来自 死信队列的数据,我们可能只是记录日志发送报警,即使出现异常也不会再重复投递。
- 针对这个问题,Spring AMQP 提供了非常方便的解决方案:
- 首先,定义死信交换器和死信队列。其实,这些都是普通的交换器和队列,只不过被我 们专门用于处理死信消息。
- 然后,通过 RetryInterceptorBuilder 构建一个 RetryOperationsInterceptor,用于处 理失败时候的重试。这里的策略是,最多尝试 5 次(重试 4 次);并且采取指数退避重 试,首次重试延迟 1 秒,第二次 2 秒,以此类推,最大延迟是 10 秒;如果第 4 次重试 还是失败,则使用 RepublishMessageRecoverer 把消息重新投入一个“死信交换 器”中。
- 最后,定义死信队列的处理程序。这个案例中,我们只是简单记录日志。
如何解决死信问题,源码分析
/**
* 死信MQ定义
*/
public class RabbitConfiguration {
private RabbitTemplate rabbitTemplate;
/**
* 快速声明一组对象,包含队列、交换器,以及队列到交换器的绑定
*
* @return
*/
public Declarables declarables() {
Queue queue = new Queue(Consts.QUEUE);
DirectExchange directExchange = new DirectExchange(Consts.EXCHANGE);
return new Declarables(queue, directExchange,
BindingBuilder.bind(queue).to(directExchange).with(Consts.ROUTING_KEY));
}
/**
* 定义死信交换器和队列,并且进行绑定
*
* @return
*/
public Declarables declarablesForDead() {
Queue queue = new Queue(Consts.DEAD_QUEUE);
DirectExchange directExchange = new DirectExchange(Consts.DEAD_EXCHANGE);
return new Declarables(queue, directExchange,
BindingBuilder.bind(queue).to(directExchange).with(Consts.DEAD_ROUTING_KEY));
}
/**
* 定义重试操作拦截器,对于同一条消息,能够先进行几次重试,解决因为网络问题导致 的偶发消息处理失败,如果还是不行的话,再把消息投递到专门的一个死信队列。
*
* @return
*/
public RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000)
.recoverer(new RepublishMessageRecoverer(rabbitTemplate, Consts.DEAD_EXCHANGE, Consts.DEAD_ROUTING_KEY))
.build();
}
/**
* 通过定义SimpleRabbitListenerContainerFactory,设置其adviceChain属性为之前定义的RetryOperationsInterceptor
* <p>
* 默认情况下 SimpleMessageListenerContainer 只有一个消费线程。只有等msg1消费完成之后,msg2才开始消费
*
* @param connectionFactory
* @return
*/
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(interceptor());
factory.setConcurrentConsumers(10);
return factory;
}
}
/**
* MQ消息消费者
*/
public class MQListener {
/**
* 模拟MQ消费信息出现异常的情况:
* <p>
* 解决方案1:解决死信无限重复进入队列最简单的方式是,在程序处理出错的时候,直接抛出 {@link AmqpRejectAndDontRequeueException}异常
* 解决方案2:对于同一条消息,能够先进行几次重试,解决因为网络问题导致 的偶发消息处理失败,如果还是不行的话,再把消息投递到专门的一个死信队列。
*
* @param data
*/
public void handler(String data) {
//http://localhost:15672/#/
log.info("got message {}", data);
throw new NullPointerException("error");
//throw new AmqpRejectAndDontRequeueException("error");
}
/**
* 死信队列处理程序
*
* @param data
*/
public void deadHandler(String data) {
log.error("got dead message {}", data);
}
}
问题讨论
- 在用户注册后发送消息到 MQ,然后会员服务监听消息进行异步处理的场景下,有些时 候我们会发现,虽然用户服务先保存数据再发送 MQ,但会员服务收到消息后去查询数 据库,却发现数据库中还没有新用户的信息。你觉得,这可能是什么问题呢,又该如何 解决呢?
- 思考题一是我真实遇到的问题,当时倒不是因为主从的问题,而是因为业务代码把保存数据和发 MQ消息放在了一个事务中,有概率收到消息的时候事务还没有提交完成,当时开发同学的处理方 式是收MQ消息的时候sleep 1秒,或许应该是先提交事务,完成后再发MQ消息,但是这又出来 一个问题MQ消息发送失败怎么办?所以后来演化为建立本地消息表来确保MQ消息可补偿,把业 务处理和保存MQ消息到本地消息表操作在相同事务内处理,然后异步发送和补偿发送消息表中的 消息到MQ
- 除了使用 Spring AMQP 实现死信消息的重投递外,RabbitMQ 2.8.0 后支持的死信交 换器 DLX 也可以实现类似功能。你能尝试用 DLX 实现吗,并比较下这两种处理机制?
回顾总结
- 第一,要考虑异步流程丢消息或处理中断的情况,异步流程需要有备线进行补偿。比如,我 们今天介绍的全量补偿方式,即便异步流程彻底失效,通过补偿也能让业务继续进行。
- 第二,异步处理的时候需要考虑消息重复的可能性,处理逻辑需要实现幂等,防止重复处 理。
- 第三,微服务场景下不同服务多个实例监听消息的情况,一般不同服务需要同时收到相同的 消息,而相同服务的多个实例只需要轮询接收消息。我们需要确认 MQ 的消息路由配置是 否满足需求,以避免消息重复或漏发问题。
- 第四,要注意始终无法处理的死信消息,可能会引发堵塞 MQ 的问题。一般在遇到消息处 理失败的时候,我们可以设置一定的重试策略。如果重试还是不行,那可以把这个消息扔到 专有的死信队列特别处理,不要让死信影响到正常消息的处理。
TODO:数据存储:NoSQL与RDBMS如何取长补短、相辅相成?
数据源头:任何客户端的东西都不可信任
- 对于 HTTP 请求,我们要在脑子里有一个根深蒂固的概念,那就是任何客户端传过来的数 据都是不能直接信任的。客户端传给服务端的数据只是信息收集,数据需要经过有效性验 证、权限验证等后才能使用,并且这些数据只能认为是用户操作的意图,不能直接代表数据 当前的状态。
客户端的计算不可信
栗子场景:客户端下单,可能会暴露这么一个 /order 的 POST 接口给客户端,让客户端直接把组装 后的订单信息 Order 传给服务端。订单信息 Order 可能包括商品 ID、商品价格、数量、商品总价
虽然用户下单时客户端肯定有商品的价格等信息,也会计算出订单的总价给用户确认,但是 这些信息只能用于呈现和核对。即使客户端传给服务端的 POJO 中包含了这些信息,服务 端也一定要重新从数据库来初始化商品的价格,重新计算最终的订单价格。如果不这么做的 话,很可能会被黑客利用,商品总价被恶意修改为比较低的价格。
因此,我们真正直接使用的、可信赖的只是客户端传过来的商品 ID 和数量,服务端会根据 这些信息重新计算最终的总价。如果服务端计算出来的商品价格和客户端传过来的价格不匹 配的话,可以给客户端友好提示,让用户重新下单。
还有一种可行的做法是,让客户端仅传入需要的数据给服务端,像这样重新定义一个 POJO CreateOrderRequest 作为接口入参,比直接使用领域模型 Order 更合理。在设计接口 时,我们会思考哪些数据需要客户端提供,而不是把一个大而全的对象作为参数提供给服务 端,以避免因为忘记在服务端重置客户端数据而导致的安全问题。
源码分析
/**
* 错误实现:完全使用客户端的传递过来数据(不安全)
*
* @param order
*/
public void wrong( { Order order)
this.createOrder(order);
}
/**
* 正确实现:服务器校验客户端传递过来的参数,服务器计算价格
*
* @param order
*/
public void right( { Order order)
Item item = Db.getItem(order.getItemId());
if (!order.getItemPrice().equals(item.getItemPrice())) {
throw new RuntimeException("您选购的商品价格有变化,请重新下单");
}
order.setItemPrice(item.getItemPrice());
BigDecimal totalPrice = item.getItemPrice().multiply(BigDecimal.valueOf(order.getQuantity()));
if (order.getItemTotalPrice().compareTo(totalPrice) != 0) {
throw new RuntimeException("您选购的商品总价有变化,请重新下单");
}
order.setItemTotalPrice(totalPrice);
createOrder(order);
}
/**
* 正确实现:客户端只传递需要的参数,而不是传入一个大而全的参数
*
* @param createOrderRequest
* @return
*/
public Order right2( { CreateOrderRequest createOrderRequest)
Item item = Db.getItem(createOrderRequest.getItemId());
Order order = new Order();
order.setItemPrice(item.getItemPrice());
order.setItemTotalPrice(item.getItemPrice().multiply(BigDecimal.valueOf(order.getQuantity())));
createOrder(order);
return order;
}通过这个案例我们可以看到,在处理客户端提交过来的数据时,服务端需要明确区分,哪些 数据是需要客户端提供的,哪些数据是客户端从服务端获取后在客户端计算的。其中,前者 可以信任;而后者不可信任,服务端需要重新计算,如果客户端和服务端计算结果不一致的 话,可以给予友好提示。
客户端提交的参数需要校验
对于客户端的数据,我们还容易忽略的一点是,误以为客户端的数据来源是服务端,客户端就不可能提交异常数据。
源码分析
/**
* 下发给客户端的数据
*
* @param modelMap
* @return
*/
public String index(ModelMap modelMap) {
List<Country> countries = new ArrayList<>();
countries.addAll(allCountries.values().stream().filter(country -> country.getId() < 4).collect(Collectors.toList()));
modelMap.addAttribute("countries", countries);
return "index";
}
/**
* 错误的接口定义,不对客户端传递过来的数据校验
*
* @param countryId
* @return
*/
public String wrong(int countryId) {
return allCountries.get(countryId).getName();
}
/**
* 正确实现校验数据
*
* @param countryId
* @return
*/
public String right(int countryId) {
if (countryId < 1 || countryId > 3)
throw new RuntimeException("非法参数");
return allCountries.get(countryId).getName();
}
/**
* 使用spring来做参数校验
*
* @param countryId
* @return
*/
public String better(
int countryId) {
return allCountries.get(countryId).getName();
}
不能信任请求头里的任何内容
栗子场景:一个比较常见的需求是,为了防刷,我们需要判断用户的唯一性。比如,针对未注册的新用 户发送一些小奖品,我们不希望相同用户多次获得奖品。考虑到未注册的用户因为没有登录 过所以没有用户标识,我们可能会想到根据请求的 IP 地址,来判断用户是否已经领过奖 品。
源码分析
HashSet<String> activityLimit = new HashSet<>();
/**
* 错误实现
*
* @param request
* @return
*/
public String test(HttpServletRequest request) {
String ip = getClientIp(request);
if (activityLimit.contains(ip)) {
return "您已经领取过奖品";
} else {
activityLimit.add(ip);
return "奖品领取成功";
}
}
/**
* IP 地址的获取方式是:优先通过 X-Forwarded- For 请求头来获取,如果没有的话再通过 HttpServletRequest 的 getRemoteAddr 方法来 获取。
* <p>
* 之所以这么做是因为,通常我们的应用之前都部署了反向代理或负载均衡器,remoteAddr 获得的只能是代理的 IP 地址,
* 而不是访问用户实际的 IP。这不符合我们的需求,因为反向 代理在转发请求时,通常会把用户真实 IP 放入 X-Forwarded-For 这个请求头中
*
* @param request
* @return
*/
private String getClientIp(HttpServletRequest request) {
String xff = request.getHeader("X-Forwarded-For");
if (xff == null) {
return request.getRemoteAddr();
} else {
return xff.contains(",") ? xff.split(",")[0] : xff;
}
}这种过于依赖 X-Forwarded-For 请求头来判断用户唯一性的实现方式,是有问题的:
- 通过curl模拟请求来设置头部信息
- 网吧、学校等机构的出口 IP 往往是同一个,在这个场景下,可能只有最先打开这个页面 的用户才能领取到奖品,而其他用户会被阻拦。
因此,IP 地址或者说请求头里的任何信息,包括 Cookie 中的信息、Referer,只能用作参 考,不能用作重要逻辑判断的依据。而对于类似这个案例唯一性的判断需求,更好的做法 是,让用户进行登录或三方授权登录(比如微信),拿到用户标识来做唯一性判断。
用户标识不能从客户端获取
常见出错场景:
- 开发同学没有正确认识接口或服务面向的用户。如果接口面向内部服务,由服务调用方 传入用户 ID 没什么不合理,但是这样的接口不能直接开放给客户端或 H5 使用。
- 在测试阶段为了方便测试调试,我们通常会实现一些无需登录即可使用的接口,直接使 用客户端传过来的用户标识,却在上线之前忘记删除类似的超级接口。
- 一个大型网站前端可能由不同的模块构成,不一定是一个系统,而用户登录状态可能也 没有打通。有些时候,我们图简单可能会在 URL 中直接传用户 ID,以实现通过前端传 值来打通用户登录状态。
如果你的接口直面用户(比如给客户端或 H5 页面调用),那么一定需要用户先登录才能使 用。登录后用户标识保存在服务端,接口需要从服务端(比如 Session 中)获取。这里有 段代码演示了一个最简单的登录操作,登录后在 Session 中设置了当前用户的标识。
分享一个spring web要登录了才能访问的权限:注解+HandlerMethodArgumentResolver接口+WebMvcConfigurer接口。
源码分析
/**
* 标识接口,需要登录了才能访问
*
* @author codingprh
*/
public LoginRequired {
String sessionKey() default "currentUser";
}
/**
* HandlerMethodArgumentResolver,请求参数解析接口
*/
public class LoginRequiredArgumentResolver implements HandlerMethodArgumentResolver {
/**
* 解析哪些参数
*
* @param methodParameter
* @return
*/
public boolean supportsParameter(MethodParameter methodParameter) {
//匹配参数上具有@LoginRequired注解的参数
return methodParameter.hasParameterAnnotation(LoginRequired.class);
}
/**
* @param methodParameter
* @param modelAndViewContainer
* @param nativeWebRequest
* @param webDataBinderFactory
* @return
* @throws Exception
*/
public Object resolveArgument(MethodParameter methodParameter, ModelAndViewContainer modelAndViewContainer, NativeWebRequest nativeWebRequest, WebDataBinderFactory webDataBinderFactory) throws Exception {
//从参数上获得注解
LoginRequired loginRequired = methodParameter.getParameterAnnotation(LoginRequired.class);
//根据注解中的Session Key,从Session中查询用户信息
Object object = nativeWebRequest.getAttribute(loginRequired.sessionKey(), NativeWebRequest.SCOPE_SESSION);
if (object == null) {
log.error("接口 {} 非法调用!", methodParameter.getMethod().toString());
throw new RuntimeException("请先登录!");
}
return object;
}
}
public class NoahApplication implements WebMvcConfigurer {
/**
* 往webmvc中增加自定义处理参数解析器
*
* @param resolvers
*/
public void addArgumentResolvers(List<HandlerMethodArgumentResolver> resolvers) {
resolvers.add(new LoginRequiredArgumentResolver());
}
}
public String right( { Long userId)
return "当前用户Id:" + userId;
}
public long login( { String username, String password, HttpSession session)
if (username.equals("admin") && password.equals("admin")) {
session.setAttribute("currentUser", 1L);
return 1L;
}
return 0L;
}
回顾总结
- 客户端的计算不可信。虽然目前很多项目的前端都是富前端,会做大量的逻辑计算, 无需访问服务端接口就可以顺畅完成各种功能,但来自客户端的计算结果不能直接信任。最 终在进行业务操作时,客户端只能扮演信息收集的角色,虽然可以将诸如价格等信息传给服 务端,但只能用于校对比较,最终要以服务端的计算结果为准。
- 所有来自客户端的参数都需要校验判断合法性。即使我们知道用户是在一个下拉列表 选择数据,即使我们知道用户通过网页正常操作不可能提交不合法的值,服务端也应该进行 参数校验,防止非法用户绕过浏览器 UI 页面通过工具直接向服务端提交参数。
- 除了请求 Body 中的信息,请求头里的任何信息同样不能信任。我们要知道,来自 请求头的 IP、Referer 和 Cookie 都有被篡改的可能性,相关数据只能用来参考和记录,不 能用作重要业务逻辑。
- 如果接口面向外部用户,那么一定不能出现用户标识这样的参数,当前用户的标识一 定来自服务端,只有经过身份认证后的用户才会在服务端留下标识。如果你的接口现在面向 内部其他服务,那么也要千万小心这样的接口只能内部使用,还可能需要进一步考虑服务端 调用方的授权问题。
如何正确保存和传输敏感数据?
- 今天,我们从安全角度来聊聊用户名、密码、身份证等敏感信息,应该怎么保存和传输。同 时,你还可以进一步复习加密算法中的散列、对称加密和非对称加密算法,以及 HTTPS 等 相关知识。
应该怎样保存用户密码?
最敏感的数据恐怕就是用户的密码了。黑客一旦窃取了用户密码,或许就可以登录进用户的 账号,消耗其资产、发布不良信息等;更可怕的是,有些用户至始至终都是使用一套密码, 密码一旦泄露,就可以被黑客用来登录全网。为了防止密码泄露,最重要的原则是不要保存用户密码。你可能会觉得很好笑,不保存用户 密码,之后用户登录的时候怎么验证?其实,我指的是不保存原始密码,这样即使拖库也不 会泄露用户密码。
我经常会听到大家说,不要明文保存用户密码,应该把密码通过 MD5 加密后保存。这的确 是一个正确的方向,但这个说法并不准确。
首先,MD5 其实不是真正的加密算法。所谓加密算法,是可以使用密钥把明文加密为密 文,随后还可以使用密钥解密出明文,是双向的。
而 MD5 是散列、哈希算法或者摘要算法。不管多长的数据,使用 MD5 运算后得到的都是 固定长度的摘要信息或指纹信息,无法再解密为原始数据。所以,MD5 是单向的。最重要 的是,仅仅使用 MD5 对密码进行摘要,并不安全。
大多数MD5破解网站都是使用彩虹表:是一种使用时间空间平衡的技术,即可以使用 更大的空间来降低破解时间,也可以使用更长的破解时间来换取更小的空间。把所有20位内数字和字母组合的密码MD5之后存进去。
此外,你可能会觉得多次 MD5 比较安全,其实并不是这样。一样秒破解。所以直接保存 MD5 后的密码是不安全的。一些开发可能会说,还需要加盐。是的,但是加 盐如果不当,还是非常不安全,比较重要的有两点。
- 第一,不能在代码中写死盐,且盐需要有一定的长度。
- 其实,知道盐是什么没什么关系,关键的是我们是在代码里写死了盐,并且盐很短、所有用 户都是这个盐。这么做有三个问题:
- 因为盐太短、太简单了,如果用户原始密码也很简单,那么整个拼起来的密码也很短, 这样一般的 MD5 破解网站都可以直接解密这个 MD5,除去盐就知道原始密码了。
- 相同的盐,意味着使用相同密码的用户 MD5 值是一样的,知道了一个用户的密码就可 能知道了多个。
- 我们也可以使用这个盐来构建一张彩虹表,虽然会花不少代价,但是一旦构建完成,所 有人的密码都可以被破解。
- 所以,最好是每一个密码都有独立的盐,并且盐要长一点,比如超过 20 位。
- 第二,虽然说每个人的盐最好不同,但我也不建议将一部分用户数据作为盐。比如,使用用户名作为盐
- 所以,盐最好是随机的值,并且是全球唯一的,意味着全球不可能有现成的彩虹表给你用。
- 正确的做法是,使用全球唯一的、和用户无关的、足够长的随机值作为盐。比如,可以使用 UUID 作为盐,把盐一起保存到数据库中。
- 并且每次用户修改密码的时候都重新计算盐,重新保存新的密码。你可能会问,盐保存在数 据库中,那被拖库了不是就可以看到了吗?难道不应该加密保存吗?
在我看来,盐没有必要加密保存。盐的作用是,防止通过彩虹表快速实现密码“解密”,如 果用户的盐都是唯一的,那么生成一次彩虹表只可能拿到一个用户的密码,这样黑客的动力 会小很多。
MD5最佳实践源码
/**
* 错误实现:无盐的md5加密
*
* @param name
* @param password
* @return
*/
public UserData wrong1( { String name, String password)
UserData userData = new UserData();
userData.setId(1L);
userData.setName(name);
userData.setPassword(DigestUtils.md5Hex(password));
return userRepository.save(userData);
}
/**
* 错误实现:盐值不够长度和随机
*
* @param name
* @param password
* @return
*/
public UserData wrong2( { String name, String password)
UserData userData = new UserData();
userData.setId(1L);
userData.setName(name);
userData.setPassword(DigestUtils.md5Hex("salt" + password));
return userRepository.save(userData);
}
/**
* 错误实现:盐值跟用户名相关,有专门的彩虹表
*
* @param name
* @param password
* @return
*/
public UserData wrong3( { String name, String password)
UserData userData = new UserData();
userData.setId(1L);
userData.setName(name);
userData.setPassword(DigestUtils.md5Hex(name + password));
return userRepository.save(userData);
}
/**
* 错误实现:两次的md5加密也秒破解
*
* @param name
* @param password
* @return
*/
public UserData wrong4( { String name, String password)
UserData userData = new UserData();
userData.setId(1L);
userData.setName(name);
userData.setPassword(DigestUtils.md5Hex(DigestUtils.md5Hex(password)));
return userRepository.save(userData);
}
/**
* 最佳实践:盐值要随机、并且长度要超过20位,每个用户单独一个盐值:可明文存储
*
* @param name
* @param password
* @return
*/
public UserData right( { String name, String password)
UserData userData = new UserData();
userData.setId(1L);
userData.setName(name);
userData.setSalt(UUID.randomUUID().toString());
userData.setPassword(DigestUtils.md5Hex(userData.getSalt() + password));
return userRepository.save(userData);
}更好的做法是,不要使用像 MD5 这样快速的摘要算法,而是使用慢一点的算法。比如 Spring Security 已经废弃了 MessageDigestPasswordEncoder,推荐使用 BCryptPasswordEncoder,也就是BCrypt来进行密码哈希。BCrypt 是为保存密码设计 的算法,相比 MD5 要慢很多。
- 如果制作 8 位密码长度的 MD5 彩虹表需要 5 个月,那么对于 BCrypt 来说, 可能就需要几十年,大部分黑客应该都没有这个耐心。
- 我们写一段代码观察下,BCryptPasswordEncoder 生成的密码哈希的三个规律:
- 第一,我们调用 encode、matches 方法进行哈希、做密码比对的时候,不需要传入盐。BCrypt 把盐作为了算法的一部分,强制我们遵循安全保存密码的最佳实践。
- 第二,生成的盐和哈希后的密码拼在了一起:$是字段分隔符,其中第一个$后的 2a 代表算 法版本,第二个$后的 10 是代价因子(默认是 10,代表 2 的 10 次方次哈希),第三个 $后的 22 个字符是盐,再后面是摘要。所以说,我们不需要使用单独的数据库字段来保存 盐。(
$<ver>$<cost>$<salt><digest>
) - 第三,代价因子的值越大,BCrypt 哈希的耗时越久。因此,对于代价因子的值,更建议的 实践是,根据用户的忍耐程度和硬件,设置一个尽可能大的值。
最后,我们需要注意的是,虽然黑客已经很难通过彩虹表来破解密码了,但是仍然有可能暴 力破解密码,也就是对于同一个用户名使用常见的密码逐一尝试登录。因此,除了做好密码 哈希保存的工作外,我们还要建设一套完善的安全防御机制,在感知到暴力破解危害的时 候,开启短信验证、图形验证码、账号暂时锁定等防御机制来抵御暴力破解。
TODO:应该怎么保存姓名和身份证?
Noah-Java最佳实践与踩坑
本篇文章是学习极客时间,朱晔老师Java业务开发常见错误100例。如需要转载请联系博主本人和朱晔老师。