using System.Drawing; using System.Drawing.Imaging; using System.Net.WebSockets; using System.Text; using Masuit.Tools.Systems; using MoYu.DataEncryption; using MoYu.RemoteRequest.Extensions; using MoYu.Templates; using NewLife.Caching; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using OfficeOpenXml.FormulaParsing.Excel.Functions.Text; using SafeCampus.Application.Services.Business.Warn.Dto; using SafeCampus.Application.Services.Business.Warn.Service; using SafeCampus.Core.Utils.TXYSMS; namespace SafeCampus.Application.Manager.DeepelephManager; /// <summary> /// 深象智能对接实现 /// </summary> public class DeepelephManager : IDeepelephManager, IScoped { private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private bool _isConnected = false; private Timer _pingTimer; private List<ClientWebSocket> webScokets=new List<ClientWebSocket>(); private readonly ISimpleCacheService _simpleCacheService; public DeepelephManager(ISimpleCacheService simpleCacheService) { _simpleCacheService = simpleCacheService; } //获取地址 //建立请求,发送订阅参数 //遍历等待接收消息 public string GetToken() { var cacheToken = _simpleCacheService.Get<string>(AuthConstants.SXTOKEN); if (cacheToken != null) return cacheToken; return GenSXToken(); } /// <summary> /// 获取深象智能Token /// </summary> /// <returns></returns> private string GenSXToken() { var token = ""; var setting = App.GetOptionsMonitor<AppInfoOptions>(); var nonce = Guid.NewGuid().ToString("N"); var timestamp =DateTimeOffset.Now.ToUnixTimeMilliseconds(); var list = $"{setting.SXAPIURL}/user/center/v1/login/client" .SetBody(new { appKey = setting.AppKey, appSecret = setting.AppSecret, nonce , timestamp, sign = MD5Encryption.Encrypt($"{setting.AppSecret}appKey{setting.AppKey}nonce{nonce}timestamp{timestamp}{setting.AppSecret}",true) }) .SetContentType("application/json") .PostAsAsync<string>().Result; var model = JsonConvert.DeserializeObject<JObject>(list); if (model["success"]!=null) { if ((bool)model["success"]) { token = model["data"]["token"].ToString(); //token有效期2小时,提前20分钟失效 _simpleCacheService.Set(AuthConstants.SXTOKEN, token, 100*60); } else { LogHelper.WriteToLog("token请求失败", model["message"].ToString()); } } return token; } /// <summary> /// 订阅预警更新 /// </summary> /// <returns></returns> public async Task SubscribeAlarm() { var setting = App.GetOptionsMonitor<AppInfoOptions>(); var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(); var url = GetWebSocketUrl(AuthConstants.SXALARM, AuthConstants.SXALARM_Grpup); var subscribeJson = JsonConvert.SerializeObject(new { token = GetToken(), type = "subscribe", timestamp, payload = new { topic = AuthConstants.SXALARM, consumerGroup = AuthConstants.SXALARM_Grpup, tags = setting.TenantCode } }); ClientWebSocket _webSocket = new ClientWebSocket(); webScokets.Add(_webSocket); await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None); LogHelper.WriteToLog($"预警WebSocket 已连接:{url}"); await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(subscribeJson)), WebSocketMessageType.Text, true, CancellationToken.None); byte[] buffer = new byte[1024*8]; while (_webSocket.State == WebSocketState.Open) { //try //{ // var jsonstr = // " {\"body\":{\"extend\":\"{\\\"trackId\\\":\\\"12937198411611620608\\\",\\\"genderScore\\\":[0.85385144,0.14504793,8.212863E-4],\\\"faceSnapshot\\\":\\\"http://deepvision.oss-cn-zhangjiakou.aliyuncs.com/acp/quanjiang/DEMO00001/20240712/edge/group/SXT001_book_wm_1720744889834_2.jpg?Expires=1720752090&OSSAccessKeyId=STS.NT4L37rDr91kevRbXXzw6nfMD&Signature=G2bEhgR7yQFP67upY9xhV6rH6AM%3D&security-token=CAIS0wN1q6Ft5B2yfSjIr5eBB4mDn5tTjvOAZ1DjhlgNdvgagaPmpjz2IHFMf3huCeodsv8%2BlGxS5%2FgelrpqVpZDR03Na8RHwrly1lv5O9KY4yZbWXDm0s%2FLI3OaLjKm9hi7AYygPgK0GJqEb1TDiVUto9%2FTfimjWFqIKICAjYUdAP0cQgi%2Fa0gwZrJRPRAwh8IGEnHTOP2xUHvtmXGCNFd0nQB%2BhGhjk7TdpPeR8R3Dllb35%2FYIroDqWPieYtJrIY10XqWBvqx%2FfbGT1zVLuVoYtvV6gaFc5zbcv9abRFVf4hiCP6%2Ff6MBuNw5%2Fae94efZNp%2BOukuZj6K6B1db7xhtVI%2BBOUiPZA4mr2IzdBeqvNNcwc7m8F1no9YjXbsGs9EEGGStLaVgVI4F8dyAhWEd9FWjgR%2FX5qAyQUGCKULOY1aw6651xwmjz8MCCT1r1GOTBindGasVnMxh5Z0JMjDK9aNkKfgFUbVJ8BrGTCIh%2FYx0bsq7yowDIEyp71TRMo%2Bbu%2FDBhIifKpO4VN7AxMup1DPwu2wNCxFTF%2Bmp%2BKWf0Jz1CnuYdie64csaEylVu4cUzwVR2okivFuX%2By5hlD5o57dNqxAQQCz4KifNxiqzxO%2BBHrfsIeBqAAVdLFq9YtSjPudclLSqdxoekyznbYEtkERJY1iOmTBUChk2ja1KRSUvYSbtTl9r6MTksWA0mlqHfJP416rLsDCoCUhEtZ5OIJ9uobmwBlOhNJOmvNLKts9UatFZKf8fpoabQDjZVOrlt5SGCat11TstwJxt%2Fqk4KlR1z2%2Fvgjk36IAA%3D\\\",\\\"faceSnapshotPath\\\":\\\"deepvision:acp/quanjiang/DEMO00001/20240712/edge/group/SXT001_book_wm_1720744889834_2.jpg\\\",\\\"faceScore\\\":21.902344}\",\"alarmType\":\"hat_detect\",\"func\":\"hat_detect\",\"cameraId\":\"SXT001\",\"alarmId\":\"4fe123dc469c47919f1bb2e83469f088\",\"snapshotUrl\":\"http://deepvision.oss-cn-zhangjiakou.aliyuncs.com/acp/quanjiang/DEMO00001/20240712/edge/group/SXT001/SXT001_d051f448-690e-4135-9659-40dfef3238fb_wm_2.jpg?Expires=1720752090&OSSAccessKeyId=STS.NT4L37rDr91kevRbXXzw6nfMD&Signature=Mq39eyfB5E2sdJ58lT03HMz9QnI%3D&security-token=CAIS0wN1q6Ft5B2yfSjIr5eBB4mDn5tTjvOAZ1DjhlgNdvgagaPmpjz2IHFMf3huCeodsv8%2BlGxS5%2FgelrpqVpZDR03Na8RHwrly1lv5O9KY4yZbWXDm0s%2FLI3OaLjKm9hi7AYygPgK0GJqEb1TDiVUto9%2FTfimjWFqIKICAjYUdAP0cQgi%2Fa0gwZrJRPRAwh8IGEnHTOP2xUHvtmXGCNFd0nQB%2BhGhjk7TdpPeR8R3Dllb35%2FYIroDqWPieYtJrIY10XqWBvqx%2FfbGT1zVLuVoYtvV6gaFc5zbcv9abRFVf4hiCP6%2Ff6MBuNw5%2Fae94efZNp%2BOukuZj6K6B1db7xhtVI%2BBOUiPZA4mr2IzdBeqvNNcwc7m8F1no9YjXbsGs9EEGGStLaVgVI4F8dyAhWEd9FWjgR%2FX5qAyQUGCKULOY1aw6651xwmjz8MCCT1r1GOTBindGasVnMxh5Z0JMjDK9aNkKfgFUbVJ8BrGTCIh%2FYx0bsq7yowDIEyp71TRMo%2Bbu%2FDBhIifKpO4VN7AxMup1DPwu2wNCxFTF%2Bmp%2BKWf0Jz1CnuYdie64csaEylVu4cUzwVR2okivFuX%2By5hlD5o57dNqxAQQCz4KifNxiqzxO%2BBHrfsIeBqAAVdLFq9YtSjPudclLSqdxoekyznbYEtkERJY1iOmTBUChk2ja1KRSUvYSbtTl9r6MTksWA0mlqHfJP416rLsDCoCUhEtZ5OIJ9uobmwBlOhNJOmvNLKts9UatFZKf8fpoabQDjZVOrlt5SGCat11TstwJxt%2Fqk4KlR1z2%2Fvgjk36IAA%3D\",\"rects\":[{\"top\":656,\"left\":1642,\"width\":285,\"height\":767}],\"poiId\":\"DEMO00001\",\"tenantCode\":\"quanjiang\",\"tick\":1720744890634},\"bornTimestamp\":1720744890762,\"consumerGroup\":\"GID_quanjiang_risk\",\"keys\":\"4fe123dc469c47919f1bb2e83469f088\",\"messageId\":\"AC14058A00017A07C5B43A83698A5378\",\"queueId\":1,\"queueOffset\":418623,\"tags\":\"quanjiang\",\"topic\":\"ECOLOGY_PAAS_RISK_ALARM\",\"type\":\"msg\"}"; // var json = JsonConvert.DeserializeObject<JObject>(jsonstr); // if (json["type"].ToString() == "msg") // { // var body = json["body"]; // if (body != null) // { // PersonType personEnum; // if (!Enum.TryParse(body["alarmType"].ToString(), out AlarmType dayEnum)) // { // dayEnum = AlarmType.visual_fence; // } // if (body["extend"]!=null) // { // var extend= JsonConvert.DeserializeObject<JObject>(body["extend"].ToString()); // if (extend["personType"] != null) // { // if (!Enum.TryParse(body["extend"]?["personType"]?.ToString(), out personEnum)) // { // personEnum = PersonType.unkonwn; // } // } // else // { // personEnum = PersonType.unkonwn; // } // } // else // { // personEnum = PersonType.unkonwn; // } // var model = new WarnInfoDto // { // TenantCode = body["tenantCode"]?.ToString(), // PoiId = body["poiId"]?.ToString(), // AlarmId = body["alarmId"]?.ToString(), // AlarmType = body["alarmType"]?.ToString(), // AlarmTypeDesc = dayEnum.GetDescription(), // CameraId = body["cameraId"]?.ToString(), // Tick = TimestampToDateTime(body["tick"].ToString()), // SnapshotUrl = body["snapshotUrl"]?.ToString(), // Rects = body["rects"]?.ToString(), // Tags = body["tags"]?.ToString(), // Extend = body["extend"]?.ToString(), // PersonType = personEnum.GetDescription(), // WarnHand = 0, // }; // var signImg = Path.Combine(Directory.GetCurrentDirectory(), "Files", App.Configuration["AppInfo:AlarmImg"], model.AlarmId + ".jpg"); // var steam = await model.SnapshotUrl.GetAsByteArrayAsync(); // model.SnapshotUrl = $"http://192.168.10.186:8003/Files/alarmImg/{model.AlarmId}.jpg"; // using (MemoryStream ms = new MemoryStream(steam)) // { // using (Bitmap bmp = new Bitmap(ms)) // { // using (Graphics g = Graphics.FromImage(bmp)) // { // using (Pen pen = new Pen(Color.Red, 3)) // { // foreach (var item in body["rects"]) // { // Rectangle rect = new Rectangle((int)item["left"], (int)item["top"], (int)item["width"], (int)item["height"]); // g.DrawRectangle(pen, rect); // } // } // } // bmp.Save(signImg, ImageFormat.Jpeg); // } // } // //await model.SnapshotUrl.GetToSaveAsync(signImg); // Scoped.Create((_, scope) => // { // var services = scope.ServiceProvider; // var _repository = services.GetService<IWarnInfoService>(); // _repository.Add(model); // }); // } // } //} //catch (Exception e) //{ //} WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); if (result.MessageType == WebSocketMessageType.Text) { try { string message = Encoding.UTF8.GetString(buffer, 0, result.Count); LogHelper.WriteToLog($"收到: {message}"); var json = JsonConvert.DeserializeObject<JObject>(message); if (json["type"].ToString() == "msg") { var body = json["body"]; if (body != null) { PersonType personEnum; if (!Enum.TryParse(body["alarmType"].ToString(), out AlarmType dayEnum)) { dayEnum = AlarmType.visual_fence; } if (body["extend"] != null) { var extend = JsonConvert.DeserializeObject<JObject>(body["extend"].ToString()); if (extend["personType"] != null) { if (!Enum.TryParse(body["extend"]?["personType"]?.ToString(), out personEnum)) { personEnum = PersonType.unkonwn; } } else { personEnum = PersonType.unkonwn; } } else { personEnum = PersonType.unkonwn; } var model = new WarnInfoDto { TenantCode = body["tenantCode"]?.ToString(), PoiId = body["poiId"]?.ToString(), AlarmId = body["alarmId"]?.ToString(), AlarmType = body["alarmType"]?.ToString(), AlarmTypeDesc = dayEnum.GetDescription(), CameraId = body["cameraId"]?.ToString(), Tick = TimestampToDateTime(body["tick"].ToString()), SnapshotUrl = body["snapshotUrl"]?.ToString(), Rects = body["rects"]?.ToString(), Tags = body["tags"]?.ToString(), Extend = body["extend"]?.ToString(), PersonType = personEnum.GetDescription(), WarnHand = 0, }; var signImg = Path.Combine(Directory.GetCurrentDirectory(), "Files", App.Configuration["AppInfo:AlarmImg"], model.AlarmId + ".jpg"); //await model.SnapshotUrl.GetToSaveAsync(signImg); var steam = await model.SnapshotUrl.GetAsByteArrayAsync(); model.SnapshotUrl = $"http://192.168.10.186:8003/Files/alarmImg/{model.AlarmId}.jpg"; using (MemoryStream ms = new MemoryStream(steam)) { using (Bitmap bmp = new Bitmap(ms)) { using (Graphics g = Graphics.FromImage(bmp)) { using (Pen pen = new Pen(Color.Red, 3)) { foreach (var item in body["rects"]) { Rectangle rect = new Rectangle((int)item["left"], (int)item["top"], (int)item["width"], (int)item["height"]); g.DrawRectangle(pen, rect); } } } bmp.Save(signImg, ImageFormat.Jpeg); } } Scoped.Create((_, scope) => { var services = scope.ServiceProvider; var _repository = services.GetService<IWarnInfoService>(); _repository.Add(model); }); } } } catch (Exception e) { LogHelper.WriteToLog( "预警webSocket处理异常"); LogHelper.WriteToLog(e, "预警webSocket处理异常"); } } else if (result.MessageType == WebSocketMessageType.Close) { LogHelper.WriteToLog("WebSocket 连接已被服务器关闭"); break; } } } /// <summary> /// 订阅点名事件 /// </summary> /// <returns></returns> public async Task SubscriberRoomCall() { var setting = App.GetOptionsMonitor<AppInfoOptions>(); var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(); var url = GetWebSocketUrl(AuthConstants.SXROOM_CALL, AuthConstants.SXROOM_CAL_Group); var subscribeJson = JsonConvert.SerializeObject(new { token = GetToken(), type = "subscribe", timestamp, payload = new { topic = AuthConstants.SXROOM_CALL, consumerGroup = AuthConstants.SXROOM_CAL_Group, tags = setting.TenantCode } }); ClientWebSocket _webSocket = new ClientWebSocket(); webScokets.Add(_webSocket); await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None); LogHelper.WriteToLog($"点名WebSocket 已连接:{url}"); await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(subscribeJson)), WebSocketMessageType.Text, true, CancellationToken.None); byte[] buffer = new byte[1024*8]; while (_webSocket.State == WebSocketState.Open) { WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); if (result.MessageType == WebSocketMessageType.Text) { string message = Encoding.UTF8.GetString(buffer, 0, result.Count); LogHelper.WriteToLog($"收到: {message}"); var json = JsonConvert.DeserializeObject<JObject>(message); if (json["type"].ToString() == "msg") { var body = json["body"]; if (body != null) { TxySmsUtil.SendSms(new[] { "" }, new[] { "" }); //TODO 由于未知道数据格式暂不写处理 } } } else if (result.MessageType == WebSocketMessageType.Close) { LogHelper.WriteToLog("WebSocket 连接已被服务器关闭"); break; } } } public async Task SubscriberAttendance() { var setting = App.GetOptionsMonitor<AppInfoOptions>(); var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(); var url = GetWebSocketUrl(AuthConstants.SXECOLOGY_ATTENDANCE, AuthConstants.SXECOLOGY_ATTENDANCE_Group); var subscribeJson = JsonConvert.SerializeObject(new { token = GetToken(), type = "subscribe", timestamp, payload = new { topic = AuthConstants.SXECOLOGY_ATTENDANCE, consumerGroup = AuthConstants.SXECOLOGY_ATTENDANCE_Group, tags = setting.TenantCode } }); ClientWebSocket _webSocket = new ClientWebSocket(); webScokets.Add(_webSocket); await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None); LogHelper.WriteToLog($"考勤WebSocket 已连接:{url}"); await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(subscribeJson)), WebSocketMessageType.Text, true, CancellationToken.None); byte[] buffer = new byte[1024 * 8]; while (_webSocket.State == WebSocketState.Open) { WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); if (result.MessageType == WebSocketMessageType.Text) { string message = Encoding.UTF8.GetString(buffer, 0, result.Count); LogHelper.WriteToLog($"收到: {message}"); var json = JsonConvert.DeserializeObject<JObject>(message); if (json["type"].ToString() == "msg") { var body = json["body"]; if (body != null) { //TODO 由于未知道数据格式暂不写处理 } } } else if (result.MessageType == WebSocketMessageType.Close) { LogHelper.WriteToLog("WebSocket 连接已被服务器关闭"); break; } } } private async Task GetWebSocketData(string url, string json) { ClientWebSocket _webSocket = new ClientWebSocket(); while (!_cancellationTokenSource.IsCancellationRequested) { try { await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None); LogHelper.WriteToLog("WebSocket 已连接"); _isConnected = true; //订阅预警消息 await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(json)), WebSocketMessageType.Text, true, CancellationToken.None); //_pingTimer = new Timer(async (state) => //{ // if (_webSocket.State == WebSocketState.Open) // { // try // { // LogHelper.WriteToLog("发送 ping..."); // byte[] emptyPingMessage = Array.Empty<byte>(); // await _webSocket.SendAsync(new ArraySegment<byte>(emptyPingMessage), WebSocketMessageType.Binary, true, CancellationToken.None); // //byte[] pingMessage = Encoding.UTF8.GetBytes("pong"); // //await _webSocket.SendAsync(new ArraySegment<byte>(pingMessage), WebSocketMessageType.Text, true, CancellationToken.None); // } // catch (Exception ex) // { // LogHelper.WriteToLog($"发送ping失败: {ex.Message}"); // } // } //}, null, TimeSpan.Zero, TimeSpan.FromSeconds(30)); await ReceiveLoop(_webSocket); _isConnected = false; LogHelper.WriteToLog("WebSocket 连接已关闭,正在重连。。。"); } catch (Exception ex) { LogHelper.WriteToLog($"WebSocket连接失败: {ex.Message}"); } await Task.Delay(GetReconnectInterval()); } } private async Task ReceiveLoop(ClientWebSocket _webSocket) { byte[] buffer = new byte[1024]; while (_webSocket.State == WebSocketState.Open) { WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); if (result.MessageType == WebSocketMessageType.Text) { string message = Encoding.UTF8.GetString(buffer, 0, result.Count); LogHelper.WriteToLog($"收到: {message}"); var json= JsonConvert.DeserializeObject<JObject>(message); if (json["type"].ToString()=="msg") { var body = json["body"]; if (body!=null) { } } } else if (result.MessageType == WebSocketMessageType.Close) { LogHelper.WriteToLog("WebSocket 连接已被服务器关闭"); break; } } } private TimeSpan GetReconnectInterval() { // You can implement your own logic for calculating reconnect interval, e.g., exponential backoff return TimeSpan.FromSeconds(10); // Example: Attempt to reconnect every 10 seconds } public async Task DisconnectAsync() { foreach (var clientWebSocket in webScokets.Where(clientWebSocket => clientWebSocket.State == WebSocketState.Open)) { await clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None); } } private string GetWebSocketUrl(string topic, string consumerGroup) { var serverAddr = ""; var setting = App.GetOptionsMonitor<AppInfoOptions>(); var list = $"{setting.SXAPIURL}/emitter/connection/get" .SetBody(new { token = GetToken(), topic , consumerGroup, }) .SetContentType("application/json") .PostAsAsync<string>().Result; var model = JsonConvert.DeserializeObject<JObject>(list); if (model["data"]["serverAddr"]!=null) { serverAddr = model["data"]["serverAddr"].ToString(); } return serverAddr; } //public async Task<bool> GetData() //{ // //var setting = App.GetOptionsMonitor<AppInfoOptions>(); // //var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(); // ////订阅预警消息 // //var warnUrl = GetWebSocketUrl(AuthConstants.SXALARM, AuthConstants.SXALARM_Grpup); // //GetWebSocketData(warnUrl, // // JsonConvert.SerializeObject(new // // { // // token = GetToken(), type = "subscribe", timestamp, // // payload = new { topic = AuthConstants.SXALARM, consumerGroup= AuthConstants.SXALARM_Grpup, tags= setting.TenantCode } // // })); // //订阅预警更新消息 // await SubscribeAlarm(); // //订阅点名事件 // //await SubscriberRoomCall(); // return true; //} /// <summary> /// 时间戳转本时区日期时间 /// </summary> /// <param name="timeStamp"></param> /// <returns></returns> public static DateTime TimestampToDateTime(string timeStamp) { DateTimeOffset utcDateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(timeStamp)); return utcDateTimeOffset.LocalDateTime; } }