using Masuit.Tools; using NewLife.MQTT; using SafeCampus.Core; using SafeCampus.Cache; namespace SafeCampus.Background; /// /// mqtt后台任务 /// public class MqttWorker : BackgroundService { private readonly ILogger _logger; private readonly ISimpleCacheService _simpleCacheService; private readonly MqttClient _mqtt; public MqttWorker(ILogger logger, ISimpleCacheService simpleCacheService, IMqttClientManager mqttClientManager) { _logger = logger; this._simpleCacheService = simpleCacheService; this._mqtt = mqttClientManager.GetClient(); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { //订阅设备上下线主题 await _mqtt.SubscribeAsync("$SYS/brokers/+/clients/+/+", (e) => { var topicList = e.Topic.Split("/");//根据/分割 var clientId = topicList[topicList.Length - 2];//获取客户端ID if (clientId.Contains("_"))//判断客户端ID是否有下划线有下划线表示是web用户登录 { var userId = clientId.Split("_")[0]; //获取redis当前用户的token信息列表 var tokenInfos = _simpleCacheService.HashGetOne>(CacheConst.Cache_UserToken, userId); if (tokenInfos != null) { var connectEvent = topicList.Last();//获取连接事件判断上线还是下线 if (connectEvent == "connected")//如果是上线 { _logger.LogInformation($"设备{clientId}上线了"); var token = _simpleCacheService.Get(CacheConst.Cache_MqttClientUser + clientId);//获取mqtt客户端ID对应的用户token if (token == null) return;//没有token就直接退出 //获取redis中当前token var tokenInfo = tokenInfos.Where(it => it.Token == token).FirstOrDefault(); if (tokenInfo != null) { tokenInfo.ClientIds.Add(clientId);//添加到客户端列表 _simpleCacheService.HashAdd(CacheConst.Cache_UserToken, userId, tokenInfos);//更新Redis } } else //下线 { _logger.LogInformation($"设备{clientId}下线了"); //获取当前客户端ID所在的token信息 var tokenInfo = tokenInfos.Where(it => it.ClientIds.Contains(clientId)).FirstOrDefault(); if (tokenInfo != null) { tokenInfo.ClientIds.RemoveWhere(it => it == clientId);//从客户端列表删除 _simpleCacheService.HashAdd(CacheConst.Cache_UserToken, userId, tokenInfos);//更新Redis } } } } }); } }