using Application.Domain.Entities; using Application.Models; using Infrastructure.Data; using Infrastructure.Extensions; using Infrastructure.Web.SignalR; using IoT.Shared.Infrastructure; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Vibrant.InfluxDB.Client; using Vibrant.InfluxDB.Client.Rows; namespace IoTCenter.Services { public class PageHub : BasePageHub { private readonly IConfiguration _cfg; private readonly IRepository _nodeRepo; private readonly IRepository _deviceRepo; private readonly IRepository _dataRepo; private readonly IRepository _apiRepo; private readonly IRepository _commandRepo; private readonly IRepository _categoryRepo; private readonly IRepository _productRepo; private readonly IRepository _sceneRepo; private readonly IRepository _sceneCommandRepo; private readonly IRepository _timerRepo; private readonly IRepository _tiggerRepo; public PageHub(IConfiguration cfg, IRepository nodeRepo, IRepository deviceRepo, IRepository dataRepo, IRepository apiRepo, IRepository commandRepo, IRepository categoryRepo, IRepository productRepo, IRepository sceneRepo, IRepository sceneCommandRepo, IRepository timerRepo, IRepository tiggerRepo) { this._cfg = cfg; this._nodeRepo = nodeRepo; this._deviceRepo = deviceRepo; this._dataRepo = dataRepo; this._apiRepo = apiRepo; this._commandRepo = commandRepo; this._categoryRepo = categoryRepo; this._productRepo = productRepo; this._sceneRepo = sceneRepo; this._sceneCommandRepo = sceneCommandRepo; this._timerRepo = timerRepo; this._tiggerRepo = tiggerRepo; } public override void ClientToServer(string method, string message, string connectionId) { Console.WriteLine($"iot center> receive message from {connectionId}"); if (method == Methods.HealthCheckResponse) { var node = _nodeRepo.Table().FirstOrDefault(o => o.Number == message); node.IsOnline = true; _nodeRepo.SaveChanges(); } else if (method == Methods.RefreshDeviceListResponse) { this.RefreshDeviceList(message, connectionId); } else if (method == Methods.RefreshSceneListResponse) { this.RefreshSceneList(message, connectionId); } else if (method == Methods.RefreshCommandListResponse) { this.RefreshCommandList(message, connectionId); } else if (method == Methods.EditNodeResponse)//上报节点 { this.UpdateNode(message); } else if (method == Methods.GetProductResponse)//上报产品 { this.UpdateProduct(message); } else if (method == Methods.EditDeviceResponse)//上报设备 { this.UpdateDevice(message); } else if (method == Methods.EditDataResponse)//上报数据 { this.UpdateData(message); } // else if (method == Methods.EditDeviceResponse) { this.UpdateDevice(message); } else if (method == Methods.DeleteDeviceResponse) { this.DeleteDevice(message); } else if (method == Methods.DeleteDataResponse) { this.DeleteData(message); } else if (method == Methods.EditSceneResponse) { this.UpdateScene(message); } else if (method == Methods.DeleteSceneResponse) { this.DeleteScene(message); } else if (method == Methods.EditCommandResponse) { this.UpdateCommand(message); } else if (method == Methods.DeleteCommandResponse) { this.DeleteCommand(message); } else if (method == Methods.EditSceneCommandResponse) { this.UpdateSceneCommand(message); } else if (method == Methods.DeleteSceneCommandResponse) { this.DeleteSceneCommand(message); } } private void RefreshDeviceList(string message, string connectionId) { var list = message.FromJson>(); var devices = _deviceRepo.Table().Where(o => o.Node.Number == connectionId && !list.Contains(o.Number)).ToList(); foreach (var device in devices) { _deviceRepo.Delete(device); } _deviceRepo.SaveChanges(); } private void RefreshSceneList(string message, string connectionId) { var list = message.FromJson>(); var scenes = _sceneRepo.Table().Where(o => o.Node.Number == connectionId && !list.Contains(o.Id)).ToList(); foreach (var scene in scenes) { _sceneRepo.Delete(scene); } _sceneRepo.SaveChanges(); } private void RefreshCommandList(string message, string connectionId) { var list = message.FromJson>(); var commands = _commandRepo.Table().Where(o => o.Device.Node.Number == connectionId && !list.Contains(o.Id)).ToList(); foreach (var command in commands) { _commandRepo.Delete(command); } _commandRepo.SaveChanges(); } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateNode(string message) { try { Console.WriteLine("iot center> receive node message"); var nodeDto = message.FromJson(); var node = _nodeRepo.Table().FirstOrDefault(o => o.Number == nodeDto.Number); if (node == null) { node = new Node(); _nodeRepo.Add(node); } node.FromDto(nodeDto); _nodeRepo.SaveChanges(); this.Clients.Group("page").SendAsync("UpdateNode", message); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateProduct(string message) { try { Console.WriteLine("iot center> receive product message"); var productDto = message.FromJson(); var category = _categoryRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == productDto.CategoryNumber); var product = _productRepo.Table().FirstOrDefault(o => o.Number == productDto.Number); if (product == null) { product = new Product().FromDto(productDto); product.CategoryId = category.Id; _productRepo.Add(product); OpenApiService.UpdateApi(product); _productRepo.SaveChanges(); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateDevice(string message) { try { Console.WriteLine("iot center> receive device message"); var deviceDto = message.FromJson(); var product = _productRepo.Table().FirstOrDefault(o => o.Number == deviceDto.ProductNumber); if (product == null) { this.ServerToClient(deviceDto.ConnectId, "GetDeviceInfo", deviceDto.ProductNumber); throw new Exception("need device info"); } var node = _nodeRepo.Table().FirstOrDefault(o => o.Number == deviceDto.NodeNumber); if (node == null) { node = new Node { Number = deviceDto.NodeNumber, Name = deviceDto.NodeNumber }; _nodeRepo.Add(node); _nodeRepo.SaveChanges(); } var device = _deviceRepo.Table().FirstOrDefault(o => o.Number == deviceDto.Number); if (device == null) { device = new Device(); device.ProductId = product.Id; device.NodeId = node.Id; _deviceRepo.Add(device); } device.FromDto(deviceDto); _deviceRepo.SaveChanges(); var device2 = _deviceRepo.ReadOnlyTable().Include(o => o.Data).FirstOrDefault(o => o.Number == deviceDto.Number); this.Clients.Group("page").SendAsync("UpdateDevice", device2.ToJson()); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteDevice(string message) { try { Console.WriteLine("iot center> receive device message"); var model = message.FromJson(); var device = _deviceRepo.Table().FirstOrDefault(o => o.Number == model.Number); if (device != null) { _deviceRepo.Delete(device); _deviceRepo.SaveChanges(); this.Clients.Group("page").SendAsync("DeleteDevice", model.Number); } } catch (Exception ex) { ex.PrintStack(); } } private void UpdateData(string message) { Console.WriteLine("iot center> receive data message"); var dataDtoList = message.FromJson>(); if (dataDtoList.Count > 0) { var number = dataDtoList.FirstOrDefault().DeviceNumber; var device = _deviceRepo.Table().Include(o => o.Data).FirstOrDefault(o => o.Number == number); if (device != null) { foreach (var dataDto in dataDtoList) { var data = device.Data.FirstOrDefault(o => o.Key == dataDto.Key); if (data == null) { data = new Data(); device.Data.Add(data); } data.FromDto(dataDto); } _deviceRepo.SaveChanges(); this.Clients.Group("page").SendAsync("UpdateDevice", device.ToJson()); foreach (var dataDto in dataDtoList) { this.LogToInfluxdbAsync(dataDto).Wait(0); } } } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteData(string message) { try { Console.WriteLine("iot center> receive delete data message"); var model = message.FromJson(); var data = _dataRepo.Table().FirstOrDefault(o => o.Device.Number == model.DeviceNumber && o.Key == model.Key); if (data != null) { _dataRepo.Delete(data); _dataRepo.SaveChanges(); this.Clients.Group("page").SendAsync("DeleteData", message); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateScene(string message) { try { Console.WriteLine("iot center> receive edit scene message"); var model = message.FromJson(); var scene = _sceneRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (scene == null) { scene = new Scene { Id = model.Id }; _sceneRepo.Add(scene); } scene.FromDto(model); scene.NodeId = _nodeRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == model.NodeNumber).Id; _sceneRepo.SaveChanges(); this.Clients.Group("page").SendAsync("UpdateScene", message); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteScene(string message) { try { Console.WriteLine("iot center> receive delete scene message"); var model = message.FromJson(); var scene = _sceneRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (scene != null) { _sceneRepo.Delete(scene); _sceneRepo.SaveChanges(); this.Clients.Group("page").SendAsync("DeleteScene", message); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateCommand(string message) { try { Console.WriteLine("iot center> receive edit command message"); var model = message.FromJson(); var device = _deviceRepo.ReadOnlyTable().Where(o => o.Number == model.DeviceNumber).FirstOrDefault(); var api = _apiRepo.ReadOnlyTable().Where(o => o.ProductId == device.ProductId && o.Name == model.ApiName).FirstOrDefault(); var command = _commandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (command == null) { command = new Command { Id = model.Id }; _commandRepo.Add(command); } command.FromDto(model); command.DeviceId = device.Id; command.ApiId = api.Id; _commandRepo.SaveChanges(); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteCommand(string message) { try { Console.WriteLine("iot center> receive delete scene message"); var model = message.FromJson(); var command = _commandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (command != null) { _commandRepo.Delete(command); _commandRepo.SaveChanges(); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateSceneCommand(string message) { try { Console.WriteLine("iot center> receive edit scene command message"); var model = message.FromJson(); var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand == null) { sceneCommand = new SceneCommand { Id = model.Id }; _sceneCommandRepo.Add(sceneCommand); } sceneCommand.SceneId = model.SceneId.Value; sceneCommand.CommandId = model.CommandId.Value; _sceneCommandRepo.SaveChanges(); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteSceneCommand(string message) { try { Console.WriteLine("iot center> receive delete scene command message"); var model = message.FromJson(); var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand != null) { _sceneCommandRepo.Delete(sceneCommand); _sceneCommandRepo.SaveChanges(); } } catch (Exception ex) { ex.PrintStack(); } } public void ApiCallback(string message, string connectionId) { if (!string.IsNullOrEmpty(connectionId)) { this.ServerToClient(connectionId, Methods.ApiCallback, message); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private async Task LogToInfluxdbAsync(EditDataModel model) { if (string.IsNullOrEmpty(model.Value)) { return; } if (model.Type != DeviceDataType.Int && model.Type != DeviceDataType.Float) { return; } try { var url = _cfg["influxdb:url"]; var usr = _cfg["influxdb:usr"]; var pwd = _cfg["influxdb:pwd"]; var dbName = "iot"; var measurementName = "data"; using var client = new InfluxClient(new Uri(url), usr, pwd); await client.CreateDatabaseAsync(dbName); var row = new DynamicInfluxRow { Timestamp = DateTime.UtcNow }; row.Fields.Add("DeviceNumber", model.DeviceNumber); row.Fields.Add("DeviceName", model.Name); row.Fields.Add(model.Key, this.GetDataValue(model)); await client.WriteAsync(dbName, measurementName, new List { row }); } catch (Exception ex) { ex.PrintStack(); } } private object GetDataValue(EditDataModel model) { return model.Type switch { DeviceDataType.Int => Convert.ToInt32(model.Value), DeviceDataType.Float => Convert.ToSingle(model.Value), _ => model.Value, }; } } }