You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
444 lines
18 KiB
444 lines
18 KiB
using Application.Domain.Entities;
|
|
using Application.Models;
|
|
using Infrastructure.Data;
|
|
using Infrastructure.Events;
|
|
using Infrastructure.Extensions;
|
|
using Infrastructure.Web.SignalR;
|
|
using IoT.Shared.Services;
|
|
using IoT.Shared.Services.IoTCenter;
|
|
using Microsoft.AspNetCore.SignalR;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.Logging;
|
|
using Newtonsoft.Json;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Net.Http;
|
|
using System.Threading.Tasks;
|
|
using Vibrant.InfluxDB.Client;
|
|
using Vibrant.InfluxDB.Client.Rows;
|
|
|
|
namespace IoTCenter.Services
|
|
{
|
|
public class IoTCenterHub : BasePageHub
|
|
{
|
|
private readonly IConfiguration _cfg;
|
|
private readonly ILogger<IoTCenterHub> _logger;
|
|
private readonly IRepository<Node> _nodeRepo;
|
|
private readonly IRepository<Device> _deviceRepo;
|
|
private readonly IRepository<Data> _dataRepo;
|
|
private readonly IRepository<Api> _apiRepo;
|
|
private readonly IRepository<Command> _commandRepo;
|
|
private readonly IRepository<Category> _categoryRepo;
|
|
private readonly IRepository<Product> _productRepo;
|
|
private readonly IRepository<Scene> _sceneRepo;
|
|
private readonly IRepository<SceneCommand> _sceneCommandRepo;
|
|
private readonly IRepository<SceneTimer> _iotTimerRepo;
|
|
private readonly IRepository<SceneTigger> _iotTiggerRepo;
|
|
private readonly DataService _dataService;
|
|
private readonly IHttpClientFactory _httpClientFactory;
|
|
private readonly IEventPublisher _publisher;
|
|
|
|
public IoTCenterHub(IConfiguration cfg,
|
|
ILogger<IoTCenterHub> logger,
|
|
IRepository<Node> nodeRepo,
|
|
IRepository<Device> deviceRepo,
|
|
IRepository<Data> dataRepo,
|
|
IRepository<Api> apiRepo,
|
|
IRepository<Command> commandRepo,
|
|
IRepository<Category> categoryRepo,
|
|
IRepository<Product> productRepo,
|
|
IRepository<Scene> sceneRepo,
|
|
IRepository<SceneCommand> sceneCommandRepo,
|
|
IRepository<SceneTimer> iotTimerRepo,
|
|
IRepository<SceneTigger> iotTiggerRepo,
|
|
DataService dataService,
|
|
IHttpClientFactory httpClientFactory,
|
|
IEventPublisher publisher)
|
|
{
|
|
this._cfg = cfg;
|
|
this._logger = logger;
|
|
this._nodeRepo = nodeRepo;
|
|
this._deviceRepo = deviceRepo;
|
|
this._dataRepo = dataRepo;
|
|
this._apiRepo = apiRepo;
|
|
this._commandRepo = commandRepo;
|
|
this._categoryRepo = categoryRepo;
|
|
this._productRepo = productRepo;
|
|
this._sceneRepo = sceneRepo;
|
|
this._sceneCommandRepo = sceneCommandRepo;
|
|
this._iotTimerRepo = iotTimerRepo;
|
|
this._iotTiggerRepo = iotTiggerRepo;
|
|
this._dataService = dataService;
|
|
this._httpClientFactory = httpClientFactory;
|
|
this._publisher = publisher;
|
|
}
|
|
|
|
public override Task OnConnectedAsync()
|
|
{
|
|
this.OnConnected();
|
|
this.UpdateNodeStatus(true);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public override Task OnDisconnectedAsync(Exception exception)
|
|
{
|
|
this.OnDisconnected(exception);
|
|
this.UpdateNodeStatus(false);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private void UpdateNodeStatus(bool status)
|
|
{
|
|
var type = Context.Items["type"] as string;
|
|
if (!string.IsNullOrEmpty(type) && type == "node")
|
|
{
|
|
var group = Context.Items["group"].ToString();
|
|
var node = this._nodeRepo.Table().FirstOrDefault(o => o.Number == group);
|
|
if (node != null)
|
|
{
|
|
node.IsOnline = status;
|
|
try
|
|
{
|
|
this._nodeRepo.SaveChanges();
|
|
var organ = Context.Items["organ"] as string;
|
|
if (!string.IsNullOrEmpty(organ))
|
|
{
|
|
this._publisher.Publish(new NodeClientConnectedEvent { NodeNumber = node.Number, OrganNumber = organ });
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
this._logger.LogError(ex.ToString());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public void ServerToClient(string method, string message, string toClient, string fromClient = null)
|
|
{
|
|
Clients.Group(toClient).SendAsync(Methods.ServerToClient, method, message, toClient, fromClient);
|
|
}
|
|
|
|
public void ClientToServer(string method, string message, string to, string from = null)
|
|
{
|
|
this._logger.LogDebug($"iot center> receive message {method} from {from}");
|
|
try
|
|
{
|
|
//节点上传
|
|
if (method == Methods.UpdateNodeResponse)//接收节点
|
|
{
|
|
var number = message.To<Node>().Number;
|
|
this._dataService.Update<Node>(message, o => o.Number == number);
|
|
}
|
|
else if (method == Methods.UpdateProductResponse)//接收产品
|
|
{
|
|
this._dataService.Update<Product>(message);
|
|
}
|
|
else if (method == Methods.UpdateApiResponse)//接收接口
|
|
{
|
|
this._dataService.Update<Api>(message);
|
|
}
|
|
else if (method == Methods.UpdateParameterResponse)//接收参数
|
|
{
|
|
this._dataService.Update<Parameter>(message);
|
|
}
|
|
else if (method == Methods.UpdateDeviceIdListResponse)//接收设备Id列表
|
|
{
|
|
this._dataService.UpdateList<Device>(message, o => o.Node.Number == from);
|
|
}
|
|
else if (method == Methods.UpdateDeviceResponse)//接收设备
|
|
{
|
|
var number = message.To<Device>().Number;
|
|
this._dataService.Update<Device>(message, o => o.Number == number);
|
|
}
|
|
else if (method == Methods.UpdateDataResponse)//接收数据
|
|
{
|
|
this._dataService.Update<Data>(message);
|
|
}
|
|
else if (method == Methods.UpdateCommandIdListResponse)//接收命令Id列表
|
|
{
|
|
this._dataService.UpdateList<Command>(message, o => o.Device.Node.Number == from);
|
|
}
|
|
else if (method == Methods.UpdateCommandResponse)//接收命令
|
|
{
|
|
this._dataService.Update<Command>(message);
|
|
}
|
|
else if (method == Methods.UpdateSceneIdListResponse)//接收场景Id列表
|
|
{
|
|
this._dataService.UpdateList<Scene>(message, o => o.Node.Number == from);
|
|
}
|
|
else if (method == Methods.UpdateSceneResponse)//接收场景
|
|
{
|
|
this._dataService.Update<Scene>(message);
|
|
}
|
|
else if (method == Methods.UpdateIoTTimerIdListResponse)//接收定时器Id列表
|
|
{
|
|
this._dataService.UpdateList<SceneTimer>(message, o => o.Scene.Node.Number == from);
|
|
}
|
|
else if (method == Methods.UpdateIoTTimerResponse)//接收定时器
|
|
{
|
|
this._dataService.Update<SceneTimer>(message);
|
|
}
|
|
else if (method == Methods.UpdateIoTTiggerIdListResponse)//接收触发器Id列表
|
|
{
|
|
this._dataService.UpdateList<SceneTigger>(message, o => o.Scene.Node.Number == from);
|
|
}
|
|
else if (method == Methods.UpdateIoTTiggerResponse)//接收触发器
|
|
{
|
|
this._dataService.Update<SceneTigger>(message);
|
|
}
|
|
else if (method == Methods.UpdateSceneCommandIdListResponse)//接收场景命令Id列表
|
|
{
|
|
this._dataService.UpdateList<SceneCommand>(message, o => o.Scene.Node.Number == from);
|
|
}
|
|
else if (method == Methods.UpdateSceneCommandResponse)//接收场景命令
|
|
{
|
|
this._dataService.Update<SceneCommand>(message);
|
|
}
|
|
//后台编辑
|
|
else if (method == $"Edit{nameof(Node)}")//编辑节点返回
|
|
{
|
|
var model = message.FromJson<EditNodeModel>();
|
|
this._dataService.Edit<Node, EditNodeModel>(model);
|
|
}
|
|
else if (method == $"Edit{nameof(Device)}")//编辑设备返回
|
|
{
|
|
var model = message.FromJson<EditDeviceModel>();
|
|
this._dataService.Edit<Device, EditDeviceModel>(model);
|
|
}
|
|
else if (method == $"Delete{nameof(Device)}")//删除设备返回
|
|
{
|
|
var model = message.FromJson<EditDeviceModel>();
|
|
this._dataService.Delete<Device, EditDeviceModel>(model);
|
|
}
|
|
else if (method == $"Edit{nameof(Data)}")//编辑数据或设备上报数据
|
|
{
|
|
var model = message.FromJson<EditDataModel>();
|
|
this._dataService.Edit<Data, EditDataModel>(model);
|
|
}
|
|
//else if (method == $"Edit{nameof(Data)}List")
|
|
//{
|
|
// var list = message.FromJson<List<EditDataModel>>();
|
|
// foreach (var model in list)
|
|
// {
|
|
// this._dataService.Edit<Data, EditDataModel>(model);
|
|
// }
|
|
//}
|
|
else if (method == $"Delete{nameof(Data)}")//删除数据返回
|
|
{
|
|
var model = message.FromJson<EditDataModel>();
|
|
this._dataService.Delete<Data, EditDataModel>(model);
|
|
}
|
|
else if (method == $"Edit{nameof(Command)}")//编辑命令返回
|
|
{
|
|
var model = message.FromJson<EditCommandModel>();
|
|
this._dataService.Edit<Command, EditCommandModel>(model);
|
|
}
|
|
else if (method == $"Delete{nameof(Command)}")//删除命令返回
|
|
{
|
|
var model = message.FromJson<EditCommandModel>();
|
|
this._dataService.Delete<Command, EditCommandModel>(model);
|
|
}
|
|
else if (method == $"Edit{nameof(Scene)}")//编辑场景返回
|
|
{
|
|
var model = message.FromJson<EditSceneModel>();
|
|
this._dataService.Edit<Scene, EditSceneModel>(model);
|
|
}
|
|
else if (method == $"Delete{nameof(Scene)}")//删除场景返回
|
|
{
|
|
var model = message.FromJson<EditSceneModel>();
|
|
this._dataService.Delete<Scene, EditSceneModel>(model);
|
|
}
|
|
else if (method == $"Edit{nameof(SceneTimer)}")
|
|
{
|
|
var model = message.FromJson<EditSceneTimerModel>();
|
|
this._dataService.Edit<SceneTimer, EditSceneTimerModel>(model);
|
|
}
|
|
else if (method == $"Delete{nameof(SceneTimer)}")
|
|
{
|
|
var model = message.FromJson<EditSceneTimerModel>();
|
|
this._dataService.Delete<SceneTimer, EditSceneTimerModel>(model);
|
|
}
|
|
else if (method == $"Edit{nameof(SceneTigger)}")
|
|
{
|
|
var model = message.FromJson<EditSceneTiggerModel>();
|
|
this._dataService.Edit<SceneTigger, EditSceneTiggerModel>(model);
|
|
}
|
|
else if (method == $"Delete{nameof(SceneTigger)}")
|
|
{
|
|
var model = message.FromJson<EditSceneTiggerModel>();
|
|
this._dataService.Delete<SceneTigger, EditSceneTiggerModel>(model);
|
|
}
|
|
else if (method == $"Edit{nameof(SceneCommand)}")
|
|
{
|
|
var model = message.FromJson<EditSceneCommandModel>();
|
|
this._dataService.Edit<SceneCommand, EditSceneCommandModel>(model);
|
|
}
|
|
else if (method == $"Delete{nameof(SceneCommand)}")
|
|
{
|
|
var model = message.FromJson<EditSceneCommandModel>();
|
|
this._dataService.Delete<SceneCommand, EditSceneCommandModel>(model);
|
|
}
|
|
//
|
|
else if (method == Methods.ExecApiResponse)
|
|
{
|
|
this.ServerToClient(method, message, to, from);
|
|
}
|
|
else if (method == Methods.ExecSceneRsponse)
|
|
{
|
|
this.ServerToClient(method, message, to, from);
|
|
}
|
|
else if (method == Methods.UpdateDvr)
|
|
{
|
|
var model = message.FromJson<Data>();
|
|
this.UpdateDvr(model);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
}
|
|
|
|
private void UpdateDvr(Data model)
|
|
{
|
|
try
|
|
{
|
|
var device = this._deviceRepo.ReadOnlyTable().Include(o => o.Node).FirstOrDefault(o => o.Id == model.DeviceId);
|
|
var number = device.Node.Number;
|
|
var url = this._cfg.GetConnectionString("srs");
|
|
var method = Methods.UpdateCamera;
|
|
if (model.Value == "是")
|
|
{
|
|
url += $"/api/v1/raw?rpc=update&scope=dvr&value=__defaultVhost__¶m=enable&data=live/{device.Number}";
|
|
}
|
|
else
|
|
{
|
|
url += $"/api/v1/raw?rpc=update&scope=dvr&value=__defaultVhost__¶m=disable&data=live/{device.Number}";
|
|
}
|
|
if (CallSrs(url))
|
|
{
|
|
this.ServerToClient(method, model.ToJson(), number);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
}
|
|
|
|
private bool CallSrs(string url)
|
|
{
|
|
var result = false;
|
|
try
|
|
{
|
|
var httpClient = this._httpClientFactory.CreateClient();
|
|
var response = httpClient.GetStringAsync(url).Result;
|
|
var json = JsonConvert.DeserializeObject<Dictionary<string, string>>(response);
|
|
if (json["code"] == "0")
|
|
{
|
|
result = true;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
return result;
|
|
}
|
|
|
|
private void UpdateSceneCommand(string message)
|
|
{
|
|
try
|
|
{
|
|
this._logger.LogDebug("iot center> receive edit scene command message");
|
|
var model = message.FromJson<EditSceneCommandModel>();
|
|
var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id);
|
|
if (sceneCommand == null)
|
|
{
|
|
sceneCommand = new SceneCommand
|
|
{
|
|
Id = model.Id
|
|
};
|
|
_sceneCommandRepo.Add(sceneCommand);
|
|
}
|
|
sceneCommand.SceneId = model.SceneId.Value;
|
|
sceneCommand.CommandId = model.CommandId.Value;
|
|
_sceneCommandRepo.SaveChanges();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
}
|
|
|
|
private void DeleteSceneCommand(string message)
|
|
{
|
|
try
|
|
{
|
|
this._logger.LogDebug("iot center> receive delete scene command message");
|
|
var model = message.FromJson<EditSceneCommandModel>();
|
|
var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id);
|
|
if (sceneCommand != null)
|
|
{
|
|
_sceneCommandRepo.Delete(sceneCommand);
|
|
_sceneCommandRepo.SaveChanges();
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
}
|
|
|
|
public async Task LogToInfluxdbAsync(Data data)
|
|
{
|
|
if (string.IsNullOrEmpty(data.Value))
|
|
{
|
|
return;
|
|
}
|
|
if (data.Type != DeviceDataType.Int && data.Type != DeviceDataType.Float)
|
|
{
|
|
return;
|
|
}
|
|
try
|
|
{
|
|
var url = _cfg["influxdb:url"];
|
|
var usr = _cfg["influxdb:usr"];
|
|
var pwd = _cfg["influxdb:pwd"];
|
|
var dbName = "iot";
|
|
var measurementName = "data";
|
|
using var client = new InfluxClient(new Uri(url), usr, pwd);
|
|
await client.CreateDatabaseAsync(dbName);
|
|
var row = new DynamicInfluxRow
|
|
{
|
|
Timestamp = DateTime.UtcNow
|
|
};
|
|
var device = this._deviceRepo.ReadOnlyTable().FirstOrDefault(o => o.Id == data.DeviceId);
|
|
row.Fields.Add("DeviceNumber", device.Number);
|
|
row.Fields.Add("DeviceName", device.Name);
|
|
row.Fields.Add(data.Key, this.GetDataValue(data));
|
|
|
|
await client.WriteAsync(dbName, measurementName, new List<DynamicInfluxRow> { row });
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
}
|
|
|
|
private object GetDataValue(Data model)
|
|
{
|
|
return model.Type switch
|
|
{
|
|
DeviceDataType.Int => Convert.ToInt32(model.Value),
|
|
|
|
DeviceDataType.Float => Convert.ToSingle(model.Value),
|
|
|
|
_ => model.Value,
|
|
};
|
|
}
|
|
}
|
|
} |