统计在线用户的数量,是应用很常见的需求了。如果需要精准的统计到用户是在线,离线状态,我想只有客户端和服务器通过保持一个TCP长连接来实现。如果应用本身并非一个IM应用的话,这种方式成本极高。
现在的应用都趋向于使用心跳包来标识用户是否在线。用户登录后,每隔一段时间,往服务器推送一个消息,表示当前用户在线。服务器则可以定义一个时间差,例如:5分钟内收到过客户端心跳消息,视为在线用户。
在线用户统计的实现
基于数据库实现
最简单的办法,就是在用户表,添加一个最后心跳包的日期时间字段 last_active
。服务器收到心跳后,每次都去更新这个字段为当前的最新时间。
如果要查询最近5分钟活跃的用户数量,就可以简单的通过一句SQL完成。
SELECT COUNT(1) AS `online_user_count` FROM `user` WHERE `last_active` BETWEEN '2020-12-22 13:00:00' AND '020-12-22 13:05:00';
弊端也是显而易见,为了提高检索效率,不得不为last_active
字段添加索引,而因为心跳的更新,会导致频繁的重新维护索引树,效率极其低下。
基于Redis实现
这是比较理想的一种实现方式了,Redis基于内存进行读写,性能自然比关系型数据库好得多,而且它所提供的Zset可以很方便的构建出一个在线用户的统计服务。
Redis的Zset
这里不会涉及太多redis的东西,简单说明以下zset
。它是一个有序的set
集合,集合中的每个元素由2个东西组成
- member 既然是集合,那么它便是集合中的元素,并且不能重复
- score 既然是有序的,它就是用于排序的权重字段
Zset的部分操作
添加元素
ZADD key score member [score member ...]
一次性添加一个或者多个元素到集合,如果member
已经存在则会使用当前score
进行覆盖
统计所有的元素数量
ZCARD key
统计score值在min和max之间元素数量
ZCOUNT key min max
删除score值在min和max之间的元素
ZREMRANGEBYSCORE key min max
一个示例
我打算,用一个zset
存储我内心中编程语言的评分排名,这个key叫做lang
添加信息,返回新添加的元素个数
> zadd lang 999 php 10 java 9 go 8 python 7 javascript
"5"
查看添加的数量
> zcard lang
"5"
查看评分在8 - 10之间的元素个数,有3个
> zcount lang 8 10
"3"
删除评分在8 - 1000的元素,返回删除的个数
> ZREMRANGEBYSCORE lang 8 1000
"4"
在线用户服务的实现
知道了zset
后,就可以实现一个在线用户的统计服务了。
实现思路
客户端每隔5分钟发送一个心跳到服务器,服务器根据会话获取到用户的ID,作为zset
的member
存入zset
,score
便是当前收到心跳的时间戳,当同一个用户第二次发送心跳的时候,就会更新他对应的score
值,由于更新是在内存,这个速度相当快。
zadd users 1608616915109 10000
需要统计出在线用户的数量,本质上就是需要统计出,最近5分钟有发送心跳的用户,通过zcount
可以很轻松的统计出来。通过程序获取到当前的时间戳,作为maxScore
,时间戳减去5分钟后作为minScore
。
zcount users 1608616615109 1608616915109
因为某些用户可能长时间没有登录过了,可以通过ZREMRANGEBYSCORE
进行清理。通过程序获取到当前的时间戳,减去5分钟后作为maxScore
,使用0
, 作为minScore
,表示清理所有超过5分钟没有发送过心跳包的用户。
ZREMRANGEBYSCORE users 0 1608616615109
实现代码
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import javax.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
/**
*
*
* 在线用户统计
*
* @author Administrator
*
*/
@Component
public class OnlineUserStatsService {
private static final String ONLINE_USERS = "onlie_users";
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 添加用户在线信息
* @param userId
* @return
*/
public Boolean online(Integer userId) {
return this.stringRedisTemplate.opsForZSet().add(ONLINE_USERS, userId.toString(), Instant.now().toEpochMilli());
}
/**
* 获取一定时间内,在线的用户数量
* @param duration
* @return
*/
public Long count(Duration duration) {
LocalDateTime now = LocalDateTime.now();
return this.stringRedisTemplate.opsForZSet().count(ONLINE_USERS,
now.minus(duration).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(),
now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
}
/**
* 获取所有在线过的用户数量,不论时间
* @return
*/
public Long count() {
return this.stringRedisTemplate.opsForZSet().zCard(ONLINE_USERS);
}
/**
* 清除超过一定时间没在线的用户数据
* @param duration
* @return
*/
public Long clear(Duration duration) {
return this.stringRedisTemplate.opsForZSet().removeRangeByScore(ONLINE_USERS, 0,
LocalDateTime.now().minus(duration).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
}
}
使用示例
@Resource
private OnlineUserStatsService onlineUserStatsService;
@Test
public void test() {
// ID为1的用户发送了心跳包
boolean result = this.onlineUserStatsService.online(1);
System.out.println("online=" + result);
// 获取5分钟内,发送过心跳包的用户数量,也就是在线用户的数量
Long count = this.onlineUserStatsService.count(Duration.ofMinutes(5));
System.out.println("oneline count=" + count);
// 获取所有发送过心跳包的用户数量
count = this.onlineUserStatsService.count();
System.out.println("all count=" + count);
// 清除超过1天都没发送过心跳包的用户
Long clear = this.onlineUserStatsService.clear(Duration.ofDays(1));
System.out.println("clear=" + clear);
}
内存消耗分析
可以通过www.redis.cn/redis_memory/预算Redis的内存消耗
我对Redis的内存分配并不熟悉,只是按照自己的想法去填写了一些数据,所以我在这里理解的东西,可能是错误的。但是我想这并不耽误证明 - 在这种场景使用Zset对内存消耗极低的事实
设想onlie_users
需要存储1亿个用户的状态信息,每个元素score
和member
需要10个字节存储,那么一共大约需要20G内存。20G的内存对于现在的服务器来说,并不是大问题。
最后
- 心跳协议不一定非要HTTP,如果客户端支持的话UDP就很适合,可以节约一些系统开销。
zset
的key,不一定非要用String
,可以修改序列化方式,以固定的字节的形式存储用户ID,在用户ID过大的时候,可以节约一些存储空间。
String userId = "10010";
System.out.println(userId.getBytes().length); // 以字符串形式存储 => 需要5个字节
byte[] bin = ByteBuffer.allocate(4).putInt(Integer.valueOf(userId)).array();
System.out.println(bin.length); // 序列化为字节形式存储 => 需要4个字节
System.out.println(ByteBuffer.wrap(bin).getInt()); // 反序列化为ID => 10010