一、服务集群深入解析
1. 什么是集群?
形象比喻:
单机 = 一个厨师做菜
客人多了 → 厨师累死 → 上菜慢 → 厨师生病餐厅关门
集群 = 三个厨师做菜
客人多了 → 分配给不同厨师 → 速度快 → 一个厨师生病其他人顶上
2. 集群如何工作?
负载均衡策略
graph TB
Client[客户端请求]
LB[负载均衡器]
subgraph "Java服务集群"
S1[服务器1<br/>192.168.1.10:8080<br/>当前负载: 30%]
S2[服务器2<br/>192.168.1.11:8080<br/>当前负载: 50%]
S3[服务器3<br/>192.168.1.12:8080<br/>当前负载: 20%]
end
Client --> LB
LB -->|轮询算法| S1
LB -->|轮询算法| S2
LB -->|轮询算法| S3
style S1 fill:#c8e6c9
style S2 fill:#fff9c4
style S3 fill:#c8e6c9
常见负载均衡算法:
// 1. 轮询(Round Robin)- 最简单
请求1 → 服务器1
请求2 → 服务器2
请求3 → 服务器3
请求4 → 服务器1 // 循环
请求5 → 服务器2
// 2. 加权轮询(Weighted Round Robin)
服务器1:权重3(性能好)
服务器2:权重2(性能中等)
服务器3:权重1(性能差)
10个请求的分配:
服务器1处理5个
服务器2处理3个
服务器3处理2个
// 3. 最少连接(Least Connections)
服务器1:当前10个连接
服务器2:当前5个连接 ← 新请求给它
服务器3:当前8个连接
// 4. IP哈希(IP Hash)
用户IP: 192.168.1.100 → hash → 永远分配到服务器2
好处:同一用户总是访问同一台服务器(会话保持)
实际配置示例(Nginx)
# nginx.conf
upstream java_backend {
# 加权轮询
server 192.168.1.10:8080 weight=3;
server 192.168.1.11:8080 weight=2;
server 192.168.1.12:8080 weight=1;
# 健康检查:3秒内失败2次,踢出30秒
server 192.168.1.10:8080 max_fails=2 fail_timeout=30s;
}
server {
listen 80;
location / {
proxy_pass http://java_backend;
# 传递真实IP
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
3. 集群的会话问题
问题场景:
用户登录 → 负载到服务器1 → Session存在服务器1的内存
用户刷新 → 负载到服务器2 → 服务器2没有Session → 要求重新登录 ❌
解决方案:Session共享
graph LR
User[用户]
S1[服务器1]
S2[服务器2]
S3[服务器3]
Redis[(Redis<br/>统一Session存储)]
User -->|登录| S1
S1 -->|存Session| Redis
User -->|刷新| S2
S2 -->|读Session| Redis
Redis -->|返回Session| S2
S2 -->|已登录状态| User
style Redis fill:#ffebee
代码实现:
// Spring Boot配置Redis Session
@Configuration
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 1800) // 30分钟
public class SessionConfig {
// 自动配置,Session会存到Redis
}
// 使用时完全透明
@RestController
public class UserController {
@PostMapping("/login")
public Result login(HttpSession session, String username, String password) {
// 验证用户名密码...
User user = userService.login(username, password);
// 存入Session(自动存到Redis)
session.setAttribute("user", user);
return Result.success();
}
@GetMapping("/profile")
public Result getProfile(HttpSession session) {
// 从Session获取(自动从Redis读)
User user = (User) session.getAttribute("user");
if (user == null) {
return Result. error("未登录");
}
return Result.success(user);
}
}
Redis中的存储结构:
Key: spring:session:sessions:a1b2c3d4-e5f6-7890
Value: {
"sessionAttr: user": {
"id": 123,
"username": "zhangsan",
"loginTime": "2025-12-24 10:00:00"
}
}
TTL: 1800秒
二、Kafka 消息队列深度解析
1. 核心概念
📦 基本组成部分
graph LR
subgraph "Kafka集群"
B1[Broker 1<br/>服务器1]
B2[Broker 2<br/>服务器2]
B3[Broker 3<br/>服务器3]
end
subgraph "Topic: 订单主题"
P0[Partition 0<br/>分区0]
P1[Partition 1<br/>分区1]
P2[Partition 2<br/>分区2]
end
Producer[生产者<br/>订单服务] -->|发送消息| P0
Producer -->|发送消息| P1
Producer -->|发送消息| P2
P0 -->|消费消息| C1[消费者1<br/>库存服务]
P1 -->|消费消息| C1
P2 -->|消费消息| C1
P0 -->|消费消息| C2[消费者2<br/>积分服务]
P1 -->|消费消息| C2
P2 -->|消费消息| C2
style Producer fill:#e1f5ff
style C1 fill:#e8f5e9
style C2 fill:#fff3e0
📋 关键术语解释
| 术语 | 解释 | 类比 |
|---|---|---|
| Broker | Kafka服务器节点 | 邮局的一个分局 |
| Topic | 消息主题/分类 | 邮件的类型(快递、信件) |
| Partition | 主题的分区 | 每个类型有多个邮箱 |
| Producer | 消息生产者 | 寄信的人 |
| Consumer | 消息消费者 | 收信的人 |
| Consumer Group | 消费者组 | 同一家公司的多个收件人 |
| Offset | 消息偏移量 | 读到第几封信了 |
2. 消息生产与消费详解
生产者发送消息
// 1. 配置Kafka生产者
Properties props = new Properties();
props.put("bootstrap. servers", "192.168.1.100:9092"); // Kafka地址
props.put("key.serializer", "org.apache.kafka.common.serialization. StringSerializer");
props.put("value.serializer", "org. apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 2. 创建订单后发送消息
public void createOrder(Order order) {
// 保存订单到MySQL
orderDao.save(order);
// 发送消息到Kafka
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-topic", // Topic名称
order.getUserId(), // Key(用于分区)
JSON.toJSONString(order) // Value(消息内容)
);
// 异步发送
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
log.error("发送失败", e);
// 可以重试或记录到数据库
} else {
log.info("发送成功,Partition: {}, Offset: {}",
metadata.partition(), metadata.offset());
}
}
});
}
消费者消费消息
// 1. 配置Kafka消费者
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.100:9092");
props.put("group.id", "inventory-service"); // 消费者组ID
props.put("key.deserializer", "org.apache. kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 2. 订阅Topic
consumer.subscribe(Arrays.asList("order-topic"));
// 3. 持续拉取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 解析消息
Order order = JSON.parseObject(record.value(), Order.class);
// 处理业务逻辑
inventoryService.reduceStock(order. getProductId(), order.getQuantity());
System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
// 手动提交offset(确保消息处理成功后再提交)
consumer. commitSync();
}
3. 分区机制详解
为什么需要分区?
场景:订单Topic每秒10万条消息
❌ 单分区:
所有消息在一个队列 → 只能一个消费者读 → 性能瓶颈
✅ 多分区(比如10个):
消息分散到10个队列 → 10个消费者并行读 → 性能提升10倍
分区策略
// 策略1:根据Key的Hash分区(相同Key进同一分区,保证顺序)
record = new ProducerRecord<>("order-topic", userId, orderData);
// userId相同的订单会进入同一个分区,保证该用户的订单顺序
// 策略2:轮询分区(负载均衡)
record = new ProducerRecord<>("order-topic", null, orderData);
// Key为null时,轮流发送到各个分区
// 策略3:自定义分区器
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// VIP用户发到分区0(高优先级处理)
if (isVipUser(key)) {
return 0;
}
// 普通用户发到其他分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % (cluster.partitionCountForTopic(topic) - 1) + 1;
}
}
4. 消费者组机制
graph TB
subgraph "Kafka Topic: 3个分区"
P0[Partition 0]
P1[Partition 1]
P2[Partition 2]
end
subgraph "消费者组A: 库存服务(3个实例)"
C1A[消费者1]
C2A[消费者2]
C3A[消费者3]
end
subgraph "消费者组B: 积分服务(2个实例)"
C1B[消费者1]
C2B[消费者2]
end
P0 --> C1A
P1 --> C2A
P2 --> C3A
P0 --> C1B
P1 --> C1B
P2 --> C2B
style P0 fill:#e3f2fd
style P1 fill:#e3f2fd
style P2 fill:#e3f2fd
style C1A fill:#e8f5e9
style C2A fill:#e8f5e9
style C3A fill:#e8f5e9
style C1B fill:#fff3e0
style C2B fill:#fff3e0
关键规则:
- 同一消费者组内:一个分区只能被一个消费者消费(避免重复)
- 不同消费者组:可以重复消费同一条消息(实现广播)
- 消费者数量 > 分区数:部分消费者会空闲
5. Offset 偏移量管理
Offset 的作用
Partition 0的消息队列:
[消息0] [消息1] [消息2] [消息3] [消息4] [消息5] ...
↑
Offset=2(消费者已读到这里)
三种提交方式
// 1. 自动提交(简单但可能丢消息)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000"); // 每秒自动提交
// 风险:消息取出后还没处理完,程序崩溃,offset已提交 → 消息丢失
// 2. 手动同步提交(安全但慢)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // 处理消息
}
consumer.commitSync(); // 处理完才提交,阻塞等待确认
}
// 3. 手动异步提交(性能好)
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
log.error("提交失败", e);
}
}
});
6. 高可用机制
副本机制(Replication)
graph TB
subgraph "Partition 0的副本分布"
L[Leader副本<br/>Broker 1<br/>负责读写]
F1[Follower副本1<br/>Broker 2<br/>同步备份]
F2[Follower副本2<br/>Broker 3<br/>同步备份]
end
Producer[生产者] -->|写入| L
Consumer[消费者] -->|读取| L
L -.->|同步数据| F1
L -.->|同步数据| F2
L -->|Leader挂了| F1
F1 -->|提升为新Leader| NewL[新Leader<br/>Broker 2]
style L fill:#4caf50
style F1 fill:#ffeb3b
style F2 fill:#ffeb3b
style NewL fill:#4caf50
配置示例:
# 每个Partition有3个副本
replication.factor=3
# 至少2个副本同步成功才算写入成功
min.insync. replicas=2
7. 实际应用场景
场景1:秒杀系统
// 用户点击秒杀按钮
@PostMapping("/seckill")
public Result seckill(Long productId, Long userId) {
// 1. 立即返回"排队中"
String requestId = UUID.randomUUID().toString();
// 2. 发送到Kafka(异步处理)
SeckillRequest request = new SeckillRequest(requestId, productId, userId);
kafkaTemplate.send("seckill-topic", request);
// 3. 立即返回给用户
return Result.success("您的请求正在处理,请稍候查询结果", requestId);
}
// 消费者慢慢处理
@KafkaListener(topics = "seckill-topic", concurrency = "10") // 10个线程并行
public void handleSeckill(SeckillRequest request) {
// 1. 检查库存(Redis)
Long stock = redisTemplate.opsForValue().decrement("stock:" + request.getProductId());
if (stock >= 0) {
// 2. 创建订单(MySQL)
Order order = createOrder(request);
// 3. 通知用户成功
notifyUser(request. getUserId(), "秒杀成功!");
} else {
// 4. 恢复库存
redisTemplate.opsForValue().increment("stock:" + request.getProductId());
// 5. 通知用户失败
notifyUser(request.getUserId(), "商品已售罄");
}
}
场景2:日志收集
// 各个服务打日志
public class KafkaLogAppender extends AppenderBase<ILoggingEvent> {
@Override
protected void append(ILoggingEvent event) {
String log = event.getFormattedMessage();
kafkaProducer.send(new ProducerRecord<>("app-logs", log));
}
}
// 日志消费者(ELK架构)
@KafkaListener(topics = "app-logs")
public void consumeLog(String log) {
// 解析日志
LogEntry entry = parseLog(log);
// 存入Elasticsearch(方便搜索)
elasticsearchTemplate.save(entry);
// 如果是ERROR级别,发送告警
if (entry.getLevel().equals("ERROR")) {
alertService.sendAlert(entry);
}
}
场景3:数据同步
// MySQL数据变化 → 同步到ES
@KafkaListener(topics = "mysql-binlog")
public void syncToElasticsearch(BinlogEvent event) {
if (event.getType() == EventType.INSERT) {
// 新增数据
elasticsearchRepository.save(event.getData());
} else if (event.getType() == EventType.UPDATE) {
// 更新数据
elasticsearchRepository.update(event.getData());
} else if (event.getType() == EventType.DELETE) {
// 删除数据
elasticsearchRepository.delete(event.getId());
}
}
三、Redis 深度解析
1. 数据结构详解
五大基础数据结构
graph LR
Redis[Redis数据类型] --> String[String<br/>字符串]
Redis --> Hash[Hash<br/>哈希表]
Redis --> List[List<br/>列表]
Redis --> Set[Set<br/>集合]
Redis --> ZSet[Sorted Set<br/>有序集合]
String --> S1[缓存对象<br/>计数器<br/>分布式锁]
Hash --> H1[存储对象<br/>购物车]
List --> L1[消息队列<br/>最新列表]
Set --> SE1[去重<br/>共同好友]
ZSet --> Z1[排行榜<br/>延时队列]
style String fill:#e3f2fd
style Hash fill:#e8f5e9
style List fill:#fff3e0
style Set fill:#fce4ec
style ZSet fill:#f3e5f5
String(字符串)
# 1. 简单缓存
SET user:1001 '{"name":"张三","age":25}'
GET user:1001
# 2. 计数器(原子操作)
INCR page:view:count # 页面访问量 +1
INCRBY score:user:1001 10 # 用户积分 +10
DECR stock:product:888 # 库存 -1
# 3. 分布式锁
SET lock: order:12345 "locked" NX EX 10
# NX = Not Exist(键不存在才设置)
# EX 10 = 10秒后过期
Java代码:
// 页面访问计数
public void recordPageView(String pageId) {
redisTemplate.opsForValue().increment("page: view:" + pageId);
}
// 获取访问量
public Long getPageView(String pageId) {
return redisTemplate.opsForValue().get("page:view:" + pageId);
}
// 分布式锁
public boolean tryLock(String key, int expireSeconds) {
return redisTemplate. opsForValue()
.setIfAbsent(key, "locked", expireSeconds, TimeUnit.SECONDS);
}
Hash(哈希表)
# 存储用户信息
HSET user:1001 name "张三"
HSET user:1001 age 25
HSET user:1001 city "北京"
# 批量设置
HMSET user:1002 name "李四" age 30 city "上海"
# 获取单个字段
HGET user:1001 name # 返回 "张三"
# 获取所有字段
HGETALL user:1001
# 返回:
# 1) "name"
# 2) "张三"
# 3) "age"
# 4) "25"
# 5) "city"
# 6) "北京"
# 购物车场景
HSET cart:user:1001 product: 888 2 # 商品888数量为2
HSET cart:user:1001 product:999 1
HINCRBY cart:user:1001 product:888 1 # 商品888数量 +1
Java代码:
// 存储用户对象
public void saveUser(User user) {
Map<String, String> userMap = new HashMap<>();
userMap.put("name", user. getName());
userMap.put("age", String.valueOf(user.getAge()));
userMap.put("city", user.getCity());
redisTemplate.opsForHash().putAll("user:" + user.getId(), userMap);
}
// 获取用户对象
public User getUser(Long userId) {
Map<Object, Object> entries = redisTemplate.opsForHash()
.entries("user:" + userId);
User user = new User();
user.setName((String) entries.get("name"));
user.setAge(Integer.parseInt((String) entries.get("age")));
user.setCity((String) entries.get("city"));
return user;
}
// 购物车加商品
public void addToCart(Long userId, Long productId, int quantity) {
redisTemplate.opsForHash()
.increment("cart: user:" + userId, "product:" + productId, quantity);
}
List(列表)
# 消息队列(左进右出)
LPUSH queue:email "发送邮件给user1"
LPUSH queue:email "发送邮件给user2"
RPOP queue:email # 取出 "发送邮件给user1"
# 最新文章列表
LPUSH articles:latest "文章ID: 999"
LPUSH articles:latest "文章ID:998"
LRANGE articles:latest 0 9 # 获取最新10篇文章
# 阻塞队列
BRPOP queue:email 10 # 阻塞10秒等待消息
Java代码:
// 发布文章(加入最新列表)
public void publishArticle(Long articleId) {
redisTemplate.opsForList().leftPush("articles: latest", articleId. toString());
// 保持列表只有100条
redisTemplate. opsForList().trim("articles:latest", 0, 99);
}
// 获取最新文章
public List<String> getLatestArticles(int count) {
return redisTemplate.opsForList().range("articles:latest", 0, count - 1);
}
// 简单消息队列
public void sendTask(String task) {
redisTemplate. opsForList().leftPush("task:queue", task);
}
public String getTask() {
return redisTemplate. opsForList().rightPop("task:queue");
}
Set(集合)
# 用户标签
SADD user:1001:tags "Java" "Redis" "MySQL"
SADD user:1002:tags "Python" "Redis" "MongoDB"
# 共同标签(交集)
SINTER user: 1001:tags user:1002:tags # 返回 "Redis"
# 抽奖(随机取)
SADD lottery:users "user1" "user2" "user3" "user4"
SRANDMEMBER lottery:users 1 # 随机抽1个
SPOP lottery:users 1 # 随机抽1个并移除
# 去重(点赞)
SADD article:999:likes "user1"
SADD article:999:likes "user1" # 重复添加无效
SCARD article:999:likes # 统计点赞数
Java代码:
// 用户点赞
public void likeArticle(Long articleId, Long userId) {
redisTemplate.opsForSet().add("article:" + articleId + ":likes", userId. toString());
}
// 取消点赞
public void unlikeArticle(Long articleId, Long userId) {
redisTemplate.opsForSet().remove("article:" + articleId + ":likes", userId.toString());
}
// 是否已点赞
public boolean hasLiked(Long articleId, Long userId) {
return redisTemplate.opsForSet()
.isMember("article:" + articleId + ":likes", userId.toString());
}
// 点赞总数
public Long getLikeCount(Long articleId) {
return redisTemplate.opsForSet().size("article:" + articleId + ": likes");
}
// 共同好友
public Set<String> getCommonFriends(Long userId1, Long userId2) {
return redisTemplate.opsForSet()
.intersect("user:" + userId1 + ":friends", "user:" + userId2 + ":friends");
}
Sorted Set(有序集合)
# 排行榜
ZADD rank:score user1 100
ZADD rank:score user2 200
ZADD rank:score user3 150
# 获取前3名
ZREVRANGE rank:score 0 2 WITHSCORES
# 返回:
# 1) "user2"
# 2) "200"
# 3) "user3"
# 4) "150"
# 5) "user1"
# 6) "100"
# 获取某个用户的排名
ZREVRANK rank:score user3 # 返回 1(第2名,从0开始)
# 增加分数
ZINCRBY rank: score 50 user1 # user1分数 +50
# 延时队列(按时间排序)
ZADD delay:queue task1 1640000000 # Unix时间戳
ZADD delay:queue task2 1640000100
ZRANGEBYSCORE delay:queue 0 当前时间戳 # 取出到期的任务
Java代码:
// 添加用户分数
public void addScore(Long userId, double score) {
redisTemplate.opsForZSet().add("rank:score", userId.toString(), score);
}
// 增加分数
public void incrementScore(Long userId, double delta) {
redisTemplate. opsForZSet().incrementScore("rank:score", userId. toString(), delta);
}
// 获取排行榜(前N名)
public Set<ZSetOperations.TypedTuple<String>> getTopN(int n) {
return redisTemplate.opsForZSet()
.reverseRangeWithScores("rank:score", 0, n - 1);
}
// 获取用户排名
public Long getUserRank(Long userId) {
return redisTemplate.opsForZSet()
.reverseRank("rank:score", userId.toString());
}
// 延时任务队列
public void addDelayTask(String taskId, long executeTime) {
redisTemplate.opsForZSet().add("delay:queue", taskId, executeTime);
}
// 获取到期任务
public Set<String> getExpiredTasks() {
long now = System.currentTimeMillis();
return redisTemplate. opsForZSet()
.rangeByScore("delay:queue", 0, now);
}
2. 缓存策略详解
缓存更新策略
graph TD
A[数据更新请求] --> B{选择策略}
B -->|策略1| C[Cache Aside<br/>旁路缓存]
B -->|策略2| D[Read/Write Through<br/>读写穿透]
B -->|策略3| E[Write Behind<br/>异步写入]
C --> C1[更新数据库]
C1 --> C2[删除缓存]
C2 --> C3[下次读取时<br/>重新加载]
D --> D1[应用只操作缓存]
D1 --> D2[缓存层负责<br/>同步数据库]
E --> E1[先写缓存]
E1 --> E2[异步批量<br/>写入数据库]
style C fill:#e3f2fd
style D fill:#e8f5e9
style E fill:#fff3e0
策略1:Cache Aside(最常用)
// 读数据
public Product getProduct(Long id) {
// 1. 先查缓存
String cacheKey = "product:" + id;
Product product = redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product; // 缓存命中
}
// 2. 缓存未命中,查数据库
product = productDao.selectById(id);
if (product != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
}
return product;
}
// 写数据
public void updateProduct(Product product) {
// 1. 先更新数据库
productDao.updateById(product);
// 2. 删除缓存(而不是更新缓存)
redisTemplate.delete("product:" + product. getId());
// 为什么删除而不是更新?
// 因为可能有多个请求同时更新,删除缓存更安全
}
策略2:Read/Write Through
// 使用Spring Cache自动管理
@Cacheable(value = "products", key = "#id")
public Product getProduct(Long id) {
// Spring自动处理:先查缓存,未命中则执行方法并缓存结果
return productDao.selectById(id);
}
@CachePut(value = "products", key = "#product.id")
public Product updateProduct(Product product) {
// Spring自动处理:更新数据库后更新缓存
productDao.updateById(product);
return product;
}
@CacheEvict(value = "products", key = "#id")
public void deleteProduct(Long id) {
// Spring自动处理:删除数据库后删除缓存
productDao.deleteById(id);
}
3. 缓存问题与解决方案
问题1:缓存穿透(查询不存在的数据)
场景:恶意攻击查询ID=-1的商品
请求 → 缓存没有 → 数据库也没有 → 每次都打到数据库
解决方案:
// 方案1:缓存空值
public Product getProduct(Long id) {
String cacheKey = "product:" + id;
Product product = redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
if (product.getId() == null) {
return null; // 之前查过,数据库没有
}
return product;
}
product = productDao.selectById(id);
if (product == null) {
// 缓存一个空对象,过期时间短一点
redisTemplate. opsForValue().set(cacheKey, new Product(), 5, TimeUnit.MINUTES);
} else {
redisTemplate. opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
}
return product;
}
// 方案2:布隆过滤器(Bloom Filter)
@Autowired
private RedissonClient redissonClient;
public Product getProduct(Long id) {
// 1. 布隆过滤器判断是否存在
RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter("product:bloom");
if (! bloomFilter.contains(id)) {
return null; // 一定不存在,直接返回
}
// 2. 可能存在,查缓存和数据库
// ... 正常逻辑
}
// 初始化布隆过滤器
public void initBloomFilter() {
RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter("product:bloom");
bloomFilter.tryInit(100000, 0.01); // 预计10万条数据,1%误判率
// 加载所有商品ID
List<Long> productIds = productDao.selectAllIds();
productIds.forEach(bloomFilter::add);
}
问题2:缓存击穿(热点数据过期)
场景:爆款商品缓存过期瞬间
1000个请求同时到达 → 缓存都没有 → 1000个请求都打到数据库
解决方案:
// 方案1:互斥锁(只让一个请求查数据库)
public Product getProduct(Long id) {
String cacheKey = "product:" + id;
Product product = redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product;
}
// 尝试获取锁
String lockKey = "lock:product:" + id;
boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", 10, TimeUnit. SECONDS);
if (locked) {
try {
// 拿到锁,查数据库
product = productDao.selectById(id);
redisTemplate. opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
return product;
} finally {
redisTemplate. delete(lockKey); // 释放锁
}
} else {
// 没拿到锁,等待一下再查缓存
Thread.sleep(50);
return getProduct(id); // 递归调用
}
}
// 方案2:永不过期(逻辑过期)
public Product getProduct(Long id) {
String cacheKey = "product:" + id;
CacheData cacheData = redisTemplate. opsForValue().get(cacheKey);
if (cacheData == null) {
// 缓存重建
return rebuildCache(id);
}
// 检查逻辑过期时间
if (cacheData.getExpireTime().isBefore(LocalDateTime.now())) {
// 已过期,异步重建缓存
executorService.submit(() -> rebuildCache(id));
// 返回旧数据(虽然过期但可用)
return cacheData. getProduct();
}
return cacheData.getProduct();
}
private Product rebuildCache(Long id) {
// 获取互斥锁
String lockKey = "lock:product:" + id;
if (! tryLock(lockKey)) {
return null; // 有其他线程在重建
}
try {
Product product = productDao.selectById(id);
CacheData cacheData = new CacheData();
cacheData.setProduct(product);
cacheData.setExpireTime(LocalDateTime.now().plusHours(1)); // 逻辑过期时间
redisTemplate.opsForValue().set("product:" + id, cacheData); // 不设置Redis过期时间
return product;
} finally {
unlock(lockKey);
}
}
问题3:缓存雪崩(大量缓存同时过期)
场景:凌晨2点批量导入商品,设置1小时过期
凌晨3点,所有缓存同时失效 → 数据库压力暴增
解决方案:
// 方案1:过期时间加随机值
public void cacheProduct(Product product) {
int baseExpire = 3600; // 1小时
int randomExpire = new Random().nextInt(300); // 0-5分钟随机
int expire = baseExpire + randomExpire;
redisTemplate.opsForValue().set(
"product:" + product. getId(),
product,
expire,
TimeUnit.SECONDS
);
}
// 方案2:热点数据永不过期
public void cacheHotProduct(Product product) {
// 热门商品不设置过期时间
redisTemplate.opsForValue().set("hot: product:" + product.getId(), product);
// 定时任务定期更新
}
// 方案3:使用Redis集群 + 多级缓存
public Product getProduct(Long id) {
// L1: 本地缓存(Caffeine)
Product product = localCache.get(id);
if (product != null) return product;
// L2: Redis缓存
product = redisTemplate. opsForValue().get("product:" + id);
if (product != null) {
localCache.put(id, product);
return product;
}
// L3: 数据库
product = productDao.selectById(id);
if (product != null) {
redisTemplate.opsForValue().set("product:" + id, product, 1, TimeUnit. HOURS);
localCache.put(id, product);
}
return product;
}
4. 分布式锁深入
Redisson 实现分布式锁
@Autowired
private RedissonClient redissonClient;
// 简单使用
public void secKill(Long productId) {
RLock lock = redissonClient.getLock("seckill:" + productId);
try {
// 尝试加锁,最多等待10秒,锁30秒后自动释放
boolean locked = lock. tryLock(10, 30, TimeUnit.SECONDS);
if (! locked) {
throw new BusinessException("系统繁忙,请稍后重试");
}
// 执行业务逻辑
int stock = getStock(productId);
if (stock > 0) {
reduceStock(productId);
createOrder();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock(); // 释放锁
}
}
// 可重入锁演示
public void demo() {
RLock lock = redissonClient.getLock("mylock");
lock.lock();
try {
method1(); // method1内部也尝试获取同一把锁
} finally {
lock.unlock();
}
}
public void method1() {
RLock lock = redissonClient.getLock("mylock");
lock.lock(); // 可重入,不会死锁
try {
// 业务逻辑
} finally {
lock.unlock();
}
}
// 联锁(MultiLock)
public void multiLock() {
RLock lock1 = redissonClient.getLock("lock1");
RLock lock2 = redissonClient.getLock("lock2");
RLock lock3 = redissonClient.getLock("lock3");
RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2, lock3);
try {
multiLock. lock();
// 同时持有3把锁
} finally {
multiLock.unlock();
}
}
// 红锁(RedLock)- 多个独立Redis实例
public void redLock() {
RLock lock1 = redissonClient1.getLock("lock");
RLock lock2 = redissonClient2.getLock("lock");
RLock lock3 = redissonClient3.getLock("lock");
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
// 至少在N/2+1个实例上加锁成功才算成功
redLock.lock();
} finally {
redLock.unlock();
}
}
5. 持久化机制
graph LR
Redis[Redis持久化] --> RDB[RDB快照]
Redis --> AOF[AOF日志]
Redis --> Mix[混合持久化]
RDB --> R1[定期全量备份<br/>恢复快<br/>数据可能丢失]
AOF --> A1[记录每个写命令<br/>数据完整<br/>文件大]
Mix --> M1[RDB + AOF<br/>兼顾性能和安全]
style RDB fill:#e3f2fd
style AOF fill:#e8f5e9
style Mix fill:#fff3e0
配置示例:
# RDB配置
save 900 1 # 900秒内至少1个key变化,触发快照
save 300 10 # 300秒内至少10个key变化
save 60 10000 # 60秒内至少10000个key变化
# AOF配置
appendonly yes
appendfsync everysec # 每秒同步一次(推荐)
# appendfsync always # 每个命令都同步(最安全但慢)
# appendfsync no # 由操作系统决定(最快但不安全)
# 混合持久化(Redis 4.0+)
aof-use-rdb-preamble yes
四、MySQL 深度解析
1. 事务ACID详解
// 转账示例
@Transactional(rollbackFor = Exception.class)
public void transfer(Long fromUserId, Long toUserId, BigDecimal amount) {
// 1. 扣减转出账户
accountDao.deductBalance(fromUserId, amount);
// 2. 模拟异常
if (amount.compareTo(new BigDecimal("1000")) > 0) {
throw new RuntimeException("单笔转账不能超过1000元");
}
// 3. 增加转入账户
accountDao.addBalance(toUserId, amount);
// 4. 记录流水
transactionLogDao.insert(new TransactionLog(fromUserId, toUserId, amount));
}
// 如果抛出异常,1、3、4步骤都会回滚
ACID特性
| 特性 | 说明 | 实现机制 |
|---|---|---|
| 原子性 (Atomicity) | 全部成功或全部失败 | Undo Log(回滚日志) |
| 一致性 (Consistency) | 数据完整性约束 | 事务 + 约束 |
| 隔离性 (Isolation) | 并发事务互不干扰 | 锁 + MVCC |
| 持久性 (Durability) | 提交后永久保存 | Redo Log(重做日志) |
2. 事务隔离级别
graph TB
A[事务隔离级别] --> B[Read Uncommitted<br/>读未提交]
A --> C[Read Committed<br/>读已提交]
A --> D[Repeatable Read<br/>可重复读默认]
A --> E[Serializable<br/>串行化]
B --> B1[脏读❌<br/>不可重复读❌<br/>幻读❌]
C --> C1[脏读✅<br/>不可重复读❌<br/>幻读❌]
D --> D1[脏读✅<br/>不可重复读✅<br/>幻读✅Next-Key Lock]
E --> E1[脏读✅<br/>不可重复读✅<br/>幻读✅<br/>性能最差]
style D fill:#4caf50
并发问题演示
-- 脏读(Read Uncommitted会出现)
-- 事务A
BEGIN;
UPDATE account SET balance = 1000 WHERE id = 1;
-- 未提交
-- 事务B
BEGIN;
SELECT balance FROM account WHERE id = 1; -- 读到1000(事务A未提交的数据)
-- 如果事务A回滚,事务B读到的是脏数据
-- 不可重复读(Read Committed会出现)
-- 事务A
BEGIN;
SELECT balance FROM account WHERE id = 1; -- 读到500
-- 事务B
BEGIN;
UPDATE account SET balance = 1000 WHERE id = 1;
COMMIT;
-- 事务A
SELECT balance FROM account WHERE id = 1; -- 读到1000(同一事务内两次读结果不同)
-- 幻读(Repeatable Read在某些情况会出现)
-- 事务A
BEGIN;
SELECT * FROM account WHERE balance > 500; -- 返回3条
-- 事务B
BEGIN;
INSERT INTO account (id, balance) VALUES (4, 600);
COMMIT;
-- 事务A
SELECT * FROM account WHERE balance > 500; -- 返回4条(多了一条幻影记录)
设置隔离级别:
-- 全局设置
SET GLOBAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- 会话级别
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- 单个事务
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
START TRANSACTION;
-- ...
COMMIT;
3. 锁机制详解
锁的分类
graph TB
Lock[MySQL锁] --> L1[按粒度分类]
Lock --> L2[按类型分类]
L1 --> T1[表锁<br/>锁整张表]
L1 --> R1[行锁<br/>锁单行记录InnoDB]
L1 --> G1[间隙锁<br/>锁索引间隙]
L2 --> S1[共享锁S<br/>读锁]
L2 --> X1[排他锁X<br/>写锁]
L2 --> I1[意向锁<br/>表级锁]
style R1 fill:#4caf50
style X1 fill:#ff9800
行锁示例
-- 共享锁(S锁):其他事务可以读,不能写
SELECT * FROM account WHERE id = 1 LOCK IN SHARE MODE;
-- 排他锁(X锁):其他事务不能读也不能写
SELECT * FROM account WHERE id = 1 FOR UPDATE;
-- 实际场景:秒杀扣库存
BEGIN;
-- 锁定商品库存行
SELECT stock FROM product WHERE id = 888 FOR UPDATE;
-- 检查库存
IF stock > 0 THEN
UPDATE product SET stock = stock - 1 WHERE id = 888;
INSERT INTO orders (...) VALUES (...);
END IF;
COMMIT;
Next-Key Lock(解决幻读)
-- 假设有索引值:10, 20, 30
-- 间隙:(-∞, 10), (10, 20), (20, 30), (30, +∞)
-- 事务A:范围查询
BEGIN;
SELECT * FROM account WHERE id BETWEEN 10 AND 30 FOR UPDATE;
-- 锁住:(10, 20, 30]这些行 + (10, 30)这个间隙
-- 事务B:尝试插入
BEGIN;
INSERT INTO account (id, balance) VALUES (15, 100); -- 被阻塞(15在间隙内)
INSERT INTO account (id, balance) VALUES (5, 100); -- 可以执行(5不在间隙内)
4. 索引优化
索引类型
-- 1. 主键索引
CREATE TABLE users (
id BIGINT PRIMARY KEY,
name VARCHAR(50)
);
-- 2. 唯一索引
CREATE UNIQUE INDEX idx_email ON users(email);
-- 3. 普通索引
CREATE INDEX idx_name ON users(name);
-- 4. 组合索引(最左前缀原则)
CREATE INDEX idx_name_age_city ON users(name, age, city);
-- 可以使用索引的查询:
SELECT * FROM users WHERE name = '张三'; -- ✅
SELECT * FROM users WHERE name = '张三' AND age = 25; -- ✅
SELECT * FROM users WHERE name = '张三' AND age = 25 AND city = '北京'; -- ✅
SELECT * FROM users WHERE name = '张三' AND city = '北京'; -- ✅(age可以跳过)
-- 不能使用索引的查询:
SELECT * FROM users WHERE age = 25; -- ❌(跳过了name)
SELECT * FROM users WHERE city = '北京'; -- ❌(跳过了name和age)
-- 5. 全文索引
CREATE FULLTEXT INDEX idx_content ON articles(content);
SELECT * FROM articles WHERE MATCH(content) AGAINST('Java Redis');
-- 6. 覆盖索引(索引包含所有查询字段)
CREATE INDEX idx_name_age ON users(name, age);
SELECT name, age FROM users WHERE name = '张三'; -- 不需要回表查询
索引失效场景
-- 1. 使用函数
SELECT * FROM users WHERE YEAR(create_time) = 2023; -- ❌索引失效
SELECT * FROM users WHERE create_time BETWEEN '2023-01-01' AND '2023-12-31'; -- ✅使用索引
-- 2. 类型转换
SELECT * FROM users WHERE phone = 13800138000; -- ❌(phone是VARCHAR,发生隐式转换)
SELECT * FROM users WHERE phone = '13800138000'; -- ✅
-- 3. 模糊查询(前缀匹配)
SELECT * FROM users WHERE name LIKE '%张%'; -- ❌
SELECT * FROM users WHERE name LIKE '张%'; -- ✅
-- 4. OR条件(部分字段无索引)
SELECT * FROM users WHERE name = '张三' OR address = '北京'; -- ❌(address无索引)
SELECT * FROM users WHERE name = '张三' OR email = 'test@qq.com'; -- ✅(都有索引)
-- 5. 不等于
SELECT * FROM users WHERE age != 25; -- ❌
SELECT * FROM users WHERE age > 25 OR age < 25; -- 可能使用索引
-- 6. IS NULL / IS NOT NULL
SELECT * FROM users WHERE email IS NULL; -- 可能失效(取决于NULL值比例)
5. EXPLAIN执行计划
EXPLAIN SELECT * FROM orders WHERE user_id = 1001;
关键字段:
| 字段 | 说明 | 重点关注 |
|---|---|---|
| type | 访问类型 | system > const > eq_ref > ref > range > index > ALL |
| possible_keys | 可能使用的索引 | - |
| key | 实际使用的索引 | NULL表示未使用索引 |
| rows | 扫描行数 | 越小越好 |
| Extra | 额外信息 | Using filesort(需要排序) Using temporary(需要临时表) Using index(覆盖索引) |
示例分析:
-- 全表扫描(最差)
EXPLAIN SELECT * FROM orders WHERE amount > 100;
-- type: ALL, rows: 100000 ❌
-- 使用索引(优化后)
CREATE INDEX idx_amount ON orders(amount);
EXPLAIN SELECT * FROM orders WHERE amount > 100;
-- type: range, rows: 5000 ✅
你说得对!我刚才讲解时确实遗漏了很多重要内容。让我继续完成剩余的深度解析。
五、MongoDB 深度解析
1. 文档模型 vs 关系模型
数据建模对比
// MySQL关系模型(需要3张表)
-- users表
CREATE TABLE users (
id BIGINT PRIMARY KEY,
name VARCHAR(50),
email VARCHAR(100)
);
-- orders表
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
user_id BIGINT,
total DECIMAL(10,2),
FOREIGN KEY (user_id) REFERENCES users(id)
);
-- order_items表
CREATE TABLE order_items (
id BIGINT PRIMARY KEY,
order_id BIGINT,
product_name VARCHAR(100),
quantity INT,
price DECIMAL(10,2),
FOREIGN KEY (order_id) REFERENCES orders(id)
);
-- 查询需要JOIN
SELECT u.name, o.total, oi.product_name
FROM users u
JOIN orders o ON u.id = o.user_id
JOIN order_items oi ON o.id = oi.order_id;
// MongoDB文档模型(一个集合搞定)
db.orders.insertOne({
_id: ObjectId("507f1f77bcf86cd799439011"),
user: {
id: 1001,
name: "张三",
email: "zhang@example.com"
},
total: 299.99,
items: [
{
product_name: "机械键盘",
quantity: 1,
price: 199.99
},
{
product_name: "鼠标垫",
quantity: 2,
price: 50.00
}
],
status: "completed",
created_at: ISODate("2023-12-01T10:30:00Z")
});
// 查询无需JOIN
db.orders.find({ "user.id": 1001 });
2. CRUD操作详解
插入数据
// 插入单条
db.products.insertOne({
name: "iPhone 15",
price: 5999,
stock: 100,
category: "手机",
specs: {
screen: "6.1英寸",
chip: "A17",
storage: "128GB"
},
tags: ["5G", "双卡", "Face ID"]
});
// 插入多条
db.products. insertMany([
{ name: "iPad", price: 3999, stock: 50 },
{ name: "MacBook", price: 9999, stock: 30 },
{ name: "AirPods", price: 1299, stock: 200 }
]);
查询数据
// 基础查询
db.products. find({ category: "手机" });
// 比较运算符
db.products.find({ price: { $gt: 5000 } }); // 大于5000
db.products.find({ stock: { $gte: 50, $lte: 100 } }); // 50-100之间
db.products.find({ category: { $in: ["手机", "平板"] } }); // 手机或平板
db.products.find({ category: { $ne: "配件" } }); // 不是配件
// 逻辑运算符
db.products.find({
$and: [
{ price: { $lt: 10000 } },
{ stock: { $gt: 0 } }
]
});
db.products.find({
$or: [
{ category: "手机" },
{ price: { $lt: 2000 } }
]
});
// 嵌套文档查询
db.products.find({ "specs.storage": "256GB" });
// 数组查询
db.products.find({ tags: "5G" }); // tags数组包含"5G"
db.products.find({ tags: { $all: ["5G", "双卡"] } }); // 同时包含两个标签
db.products.find({ tags: { $size: 3 } }); // tags数组长度为3
// 正则表达式
db.products. find({ name: /iPhone/i }); // 不区分大小写
// 投影(只返回指定字段)
db.products.find(
{ category: "手机" },
{ name: 1, price: 1, _id: 0 } // 1=返回, 0=不返回
);
// 排序、分页
db.products.find({ category: "手机" })
.sort({ price: -1 }) // 按价格降序
.skip(10) // 跳过10条
.limit(10); // 返回10条
更新数据
// 更新单条
db.products. updateOne(
{ name: "iPhone 15" },
{ $set: { price: 5799, stock: 95 } }
);
// 更新多条
db.products.updateMany(
{ category: "手机" },
{ $set: { on_sale: true } }
);
// 更新运算符
db.products.updateOne(
{ name: "iPhone 15" },
{
$inc: { stock: -1 }, // 库存-1
$set: { last_update: new Date() }, // 设置字段
$push: { tags: "热销" }, // 数组添加元素
$addToSet: { tags: "新品" } // 数组添加(不重复)
}
);
// 数组操作
db.products.updateOne(
{ name: "iPhone 15" },
{ $pull: { tags: "新品" } } // 从数组删除
);
db.products.updateOne(
{ name: "iPhone 15" },
{ $pop: { tags: 1 } } // 删除数组最后一个元素(-1删除第一个)
);
// upsert(不存在则插入)
db.products.updateOne(
{ name: "新产品" },
{ $set: { price: 999, stock: 100 } },
{ upsert: true }
);
// 替换整个文档
db.products. replaceOne(
{ name: "iPhone 15" },
{
name: "iPhone 15 Pro",
price: 7999,
stock: 50
}
);
删除数据
// 删除单条
db.products.deleteOne({ name: "iPhone 15" });
// 删除多条
db.products.deleteMany({ stock: 0 });
// 删除所有
db.products.deleteMany({});
3. 聚合管道(Aggregation)
// 场景:统计每个分类的商品数量和平均价格
db.products. aggregate([
// 阶段1:筛选
{ $match: { stock: { $gt: 0 } } },
// 阶段2:分组
{
$group: {
_id: "$category",
count: { $sum: 1 },
avgPrice: { $avg: "$price" },
maxPrice: { $max: "$price" },
minPrice: { $min: "$price" }
}
},
// 阶段3:排序
{ $sort: { count: -1 } },
// 阶段4:限制结果
{ $limit: 5 }
]);
// 输出:
[
{ _id: "手机", count: 15, avgPrice: 4500, maxPrice: 9999, minPrice: 1999 },
{ _id: "平板", count: 10, avgPrice: 3500, maxPrice: 7999, minPrice: 2199 },
...
]
复杂聚合示例
// 场景:订单统计分析
db.orders.aggregate([
// 1. 解构数组(每个item变成一条记录)
{ $unwind: "$items" },
// 2. 筛选2023年的订单
{
$match: {
created_at: {
$gte: ISODate("2023-01-01"),
$lt: ISODate("2024-01-01")
}
}
},
// 3. 计算每个商品的销售额
{
$project: {
product_name: "$items.product_name",
revenue: { $multiply: ["$items.quantity", "$items.price"] },
month: { $month: "$created_at" }
}
},
// 4. 按月份和商品分组
{
$group: {
_id: {
month: "$month",
product: "$product_name"
},
total_revenue: { $sum: "$revenue" },
total_quantity: { $sum: "$items.quantity" }
}
},
// 5. 重新组织输出
{
$project: {
_id: 0,
month: "$_id.month",
product: "$_id.product",
revenue: "$total_revenue",
quantity: "$total_quantity"
}
},
// 6. 排序
{ $sort: { month: 1, revenue: -1 } }
]);
4. 索引优化
// 创建单字段索引
db.products. createIndex({ name: 1 }); // 1=升序, -1=降序
// 创建复合索引
db.products. createIndex({ category: 1, price: -1 });
// 创建唯一索引
db.users.createIndex({ email: 1 }, { unique: true });
// 创建文本索引(全文搜索)
db. articles.createIndex({ title: "text", content: "text" });
db.articles.find({ $text: { $search: "MongoDB 教程" } });
// 创建TTL索引(自动删除过期数据)
db.sessions.createIndex(
{ created_at: 1 },
{ expireAfterSeconds: 3600 } // 1小时后自动删除
);
// 查看索引
db.products.getIndexes();
// 删除索引
db.products. dropIndex("name_1");
// 分析查询性能
db.products. find({ category: "手机" }).explain("executionStats");
5. 副本集(Replica Set)高可用
graph TB
subgraph "MongoDB副本集"
P[Primary节点<br/>主节点<br/>处理所有写操作]
S1[Secondary节点1<br/>从节点<br/>同步数据备份]
S2[Secondary节点2<br/>从节点<br/>同步数据备份]
end
Client[应用程序] -->|写操作| P
Client -->|读操作可选| S1
Client -->|读操作可选| S2
P -.->|同步Oplog| S1
P -.->|同步Oplog| S2
P -->|Primary挂了| X[选举]
X -->|提升为新Primary| S1
style P fill:#4caf50
style S1 fill:#ffeb3b
style S2 fill:#ffeb3b
副本集配置
// 初始化副本集
rs.initiate({
_id: "myReplicaSet",
members: [
{ _id: 0, host: "mongo1:27017" },
{ _id: 1, host: "mongo2:27017" },
{ _id: 2, host: "mongo3:27017" }
]
});
// 查看副本集状态
rs.status();
// 添加节点
rs.add("mongo4:27017");
// 设置优先级(数字越大越容易成为Primary)
cfg = rs.conf();
cfg.members[1].priority = 2;
rs.reconfig(cfg);
Java连接副本集
// Spring Boot配置
spring:
data:
mongodb:
uri: mongodb://mongo1:27017,mongo2:27017,mongo3:27017/mydb? replicaSet=myReplicaSet&readPreference=secondaryPreferred
// 代码示例
@Autowired
private MongoTemplate mongoTemplate;
// 写操作(自动发送到Primary)
public void saveProduct(Product product) {
mongoTemplate.save(product);
}
// 读操作(可以从Secondary读取,减轻Primary压力)
@ReadPreference(ReadPreferenceType.SECONDARY_PREFERRED)
public List<Product> findProducts(String category) {
Query query = new Query(Criteria. where("category").is(category));
return mongoTemplate.find(query, Product.class);
}
6. 分片集群(Sharding)水平扩展
graph TB
subgraph "客户端"
App[应用程序]
end
subgraph "路由层"
Mongos1[Mongos<br/>路由服务1]
Mongos2[Mongos<br/>路由服务2]
end
subgraph "配置服务器"
Config[Config Server<br/>存储元数据]
end
subgraph "分片1 Shard1"
S1P[Primary]
S1S1[Secondary]
S1S2[Secondary]
end
subgraph "分片2 Shard2"
S2P[Primary]
S2S1[Secondary]
S2S2[Secondary]
end
subgraph "分片3 Shard3"
S3P[Primary]
S3S1[Secondary]
S3S2[Secondary]
end
App --> Mongos1
App --> Mongos2
Mongos1 --> Config
Mongos2 --> Config
Mongos1 --> S1P
Mongos1 --> S2P
Mongos1 --> S3P
S1P -.-> S1S1
S1P -.-> S1S2
S2P -.-> S2S1
S2P -.-> S2S2
S3P -.-> S3S1
S3P -.-> S3S2
style Mongos1 fill:#e1f5ff
style Mongos2 fill:#e1f5ff
style Config fill:#fff3e0
style S1P fill:#4caf50
style S2P fill:#4caf50
style S3P fill:#4caf50
分片策略
// 1. 启用分片
sh.enableSharding("mydb");
// 2. 创建分片键索引
db.products.createIndex({ category: 1, _id: 1 });
// 3. 对集合进行分片
sh.shardCollection("mydb. products", { category: 1, _id: 1 });
// 数据分布示例:
// Shard1: category = "手机"
// Shard2: category = "平板"
// Shard3: category = "配件"
// 查看分片状态
sh.status();
六、Apollo 配置中心深度解析
1. 核心概念
graph LR
subgraph "Apollo架构"
Portal[Portal<br/>管理界面]
AdminService[Admin Service<br/>配置管理]
ConfigService[Config Service<br/>配置读取]
MetaServer[Meta Server<br/>服务发现]
end
subgraph "存储"
MySQL[(MySQL<br/>配置存储)]
end
subgraph "应用端"
Client[Java应用<br/>Apollo Client]
end
Portal --> AdminService
AdminService --> MySQL
ConfigService --> MySQL
Client --> ConfigService
Client --> MetaServer
style Portal fill:#e1f5ff
style ConfigService fill:#e8f5e9
style Client fill:#fff3e0
2. 配置管理实战
创建配置
# application命名空间(公共配置)
app.name=my-service
server.port=8080
# database命名空间(数据库配置)
spring.datasource.url=jdbc:mysql://localhost:3306/mydb
spring.datasource.username=root
spring.datasource. password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# redis命名空间(Redis配置)
spring.redis. host=localhost
spring.redis. port=6379
spring.redis.database=0
Java集成
// 1. 添加依赖
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>2.0.1</version>
</dependency>
// 2. 配置文件
# application.properties
app.id=my-service
apollo.meta=http://apollo-config-server:8080
apollo.bootstrap. enabled=true
apollo.bootstrap.namespaces=application,database,redis
// 3. 启动类
@SpringBootApplication
@EnableApolloConfig
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
// 4. 使用配置
@Service
public class UserService {
// 方式1:@Value注解
@Value("${server.port:8080}")
private int port;
// 方式2:@ConfigurationProperties
@Autowired
private DataSourceProperties dataSourceProperties;
// 方式3:监听配置变化
@ApolloConfigChangeListener
public void onChange(ConfigChangeEvent changeEvent) {
for (String key : changeEvent.changedKeys()) {
ConfigChange change = changeEvent.getChange(key);
log.info("配置变化: {} - 旧值: {}, 新值: {}",
key, change.getOldValue(), change.getNewValue());
// 根据配置变化执行业务逻辑
if ("max. pool.size".equals(key)) {
updateThreadPool(Integer.parseInt(change.getNewValue()));
}
}
}
}
@ConfigurationProperties(prefix = "spring.datasource")
@Data
public class DataSourceProperties {
private String url;
private String username;
private String password;
}
3. 灰度发布
sequenceDiagram
participant Admin as 管理员
participant Portal as Apollo Portal
participant S1 as 服务实例1<br/>灰度IP
participant S2 as 服务实例2
participant S3 as 服务实例3
Admin->>Portal: 1. 创建灰度规则<br/>IP=192.168.1.10
Portal->>S1: 2. 推送新配置<br/>timeout=5000
S1-->>Portal: 3. 拉取成功
Note over S1: 使用新配置<br/>timeout=5000
Note over S2,S3: 使用旧配置<br/>timeout=3000
Admin->>Portal: 4. 观察灰度效果
Admin->>Portal: 5. 全量发布
Portal->>S2: 6. 推送新配置
Portal->>S3: 6. 推送新配置
Note over S1,S3: 全部使用新配置<br/>timeout=5000
操作步骤:
- Portal界面点击”创建灰度”
- 输入灰度IP:
192.168.1.10 - 修改配置值
- 点击”灰度发布”
- 观察灰度机器的日志和监控
- 确认无问题后点击”全量发布”
- 或发现问题点击”放弃灰度”
4. 多环境管理
项目结构:
my-service
├── DEV(开发环境)
│ ├── application
│ │ └── spring.profiles.active=dev
│ └── database
│ └── spring. datasource.url=jdbc:mysql://dev-db:3306/mydb
│
├── FAT(测试环境)
│ ├── application
│ │ └── spring.profiles. active=fat
│ └── database
│ └── spring.datasource.url=jdbc:mysql://test-db:3306/mydb
│
└── PROD(生产环境)
├── application
│ └── spring.profiles.active=prod
└── database
└── spring.datasource.url=jdbc:mysql://prod-db:3306/mydb
配置方式:
# 开发环境
java -jar app.jar -Denv=DEV
# 生产环境
java -jar app.jar -Denv=PROD
七、Docker 深度解析
1. Docker核心概念
graph TB
subgraph "Docker架构"
Client[Docker Client<br/>docker命令]
Daemon[Docker Daemon<br/>dockerd守护进程]
Registry[Docker Registry<br/>镜像仓库]
end
subgraph "本地资源"
Images[Images<br/>镜像]
Containers[Containers<br/>容器]
Volumes[Volumes<br/>数据卷]
Networks[Networks<br/>网络]
end
Client -->|docker build| Daemon
Client -->|docker run| Daemon
Client -->|docker pull| Daemon
Daemon -->|创建| Containers
Daemon -->|拉取| Registry
Daemon -->|管理| Images
Daemon -->|管理| Volumes
Daemon -->|管理| Networks
Images -->|实例化| Containers
Volumes -->|挂载到| Containers
Networks -->|连接| Containers
style Daemon fill:#4caf50
style Containers fill:#2196f3
style Images fill:#ff9800
2. Dockerfile 详解
# 基础镜像
FROM openjdk:11-jre-slim
# 维护者信息
LABEL maintainer="your-email@example.com"
# 设置工作目录
WORKDIR /app
# 复制文件
COPY target/my-service.jar /app/app.jar
# 设置环境变量
ENV JAVA_OPTS="-Xms512m -Xmx1024m" \
APP_ENV=prod \
TZ=Asia/Shanghai
# 暴露端口
EXPOSE 8080
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
CMD curl -f http://localhost:8080/actuator/health || exit 1
# 启动命令
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar /app/app.jar"]
多阶段构建(减小镜像大小)
# 阶段1:构建
FROM maven:3.8-openjdk-11 AS builder
WORKDIR /build
COPY pom.xml .
COPY src ./src
RUN mvn clean package -DskipTests
# 阶段2:运行
FROM openjdk:11-jre-slim
WORKDIR /app
COPY --from=builder /build/target/*. jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
# 最终镜像只包含JRE和jar包,不包含Maven和源代码
# 镜像大小:从800MB降到200MB
3. 构建和运行
# 构建镜像
docker build -t my-service:1.0.0 .
# 查看镜像
docker images
# 运行容器
docker run -d \
--name my-service \
-p 8080:8080 \
-e SPRING_PROFILES_ACTIVE=prod \
-e JAVA_OPTS="-Xms512m -Xmx1024m" \
-v /data/logs:/app/logs \
--restart=always \
my-service: 1.0.0
# 参数说明:
# -d: 后台运行
# --name: 容器名称
# -p: 端口映射(宿主机:容器)
# -e: 环境变量
# -v: 数据卷挂载(宿主机:容器)
# --restart: 重启策略
# 查看运行中的容器
docker ps
# 查看容器日志
docker logs -f my-service
# 进入容器
docker exec -it my-service /bin/bash
# 停止容器
docker stop my-service
# 删除容器
docker rm my-service
# 删除镜像
docker rmi my-service:1.0.0
4. Docker 网络
graph TB
subgraph "bridge网络默认"
C1[容器1<br/>172.17.0.2]
C2[容器2<br/>172.17.0.3]
Bridge[docker0网桥]
end
subgraph "自定义网络"
C3[Java服务<br/>my-network]
C4[MySQL<br/>my-network]
C5[Redis<br/>my-network]
end
C1 --> Bridge
C2 --> Bridge
Bridge --> Host[宿主机]
C3 <--> C4
C3 <--> C5
C4 <--> C5
style Bridge fill:#e3f2fd
style C3 fill:#4caf50
style C4 fill:#2196f3
style C5 fill:#ff9800
# 创建自定义网络
docker network create my-network
# 运行容器并加入网络
docker run -d --name mysql --network my-network \
-e MYSQL_ROOT_PASSWORD=123456 \
mysql:8.0
docker run -d --name redis --network my-network \
redis:7.0
docker run -d --name my-service --network my-network \
-p 8080:8080 \
-e DB_HOST=mysql \
-e REDIS_HOST=redis \
my-service:1.0.0
# 在my-service中可以直接通过主机名访问:
# jdbc:mysql://mysql:3306/mydb
# redis://redis:6379
5. Docker Compose(容器编排)
# docker-compose.yml
version: '3.8'
services:
# MySQL服务
mysql:
image: mysql:8.0
container_name: my-mysql
environment:
MYSQL_ROOT_PASSWORD: root123
MYSQL_DATABASE: mydb
TZ: Asia/Shanghai
ports:
- "3306:3306"
volumes:
- mysql-data:/var/lib/mysql
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- backend
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 3
# Redis服务
redis:
image: redis: 7.0-alpine
container_name: my-redis
ports:
- "6379:6379"
volumes:
- redis-data:/data
networks:
- backend
command: redis-server --appendonly yes
# MongoDB服务
mongodb:
image: mongo:6.0
container_name: my-mongodb
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin123
ports:
- "27017:27017"
volumes:
- mongo-data:/data/db
networks:
- backend
# Kafka服务
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- backend
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- backend
# Java应用服务
app:
build:
context: .
dockerfile: Dockerfile
image: my-service:latest
container_name: my-service
depends_on:
mysql:
condition: service_healthy
redis:
condition: service_started
mongodb:
condition: service_started
kafka:
condition: service_started
ports:
- "8080:8080"
environment:
SPRING_PROFILES_ACTIVE: prod
DB_HOST: mysql
DB_PORT: 3306
REDIS_HOST: redis
REDIS_PORT: 6379
MONGO_HOST: mongodb
MONGO_PORT: 27017
KAFKA_SERVERS: kafka:9092
volumes:
- ./logs:/app/logs
networks:
- backend
restart: unless-stopped
# 数据卷定义
volumes:
mysql-data:
redis-data:
mongo-data:
# 网络定义
networks:
backend:
driver: bridge
使用方式:
# 启动所有服务
docker-compose up -d
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f app
# 停止所有服务
docker-compose stop
# 停止并删除容器
docker-compose down
# 停止并删除容器和数据卷
docker-compose down -v
# 重启单个服务
docker-compose restart app
# 扩容(启动多个实例)
docker-compose up -d --scale app=3
6. 数据持久化
三种挂载方式
# 1. Volume(推荐)- Docker管理
docker run -d --name mysql \
-v mysql-data:/var/lib/mysql \
mysql: 8.0
# 查看Volume
docker volume ls
docker volume inspect mysql-data
# 2. Bind Mount - 挂载宿主机目录
docker run -d --name nginx \
-v /home/user/html:/usr/share/nginx/html \
nginx:alpine
# 3. tmpfs - 临时文件系统(内存)
docker run -d --name app \
--tmpfs /tmp \
my-service:1.0.0
7. 最佳实践
优化镜像大小
# ❌ 不好的做法
FROM ubuntu:latest
RUN apt-get update
RUN apt-get install -y openjdk-11-jdk
RUN apt-get install -y maven
COPY . /app
RUN cd /app && mvn package
CMD ["java", "-jar", "/app/target/app.jar"]
# 镜像大小:1.2GB
# ✅ 好的做法
FROM openjdk:11-jre-slim
COPY target/app.jar /app. jar
CMD ["java", "-jar", "/app.jar"]
# 镜像大小:200MB
# ✅ 更好的做法(多阶段构建)
FROM maven:3.8-openjdk-11 AS build
WORKDIR /app
COPY pom.xml .
COPY src ./src
RUN mvn package -DskipTests
FROM openjdk:11-jre-slim
COPY --from=build /app/target/app.jar /app.jar
CMD ["java", "-jar", "/app.jar"]
# 镜像大小:200MB,且构建过程标准化
合并RUN指令
# ❌ 每个RUN创建一个镜像层
RUN apt-get update
RUN apt-get install -y curl
RUN apt-get install -y vim
RUN rm -rf /var/lib/apt/lists/*
# ✅ 合并为一个RUN
RUN apt-get update \
&& apt-get install -y curl vim \
&& rm -rf /var/lib/apt/lists/*
使用. dockerignore
# . dockerignore
target/
. git/
.idea/
*.md
Dockerfile
docker-compose.yml
八、系统监控与运维
1. 日志收集架构
graph LR
subgraph "应用层"
App1[Java服务1]
App2[Java服务2]
App3[Java服务N]
end
subgraph "日志收集"
Filebeat[Filebeat<br/>日志采集]
Kafka[Kafka<br/>消息队列]
end
subgraph "日志处理"
Logstash[Logstash<br/>日志解析]
end
subgraph "存储查询"
ES[Elasticsearch<br/>日志存储]
Kibana[Kibana<br/>日志查询界面]
end
App1 -->|写日志文件| Filebeat
App2 -->|写日志文件| Filebeat
App3 -->|写日志文件| Filebeat
Filebeat --> Kafka
Kafka --> Logstash
Logstash --> ES
ES --> Kibana
style Kafka fill:#fff3e0
style ES fill:#e8f5e9
style Kibana fill:#e1f5ff
Logback配置(输出到文件)
<!-- logback-spring.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 控制台输出 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss. SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!-- 文件输出 -->
<appender name="FILE" class="ch.qos. logback.core.rolling.RollingFileAppender">
<file>/app/logs/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天生成一个日志文件 -->
<fileNamePattern>/app/logs/app. %d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 保留30天 -->
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<!-- JSON格式,方便ELK解析 -->
<pattern>{"time": "%d{yyyy-MM-dd HH:mm:ss.SSS}","level":"%level","thread":"%thread","class":"%logger{40}","message":"%msg"}%n</pattern>
</encoder>
</appender>
<!-- Kafka输出(直接发送到Kafka) -->
<appender name="KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>{"time":"%d{yyyy-MM-dd HH:mm:ss. SSS}","level":"%level","message":"%msg"}%n</pattern>
</encoder>
<topic>app-logs</topic>
<keyingStrategy class="com.github.danielwegener. logback.kafka.keying.NoKeyKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka. delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>bootstrap.servers=kafka:9092</producerConfig>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
<appender-ref ref="KAFKA"/>
</root>
</configuration>
2. 监控体系
graph TB
subgraph "应用层"
App[Java应用<br/>暴露Metrics接口]
end
subgraph "数据采集"
Prometheus[Prometheus<br/>时序数据库]
end
subgraph "可视化"
Grafana[Grafana<br/>监控大盘]
end
subgraph "告警"
AlertManager[AlertManager<br/>告警管理]
Email[邮件]
SMS[短信]
Webhook[钉钉/企微]
end
App -->|/actuator/prometheus| Prometheus
Prometheus -->|查询数据| Grafana
Prometheus -->|触发告警| AlertManager
AlertManager --> Email
AlertManager --> SMS
AlertManager --> Webhook
style Prometheus fill:#ff9800
style Grafana fill:#2196f3
style AlertManager fill:#f44336
Spring Boot Actuator集成
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
# application.yml
management:
endpoints:
web:
exposure:
include: health,info,prometheus,metrics
metrics:
tags:
application: ${spring.application.name}
export:
prometheus:
enabled: true
自定义指标
@Component
public class CustomMetrics {
private final MeterRegistry meterRegistry;
public CustomMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
// 计数器(累计值)
public void recordOrder() {
meterRegistry.counter("order.created", "status", "success").increment();
}
// 计时器(统计耗时)
public void recordApiDuration(String api, long duration) {
meterRegistry.timer("api.duration", "path", api)
.record(duration, TimeUnit.MILLISECONDS);
}
// 仪表盘(当前值)
@PostConstruct
public void init() {
meterRegistry.gauge("thread.pool.active", threadPoolExecutor, ThreadPoolExecutor::getActiveCount);
}
}
// 使用示例
@Service
public class OrderService {
@Autowired
private CustomMetrics customMetrics;
public void createOrder(Order order) {
long start = System.currentTimeMillis();
try {
// 业务逻辑
orderDao.save(order);
// 记录成功
customMetrics.recordOrder();
} finally {
// 记录耗时
customMetrics.recordApiDuration("/api/order/create",
System.currentTimeMillis() - start);
}
}
}
Prometheus配置
# prometheus.yml
global:
scrape_interval: 15s # 每15秒采集一次
scrape_configs:
# 采集Java应用
- job_name: 'my-service'
metrics_path: '/actuator/prometheus'
static_configs:
- targets: ['my-service:8080']
# 采集MySQL
- job_name: 'mysql'
static_configs:
- targets: ['mysql-exporter:9104']
# 采集Redis
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
Grafana Dashboard配置
常见监控指标:
# JVM内存使用率
jvm_memory_used_bytes{area="heap"} / jvm_memory_max_bytes{area="heap"} * 100
# CPU使用率
process_cpu_usage * 100
# QPS(每秒请求数)
rate(http_server_requests_seconds_count[1m])
# 接口平均响应时间
rate(http_server_requests_seconds_sum[1m]) / rate(http_server_requests_seconds_count[1m])
# 错误率
rate(http_server_requests_seconds_count{status=~"5.."}[1m]) / rate(http_server_requests_seconds_count[1m]) * 100
# 线程池队列大小
thread_pool_queue_size
# 数据库连接池使用率
hikaricp_connections_active / hikaricp_connections_max * 100
3. 告警规则
# alert_rules.yml
groups:
- name: application_alerts
interval: 30s
rules:
# CPU使用率超过80%
- alert: HighCPUUsage
expr: process_cpu_usage * 100 > 80
for: 5m
labels:
severity: warning
annotations:
summary: "CPU使用率过高"
description: " CPU使用率 %"
# 内存使用率超过85%
- alert: HighMemoryUsage
expr: jvm_memory_used_bytes{area="heap"} / jvm_memory_max_bytes{area="heap"} * 100 > 85
for: 5m
labels:
severity: warning
annotations:
summary: "内存使用率过高"
# 接口错误率超过5%
- alert: HighErrorRate
expr: rate(http_server_requests_seconds_count{status=~"5.."}[5m]) / rate(http_server_requests_seconds_count[5m]) * 100 > 5
for: 2m
labels:
severity: critical
annotations:
summary: "接口错误率过高"
description: "错误率 %"
# 服务宕机
- alert: ServiceDown
expr: up{job="my-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "服务宕机"
description: " 已宕机超过1分钟"
4. 链路追踪(分布式追踪)
sequenceDiagram
participant User as 用户
participant Gateway as API网关<br/>TraceID: abc123
participant OrderService as 订单服务<br/>SpanID:001
participant InventoryService as 库存服务<br/>SpanID:002
participant PaymentService as 支付服务<br/>SpanID:003
participant DB as 数据库
User->>Gateway: 下单请求
Note over Gateway: 生成TraceID: abc123
Gateway->>OrderService: 创建订单<br/>TraceID: abc123, SpanID: 001
OrderService->>InventoryService: 检查库存<br/>TraceID: abc123, SpanID: 002, ParentID: 001
InventoryService->>DB: 查询库存
DB-->>InventoryService: 返回结果
InventoryService-->>OrderService: 库存充足
OrderService->>PaymentService: 扣款<br/>TraceID: abc123, SpanID: 003, ParentID: 001
PaymentService->>DB: 扣减余额
DB-->>PaymentService: 扣款成功
PaymentService-->>OrderService: 支付完成
OrderService-->>Gateway: 订单创建成功
Gateway-->>User: 返回结果
Spring Cloud Sleuth + Zipkin
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>org.springframework. cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
</dependency>
# application.yml
spring:
sleuth:
sampler:
probability: 1.0 # 采样率100%(生产环境建议0.1)
zipkin:
base-url: http://zipkin-server:9411
sender:
type: web
查看链路:
- 访问
http://zipkin-server:9411 - 输入 TraceID 查看完整调用链
- 可以看到每个服务的耗时、是否出错
5. 健康检查
// 自定义健康检查
@Component
public class CustomHealthIndicator implements HealthIndicator {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private KafkaTemplate kafkaTemplate;
@Override
public Health health() {
// 检查Redis连接
try {
redisTemplate.opsForValue().get("health-check");
} catch (Exception e) {
return Health.down()
.withDetail("redis", "连接失败: " + e.getMessage())
.build();
}
// 检查Kafka连接
// ...
// 检查数据库连接池
HikariDataSource dataSource = getDataSource();
int activeConnections = dataSource.getHikariPoolMXBean().getActiveConnections();
int maxConnections = dataSource.getMaximumPoolSize();
if (activeConnections > maxConnections * 0.9) {
return Health.down()
.withDetail("database", "连接池即将耗尽")
.withDetail("active", activeConnections)
.withDetail("max", maxConnections)
.build();
}
return Health. up()
.withDetail("redis", "正常")
.withDetail("kafka", "正常")
.withDetail("database", "正常")
.build();
}
}
访问健康检查接口:
curl http://localhost:8080/actuator/health
# 返回:
{
"status": "UP",
"components": {
"custom": {
"status": "UP",
"details": {
"redis": "正常",
"kafka": "正常",
"database": "正常"
}
},
"diskSpace": {
"status": "UP",
"details": {
"total": 500000000000,
"free": 200000000000
}
}
}
}
6. 完整的Docker Compose运维配置
version: '3.8'
services:
# 应用服务
app:
image: my-service:latest
deploy:
replicas: 3 # 3个实例
resources:
limits:
cpus: '2'
memory: 2G
reservations:
cpus: '1'
memory: 1G
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
logging:
driver: "json-file"
options:
max-size: "100m"
max-file: "3"
environment:
JAVA_OPTS: "-Xms1g -Xmx2g -XX:+UseG1GC"
# Prometheus
prometheus:
image: prom/prometheus: latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
command:
- '--config. file=/etc/prometheus/prometheus. yml'
- '--storage. tsdb.path=/prometheus'
# Grafana
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
environment:
GF_SECURITY_ADMIN_PASSWORD: admin123
# Zipkin
zipkin:
image: openzipkin/zipkin:latest
ports:
- "9411:9411"
# Filebeat
filebeat:
image: docker.elastic.co/beats/filebeat:8.10.0
volumes:
- ./filebeat. yml:/usr/share/filebeat/filebeat.yml
- /var/lib/docker/containers:/var/lib/docker/containers: ro
- /var/run/docker.sock:/var/run/docker.sock:ro
depends_on:
- kafka
volumes:
prometheus-data:
grafana-data: