You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iot/projects/Platform/Services/IoTCenterHub.cs

391 lines
15 KiB

using Application.Domain.Entities;
using Infrastructure.Data;
using Infrastructure.Events;
using Infrastructure.Extensions;
using Infrastructure.Web.SignalR;
using IoT.Shared.Application.Models;
using IoT.Shared.Services;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Vibrant.InfluxDB.Client;
using Vibrant.InfluxDB.Client.Rows;
namespace Platform.Services
{
public class IoTCenterHub : BasePageHub
{
private static object lockObject = new object();
private readonly IConfiguration _cfg;
private readonly ILogger<IoTCenterHub> _logger;
private readonly IRepository<IoTGateway> _nodeRepo;
private readonly IRepository<IoTDevice> _deviceRepo;
private readonly IRepository<IoTData> _dataRepo;
private readonly IRepository<IoTApi> _apiRepo;
private readonly IRepository<IoTProductCategory> _categoryRepo;
private readonly IRepository<IoTProduct> _productRepo;
private readonly DataService _dataService;
private readonly IHttpClientFactory _httpClientFactory;
private readonly IEventPublisher _publisher;
public IoTCenterHub(IConfiguration cfg,
ILogger<IoTCenterHub> logger,
IRepository<IoTGateway> nodeRepo,
IRepository<IoTDevice> deviceRepo,
IRepository<IoTData> dataRepo,
IRepository<IoTApi> apiRepo,
IRepository<IoTProductCategory> categoryRepo,
IRepository<IoTProduct> productRepo,
DataService dataService,
IHttpClientFactory httpClientFactory,
IEventPublisher publisher)
{
this._cfg = cfg;
this._logger = logger;
this._nodeRepo = nodeRepo;
this._deviceRepo = deviceRepo;
this._dataRepo = dataRepo;
this._apiRepo = apiRepo;
this._categoryRepo = categoryRepo;
this._productRepo = productRepo;
this._dataService = dataService;
this._httpClientFactory = httpClientFactory;
this._publisher = publisher;
}
public override Task OnConnectedAsync()
{
this.OnConnected();
this.UpdateNodeStatus(true);
return Task.CompletedTask;
}
public override Task OnDisconnectedAsync(Exception exception)
{
this.OnDisconnected(exception);
this.UpdateNodeStatus(false);
return Task.CompletedTask;
}
private void UpdateNodeStatus(bool status)
{
lock (lockObject)
{
var type = Context.Items["type"] as string;
if (!string.IsNullOrEmpty(type) && type == "node")
{
var group = Context.Items["group"].ToString();
var node = this._nodeRepo.Table().FirstOrDefault(o => o.Number == group);
if (node != null)
{
node.IsOnline = status;
try
{
this._nodeRepo.SaveChanges();
}
catch (Exception ex)
{
this._logger.LogError(ex.ToString());
}
}
}
}
}
public void ServerToClient(string method, string message, string toClient, string fromClient = null)
{
Clients.Group(toClient).SendAsync(Methods.ServerToClient, method, message, toClient, fromClient);
}
public void ClientToServer(string method, string message, string to, string from = null)
{
this._logger.LogDebug($"iot center> receive message {method} from {from}");
try
{
//网关上传
if (method == $"{nameof(IoTGateway)}EntityUpdated")//接收网关
{
var entity = message.FromJson<IoTGateway>();
this._dataService.Update(entity, o => o.Number == entity.Number, new string[] { nameof(IoTGateway.BuildingId) });
}
else if (method == $"{nameof(IoTProduct)}EntityUpdated")//接收产品
{
var entity = message.FromJson<IoTProduct>();
this._dataService.Update(entity, o => o.Number == entity.Number, skips: new string[] { nameof(IoTProduct.IoTProductCategoryId), nameof(IoTProduct.DisplayOrder) });
}
else if (method == Methods.UpdateDeviceIdList)//接收设备Id列表
{
this._dataService.UpdateList<IoTDevice>(message, o => o.IoTGateway.Number == from);
}
else if (method == $"{nameof(IoTDevice)}EntityUpdated")//接收设备
{
var entity = message.FromJson<IoTDevice>();
this._dataService.Update(entity, o => o.Number == entity.Number);
}
else if (method == $"{nameof(IoTData)}EntityUpdated")//接收数据
{
this._dataService.Update(message.FromJson<IoTData>());
}
//后台编辑
else if (method == $"Edit{nameof(IoTGateway)}")//编辑节点返回
{
var model = message.FromJson<EditIoTGatewayModel>();
this._dataService.Edit<IoTGateway, EditIoTGatewayModel>(model);
}
else if (method == $"Edit{nameof(IoTDevice)}")//编辑设备返回
{
var model = message.FromJson<EditIoTDeviceModel>();
this._dataService.Edit<IoTDevice, EditIoTDeviceModel>(model);
}
else if (method == $"Delete{nameof(IoTDevice)}")//删除设备返回
{
var model = message.FromJson<EditIoTDeviceModel>();
this._dataService.Delete<IoTDevice, EditIoTDeviceModel>(model);
}
else if (method == $"Edit{nameof(IoTData)}")//编辑数据或设备上报数据
{
var model = message.FromJson<EditIoTDataModel>();
this._dataService.Edit<IoTData, EditIoTDataModel>(model);
}
//else if (method == $"Edit{nameof(Data)}List")
//{
// var list = message.FromJson<List<EditDataModel>>();
// foreach (var model in list)
// {
// this._dataService.Edit<Data, EditDataModel>(model);
// }
//}
else if (method == $"Delete{nameof(IoTData)}")//删除数据返回
{
var model = message.FromJson<EditIoTDataModel>();
this._dataService.Delete<IoTData, EditIoTDataModel>(model);
}
//else if (method == $"Edit{nameof(IoTCommand)}")//编辑命令返回
//{
// var model = message.FromJson<EditCommandModel>();
// this._dataService.Edit<IoTCommand, EditCommandModel>(model);
//}
//else if (method == $"Delete{nameof(IoTCommand)}")//删除命令返回
//{
// var model = message.FromJson<EditCommandModel>();
// this._dataService.Delete<IoTCommand, EditCommandModel>(model);
//}
//else if (method == $"Edit{nameof(IoTScene)}")//编辑场景返回
//{
// var model = message.FromJson<EditSceneModel>();
// this._dataService.Edit<IoTScene, EditSceneModel>(model);
//}
//else if (method == $"Delete{nameof(IoTScene)}")//删除场景返回
//{
// var model = message.FromJson<EditSceneModel>();
// this._dataService.Delete<IoTScene, EditSceneModel>(model);
//}
//else if (method == $"Edit{nameof(IoTSceneTimer)}")
//{
// var model = message.FromJson<EditSceneTimerModel>();
// this._dataService.Edit<IoTSceneTimer, EditSceneTimerModel>(model);
//}
//else if (method == $"Delete{nameof(IoTSceneTimer)}")
//{
// var model = message.FromJson<EditSceneTimerModel>();
// this._dataService.Delete<IoTSceneTimer, EditSceneTimerModel>(model);
//}
//else if (method == $"Edit{nameof(IoTSceneTigger)}")
//{
// var model = message.FromJson<EditSceneTiggerModel>();
// this._dataService.Edit<IoTSceneTigger, EditSceneTiggerModel>(model);
//}
//else if (method == $"Delete{nameof(IoTSceneTigger)}")
//{
// var model = message.FromJson<EditSceneTiggerModel>();
// this._dataService.Delete<IoTSceneTigger, EditSceneTiggerModel>(model);
//}
//else if (method == $"Edit{nameof(IoTSceneIoTCommand)}")
//{
// var model = message.FromJson<EditSceneCommandModel>();
// this._dataService.Edit<IoTSceneIoTCommand, EditSceneCommandModel>(model);
//}
//else if (method == $"Delete{nameof(IoTSceneIoTCommand)}")
//{
// var model = message.FromJson<EditSceneCommandModel>();
// this._dataService.Delete<IoTSceneIoTCommand, EditSceneCommandModel>(model);
//}
//
else if (method == Methods.ExecApiResponse)
{
this.ServerToClient(method, message, to, from);
}
else if (method == Methods.ExecSceneRsponse)
{
this.ServerToClient(method, message, to, from);
}
else if (method == Methods.UpdateDvr)
{
var model = message.FromJson<IoTData>();
this.UpdateDvr(model);
}
}
catch (Exception ex)
{
ex.PrintStack();
}
}
private void UpdateDvr(IoTData model)
{
try
{
var device = this._deviceRepo.ReadOnlyTable().Include(o => o.IoTGateway).FirstOrDefault(o => o.Id == model.IoTDeviceId);
var number = device.IoTGateway.Number;
var url = this._cfg.GetConnectionString("srs");
var method = Methods.UpdateCamera;
if (model.Value == "是")
{
url += $"/api/v1/raw?rpc=update&scope=dvr&value=__defaultVhost__&param=enable&data=live/{device.Number}";
}
else
{
url += $"/api/v1/raw?rpc=update&scope=dvr&value=__defaultVhost__&param=disable&data=live/{device.Number}";
}
if (CallSrs(url))
{
this.ServerToClient(method, model.ToJson(), number);
}
}
catch (Exception ex)
{
ex.PrintStack();
}
}
private bool CallSrs(string url)
{
var result = false;
try
{
var httpClient = this._httpClientFactory.CreateClient();
var response = httpClient.GetStringAsync(url).Result;
var json = JsonConvert.DeserializeObject<Dictionary<string, string>>(response);
if (json["code"] == "0")
{
result = true;
}
}
catch (Exception ex)
{
ex.PrintStack();
}
return result;
}
//private void UpdateSceneCommand(string message)
//{
// try
// {
// this._logger.LogDebug("iot center> receive edit scene command message");
// var model = message.FromJson<EditSceneCommandModel>();
// var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id);
// if (sceneCommand == null)
// {
// sceneCommand = new IoTSceneIoTCommand
// {
// Id = model.Id
// };
// _sceneCommandRepo.Add(sceneCommand);
// }
// sceneCommand.SceneId = model.SceneId.Value;
// sceneCommand.CommandId = model.CommandId.Value;
// _sceneCommandRepo.SaveChanges();
// }
// catch (Exception ex)
// {
// ex.PrintStack();
// }
//}
//private void DeleteSceneCommand(string message)
//{
// try
// {
// this._logger.LogDebug("iot center> receive delete scene command message");
// var model = message.FromJson<EditSceneCommandModel>();
// var sceneCommand = _sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id);
// if (sceneCommand != null)
// {
// _sceneCommandRepo.Delete(sceneCommand);
// _sceneCommandRepo.SaveChanges();
// }
// }
// catch (Exception ex)
// {
// ex.PrintStack();
// }
//}
public async Task LogToInfluxdbAsync(IoTData data)
{
if (string.IsNullOrEmpty(data.Value))
{
return;
}
if (data.ValueType != IoTValueType.Int && data.ValueType != IoTValueType.Float)
{
return;
}
try
{
var url = _cfg["influxdb:url"];
var usr = _cfg["influxdb:usr"];
var pwd = _cfg["influxdb:pwd"];
var dbName = "iot";
var measurementName = "data";
using var client = new InfluxClient(new Uri(url), usr, pwd);
await client.CreateDatabaseAsync(dbName);
var row = new DynamicInfluxRow
{
Timestamp = DateTime.UtcNow
};
var device = this._deviceRepo.ReadOnlyTable().FirstOrDefault(o => o.Id == data.IoTDeviceId);
row.Fields.Add("DeviceNumber", device.Number);
row.Fields.Add("DeviceName", device.Name);
row.Fields.Add(data.Key, this.GetDataValue(data));
await client.WriteAsync(dbName, measurementName, new List<DynamicInfluxRow> { row });
}
catch (Exception ex)
{
ex.PrintStack();
}
}
private object GetDataValue(IoTData model)
{
return model.ValueType switch
{
IoTValueType.Int => Convert.ToInt32(model.Value),
IoTValueType.Float => Convert.ToSingle(model.Value),
_ => model.Value,
};
}
}
}