using Application.Domain.Entities; using Application.Models; using Infrastructure.Data; using Infrastructure.Events; using Infrastructure.Extensions; using Infrastructure.Web.SignalR; using IoT.Shared.Services; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Configuration; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Vibrant.InfluxDB.Client; using Vibrant.InfluxDB.Client.Rows; namespace IoTCenter.Services { public class IoTCenterHub : BasePageHub { private readonly IConfiguration _cfg; private readonly IEventPublisher _eventPublisher; private readonly IRepository _nodeRepo; private readonly IRepository _deviceRepo; private readonly IRepository _dataRepo; private readonly IRepository _apiRepo; private readonly IRepository _commandRepo; private readonly IRepository _categoryRepo; private readonly IRepository _productRepo; private readonly IRepository _sceneRepo; private readonly IRepository _sceneCommandRepo; private readonly IRepository _iotTimerRepo; private readonly IRepository _iotTiggerRepo; private readonly DataService _dataService; public IoTCenterHub(IConfiguration cfg, IEventPublisher eventPublisher, IRepository nodeRepo, IRepository deviceRepo, IRepository dataRepo, IRepository apiRepo, IRepository commandRepo, IRepository categoryRepo, IRepository productRepo, IRepository sceneRepo, IRepository sceneCommandRepo, IRepository iotTimerRepo, IRepository iotTiggerRepo, DataService dataService) { this._cfg = cfg; this._eventPublisher = eventPublisher; this._nodeRepo = nodeRepo; this._deviceRepo = deviceRepo; this._dataRepo = dataRepo; this._apiRepo = apiRepo; this._commandRepo = commandRepo; this._categoryRepo = categoryRepo; this._productRepo = productRepo; this._sceneRepo = sceneRepo; this._sceneCommandRepo = sceneCommandRepo; this._iotTimerRepo = iotTimerRepo; this._iotTiggerRepo = iotTiggerRepo; this._dataService = dataService; } public void ServerToClient(string method, string message, string toClient, string fromClient = null) { Clients.Group(toClient).SendAsync(Methods.ServerToClient, method, message, toClient, fromClient); } public void ClientToServer(string method, string message, string to, string from = null) { Console.WriteLine($"iot center> receive message {method} from {from}"); try { //节点上传 if (method == Methods.UpdateNodeResponse)//接收节点 { this._dataService.Update(message); } else if (method == Methods.UpdateProductResponse)//接收产品 { this._dataService.Update(message); } else if (method == Methods.UpdateApiResponse)//接收接口 { this._dataService.Update(message); } else if (method == Methods.UpdateParameterResponse)//接收参数 { this._dataService.Update(message); } else if (method == Methods.UpdateDeviceIdListResponse)//接收设备Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == from); } else if (method == Methods.UpdateDeviceResponse)//接收设备 { this._dataService.Update(message); } else if (method == Methods.UpdateDataResponse)//接收数据 { this._dataService.Update(message); } else if (method == Methods.UpdateCommandIdListResponse)//接收命令Id列表 { this._dataService.UpdateList(message, o => o.Device.Node.Number == from); } else if (method == Methods.UpdateCommandResponse)//接收命令 { this._dataService.Update(message); } else if (method == Methods.UpdateSceneIdListResponse)//接收场景Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == from); } else if (method == Methods.UpdateSceneResponse)//接收场景 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTimerIdListResponse)//接收定时器Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == from); } else if (method == Methods.UpdateIoTTimerResponse)//接收定时器 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTiggerIdListResponse)//接收触发器Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == from); } else if (method == Methods.UpdateIoTTiggerResponse)//接收触发器 { this._dataService.Update(message); } else if (method == Methods.UpdateSceneCommandIdListResponse)//接收场景命令Id列表 { this._dataService.UpdateList(message, o => o.Scene.Node.Number == from); } else if (method == Methods.UpdateSceneCommandResponse)//接收场景命令 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTimerCommandIdListResponse)//接收定时器命令Id列表 { this._dataService.UpdateList(message, o => o.IoTTimer.Node.Number == from); } else if (method == Methods.UpdateIoTTimerCommandResponse)//接收定时器命令 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTiggerCommandIdListResponse)//接收触发器命令Id列表 { this._dataService.UpdateList(message, o => o.IoTTigger.Node.Number == from); } else if (method == Methods.UpdateIoTTiggerCommandResponse)//接收触发器命令 { this._dataService.Update(message); } //后台编辑 else if (method == $"Edit{nameof(Node)}")//编辑节点返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Edit{nameof(Device)}")//编辑设备返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(Device)}")//删除设备返回 { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(Data)}")//编辑数据或设备上报数据 { var model = message.FromJson(); this._dataService.Edit(model); } //else if (method == $"Edit{nameof(Data)}List") //{ // var list = message.FromJson>(); // foreach (var model in list) // { // this._dataService.Edit(model); // } //} else if (method == $"Delete{nameof(Data)}")//删除数据返回 { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(Command)}")//编辑命令返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(Command)}")//删除命令返回 { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(Scene)}")//编辑场景返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(Scene)}")//删除场景返回 { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(IoTTimer)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(IoTTimer)}") { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(IoTTigger)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(IoTTigger)}") { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(SceneCommand)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(SceneCommand)}") { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(IoTTimerCommand)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(IoTTimerCommand)}") { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(IoTTiggerCommand)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(IoTTiggerCommand)}") { var model = message.FromJson(); this._dataService.Delete(model); } // else if (method == Methods.ExecApiResponse) { this.ServerToClient(method, message, to, from); } else if (method == Methods.ExecSceneRsponse) { this.ServerToClient(method, message, to, from); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateSceneCommand(string message) { try { Console.WriteLine("iot center> receive edit scene command message"); var model = message.FromJson(); var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand == null) { sceneCommand = new SceneCommand { Id = model.Id }; _sceneCommandRepo.Add(sceneCommand); } sceneCommand.SceneId = model.SceneId.Value; sceneCommand.CommandId = model.CommandId.Value; _sceneCommandRepo.SaveChanges(); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteSceneCommand(string message) { try { Console.WriteLine("iot center> receive delete scene command message"); var model = message.FromJson(); var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand != null) { _sceneCommandRepo.Delete(sceneCommand); _sceneCommandRepo.SaveChanges(); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] public async Task LogToInfluxdbAsync(Data data) { if (string.IsNullOrEmpty(data.Value)) { return; } if (data.Type != DeviceDataType.Int && data.Type != DeviceDataType.Float) { return; } try { var url = _cfg["influxdb:url"]; var usr = _cfg["influxdb:usr"]; var pwd = _cfg["influxdb:pwd"]; var dbName = "iot"; var measurementName = "data"; using var client = new InfluxClient(new Uri(url), usr, pwd); await client.CreateDatabaseAsync(dbName); var row = new DynamicInfluxRow { Timestamp = DateTime.UtcNow }; var device = this._deviceRepo.ReadOnlyTable().FirstOrDefault(o => o.Id == data.DeviceId); row.Fields.Add("DeviceNumber", device.Number); row.Fields.Add("DeviceName", device.Name); row.Fields.Add(data.Key, this.GetDataValue(data)); await client.WriteAsync(dbName, measurementName, new List { row }); } catch (Exception ex) { ex.PrintStack(); } } private object GetDataValue(Data model) { return model.Type switch { DeviceDataType.Int => Convert.ToInt32(model.Value), DeviceDataType.Float => Convert.ToSingle(model.Value), _ => model.Value, }; } } }