using Application.Domain.Entities; using Application.Models; using Infrastructure.Data; using Infrastructure.Extensions; using Infrastructure.Web.SignalR; using IoT.Shared.Services; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Vibrant.InfluxDB.Client; using Vibrant.InfluxDB.Client.Rows; namespace IoTCenter.Services { public class IoTCenterHub : BasePageHub { private readonly IConfiguration _cfg; private readonly IRepository _nodeRepo; private readonly IRepository _deviceRepo; private readonly IRepository _dataRepo; private readonly IRepository _apiRepo; private readonly IRepository _commandRepo; private readonly IRepository _categoryRepo; private readonly IRepository _productRepo; private readonly IRepository _sceneRepo; private readonly IRepository _sceneCommandRepo; private readonly IRepository _iotTimerRepo; private readonly IRepository _iotTiggerRepo; private readonly DataService _dataService; public IoTCenterHub(IConfiguration cfg, IRepository nodeRepo, IRepository deviceRepo, IRepository dataRepo, IRepository apiRepo, IRepository commandRepo, IRepository categoryRepo, IRepository productRepo, IRepository sceneRepo, IRepository sceneCommandRepo, IRepository iotTimerRepo, IRepository iotTiggerRepo, DataService dataService) { this._cfg = cfg; this._nodeRepo = nodeRepo; this._deviceRepo = deviceRepo; this._dataRepo = dataRepo; this._apiRepo = apiRepo; this._commandRepo = commandRepo; this._categoryRepo = categoryRepo; this._productRepo = productRepo; this._sceneRepo = sceneRepo; this._sceneCommandRepo = sceneCommandRepo; this._iotTimerRepo = iotTimerRepo; this._iotTiggerRepo = iotTiggerRepo; this._dataService = dataService; } public override void ClientToServer(string method, string message, string connectionId) { Console.WriteLine($"iot center> receive message {method} from {connectionId}"); try { //节点上传 if (method == Methods.UpdateNodeResponse)//接收节点 { this._dataService.Update(message); } else if (method == Methods.UpdateProductResponse)//接收产品、接口和参数 { this._dataService.UpdateProduct(message); } else if (method == Methods.UpdateDeviceIdListResponse)//接收设备Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == connectionId); } else if (method == Methods.UpdateDeviceResponse)//接收设备和数据 { this._dataService.UpdateDevice(message); } else if (method == Methods.UpdateCommandIdListResponse)//接收命令Id列表 { this._dataService.UpdateList(message, o => o.Device.Node.Number == connectionId); } else if (method == Methods.UpdateCommandResponse)//接收命令 { this._dataService.Update(message); } else if (method == Methods.UpdateSceneIdListResponse)//接收场景Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == connectionId); } else if (method == Methods.UpdateSceneResponse)//接收场景 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTiggerIdListResponse)//接收定时器Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == connectionId); } else if (method == Methods.UpdateSceneResponse)//接收定时器 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTiggerIdListResponse)//接收触发器Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == connectionId); } else if (method == Methods.UpdateIoTTiggerResponse)//接收触发器 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTiggerIdListResponse)//接收场景命令Id列表 { this._dataService.UpdateList(message, o => o.Scene.Node.Number == connectionId); } else if (method == Methods.UpdateIoTTiggerResponse)//接收场景命令 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTiggerIdListResponse)//接收定时器命令Id列表 { this._dataService.UpdateList(message, o => o.IoTTimer.Node.Number == connectionId); } else if (method == Methods.UpdateIoTTimerCommandResponse)//接收定时器命令 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTiggerIdListResponse)//接收触发器命令Id列表 { this._dataService.UpdateList(message, o => o.IoTTigger.Node.Number == connectionId); } else if (method == Methods.UpdateIoTTiggerResponse)//接收触发器命令 { this._dataService.Update(message); } //后台编辑 else if (method == Methods.EditNodeResponse) { var model = message.FromJson(); this._dataService.Edit(model); // this.Clients.Group("page").SendAsync("UpdateNode", model.ToJson()); } else if (method == Methods.EditDeviceResponse) { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == Methods.DeleteDeviceResponse) { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == Methods.EditDataResponse)//后台编辑或设备上报 { var model = message.FromJson(); this._dataService.Edit(model); // var device = _deviceRepo.ReadOnlyTable().Include(o => o.Data).Where(o => o.Id == model.DeviceId).FirstOrDefault(); this.Clients.Group("page").SendAsync("UpdateDevice", device.ToJson()); } else if (method == Methods.DeleteDataResponse) { var model = message.FromJson(); this._dataService.Delete(model); } /////// else if (method == Methods.EditSceneResponse) { this.UpdateScene(message); } else if (method == Methods.DeleteSceneResponse) { this.DeleteScene(message); } else if (method == Methods.EditCommandResponse) { this.UpdateCommand(message); } else if (method == Methods.DeleteCommandResponse) { this.DeleteCommand(message); } else if (method == Methods.EditSceneCommandResponse) { this.UpdateSceneCommand(message); } else if (method == Methods.DeleteSceneCommandResponse) { this.DeleteSceneCommand(message); } else if (method == Methods.EditIoTTimerResponse) { this.UpdateIoTTimer(message); } else if (method == Methods.DeleteIoTTimerResponse) { this.DeleteIoTTimer(message); } else if (method == Methods.EditIoTTiggerResponse) { this.UpdateIoTTigger(message); } else if (method == Methods.DeleteIoTTiggerResponse) { this.DeleteIoTTigger(message); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateScene(string message) { try { Console.WriteLine("iot center> receive edit scene message"); var model = message.FromJson(); var scene = _sceneRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (scene == null) { scene = new Scene { Id = model.Id }; _sceneRepo.Add(scene); } scene.FromDto(model); scene.NodeId = _nodeRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == model.NodeNumber).Id; _sceneRepo.SaveChanges(); this.Clients.Group("page").SendAsync("UpdateScene", message); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteScene(string message) { try { Console.WriteLine("iot center> receive delete scene message"); var model = message.FromJson(); var scene = _sceneRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (scene != null) { _sceneRepo.Delete(scene); _sceneRepo.SaveChanges(); this.Clients.Group("page").SendAsync("DeleteScene", message); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateCommand(string message) { try { Console.WriteLine("iot center> receive edit command message"); var model = message.FromJson(); var device = _deviceRepo.ReadOnlyTable().Where(o => o.Number == model.DeviceNumber).FirstOrDefault(); var api = _apiRepo.ReadOnlyTable().Where(o => o.ProductId == device.ProductId && o.Name == model.ApiName).FirstOrDefault(); var command = _commandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (command == null) { command = new Command { Id = model.Id }; _commandRepo.Add(command); } command.FromDto(model); command.DeviceId = device.Id; command.ApiId = api.Id; _commandRepo.SaveChanges(); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteCommand(string message) { try { Console.WriteLine("iot center> receive delete scene message"); var model = message.FromJson(); var command = _commandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (command != null) { _commandRepo.Delete(command); _commandRepo.SaveChanges(); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateSceneCommand(string message) { try { Console.WriteLine("iot center> receive edit scene command message"); var model = message.FromJson(); var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand == null) { sceneCommand = new SceneCommand { Id = model.Id }; _sceneCommandRepo.Add(sceneCommand); } sceneCommand.SceneId = model.SceneId.Value; sceneCommand.CommandId = model.CommandId.Value; _sceneCommandRepo.SaveChanges(); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteSceneCommand(string message) { try { Console.WriteLine("iot center> receive delete scene command message"); var model = message.FromJson(); var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand != null) { _sceneCommandRepo.Delete(sceneCommand); _sceneCommandRepo.SaveChanges(); } } catch (Exception ex) { ex.PrintStack(); } } private void UpdateIoTTimer(string message) { try { Console.WriteLine("iot center> receive edit iot timer message"); var model = message.FromJson(); var iotTimer = _iotTimerRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (iotTimer == null) { iotTimer = new IoTTimer { Id = model.Id }; _iotTimerRepo.Add(iotTimer); } iotTimer.FromDto(model); iotTimer.NodeId = _nodeRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == model.NodeNumber).Id; _iotTimerRepo.SaveChanges(); } catch (Exception ex) { ex.PrintStack(); } } private void DeleteIoTTimer(string message) { try { Console.WriteLine("iot center> receive delete iot timer message"); var model = message.FromJson(); var iotTimer = _iotTimerRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (iotTimer != null) { _iotTimerRepo.Delete(iotTimer); _iotTimerRepo.SaveChanges(); } } catch (Exception ex) { ex.PrintStack(); } } private void UpdateIoTTigger(string message) { try { Console.WriteLine("iot center> receive edit iot tigger message"); var model = message.FromJson(); var iotTigger = _iotTiggerRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (iotTigger == null) { iotTigger = new IoTTigger { Id = model.Id }; _iotTiggerRepo.Add(iotTigger); } iotTigger.FromDto(model); iotTigger.NodeId = _nodeRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == model.NodeNumber).Id; iotTigger.DataId = _dataRepo.ReadOnlyTable().FirstOrDefault(o => o.Device.Number == model.DeviceNumber && o.Key == model.DataKey).Id; _iotTiggerRepo.SaveChanges(); } catch (Exception ex) { ex.PrintStack(); } } private void DeleteIoTTigger(string message) { try { Console.WriteLine("iot center> receive delete iot tigger message"); var model = message.FromJson(); var iotTigger = _iotTiggerRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (iotTigger != null) { _iotTiggerRepo.Delete(iotTigger); _iotTiggerRepo.SaveChanges(); } } catch (Exception ex) { ex.PrintStack(); } } public void ApiCallback(string message, string connectionId) { if (!string.IsNullOrEmpty(connectionId)) { this.ServerToClient(connectionId, Methods.ApiCallback, message); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private async Task LogToInfluxdbAsync(EditDataModel model, string deviceNumber) { if (string.IsNullOrEmpty(model.Value)) { return; } if (model.Type != DeviceDataType.Int && model.Type != DeviceDataType.Float) { return; } try { var url = _cfg["influxdb:url"]; var usr = _cfg["influxdb:usr"]; var pwd = _cfg["influxdb:pwd"]; var dbName = "iot"; var measurementName = "data"; using var client = new InfluxClient(new Uri(url), usr, pwd); await client.CreateDatabaseAsync(dbName); var row = new DynamicInfluxRow { Timestamp = DateTime.UtcNow }; row.Fields.Add("DeviceNumber", deviceNumber); row.Fields.Add("DeviceName", model.Name); row.Fields.Add(model.Key, this.GetDataValue(model)); await client.WriteAsync(dbName, measurementName, new List { row }); } catch (Exception ex) { ex.PrintStack(); } } private object GetDataValue(EditDataModel model) { return model.Type switch { DeviceDataType.Int => Convert.ToInt32(model.Value), DeviceDataType.Float => Convert.ToSingle(model.Value), _ => model.Value, }; } } }