平安校园
 
 
 
 
 
 

530 lines
26 KiB

  1. using System.Drawing;
  2. using System.Drawing.Imaging;
  3. using System.Net.WebSockets;
  4. using System.Text;
  5. using Masuit.Tools.Systems;
  6. using MoYu.DataEncryption;
  7. using MoYu.RemoteRequest.Extensions;
  8. using MoYu.Templates;
  9. using NewLife.Caching;
  10. using Newtonsoft.Json;
  11. using Newtonsoft.Json.Linq;
  12. using OfficeOpenXml.FormulaParsing.Excel.Functions.Text;
  13. using SafeCampus.Application.Services.Business.Warn.Dto;
  14. using SafeCampus.Application.Services.Business.Warn.Service;
  15. namespace SafeCampus.Application.Manager.DeepelephManager;
  16. /// <summary>
  17. /// 深象智能对接实现
  18. /// </summary>
  19. public class DeepelephManager : IDeepelephManager, IScoped
  20. {
  21. private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
  22. private bool _isConnected = false;
  23. private Timer _pingTimer;
  24. private List<ClientWebSocket> webScokets=new List<ClientWebSocket>();
  25. private readonly ISimpleCacheService _simpleCacheService;
  26. public DeepelephManager(ISimpleCacheService simpleCacheService)
  27. {
  28. _simpleCacheService = simpleCacheService;
  29. }
  30. //获取地址
  31. //建立请求,发送订阅参数
  32. //遍历等待接收消息
  33. public string GetToken()
  34. {
  35. var cacheToken = _simpleCacheService.Get<string>(AuthConstants.SXTOKEN);
  36. if (cacheToken != null) return cacheToken;
  37. return GenSXToken();
  38. }
  39. /// <summary>
  40. /// 获取深象智能Token
  41. /// </summary>
  42. /// <returns></returns>
  43. private string GenSXToken()
  44. {
  45. var token = "";
  46. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  47. var nonce = Guid.NewGuid().ToString("N");
  48. var timestamp =DateTimeOffset.Now.ToUnixTimeMilliseconds();
  49. var list = $"{setting.SXAPIURL}/user/center/v1/login/client"
  50. .SetBody(new
  51. {
  52. appKey = setting.AppKey,
  53. appSecret = setting.AppSecret,
  54. nonce ,
  55. timestamp,
  56. sign = MD5Encryption.Encrypt($"{setting.AppSecret}appKey{setting.AppKey}nonce{nonce}timestamp{timestamp}{setting.AppSecret}",true)
  57. })
  58. .SetContentType("application/json")
  59. .PostAsAsync<string>().Result;
  60. var model = JsonConvert.DeserializeObject<JObject>(list);
  61. if (model["success"]!=null)
  62. {
  63. if ((bool)model["success"])
  64. {
  65. token = model["data"]["token"].ToString();
  66. //token有效期2小时,提前20分钟失效
  67. _simpleCacheService.Set(AuthConstants.SXTOKEN, token, 100*60);
  68. }
  69. else
  70. {
  71. LogHelper.WriteToLog("token请求失败", model["message"].ToString());
  72. }
  73. }
  74. return token;
  75. }
  76. /// <summary>
  77. /// 订阅预警更新
  78. /// </summary>
  79. /// <returns></returns>
  80. public async Task SubscribeAlarm()
  81. {
  82. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  83. var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  84. var url = GetWebSocketUrl(AuthConstants.SXALARM, AuthConstants.SXALARM_Grpup);
  85. var subscribeJson = JsonConvert.SerializeObject(new
  86. {
  87. token = GetToken(),
  88. type = "subscribe",
  89. timestamp,
  90. payload = new
  91. {
  92. topic = AuthConstants.SXALARM, consumerGroup = AuthConstants.SXALARM_Grpup, tags = setting.TenantCode
  93. }
  94. });
  95. ClientWebSocket _webSocket = new ClientWebSocket();
  96. webScokets.Add(_webSocket);
  97. await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None);
  98. LogHelper.WriteToLog($"预警WebSocket 已连接:{url}");
  99. await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(subscribeJson)), WebSocketMessageType.Text, true, CancellationToken.None);
  100. byte[] buffer = new byte[1024*8];
  101. while (_webSocket.State == WebSocketState.Open)
  102. {
  103. //try
  104. //{
  105. // var jsonstr =
  106. // " {\"body\":{\"extend\":\"{\\\"trackId\\\":\\\"12937198411611620608\\\",\\\"genderScore\\\":[0.85385144,0.14504793,8.212863E-4],\\\"faceSnapshot\\\":\\\"http://deepvision.oss-cn-zhangjiakou.aliyuncs.com/acp/quanjiang/DEMO00001/20240712/edge/group/SXT001_book_wm_1720744889834_2.jpg?Expires=1720752090&OSSAccessKeyId=STS.NT4L37rDr91kevRbXXzw6nfMD&Signature=G2bEhgR7yQFP67upY9xhV6rH6AM%3D&security-token=CAIS0wN1q6Ft5B2yfSjIr5eBB4mDn5tTjvOAZ1DjhlgNdvgagaPmpjz2IHFMf3huCeodsv8%2BlGxS5%2FgelrpqVpZDR03Na8RHwrly1lv5O9KY4yZbWXDm0s%2FLI3OaLjKm9hi7AYygPgK0GJqEb1TDiVUto9%2FTfimjWFqIKICAjYUdAP0cQgi%2Fa0gwZrJRPRAwh8IGEnHTOP2xUHvtmXGCNFd0nQB%2BhGhjk7TdpPeR8R3Dllb35%2FYIroDqWPieYtJrIY10XqWBvqx%2FfbGT1zVLuVoYtvV6gaFc5zbcv9abRFVf4hiCP6%2Ff6MBuNw5%2Fae94efZNp%2BOukuZj6K6B1db7xhtVI%2BBOUiPZA4mr2IzdBeqvNNcwc7m8F1no9YjXbsGs9EEGGStLaVgVI4F8dyAhWEd9FWjgR%2FX5qAyQUGCKULOY1aw6651xwmjz8MCCT1r1GOTBindGasVnMxh5Z0JMjDK9aNkKfgFUbVJ8BrGTCIh%2FYx0bsq7yowDIEyp71TRMo%2Bbu%2FDBhIifKpO4VN7AxMup1DPwu2wNCxFTF%2Bmp%2BKWf0Jz1CnuYdie64csaEylVu4cUzwVR2okivFuX%2By5hlD5o57dNqxAQQCz4KifNxiqzxO%2BBHrfsIeBqAAVdLFq9YtSjPudclLSqdxoekyznbYEtkERJY1iOmTBUChk2ja1KRSUvYSbtTl9r6MTksWA0mlqHfJP416rLsDCoCUhEtZ5OIJ9uobmwBlOhNJOmvNLKts9UatFZKf8fpoabQDjZVOrlt5SGCat11TstwJxt%2Fqk4KlR1z2%2Fvgjk36IAA%3D\\\",\\\"faceSnapshotPath\\\":\\\"deepvision:acp/quanjiang/DEMO00001/20240712/edge/group/SXT001_book_wm_1720744889834_2.jpg\\\",\\\"faceScore\\\":21.902344}\",\"alarmType\":\"hat_detect\",\"func\":\"hat_detect\",\"cameraId\":\"SXT001\",\"alarmId\":\"4fe123dc469c47919f1bb2e83469f088\",\"snapshotUrl\":\"http://deepvision.oss-cn-zhangjiakou.aliyuncs.com/acp/quanjiang/DEMO00001/20240712/edge/group/SXT001/SXT001_d051f448-690e-4135-9659-40dfef3238fb_wm_2.jpg?Expires=1720752090&OSSAccessKeyId=STS.NT4L37rDr91kevRbXXzw6nfMD&Signature=Mq39eyfB5E2sdJ58lT03HMz9QnI%3D&security-token=CAIS0wN1q6Ft5B2yfSjIr5eBB4mDn5tTjvOAZ1DjhlgNdvgagaPmpjz2IHFMf3huCeodsv8%2BlGxS5%2FgelrpqVpZDR03Na8RHwrly1lv5O9KY4yZbWXDm0s%2FLI3OaLjKm9hi7AYygPgK0GJqEb1TDiVUto9%2FTfimjWFqIKICAjYUdAP0cQgi%2Fa0gwZrJRPRAwh8IGEnHTOP2xUHvtmXGCNFd0nQB%2BhGhjk7TdpPeR8R3Dllb35%2FYIroDqWPieYtJrIY10XqWBvqx%2FfbGT1zVLuVoYtvV6gaFc5zbcv9abRFVf4hiCP6%2Ff6MBuNw5%2Fae94efZNp%2BOukuZj6K6B1db7xhtVI%2BBOUiPZA4mr2IzdBeqvNNcwc7m8F1no9YjXbsGs9EEGGStLaVgVI4F8dyAhWEd9FWjgR%2FX5qAyQUGCKULOY1aw6651xwmjz8MCCT1r1GOTBindGasVnMxh5Z0JMjDK9aNkKfgFUbVJ8BrGTCIh%2FYx0bsq7yowDIEyp71TRMo%2Bbu%2FDBhIifKpO4VN7AxMup1DPwu2wNCxFTF%2Bmp%2BKWf0Jz1CnuYdie64csaEylVu4cUzwVR2okivFuX%2By5hlD5o57dNqxAQQCz4KifNxiqzxO%2BBHrfsIeBqAAVdLFq9YtSjPudclLSqdxoekyznbYEtkERJY1iOmTBUChk2ja1KRSUvYSbtTl9r6MTksWA0mlqHfJP416rLsDCoCUhEtZ5OIJ9uobmwBlOhNJOmvNLKts9UatFZKf8fpoabQDjZVOrlt5SGCat11TstwJxt%2Fqk4KlR1z2%2Fvgjk36IAA%3D\",\"rects\":[{\"top\":656,\"left\":1642,\"width\":285,\"height\":767}],\"poiId\":\"DEMO00001\",\"tenantCode\":\"quanjiang\",\"tick\":1720744890634},\"bornTimestamp\":1720744890762,\"consumerGroup\":\"GID_quanjiang_risk\",\"keys\":\"4fe123dc469c47919f1bb2e83469f088\",\"messageId\":\"AC14058A00017A07C5B43A83698A5378\",\"queueId\":1,\"queueOffset\":418623,\"tags\":\"quanjiang\",\"topic\":\"ECOLOGY_PAAS_RISK_ALARM\",\"type\":\"msg\"}";
  107. // var json = JsonConvert.DeserializeObject<JObject>(jsonstr);
  108. // if (json["type"].ToString() == "msg")
  109. // {
  110. // var body = json["body"];
  111. // if (body != null)
  112. // {
  113. // PersonType personEnum;
  114. // if (!Enum.TryParse(body["alarmType"].ToString(), out AlarmType dayEnum))
  115. // {
  116. // dayEnum = AlarmType.visual_fence;
  117. // }
  118. // if (body["extend"]!=null)
  119. // {
  120. // var extend= JsonConvert.DeserializeObject<JObject>(body["extend"].ToString());
  121. // if (extend["personType"] != null)
  122. // {
  123. // if (!Enum.TryParse(body["extend"]?["personType"]?.ToString(), out personEnum))
  124. // {
  125. // personEnum = PersonType.unkonwn;
  126. // }
  127. // }
  128. // else
  129. // {
  130. // personEnum = PersonType.unkonwn;
  131. // }
  132. // }
  133. // else
  134. // {
  135. // personEnum = PersonType.unkonwn;
  136. // }
  137. // var model = new WarnInfoDto
  138. // {
  139. // TenantCode = body["tenantCode"]?.ToString(),
  140. // PoiId = body["poiId"]?.ToString(),
  141. // AlarmId = body["alarmId"]?.ToString(),
  142. // AlarmType = body["alarmType"]?.ToString(),
  143. // AlarmTypeDesc = dayEnum.GetDescription(),
  144. // CameraId = body["cameraId"]?.ToString(),
  145. // Tick = TimestampToDateTime(body["tick"].ToString()),
  146. // SnapshotUrl = body["snapshotUrl"]?.ToString(),
  147. // Rects = body["rects"]?.ToString(),
  148. // Tags = body["tags"]?.ToString(),
  149. // Extend = body["extend"]?.ToString(),
  150. // PersonType = personEnum.GetDescription(),
  151. // WarnHand = 0,
  152. // };
  153. // var signImg = Path.Combine(Directory.GetCurrentDirectory(), "Files", App.Configuration["AppInfo:AlarmImg"], model.AlarmId + ".jpg");
  154. // var steam = await model.SnapshotUrl.GetAsByteArrayAsync();
  155. // model.SnapshotUrl = $"http://192.168.10.186:8003/Files/alarmImg/{model.AlarmId}.jpg";
  156. // using (MemoryStream ms = new MemoryStream(steam))
  157. // {
  158. // using (Bitmap bmp = new Bitmap(ms))
  159. // {
  160. // using (Graphics g = Graphics.FromImage(bmp))
  161. // {
  162. // using (Pen pen = new Pen(Color.Red, 3))
  163. // {
  164. // foreach (var item in body["rects"])
  165. // {
  166. // Rectangle rect = new Rectangle((int)item["left"], (int)item["top"], (int)item["width"], (int)item["height"]);
  167. // g.DrawRectangle(pen, rect);
  168. // }
  169. // }
  170. // }
  171. // bmp.Save(signImg, ImageFormat.Jpeg);
  172. // }
  173. // }
  174. // //await model.SnapshotUrl.GetToSaveAsync(signImg);
  175. // Scoped.Create((_, scope) =>
  176. // {
  177. // var services = scope.ServiceProvider;
  178. // var _repository = services.GetService<IWarnInfoService>();
  179. // _repository.Add(model);
  180. // });
  181. // }
  182. // }
  183. //}
  184. //catch (Exception e)
  185. //{
  186. //}
  187. WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
  188. if (result.MessageType == WebSocketMessageType.Text)
  189. {
  190. try
  191. {
  192. string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
  193. LogHelper.WriteToLog($"收到: {message}");
  194. var json = JsonConvert.DeserializeObject<JObject>(message);
  195. if (json["type"].ToString() == "msg")
  196. {
  197. var body = json["body"];
  198. if (body != null)
  199. {
  200. PersonType personEnum;
  201. if (!Enum.TryParse(body["alarmType"].ToString(), out AlarmType dayEnum))
  202. {
  203. dayEnum = AlarmType.visual_fence;
  204. }
  205. if (body["extend"] != null)
  206. {
  207. var extend = JsonConvert.DeserializeObject<JObject>(body["extend"].ToString());
  208. if (extend["personType"] != null)
  209. {
  210. if (!Enum.TryParse(body["extend"]?["personType"]?.ToString(), out personEnum))
  211. {
  212. personEnum = PersonType.unkonwn;
  213. }
  214. }
  215. else
  216. {
  217. personEnum = PersonType.unkonwn;
  218. }
  219. }
  220. else
  221. {
  222. personEnum = PersonType.unkonwn;
  223. }
  224. var model = new WarnInfoDto
  225. {
  226. TenantCode = body["tenantCode"]?.ToString(),
  227. PoiId = body["poiId"]?.ToString(),
  228. AlarmId = body["alarmId"]?.ToString(),
  229. AlarmType = body["alarmType"]?.ToString(),
  230. AlarmTypeDesc = dayEnum.GetDescription(),
  231. CameraId = body["cameraId"]?.ToString(),
  232. Tick = TimestampToDateTime(body["tick"].ToString()),
  233. SnapshotUrl = body["snapshotUrl"]?.ToString(),
  234. Rects = body["rects"]?.ToString(),
  235. Tags = body["tags"]?.ToString(),
  236. Extend = body["extend"]?.ToString(),
  237. PersonType = personEnum.GetDescription(),
  238. WarnHand = 0,
  239. };
  240. var signImg = Path.Combine(Directory.GetCurrentDirectory(), "Files", App.Configuration["AppInfo:AlarmImg"], model.AlarmId + ".jpg");
  241. //await model.SnapshotUrl.GetToSaveAsync(signImg);
  242. var steam = await model.SnapshotUrl.GetAsByteArrayAsync();
  243. model.SnapshotUrl = $"http://192.168.10.186:8003/Files/alarmImg/{model.AlarmId}.jpg";
  244. using (MemoryStream ms = new MemoryStream(steam))
  245. {
  246. using (Bitmap bmp = new Bitmap(ms))
  247. {
  248. using (Graphics g = Graphics.FromImage(bmp))
  249. {
  250. using (Pen pen = new Pen(Color.Red, 3))
  251. {
  252. foreach (var item in body["rects"])
  253. {
  254. Rectangle rect = new Rectangle((int)item["left"], (int)item["top"], (int)item["width"], (int)item["height"]);
  255. g.DrawRectangle(pen, rect);
  256. }
  257. }
  258. }
  259. bmp.Save(signImg, ImageFormat.Jpeg);
  260. }
  261. }
  262. Scoped.Create((_, scope) =>
  263. {
  264. var services = scope.ServiceProvider;
  265. var _repository = services.GetService<IWarnInfoService>();
  266. _repository.Add(model);
  267. });
  268. }
  269. }
  270. }
  271. catch (Exception e)
  272. {
  273. LogHelper.WriteToLog( "预警webSocket处理异常");
  274. LogHelper.WriteToLog(e, "预警webSocket处理异常");
  275. }
  276. }
  277. else if (result.MessageType == WebSocketMessageType.Close)
  278. {
  279. LogHelper.WriteToLog("WebSocket 连接已被服务器关闭");
  280. break;
  281. }
  282. }
  283. }
  284. /// <summary>
  285. /// 订阅点名事件
  286. /// </summary>
  287. /// <returns></returns>
  288. public async Task SubscriberRoomCall()
  289. {
  290. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  291. var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  292. var url = GetWebSocketUrl(AuthConstants.SXROOM_CALL, AuthConstants.SXROOM_CAL_Group);
  293. var subscribeJson = JsonConvert.SerializeObject(new
  294. {
  295. token = GetToken(),
  296. type = "subscribe",
  297. timestamp,
  298. payload = new
  299. {
  300. topic = AuthConstants.SXROOM_CALL, consumerGroup = AuthConstants.SXROOM_CAL_Group, tags = setting.TenantCode
  301. }
  302. });
  303. ClientWebSocket _webSocket = new ClientWebSocket();
  304. webScokets.Add(_webSocket);
  305. await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None);
  306. LogHelper.WriteToLog($"点名WebSocket 已连接:{url}");
  307. await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(subscribeJson)), WebSocketMessageType.Text, true, CancellationToken.None);
  308. byte[] buffer = new byte[1024*8];
  309. while (_webSocket.State == WebSocketState.Open)
  310. {
  311. WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
  312. if (result.MessageType == WebSocketMessageType.Text)
  313. {
  314. string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
  315. LogHelper.WriteToLog($"收到: {message}");
  316. var json = JsonConvert.DeserializeObject<JObject>(message);
  317. if (json["type"].ToString() == "msg")
  318. {
  319. var body = json["body"];
  320. if (body != null)
  321. {
  322. //TODO 由于未知道数据格式暂不写处理
  323. }
  324. }
  325. }
  326. else if (result.MessageType == WebSocketMessageType.Close)
  327. {
  328. LogHelper.WriteToLog("WebSocket 连接已被服务器关闭");
  329. break;
  330. }
  331. }
  332. }
  333. public async Task SubscriberAttendance()
  334. {
  335. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  336. var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  337. var url = GetWebSocketUrl(AuthConstants.SXECOLOGY_ATTENDANCE, AuthConstants.SXECOLOGY_ATTENDANCE_Group);
  338. var subscribeJson = JsonConvert.SerializeObject(new
  339. {
  340. token = GetToken(),
  341. type = "subscribe",
  342. timestamp,
  343. payload = new
  344. {
  345. topic = AuthConstants.SXECOLOGY_ATTENDANCE,
  346. consumerGroup = AuthConstants.SXECOLOGY_ATTENDANCE_Group,
  347. tags = setting.TenantCode
  348. }
  349. });
  350. ClientWebSocket _webSocket = new ClientWebSocket();
  351. webScokets.Add(_webSocket);
  352. await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None);
  353. LogHelper.WriteToLog($"考勤WebSocket 已连接:{url}");
  354. await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(subscribeJson)), WebSocketMessageType.Text, true, CancellationToken.None);
  355. byte[] buffer = new byte[1024 * 8];
  356. while (_webSocket.State == WebSocketState.Open)
  357. {
  358. WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
  359. if (result.MessageType == WebSocketMessageType.Text)
  360. {
  361. string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
  362. LogHelper.WriteToLog($"收到: {message}");
  363. var json = JsonConvert.DeserializeObject<JObject>(message);
  364. if (json["type"].ToString() == "msg")
  365. {
  366. var body = json["body"];
  367. if (body != null)
  368. {
  369. //TODO 由于未知道数据格式暂不写处理
  370. }
  371. }
  372. }
  373. else if (result.MessageType == WebSocketMessageType.Close)
  374. {
  375. LogHelper.WriteToLog("WebSocket 连接已被服务器关闭");
  376. break;
  377. }
  378. }
  379. }
  380. private async Task GetWebSocketData(string url, string json)
  381. {
  382. ClientWebSocket _webSocket = new ClientWebSocket();
  383. while (!_cancellationTokenSource.IsCancellationRequested)
  384. {
  385. try
  386. {
  387. await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None);
  388. LogHelper.WriteToLog("WebSocket 已连接");
  389. _isConnected = true;
  390. //订阅预警消息
  391. await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(json)), WebSocketMessageType.Text, true, CancellationToken.None);
  392. //_pingTimer = new Timer(async (state) =>
  393. //{
  394. // if (_webSocket.State == WebSocketState.Open)
  395. // {
  396. // try
  397. // {
  398. // LogHelper.WriteToLog("发送 ping...");
  399. // byte[] emptyPingMessage = Array.Empty<byte>();
  400. // await _webSocket.SendAsync(new ArraySegment<byte>(emptyPingMessage), WebSocketMessageType.Binary, true, CancellationToken.None);
  401. // //byte[] pingMessage = Encoding.UTF8.GetBytes("pong");
  402. // //await _webSocket.SendAsync(new ArraySegment<byte>(pingMessage), WebSocketMessageType.Text, true, CancellationToken.None);
  403. // }
  404. // catch (Exception ex)
  405. // {
  406. // LogHelper.WriteToLog($"发送ping失败: {ex.Message}");
  407. // }
  408. // }
  409. //}, null, TimeSpan.Zero, TimeSpan.FromSeconds(30));
  410. await ReceiveLoop(_webSocket);
  411. _isConnected = false;
  412. LogHelper.WriteToLog("WebSocket 连接已关闭,正在重连。。。");
  413. }
  414. catch (Exception ex)
  415. {
  416. LogHelper.WriteToLog($"WebSocket连接失败: {ex.Message}");
  417. }
  418. await Task.Delay(GetReconnectInterval());
  419. }
  420. }
  421. private async Task ReceiveLoop(ClientWebSocket _webSocket)
  422. {
  423. byte[] buffer = new byte[1024];
  424. while (_webSocket.State == WebSocketState.Open)
  425. {
  426. WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
  427. if (result.MessageType == WebSocketMessageType.Text)
  428. {
  429. string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
  430. LogHelper.WriteToLog($"收到: {message}");
  431. var json= JsonConvert.DeserializeObject<JObject>(message);
  432. if (json["type"].ToString()=="msg")
  433. {
  434. var body = json["body"];
  435. if (body!=null)
  436. {
  437. }
  438. }
  439. }
  440. else if (result.MessageType == WebSocketMessageType.Close)
  441. {
  442. LogHelper.WriteToLog("WebSocket 连接已被服务器关闭");
  443. break;
  444. }
  445. }
  446. }
  447. private TimeSpan GetReconnectInterval()
  448. {
  449. // You can implement your own logic for calculating reconnect interval, e.g., exponential backoff
  450. return TimeSpan.FromSeconds(10); // Example: Attempt to reconnect every 10 seconds
  451. }
  452. public async Task DisconnectAsync()
  453. {
  454. foreach (var clientWebSocket in webScokets.Where(clientWebSocket => clientWebSocket.State == WebSocketState.Open))
  455. {
  456. await clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
  457. }
  458. }
  459. private string GetWebSocketUrl(string topic, string consumerGroup)
  460. {
  461. var serverAddr = "";
  462. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  463. var list = $"{setting.SXAPIURL}/emitter/connection/get"
  464. .SetBody(new
  465. {
  466. token = GetToken(),
  467. topic ,
  468. consumerGroup,
  469. })
  470. .SetContentType("application/json")
  471. .PostAsAsync<string>().Result;
  472. var model = JsonConvert.DeserializeObject<JObject>(list);
  473. if (model["data"]["serverAddr"]!=null)
  474. {
  475. serverAddr = model["data"]["serverAddr"].ToString();
  476. }
  477. return serverAddr;
  478. }
  479. //public async Task<bool> GetData()
  480. //{
  481. // //var setting = App.GetOptionsMonitor<AppInfoOptions>();
  482. // //var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  483. // ////订阅预警消息
  484. // //var warnUrl = GetWebSocketUrl(AuthConstants.SXALARM, AuthConstants.SXALARM_Grpup);
  485. // //GetWebSocketData(warnUrl,
  486. // // JsonConvert.SerializeObject(new
  487. // // {
  488. // // token = GetToken(), type = "subscribe", timestamp,
  489. // // payload = new { topic = AuthConstants.SXALARM, consumerGroup= AuthConstants.SXALARM_Grpup, tags= setting.TenantCode }
  490. // // }));
  491. // //订阅预警更新消息
  492. // await SubscribeAlarm();
  493. // //订阅点名事件
  494. // //await SubscriberRoomCall();
  495. // return true;
  496. //}
  497. /// <summary>
  498. /// 时间戳转本时区日期时间
  499. /// </summary>
  500. /// <param name="timeStamp"></param>
  501. /// <returns></returns>
  502. public static DateTime TimestampToDateTime(string timeStamp)
  503. {
  504. DateTimeOffset utcDateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(timeStamp));
  505. return utcDateTimeOffset.LocalDateTime;
  506. }
  507. }