平安校园
Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 
 
 

532 linhas
26 KiB

  1. using System.Drawing;
  2. using System.Drawing.Imaging;
  3. using System.Net.WebSockets;
  4. using System.Text;
  5. using Masuit.Tools.Systems;
  6. using MoYu.DataEncryption;
  7. using MoYu.RemoteRequest.Extensions;
  8. using MoYu.Templates;
  9. using NewLife.Caching;
  10. using Newtonsoft.Json;
  11. using Newtonsoft.Json.Linq;
  12. using OfficeOpenXml.FormulaParsing.Excel.Functions.Text;
  13. using SafeCampus.Application.Services.Business.Warn.Dto;
  14. using SafeCampus.Application.Services.Business.Warn.Service;
  15. using SafeCampus.Core.Utils.TXYSMS;
  16. namespace SafeCampus.Application.Manager.DeepelephManager;
  17. /// <summary>
  18. /// 深象智能对接实现
  19. /// </summary>
  20. public class DeepelephManager : IDeepelephManager, IScoped
  21. {
  22. private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
  23. private bool _isConnected = false;
  24. private Timer _pingTimer;
  25. private List<ClientWebSocket> webScokets=new List<ClientWebSocket>();
  26. private readonly ISimpleCacheService _simpleCacheService;
  27. public DeepelephManager(ISimpleCacheService simpleCacheService)
  28. {
  29. _simpleCacheService = simpleCacheService;
  30. }
  31. //获取地址
  32. //建立请求,发送订阅参数
  33. //遍历等待接收消息
  34. public string GetToken()
  35. {
  36. var cacheToken = _simpleCacheService.Get<string>(AuthConstants.SXTOKEN);
  37. if (cacheToken != null) return cacheToken;
  38. return GenSXToken();
  39. }
  40. /// <summary>
  41. /// 获取深象智能Token
  42. /// </summary>
  43. /// <returns></returns>
  44. private string GenSXToken()
  45. {
  46. var token = "";
  47. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  48. var nonce = Guid.NewGuid().ToString("N");
  49. var timestamp =DateTimeOffset.Now.ToUnixTimeMilliseconds();
  50. var list = $"{setting.SXAPIURL}/user/center/v1/login/client"
  51. .SetBody(new
  52. {
  53. appKey = setting.AppKey,
  54. appSecret = setting.AppSecret,
  55. nonce ,
  56. timestamp,
  57. sign = MD5Encryption.Encrypt($"{setting.AppSecret}appKey{setting.AppKey}nonce{nonce}timestamp{timestamp}{setting.AppSecret}",true)
  58. })
  59. .SetContentType("application/json")
  60. .PostAsAsync<string>().Result;
  61. var model = JsonConvert.DeserializeObject<JObject>(list);
  62. if (model["success"]!=null)
  63. {
  64. if ((bool)model["success"])
  65. {
  66. token = model["data"]["token"].ToString();
  67. //token有效期2小时,提前20分钟失效
  68. _simpleCacheService.Set(AuthConstants.SXTOKEN, token, 100*60);
  69. }
  70. else
  71. {
  72. LogHelper.WriteToLog("token请求失败", model["message"].ToString());
  73. }
  74. }
  75. return token;
  76. }
  77. /// <summary>
  78. /// 订阅预警更新
  79. /// </summary>
  80. /// <returns></returns>
  81. public async Task SubscribeAlarm()
  82. {
  83. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  84. var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  85. var url = GetWebSocketUrl(AuthConstants.SXALARM, AuthConstants.SXALARM_Grpup);
  86. var subscribeJson = JsonConvert.SerializeObject(new
  87. {
  88. token = GetToken(),
  89. type = "subscribe",
  90. timestamp,
  91. payload = new
  92. {
  93. topic = AuthConstants.SXALARM, consumerGroup = AuthConstants.SXALARM_Grpup, tags = setting.TenantCode
  94. }
  95. });
  96. ClientWebSocket _webSocket = new ClientWebSocket();
  97. webScokets.Add(_webSocket);
  98. await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None);
  99. LogHelper.WriteToLog($"预警WebSocket 已连接:{url}");
  100. await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(subscribeJson)), WebSocketMessageType.Text, true, CancellationToken.None);
  101. byte[] buffer = new byte[1024*8];
  102. while (_webSocket.State == WebSocketState.Open)
  103. {
  104. //try
  105. //{
  106. // var jsonstr =
  107. // " {\"body\":{\"extend\":\"{\\\"trackId\\\":\\\"12937198411611620608\\\",\\\"genderScore\\\":[0.85385144,0.14504793,8.212863E-4],\\\"faceSnapshot\\\":\\\"http://deepvision.oss-cn-zhangjiakou.aliyuncs.com/acp/quanjiang/DEMO00001/20240712/edge/group/SXT001_book_wm_1720744889834_2.jpg?Expires=1720752090&OSSAccessKeyId=STS.NT4L37rDr91kevRbXXzw6nfMD&Signature=G2bEhgR7yQFP67upY9xhV6rH6AM%3D&security-token=CAIS0wN1q6Ft5B2yfSjIr5eBB4mDn5tTjvOAZ1DjhlgNdvgagaPmpjz2IHFMf3huCeodsv8%2BlGxS5%2FgelrpqVpZDR03Na8RHwrly1lv5O9KY4yZbWXDm0s%2FLI3OaLjKm9hi7AYygPgK0GJqEb1TDiVUto9%2FTfimjWFqIKICAjYUdAP0cQgi%2Fa0gwZrJRPRAwh8IGEnHTOP2xUHvtmXGCNFd0nQB%2BhGhjk7TdpPeR8R3Dllb35%2FYIroDqWPieYtJrIY10XqWBvqx%2FfbGT1zVLuVoYtvV6gaFc5zbcv9abRFVf4hiCP6%2Ff6MBuNw5%2Fae94efZNp%2BOukuZj6K6B1db7xhtVI%2BBOUiPZA4mr2IzdBeqvNNcwc7m8F1no9YjXbsGs9EEGGStLaVgVI4F8dyAhWEd9FWjgR%2FX5qAyQUGCKULOY1aw6651xwmjz8MCCT1r1GOTBindGasVnMxh5Z0JMjDK9aNkKfgFUbVJ8BrGTCIh%2FYx0bsq7yowDIEyp71TRMo%2Bbu%2FDBhIifKpO4VN7AxMup1DPwu2wNCxFTF%2Bmp%2BKWf0Jz1CnuYdie64csaEylVu4cUzwVR2okivFuX%2By5hlD5o57dNqxAQQCz4KifNxiqzxO%2BBHrfsIeBqAAVdLFq9YtSjPudclLSqdxoekyznbYEtkERJY1iOmTBUChk2ja1KRSUvYSbtTl9r6MTksWA0mlqHfJP416rLsDCoCUhEtZ5OIJ9uobmwBlOhNJOmvNLKts9UatFZKf8fpoabQDjZVOrlt5SGCat11TstwJxt%2Fqk4KlR1z2%2Fvgjk36IAA%3D\\\",\\\"faceSnapshotPath\\\":\\\"deepvision:acp/quanjiang/DEMO00001/20240712/edge/group/SXT001_book_wm_1720744889834_2.jpg\\\",\\\"faceScore\\\":21.902344}\",\"alarmType\":\"hat_detect\",\"func\":\"hat_detect\",\"cameraId\":\"SXT001\",\"alarmId\":\"4fe123dc469c47919f1bb2e83469f088\",\"snapshotUrl\":\"http://deepvision.oss-cn-zhangjiakou.aliyuncs.com/acp/quanjiang/DEMO00001/20240712/edge/group/SXT001/SXT001_d051f448-690e-4135-9659-40dfef3238fb_wm_2.jpg?Expires=1720752090&OSSAccessKeyId=STS.NT4L37rDr91kevRbXXzw6nfMD&Signature=Mq39eyfB5E2sdJ58lT03HMz9QnI%3D&security-token=CAIS0wN1q6Ft5B2yfSjIr5eBB4mDn5tTjvOAZ1DjhlgNdvgagaPmpjz2IHFMf3huCeodsv8%2BlGxS5%2FgelrpqVpZDR03Na8RHwrly1lv5O9KY4yZbWXDm0s%2FLI3OaLjKm9hi7AYygPgK0GJqEb1TDiVUto9%2FTfimjWFqIKICAjYUdAP0cQgi%2Fa0gwZrJRPRAwh8IGEnHTOP2xUHvtmXGCNFd0nQB%2BhGhjk7TdpPeR8R3Dllb35%2FYIroDqWPieYtJrIY10XqWBvqx%2FfbGT1zVLuVoYtvV6gaFc5zbcv9abRFVf4hiCP6%2Ff6MBuNw5%2Fae94efZNp%2BOukuZj6K6B1db7xhtVI%2BBOUiPZA4mr2IzdBeqvNNcwc7m8F1no9YjXbsGs9EEGGStLaVgVI4F8dyAhWEd9FWjgR%2FX5qAyQUGCKULOY1aw6651xwmjz8MCCT1r1GOTBindGasVnMxh5Z0JMjDK9aNkKfgFUbVJ8BrGTCIh%2FYx0bsq7yowDIEyp71TRMo%2Bbu%2FDBhIifKpO4VN7AxMup1DPwu2wNCxFTF%2Bmp%2BKWf0Jz1CnuYdie64csaEylVu4cUzwVR2okivFuX%2By5hlD5o57dNqxAQQCz4KifNxiqzxO%2BBHrfsIeBqAAVdLFq9YtSjPudclLSqdxoekyznbYEtkERJY1iOmTBUChk2ja1KRSUvYSbtTl9r6MTksWA0mlqHfJP416rLsDCoCUhEtZ5OIJ9uobmwBlOhNJOmvNLKts9UatFZKf8fpoabQDjZVOrlt5SGCat11TstwJxt%2Fqk4KlR1z2%2Fvgjk36IAA%3D\",\"rects\":[{\"top\":656,\"left\":1642,\"width\":285,\"height\":767}],\"poiId\":\"DEMO00001\",\"tenantCode\":\"quanjiang\",\"tick\":1720744890634},\"bornTimestamp\":1720744890762,\"consumerGroup\":\"GID_quanjiang_risk\",\"keys\":\"4fe123dc469c47919f1bb2e83469f088\",\"messageId\":\"AC14058A00017A07C5B43A83698A5378\",\"queueId\":1,\"queueOffset\":418623,\"tags\":\"quanjiang\",\"topic\":\"ECOLOGY_PAAS_RISK_ALARM\",\"type\":\"msg\"}";
  108. // var json = JsonConvert.DeserializeObject<JObject>(jsonstr);
  109. // if (json["type"].ToString() == "msg")
  110. // {
  111. // var body = json["body"];
  112. // if (body != null)
  113. // {
  114. // PersonType personEnum;
  115. // if (!Enum.TryParse(body["alarmType"].ToString(), out AlarmType dayEnum))
  116. // {
  117. // dayEnum = AlarmType.visual_fence;
  118. // }
  119. // if (body["extend"]!=null)
  120. // {
  121. // var extend= JsonConvert.DeserializeObject<JObject>(body["extend"].ToString());
  122. // if (extend["personType"] != null)
  123. // {
  124. // if (!Enum.TryParse(body["extend"]?["personType"]?.ToString(), out personEnum))
  125. // {
  126. // personEnum = PersonType.unkonwn;
  127. // }
  128. // }
  129. // else
  130. // {
  131. // personEnum = PersonType.unkonwn;
  132. // }
  133. // }
  134. // else
  135. // {
  136. // personEnum = PersonType.unkonwn;
  137. // }
  138. // var model = new WarnInfoDto
  139. // {
  140. // TenantCode = body["tenantCode"]?.ToString(),
  141. // PoiId = body["poiId"]?.ToString(),
  142. // AlarmId = body["alarmId"]?.ToString(),
  143. // AlarmType = body["alarmType"]?.ToString(),
  144. // AlarmTypeDesc = dayEnum.GetDescription(),
  145. // CameraId = body["cameraId"]?.ToString(),
  146. // Tick = TimestampToDateTime(body["tick"].ToString()),
  147. // SnapshotUrl = body["snapshotUrl"]?.ToString(),
  148. // Rects = body["rects"]?.ToString(),
  149. // Tags = body["tags"]?.ToString(),
  150. // Extend = body["extend"]?.ToString(),
  151. // PersonType = personEnum.GetDescription(),
  152. // WarnHand = 0,
  153. // };
  154. // var signImg = Path.Combine(Directory.GetCurrentDirectory(), "Files", App.Configuration["AppInfo:AlarmImg"], model.AlarmId + ".jpg");
  155. // var steam = await model.SnapshotUrl.GetAsByteArrayAsync();
  156. // model.SnapshotUrl = $"http://192.168.10.186:8003/Files/alarmImg/{model.AlarmId}.jpg";
  157. // using (MemoryStream ms = new MemoryStream(steam))
  158. // {
  159. // using (Bitmap bmp = new Bitmap(ms))
  160. // {
  161. // using (Graphics g = Graphics.FromImage(bmp))
  162. // {
  163. // using (Pen pen = new Pen(Color.Red, 3))
  164. // {
  165. // foreach (var item in body["rects"])
  166. // {
  167. // Rectangle rect = new Rectangle((int)item["left"], (int)item["top"], (int)item["width"], (int)item["height"]);
  168. // g.DrawRectangle(pen, rect);
  169. // }
  170. // }
  171. // }
  172. // bmp.Save(signImg, ImageFormat.Jpeg);
  173. // }
  174. // }
  175. // //await model.SnapshotUrl.GetToSaveAsync(signImg);
  176. // Scoped.Create((_, scope) =>
  177. // {
  178. // var services = scope.ServiceProvider;
  179. // var _repository = services.GetService<IWarnInfoService>();
  180. // _repository.Add(model);
  181. // });
  182. // }
  183. // }
  184. //}
  185. //catch (Exception e)
  186. //{
  187. //}
  188. WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
  189. if (result.MessageType == WebSocketMessageType.Text)
  190. {
  191. try
  192. {
  193. string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
  194. LogHelper.WriteToLog($"收到: {message}");
  195. var json = JsonConvert.DeserializeObject<JObject>(message);
  196. if (json["type"].ToString() == "msg")
  197. {
  198. var body = json["body"];
  199. if (body != null)
  200. {
  201. PersonType personEnum;
  202. if (!Enum.TryParse(body["alarmType"].ToString(), out AlarmType dayEnum))
  203. {
  204. dayEnum = AlarmType.visual_fence;
  205. }
  206. if (body["extend"] != null)
  207. {
  208. var extend = JsonConvert.DeserializeObject<JObject>(body["extend"].ToString());
  209. if (extend["personType"] != null)
  210. {
  211. if (!Enum.TryParse(body["extend"]?["personType"]?.ToString(), out personEnum))
  212. {
  213. personEnum = PersonType.unkonwn;
  214. }
  215. }
  216. else
  217. {
  218. personEnum = PersonType.unkonwn;
  219. }
  220. }
  221. else
  222. {
  223. personEnum = PersonType.unkonwn;
  224. }
  225. var model = new WarnInfoDto
  226. {
  227. TenantCode = body["tenantCode"]?.ToString(),
  228. PoiId = body["poiId"]?.ToString(),
  229. AlarmId = body["alarmId"]?.ToString(),
  230. AlarmType = body["alarmType"]?.ToString(),
  231. AlarmTypeDesc = dayEnum.GetDescription(),
  232. CameraId = body["cameraId"]?.ToString(),
  233. Tick = TimestampToDateTime(body["tick"].ToString()),
  234. SnapshotUrl = body["snapshotUrl"]?.ToString(),
  235. Rects = body["rects"]?.ToString(),
  236. Tags = body["tags"]?.ToString(),
  237. Extend = body["extend"]?.ToString(),
  238. PersonType = personEnum.GetDescription(),
  239. WarnHand = 0,
  240. };
  241. var signImg = Path.Combine(Directory.GetCurrentDirectory(), "Files", App.Configuration["AppInfo:AlarmImg"], model.AlarmId + ".jpg");
  242. //await model.SnapshotUrl.GetToSaveAsync(signImg);
  243. var steam = await model.SnapshotUrl.GetAsByteArrayAsync();
  244. model.SnapshotUrl = $"http://192.168.10.186:8003/Files/alarmImg/{model.AlarmId}.jpg";
  245. using (MemoryStream ms = new MemoryStream(steam))
  246. {
  247. using (Bitmap bmp = new Bitmap(ms))
  248. {
  249. using (Graphics g = Graphics.FromImage(bmp))
  250. {
  251. using (Pen pen = new Pen(Color.Red, 3))
  252. {
  253. foreach (var item in body["rects"])
  254. {
  255. Rectangle rect = new Rectangle((int)item["left"], (int)item["top"], (int)item["width"], (int)item["height"]);
  256. g.DrawRectangle(pen, rect);
  257. }
  258. }
  259. }
  260. bmp.Save(signImg, ImageFormat.Jpeg);
  261. }
  262. }
  263. Scoped.Create((_, scope) =>
  264. {
  265. var services = scope.ServiceProvider;
  266. var _repository = services.GetService<IWarnInfoService>();
  267. _repository.Add(model);
  268. });
  269. }
  270. }
  271. }
  272. catch (Exception e)
  273. {
  274. LogHelper.WriteToLog( "预警webSocket处理异常");
  275. LogHelper.WriteToLog(e, "预警webSocket处理异常");
  276. }
  277. }
  278. else if (result.MessageType == WebSocketMessageType.Close)
  279. {
  280. LogHelper.WriteToLog("WebSocket 连接已被服务器关闭");
  281. break;
  282. }
  283. }
  284. }
  285. /// <summary>
  286. /// 订阅点名事件
  287. /// </summary>
  288. /// <returns></returns>
  289. public async Task SubscriberRoomCall()
  290. {
  291. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  292. var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  293. var url = GetWebSocketUrl(AuthConstants.SXROOM_CALL, AuthConstants.SXROOM_CAL_Group);
  294. var subscribeJson = JsonConvert.SerializeObject(new
  295. {
  296. token = GetToken(),
  297. type = "subscribe",
  298. timestamp,
  299. payload = new
  300. {
  301. topic = AuthConstants.SXROOM_CALL, consumerGroup = AuthConstants.SXROOM_CAL_Group, tags = setting.TenantCode
  302. }
  303. });
  304. ClientWebSocket _webSocket = new ClientWebSocket();
  305. webScokets.Add(_webSocket);
  306. await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None);
  307. LogHelper.WriteToLog($"点名WebSocket 已连接:{url}");
  308. await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(subscribeJson)), WebSocketMessageType.Text, true, CancellationToken.None);
  309. byte[] buffer = new byte[1024*8];
  310. while (_webSocket.State == WebSocketState.Open)
  311. {
  312. WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
  313. if (result.MessageType == WebSocketMessageType.Text)
  314. {
  315. string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
  316. LogHelper.WriteToLog($"收到: {message}");
  317. var json = JsonConvert.DeserializeObject<JObject>(message);
  318. if (json["type"].ToString() == "msg")
  319. {
  320. var body = json["body"];
  321. if (body != null)
  322. {
  323. TxySmsUtil.SendSms(new[] { "" }, new[] { "" });
  324. //TODO 由于未知道数据格式暂不写处理
  325. }
  326. }
  327. }
  328. else if (result.MessageType == WebSocketMessageType.Close)
  329. {
  330. LogHelper.WriteToLog("WebSocket 连接已被服务器关闭");
  331. break;
  332. }
  333. }
  334. }
  335. public async Task SubscriberAttendance()
  336. {
  337. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  338. var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  339. var url = GetWebSocketUrl(AuthConstants.SXECOLOGY_ATTENDANCE, AuthConstants.SXECOLOGY_ATTENDANCE_Group);
  340. var subscribeJson = JsonConvert.SerializeObject(new
  341. {
  342. token = GetToken(),
  343. type = "subscribe",
  344. timestamp,
  345. payload = new
  346. {
  347. topic = AuthConstants.SXECOLOGY_ATTENDANCE,
  348. consumerGroup = AuthConstants.SXECOLOGY_ATTENDANCE_Group,
  349. tags = setting.TenantCode
  350. }
  351. });
  352. ClientWebSocket _webSocket = new ClientWebSocket();
  353. webScokets.Add(_webSocket);
  354. await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None);
  355. LogHelper.WriteToLog($"考勤WebSocket 已连接:{url}");
  356. await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(subscribeJson)), WebSocketMessageType.Text, true, CancellationToken.None);
  357. byte[] buffer = new byte[1024 * 8];
  358. while (_webSocket.State == WebSocketState.Open)
  359. {
  360. WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
  361. if (result.MessageType == WebSocketMessageType.Text)
  362. {
  363. string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
  364. LogHelper.WriteToLog($"收到: {message}");
  365. var json = JsonConvert.DeserializeObject<JObject>(message);
  366. if (json["type"].ToString() == "msg")
  367. {
  368. var body = json["body"];
  369. if (body != null)
  370. {
  371. //TODO 由于未知道数据格式暂不写处理
  372. }
  373. }
  374. }
  375. else if (result.MessageType == WebSocketMessageType.Close)
  376. {
  377. LogHelper.WriteToLog("WebSocket 连接已被服务器关闭");
  378. break;
  379. }
  380. }
  381. }
  382. private async Task GetWebSocketData(string url, string json)
  383. {
  384. ClientWebSocket _webSocket = new ClientWebSocket();
  385. while (!_cancellationTokenSource.IsCancellationRequested)
  386. {
  387. try
  388. {
  389. await _webSocket.ConnectAsync(new Uri(url), CancellationToken.None);
  390. LogHelper.WriteToLog("WebSocket 已连接");
  391. _isConnected = true;
  392. //订阅预警消息
  393. await _webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(json)), WebSocketMessageType.Text, true, CancellationToken.None);
  394. //_pingTimer = new Timer(async (state) =>
  395. //{
  396. // if (_webSocket.State == WebSocketState.Open)
  397. // {
  398. // try
  399. // {
  400. // LogHelper.WriteToLog("发送 ping...");
  401. // byte[] emptyPingMessage = Array.Empty<byte>();
  402. // await _webSocket.SendAsync(new ArraySegment<byte>(emptyPingMessage), WebSocketMessageType.Binary, true, CancellationToken.None);
  403. // //byte[] pingMessage = Encoding.UTF8.GetBytes("pong");
  404. // //await _webSocket.SendAsync(new ArraySegment<byte>(pingMessage), WebSocketMessageType.Text, true, CancellationToken.None);
  405. // }
  406. // catch (Exception ex)
  407. // {
  408. // LogHelper.WriteToLog($"发送ping失败: {ex.Message}");
  409. // }
  410. // }
  411. //}, null, TimeSpan.Zero, TimeSpan.FromSeconds(30));
  412. await ReceiveLoop(_webSocket);
  413. _isConnected = false;
  414. LogHelper.WriteToLog("WebSocket 连接已关闭,正在重连。。。");
  415. }
  416. catch (Exception ex)
  417. {
  418. LogHelper.WriteToLog($"WebSocket连接失败: {ex.Message}");
  419. }
  420. await Task.Delay(GetReconnectInterval());
  421. }
  422. }
  423. private async Task ReceiveLoop(ClientWebSocket _webSocket)
  424. {
  425. byte[] buffer = new byte[1024];
  426. while (_webSocket.State == WebSocketState.Open)
  427. {
  428. WebSocketReceiveResult result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
  429. if (result.MessageType == WebSocketMessageType.Text)
  430. {
  431. string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
  432. LogHelper.WriteToLog($"收到: {message}");
  433. var json= JsonConvert.DeserializeObject<JObject>(message);
  434. if (json["type"].ToString()=="msg")
  435. {
  436. var body = json["body"];
  437. if (body!=null)
  438. {
  439. }
  440. }
  441. }
  442. else if (result.MessageType == WebSocketMessageType.Close)
  443. {
  444. LogHelper.WriteToLog("WebSocket 连接已被服务器关闭");
  445. break;
  446. }
  447. }
  448. }
  449. private TimeSpan GetReconnectInterval()
  450. {
  451. // You can implement your own logic for calculating reconnect interval, e.g., exponential backoff
  452. return TimeSpan.FromSeconds(10); // Example: Attempt to reconnect every 10 seconds
  453. }
  454. public async Task DisconnectAsync()
  455. {
  456. foreach (var clientWebSocket in webScokets.Where(clientWebSocket => clientWebSocket.State == WebSocketState.Open))
  457. {
  458. await clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
  459. }
  460. }
  461. private string GetWebSocketUrl(string topic, string consumerGroup)
  462. {
  463. var serverAddr = "";
  464. var setting = App.GetOptionsMonitor<AppInfoOptions>();
  465. var list = $"{setting.SXAPIURL}/emitter/connection/get"
  466. .SetBody(new
  467. {
  468. token = GetToken(),
  469. topic ,
  470. consumerGroup,
  471. })
  472. .SetContentType("application/json")
  473. .PostAsAsync<string>().Result;
  474. var model = JsonConvert.DeserializeObject<JObject>(list);
  475. if (model["data"]["serverAddr"]!=null)
  476. {
  477. serverAddr = model["data"]["serverAddr"].ToString();
  478. }
  479. return serverAddr;
  480. }
  481. //public async Task<bool> GetData()
  482. //{
  483. // //var setting = App.GetOptionsMonitor<AppInfoOptions>();
  484. // //var timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  485. // ////订阅预警消息
  486. // //var warnUrl = GetWebSocketUrl(AuthConstants.SXALARM, AuthConstants.SXALARM_Grpup);
  487. // //GetWebSocketData(warnUrl,
  488. // // JsonConvert.SerializeObject(new
  489. // // {
  490. // // token = GetToken(), type = "subscribe", timestamp,
  491. // // payload = new { topic = AuthConstants.SXALARM, consumerGroup= AuthConstants.SXALARM_Grpup, tags= setting.TenantCode }
  492. // // }));
  493. // //订阅预警更新消息
  494. // await SubscribeAlarm();
  495. // //订阅点名事件
  496. // //await SubscriberRoomCall();
  497. // return true;
  498. //}
  499. /// <summary>
  500. /// 时间戳转本时区日期时间
  501. /// </summary>
  502. /// <param name="timeStamp"></param>
  503. /// <returns></returns>
  504. public static DateTime TimestampToDateTime(string timeStamp)
  505. {
  506. DateTimeOffset utcDateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(timeStamp));
  507. return utcDateTimeOffset.LocalDateTime;
  508. }
  509. }