using Application.Domain.Entities; using Application.Models; using Infrastructure.Data; using Infrastructure.Events; using Infrastructure.Extensions; using Infrastructure.Web.SignalR; using IoT.Shared.Services; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Threading.Tasks; using Vibrant.InfluxDB.Client; using Vibrant.InfluxDB.Client.Rows; namespace IoTCenter.Services { public class IoTCenterHub : BasePageHub { private readonly IConfiguration _cfg; private readonly IEventPublisher _eventPublisher; private readonly IRepository _nodeRepo; private readonly IRepository _deviceRepo; private readonly IRepository _dataRepo; private readonly IRepository _apiRepo; private readonly IRepository _commandRepo; private readonly IRepository _categoryRepo; private readonly IRepository _productRepo; private readonly IRepository _sceneRepo; private readonly IRepository _sceneCommandRepo; private readonly IRepository _iotTimerRepo; private readonly IRepository _iotTiggerRepo; private readonly DataService _dataService; private readonly IHttpClientFactory _httpClientFactory; public IoTCenterHub(IConfiguration cfg, IEventPublisher eventPublisher, IRepository nodeRepo, IRepository deviceRepo, IRepository dataRepo, IRepository apiRepo, IRepository commandRepo, IRepository categoryRepo, IRepository productRepo, IRepository sceneRepo, IRepository sceneCommandRepo, IRepository iotTimerRepo, IRepository iotTiggerRepo, DataService dataService, IHttpClientFactory httpClientFactory) { this._cfg = cfg; this._eventPublisher = eventPublisher; this._nodeRepo = nodeRepo; this._deviceRepo = deviceRepo; this._dataRepo = dataRepo; this._apiRepo = apiRepo; this._commandRepo = commandRepo; this._categoryRepo = categoryRepo; this._productRepo = productRepo; this._sceneRepo = sceneRepo; this._sceneCommandRepo = sceneCommandRepo; this._iotTimerRepo = iotTimerRepo; this._iotTiggerRepo = iotTiggerRepo; this._dataService = dataService; this._httpClientFactory = httpClientFactory; } public void ServerToClient(string method, string message, string toClient, string fromClient = null) { Clients.Group(toClient).SendAsync(Methods.ServerToClient, method, message, toClient, fromClient); } public void ClientToServer(string method, string message, string to, string from = null) { Console.WriteLine($"iot center> receive message {method} from {from}"); try { //节点上传 if (method == Methods.UpdateNodeResponse)//接收节点 { this._dataService.Update(message); } else if (method == Methods.UpdateProductResponse)//接收产品 { this._dataService.Update(message); } else if (method == Methods.UpdateApiResponse)//接收接口 { this._dataService.Update(message); } else if (method == Methods.UpdateParameterResponse)//接收参数 { this._dataService.Update(message); } else if (method == Methods.UpdateDeviceIdListResponse)//接收设备Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == from); } else if (method == Methods.UpdateDeviceResponse)//接收设备 { this._dataService.Update(message); } else if (method == Methods.UpdateDataResponse)//接收数据 { this._dataService.Update(message); } else if (method == Methods.UpdateCommandIdListResponse)//接收命令Id列表 { this._dataService.UpdateList(message, o => o.Device.Node.Number == from); } else if (method == Methods.UpdateCommandResponse)//接收命令 { this._dataService.Update(message); } else if (method == Methods.UpdateSceneIdListResponse)//接收场景Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == from); } else if (method == Methods.UpdateSceneResponse)//接收场景 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTimerIdListResponse)//接收定时器Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == from); } else if (method == Methods.UpdateIoTTimerResponse)//接收定时器 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTiggerIdListResponse)//接收触发器Id列表 { this._dataService.UpdateList(message, o => o.Node.Number == from); } else if (method == Methods.UpdateIoTTiggerResponse)//接收触发器 { this._dataService.Update(message); } else if (method == Methods.UpdateSceneCommandIdListResponse)//接收场景命令Id列表 { this._dataService.UpdateList(message, o => o.Scene.Node.Number == from); } else if (method == Methods.UpdateSceneCommandResponse)//接收场景命令 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTimerCommandIdListResponse)//接收定时器命令Id列表 { this._dataService.UpdateList(message, o => o.IoTTimer.Node.Number == from); } else if (method == Methods.UpdateIoTTimerCommandResponse)//接收定时器命令 { this._dataService.Update(message); } else if (method == Methods.UpdateIoTTiggerCommandIdListResponse)//接收触发器命令Id列表 { this._dataService.UpdateList(message, o => o.IoTTigger.Node.Number == from); } else if (method == Methods.UpdateIoTTiggerCommandResponse)//接收触发器命令 { this._dataService.Update(message); } //后台编辑 else if (method == $"Edit{nameof(Node)}")//编辑节点返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Edit{nameof(Device)}")//编辑设备返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(Device)}")//删除设备返回 { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(Data)}")//编辑数据或设备上报数据 { var model = message.FromJson(); this._dataService.Edit(model); } //else if (method == $"Edit{nameof(Data)}List") //{ // var list = message.FromJson>(); // foreach (var model in list) // { // this._dataService.Edit(model); // } //} else if (method == $"Delete{nameof(Data)}")//删除数据返回 { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(Command)}")//编辑命令返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(Command)}")//删除命令返回 { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(Scene)}")//编辑场景返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(Scene)}")//删除场景返回 { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(IoTTimer)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(IoTTimer)}") { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(IoTTigger)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(IoTTigger)}") { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(SceneCommand)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(SceneCommand)}") { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(IoTTimerCommand)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(IoTTimerCommand)}") { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(IoTTiggerCommand)}") { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(IoTTiggerCommand)}") { var model = message.FromJson(); this._dataService.Delete(model); } // else if (method == Methods.ExecApiResponse) { this.ServerToClient(method, message, to, from); } else if (method == Methods.ExecSceneRsponse) { this.ServerToClient(method, message, to, from); } else if (method == Methods.UpdateDvr) { var model = message.FromJson(); this.UpdateDvr(model); } } catch (Exception ex) { ex.PrintStack(); } } private void UpdateDvr(Data model) { try { var device = this._deviceRepo.ReadOnlyTable().Include(o => o.Node).FirstOrDefault(o => o.Id == model.DeviceId); var number = device.Node.Number; var url = this._cfg.GetValue("srs", "http://localhost:1985"); var result = ""; var method = Methods.UpdateCamera; if (model.Value == "是") { url += $"/api/v1/raw?rpc=update&scope=dvr&value=__defaultVhost__¶m=enable&data=live/{device.Number}"; } else { url += $"/api/v1/raw?rpc=update&scope=dvr&value=__defaultVhost__¶m=disable&data=live/{device.Number}"; } if (CallSrs(url)) { this.ServerToClient(method, model.ToJson(), number); } } catch (Exception ex) { ex.PrintStack(); } } private bool CallSrs(string url) { var result = false; try { var httpClient = this._httpClientFactory.CreateClient(); var response = httpClient.GetStringAsync(url).Result; var json = JsonConvert.DeserializeObject>(response); if (json["code"] == "0") { result = true; } } catch (Exception ex) { ex.PrintStack(); } return result; } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void UpdateSceneCommand(string message) { try { Console.WriteLine("iot center> receive edit scene command message"); var model = message.FromJson(); var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand == null) { sceneCommand = new SceneCommand { Id = model.Id }; _sceneCommandRepo.Add(sceneCommand); } sceneCommand.SceneId = model.SceneId.Value; sceneCommand.CommandId = model.CommandId.Value; _sceneCommandRepo.SaveChanges(); } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] private void DeleteSceneCommand(string message) { try { Console.WriteLine("iot center> receive delete scene command message"); var model = message.FromJson(); var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand != null) { _sceneCommandRepo.Delete(sceneCommand); _sceneCommandRepo.SaveChanges(); } } catch (Exception ex) { ex.PrintStack(); } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] public async Task LogToInfluxdbAsync(Data data) { if (string.IsNullOrEmpty(data.Value)) { return; } if (data.Type != DeviceDataType.Int && data.Type != DeviceDataType.Float) { return; } try { var url = _cfg["influxdb:url"]; var usr = _cfg["influxdb:usr"]; var pwd = _cfg["influxdb:pwd"]; var dbName = "iot"; var measurementName = "data"; using var client = new InfluxClient(new Uri(url), usr, pwd); await client.CreateDatabaseAsync(dbName); var row = new DynamicInfluxRow { Timestamp = DateTime.UtcNow }; var device = this._deviceRepo.ReadOnlyTable().FirstOrDefault(o => o.Id == data.DeviceId); row.Fields.Add("DeviceNumber", device.Number); row.Fields.Add("DeviceName", device.Name); row.Fields.Add(data.Key, this.GetDataValue(data)); await client.WriteAsync(dbName, measurementName, new List { row }); } catch (Exception ex) { ex.PrintStack(); } } private object GetDataValue(Data model) { return model.Type switch { DeviceDataType.Int => Convert.ToInt32(model.Value), DeviceDataType.Float => Convert.ToSingle(model.Value), _ => model.Value, }; } } }