using Infrastructure.Data; using Infrastructure.Events; using Infrastructure.Extensions; using Infrastructure.Web.SignalR; using Application.Domain.Entities; using IoT.Shared.Application.Models; using IoT.Shared.Services; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Threading.Tasks; using Vibrant.InfluxDB.Client; using Vibrant.InfluxDB.Client.Rows; namespace Platform.Services { public class IoTCenterHub : BasePageHub { private static object lockObject = new object(); private readonly IConfiguration _cfg; private readonly ILogger _logger; private readonly IRepository _nodeRepo; private readonly IRepository _deviceRepo; private readonly IRepository _dataRepo; private readonly IRepository _apiRepo; //private readonly IRepository _commandRepo; private readonly IRepository _categoryRepo; private readonly IRepository _productRepo; //private readonly IRepository _sceneRepo; //private readonly IRepository _sceneCommandRepo; //private readonly IRepository _iotTimerRepo; //private readonly IRepository _iotTiggerRepo; private readonly DataService _dataService; private readonly IHttpClientFactory _httpClientFactory; private readonly IEventPublisher _publisher; public IoTCenterHub(IConfiguration cfg, ILogger logger, IRepository nodeRepo, IRepository deviceRepo, IRepository dataRepo, IRepository apiRepo, //IRepository commandRepo, IRepository categoryRepo, IRepository productRepo, //IRepository sceneRepo, //IRepository sceneCommandRepo, //IRepository iotTimerRepo, //IRepository iotTiggerRepo, DataService dataService, IHttpClientFactory httpClientFactory, IEventPublisher publisher) { this._cfg = cfg; this._logger = logger; this._nodeRepo = nodeRepo; this._deviceRepo = deviceRepo; this._dataRepo = dataRepo; this._apiRepo = apiRepo; //this._commandRepo = commandRepo; this._categoryRepo = categoryRepo; this._productRepo = productRepo; //this._sceneRepo = sceneRepo; //this._sceneCommandRepo = sceneCommandRepo; //this._iotTimerRepo = iotTimerRepo; //this._iotTiggerRepo = iotTiggerRepo; this._dataService = dataService; this._httpClientFactory = httpClientFactory; this._publisher = publisher; } public override Task OnConnectedAsync() { this.OnConnected(); this.UpdateNodeStatus(true); return Task.CompletedTask; } public override Task OnDisconnectedAsync(Exception exception) { this.OnDisconnected(exception); this.UpdateNodeStatus(false); return Task.CompletedTask; } private void UpdateNodeStatus(bool status) { lock (lockObject) { var type = Context.Items["type"] as string; if (!string.IsNullOrEmpty(type) && type == "node") { var group = Context.Items["group"].ToString(); var node = this._nodeRepo.Table().FirstOrDefault(o => o.Number == group); if (node != null) { node.IsOnline = status; try { this._nodeRepo.SaveChanges(); } catch (Exception ex) { this._logger.LogError(ex.ToString()); } } } } } public void ServerToClient(string method, string message, string toClient, string fromClient = null) { Clients.Group(toClient).SendAsync(Methods.ServerToClient, method, message, toClient, fromClient); } public void ClientToServer(string method, string message, string to, string from = null) { this._logger.LogDebug($"iot center> receive message {method} from {from}"); try { //网关上传 if (method == Methods.UpdateNodeResponse)//接收网关 { var number = message.To().Number; this._dataService.Update(message, o => o.Number == number,new string[] { nameof(IoTGateway.BuildingId) }); } else if (method == Methods.UpdateProductResponse)//接收产品 { this._dataService.Update(message, skips: new string[] { nameof(IoTProduct.IoTProductCategoryId), nameof(IoTProduct.DisplayOrder) }); } else if (method == Methods.UpdateApiResponse)//接收接口 { this._dataService.Update(message); } else if (method == Methods.UpdateParameterResponse)//接收参数 { this._dataService.Update(message); } else if (method == Methods.UpdateDeviceIdListResponse)//接收设备Id列表 { this._dataService.UpdateList(message, o => o.IoTGateway.Number == from); } else if (method == Methods.UpdateDeviceResponse)//接收设备 { var number = message.To().Number; this._dataService.Update(message, o => o.Number == number); } else if (method == Methods.UpdateDataResponse)//接收数据 { this._dataService.Update(message); } //else if (method == Methods.UpdateCommandIdListResponse)//接收命令Id列表 //{ // this._dataService.UpdateList(message, o => o.Device.Node.Number == from); //} //else if (method == Methods.UpdateCommandResponse)//接收命令 //{ // this._dataService.Update(message); //} //else if (method == Methods.UpdateSceneIdListResponse)//接收场景Id列表 //{ // this._dataService.UpdateList(message, o => o.Node.Number == from); //} //else if (method == Methods.UpdateSceneResponse)//接收场景 //{ // this._dataService.Update(message); //} //else if (method == Methods.UpdateIoTTimerIdListResponse)//接收定时器Id列表 //{ // this._dataService.UpdateList(message, o => o.Scene.Node.Number == from); //} //else if (method == Methods.UpdateIoTTimerResponse)//接收定时器 //{ // this._dataService.Update(message); //} //else if (method == Methods.UpdateIoTTiggerIdListResponse)//接收触发器Id列表 //{ // this._dataService.UpdateList(message, o => o.Scene.Node.Number == from); //} //else if (method == Methods.UpdateIoTTiggerResponse)//接收触发器 //{ // this._dataService.Update(message); //} //else if (method == Methods.UpdateSceneCommandIdListResponse)//接收场景命令Id列表 //{ // this._dataService.UpdateList(message, o => o.Scene.Node.Number == from); //} //else if (method == Methods.UpdateSceneCommandResponse)//接收场景命令 //{ // this._dataService.Update(message); //} //后台编辑 else if (method == $"Edit{nameof(IoTGateway)}")//编辑节点返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Edit{nameof(IoTDevice)}")//编辑设备返回 { var model = message.FromJson(); this._dataService.Edit(model); } else if (method == $"Delete{nameof(IoTDevice)}")//删除设备返回 { var model = message.FromJson(); this._dataService.Delete(model); } else if (method == $"Edit{nameof(IoTData)}")//编辑数据或设备上报数据 { var model = message.FromJson(); this._dataService.Edit(model); } //else if (method == $"Edit{nameof(Data)}List") //{ // var list = message.FromJson>(); // foreach (var model in list) // { // this._dataService.Edit(model); // } //} else if (method == $"Delete{nameof(IoTData)}")//删除数据返回 { var model = message.FromJson(); this._dataService.Delete(model); } //else if (method == $"Edit{nameof(IoTCommand)}")//编辑命令返回 //{ // var model = message.FromJson(); // this._dataService.Edit(model); //} //else if (method == $"Delete{nameof(IoTCommand)}")//删除命令返回 //{ // var model = message.FromJson(); // this._dataService.Delete(model); //} //else if (method == $"Edit{nameof(IoTScene)}")//编辑场景返回 //{ // var model = message.FromJson(); // this._dataService.Edit(model); //} //else if (method == $"Delete{nameof(IoTScene)}")//删除场景返回 //{ // var model = message.FromJson(); // this._dataService.Delete(model); //} //else if (method == $"Edit{nameof(IoTSceneTimer)}") //{ // var model = message.FromJson(); // this._dataService.Edit(model); //} //else if (method == $"Delete{nameof(IoTSceneTimer)}") //{ // var model = message.FromJson(); // this._dataService.Delete(model); //} //else if (method == $"Edit{nameof(IoTSceneTigger)}") //{ // var model = message.FromJson(); // this._dataService.Edit(model); //} //else if (method == $"Delete{nameof(IoTSceneTigger)}") //{ // var model = message.FromJson(); // this._dataService.Delete(model); //} //else if (method == $"Edit{nameof(IoTSceneIoTCommand)}") //{ // var model = message.FromJson(); // this._dataService.Edit(model); //} //else if (method == $"Delete{nameof(IoTSceneIoTCommand)}") //{ // var model = message.FromJson(); // this._dataService.Delete(model); //} // else if (method == Methods.ExecApiResponse) { this.ServerToClient(method, message, to, from); } else if (method == Methods.ExecSceneRsponse) { this.ServerToClient(method, message, to, from); } else if (method == Methods.UpdateDvr) { var model = message.FromJson(); this.UpdateDvr(model); } } catch (Exception ex) { ex.PrintStack(); } } private void UpdateDvr(IoTData model) { try { var device = this._deviceRepo.ReadOnlyTable().Include(o => o.IoTGateway).FirstOrDefault(o => o.Id == model.IoTDeviceId); var number = device.IoTGateway.Number; var url = this._cfg.GetConnectionString("srs"); var method = Methods.UpdateCamera; if (model.Value == "是") { url += $"/api/v1/raw?rpc=update&scope=dvr&value=__defaultVhost__¶m=enable&data=live/{device.Number}"; } else { url += $"/api/v1/raw?rpc=update&scope=dvr&value=__defaultVhost__¶m=disable&data=live/{device.Number}"; } if (CallSrs(url)) { this.ServerToClient(method, model.ToJson(), number); } } catch (Exception ex) { ex.PrintStack(); } } private bool CallSrs(string url) { var result = false; try { var httpClient = this._httpClientFactory.CreateClient(); var response = httpClient.GetStringAsync(url).Result; var json = JsonConvert.DeserializeObject>(response); if (json["code"] == "0") { result = true; } } catch (Exception ex) { ex.PrintStack(); } return result; } //private void UpdateSceneCommand(string message) //{ // try // { // this._logger.LogDebug("iot center> receive edit scene command message"); // var model = message.FromJson(); // var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); // if (sceneCommand == null) // { // sceneCommand = new IoTSceneIoTCommand // { // Id = model.Id // }; // _sceneCommandRepo.Add(sceneCommand); // } // sceneCommand.SceneId = model.SceneId.Value; // sceneCommand.CommandId = model.CommandId.Value; // _sceneCommandRepo.SaveChanges(); // } // catch (Exception ex) // { // ex.PrintStack(); // } //} //private void DeleteSceneCommand(string message) //{ // try // { // this._logger.LogDebug("iot center> receive delete scene command message"); // var model = message.FromJson(); // var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); // if (sceneCommand != null) // { // _sceneCommandRepo.Delete(sceneCommand); // _sceneCommandRepo.SaveChanges(); // } // } // catch (Exception ex) // { // ex.PrintStack(); // } //} public async Task LogToInfluxdbAsync(IoTData data) { if (string.IsNullOrEmpty(data.Value)) { return; } if (data.ValueType != IoTValueType.Int && data.ValueType != IoTValueType.Float) { return; } try { var url = _cfg["influxdb:url"]; var usr = _cfg["influxdb:usr"]; var pwd = _cfg["influxdb:pwd"]; var dbName = "iot"; var measurementName = "data"; using var client = new InfluxClient(new Uri(url), usr, pwd); await client.CreateDatabaseAsync(dbName); var row = new DynamicInfluxRow { Timestamp = DateTime.UtcNow }; var device = this._deviceRepo.ReadOnlyTable().FirstOrDefault(o => o.Id == data.IoTDeviceId); row.Fields.Add("DeviceNumber", device.Number); row.Fields.Add("DeviceName", device.Name); row.Fields.Add(data.Key, this.GetDataValue(data)); await client.WriteAsync(dbName, measurementName, new List { row }); } catch (Exception ex) { ex.PrintStack(); } } private object GetDataValue(IoTData model) { return model.ValueType switch { IoTValueType.Int => Convert.ToInt32(model.Value), IoTValueType.Float => Convert.ToSingle(model.Value), _ => model.Value, }; } } }