You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
465 lines
16 KiB
465 lines
16 KiB
using Application.Domain.Entities;
|
|
using Hangfire;
|
|
using Hangfire.Storage;
|
|
using Infrastructure.Application.Services.Settings;
|
|
using Infrastructure.Data;
|
|
using Infrastructure.Events;
|
|
using Infrastructure.Extensions;
|
|
using IoT.Shared.Application.Models;
|
|
using IoT.Shared.Services;
|
|
using Jint;
|
|
using Jint.Native;
|
|
using Microsoft.AspNetCore.SignalR;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.Logging;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Net.Http;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Vibrant.InfluxDB.Client;
|
|
using Vibrant.InfluxDB.Client.Rows;
|
|
|
|
namespace Platform.Services
|
|
{
|
|
public class IoTCenterEventHandler :
|
|
IEventHander<EntityInserted<Organ>>,
|
|
IEventHander<EntityUpdated<Organ>>,
|
|
IEventHander<EntityDeleted<Organ>>,
|
|
IEventHander<EntityInserted<Building>>,
|
|
IEventHander<EntityUpdated<Building>>,
|
|
IEventHander<EntityDeleted<Building>>,
|
|
IEventHander<EntityInserted<IoTProduct>>,
|
|
IEventHander<EntityUpdated<IoTProduct>>,
|
|
IEventHander<EntityDeleted<IoTProduct>>,
|
|
IEventHander<EntityInserted<IoTTimer>>,
|
|
IEventHander<EntityUpdated<IoTTimer>>,
|
|
IEventHander<EntityDeleted<IoTTimer>>,
|
|
IEventHander<EntityInserted<IoTGateway>>,
|
|
IEventHander<EntityUpdated<IoTGateway>>,
|
|
IEventHander<EntityDeleted<IoTGateway>>,
|
|
IEventHander<EntityInserted<IoTDevice>>,
|
|
IEventHander<EntityUpdated<IoTDevice>>,
|
|
IEventHander<EntityDeleted<IoTDevice>>,
|
|
IEventHander<EntityInserted<IoTData>>,
|
|
IEventHander<EntityUpdated<IoTData>>,
|
|
IEventHander<EntityDeleted<IoTData>>,
|
|
IEventHander<EntityInserted<IoTCommand>>,
|
|
IEventHander<EntityUpdated<IoTCommand>>,
|
|
IEventHander<EntityDeleted<IoTCommand>>
|
|
{
|
|
private readonly IConfiguration _cfg;
|
|
|
|
private readonly ISettingService _settingService;
|
|
|
|
private readonly ILogger<IoTCenterEventHandler> _logger;
|
|
|
|
private readonly IRepository<IoTProduct> _productrepo;
|
|
|
|
private readonly IRepository<IoTGateway> _nodeRepo;
|
|
|
|
private readonly IRepository<Organ> _organRepo;
|
|
|
|
private readonly IRepository<BuildingIoTGateway> _buildingIoTGatewayRepo;
|
|
|
|
private readonly IRepository<IoTDevice> _deviceRepo;
|
|
|
|
private readonly IRepository<IoTTimer> _organSceneTimerRepo;
|
|
|
|
private readonly IRepository<IoTTigger> _sceneTiggerRepo;
|
|
|
|
private readonly ISceneTiggerService _sceneTiggerService;
|
|
|
|
private readonly IHubContext<IoTCenterHub> _hub;
|
|
|
|
private readonly IHttpClientFactory _httpClientFactory;
|
|
|
|
public IoTCenterEventHandler(IConfiguration cfg,
|
|
ISettingService settingService,
|
|
ILogger<IoTCenterEventHandler> logger,
|
|
IRepository<IoTProduct> productRepo,
|
|
IRepository<IoTGateway> nodeRepo,
|
|
IRepository<Organ> organRepo,
|
|
IRepository<BuildingIoTGateway> buildingIoTGatewayRepo,
|
|
IRepository<IoTDevice> deviceRepo,
|
|
IRepository<IoTTimer> organSceneTimerRepo,
|
|
IRepository<IoTTigger> sceneTiggerRepo,
|
|
ISceneTiggerService sceneTiggerService,
|
|
IHubContext<IoTCenterHub> hub,
|
|
IHttpClientFactory httpClientFactory)
|
|
{
|
|
this._cfg = cfg;
|
|
this._settingService = settingService;
|
|
this._logger = logger;
|
|
this._productrepo = productRepo;
|
|
this._nodeRepo = nodeRepo;
|
|
this._organRepo = organRepo;
|
|
this._buildingIoTGatewayRepo = buildingIoTGatewayRepo;
|
|
this._deviceRepo = deviceRepo;
|
|
this._organSceneTimerRepo = organSceneTimerRepo;
|
|
this._sceneTiggerRepo = sceneTiggerRepo;
|
|
this._sceneTiggerService = sceneTiggerService;
|
|
this._hub = hub;
|
|
this._httpClientFactory = httpClientFactory;
|
|
}
|
|
|
|
//Organ
|
|
public void Handle(EntityInserted<Organ> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityUpdated<Organ> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityDeleted<Organ> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
//Building
|
|
public void Handle(EntityInserted<Building> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityUpdated<Building> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityDeleted<Building> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
//Product
|
|
public void Handle(EntityInserted<IoTProduct> message)
|
|
{
|
|
UpdateProduct(message.Data);
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityUpdated<IoTProduct> message)
|
|
{
|
|
UpdateProduct(message.Data);
|
|
this.Notify(message);
|
|
}
|
|
|
|
private void UpdateProduct(IoTProduct entity)
|
|
{
|
|
var product = this._productrepo.Table()
|
|
.Include(o => o.IoTApis)
|
|
.ThenInclude(o => o.IoTParameters)
|
|
.FirstOrDefault(o => o.Number == entity.Number);
|
|
if (product != null)
|
|
{
|
|
OpenApiService.UpdateApi(product);
|
|
this._productrepo.SaveChanges();
|
|
}
|
|
}
|
|
|
|
public void Handle(EntityDeleted<IoTProduct> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityInserted<IoTTimer> message)
|
|
{
|
|
var timer = message.Data;
|
|
RecurringJob.AddOrUpdate(timer.Id.ToString(), () => Handle(timer.Id), timer.Cron, TimeZoneInfo.Local);
|
|
}
|
|
|
|
public void Handle(EntityUpdated<IoTTimer> message)
|
|
{
|
|
var timer = message.Data;
|
|
RecurringJob.AddOrUpdate(timer.Id.ToString(), () => Handle(timer.Id), timer.Cron, TimeZoneInfo.Local);
|
|
}
|
|
|
|
public void Handle(EntityDeleted<IoTTimer> message)
|
|
{
|
|
var timer = message.Data;
|
|
RecurringJob.RemoveIfExists(timer.Id.ToString());
|
|
}
|
|
|
|
public void UpdateTimer()
|
|
{
|
|
using var conn = JobStorage.Current.GetConnection();
|
|
var jobs = conn.GetRecurringJobs();
|
|
foreach (var job in jobs)
|
|
{
|
|
RecurringJob.RemoveIfExists(job.Id);
|
|
}
|
|
var list = this._organSceneTimerRepo.ReadOnlyTable().ToList();
|
|
foreach (var timer in list)
|
|
{
|
|
try
|
|
{
|
|
RecurringJob.AddOrUpdate(timer.Id.ToString(), () => Handle(timer.Id), timer.Cron, TimeZoneInfo.Local);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
this._logger.LogError(ex.ToString());
|
|
}
|
|
}
|
|
}
|
|
|
|
public void Handle(EntityInserted<IoTGateway> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityUpdated<IoTGateway> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityDeleted<IoTGateway> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityInserted<IoTDevice> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityUpdated<IoTDevice> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityDeleted<IoTDevice> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityInserted<IoTData> message)
|
|
{
|
|
message.Data.IoTDevice = _deviceRepo.ReadOnlyTable().Include(o => o.IoTGateway).Where(o => o.Id == message.Data.IoTDeviceId).FirstOrDefault();
|
|
this.Notify(message);
|
|
this.TiggerHandle(message);
|
|
this.LogData(message.Data);
|
|
}
|
|
|
|
public void Handle(EntityUpdated<IoTData> message)
|
|
{
|
|
message.Data.IoTDevice = _deviceRepo.ReadOnlyTable().Include(o => o.IoTGateway).Where(o => o.Id == message.Data.IoTDeviceId).FirstOrDefault();
|
|
this.Notify(message);
|
|
this.TiggerHandle(message);
|
|
this.LogData(message.Data);
|
|
}
|
|
|
|
public void Handle(EntityDeleted<IoTData> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityInserted<IoTCommand> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityUpdated<IoTCommand> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
public void Handle(EntityDeleted<IoTCommand> message)
|
|
{
|
|
this.Notify(message);
|
|
}
|
|
|
|
private void TiggerHandle(BaseEvent<IoTData> message)
|
|
{
|
|
try
|
|
{
|
|
foreach (var tigger in this._sceneTiggerService.GetSceneTiggers())
|
|
{
|
|
var data = message.Data;
|
|
if (tigger.IoTDataId == data.Id)
|
|
{
|
|
try
|
|
{
|
|
var condition = tigger.Condition;
|
|
var value = data.Value;
|
|
var engine = new Engine().Execute($"function valid(value){{return {condition};}}");
|
|
var result = engine.Invoke("valid", value);
|
|
if (result == JsValue.True)
|
|
{
|
|
this.TiggerHandle(tigger.Id);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
this._logger.LogError(ex.ToString());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
this._logger.LogError(ex.ToString());
|
|
}
|
|
}
|
|
|
|
private void Notify<T>(BaseEvent<T> message)
|
|
{
|
|
Task.Run(() =>
|
|
{
|
|
try
|
|
{
|
|
_hub.ServerToClient($"{typeof(T).Name}{message.Arg}", message.Data, "page", null);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
});
|
|
}
|
|
|
|
private void TiggerHandle(Guid tiggerId)
|
|
{
|
|
this._logger.LogDebug($"global tigger exec:{tiggerId}");
|
|
var tigger = this._sceneTiggerRepo.ReadOnlyTable()
|
|
.Include(o => o.IoTScene).ThenInclude(o => o.IoTSceneIoTCommands).ThenInclude(o => o.IoTCommand).ThenInclude(o => o.IoTDevice).ThenInclude(o => o.IoTGateway)
|
|
.FirstOrDefault(o => o.Id == tiggerId);
|
|
foreach (var sceneCommand in tigger.IoTScene.IoTSceneIoTCommands.OrderBy(o => o.IoTCommand.DisplayOrder))
|
|
{
|
|
try
|
|
{
|
|
this._hub.ServerToClient(Methods.ExecCommand, sceneCommand.IoTCommandId, sceneCommand.IoTCommand.IoTDevice.IoTGateway.Number, null);
|
|
Delay(_settingService, sceneCommand.IoTCommand.Delay);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
}
|
|
}
|
|
|
|
private void LogData(IoTData data)
|
|
{
|
|
var url = _cfg["influxdb:url"];
|
|
var usr = _cfg["influxdb:usr"];
|
|
var pwd = _cfg["influxdb:pwd"];
|
|
var device = _deviceRepo.ReadOnlyTable()
|
|
.Include(o => o.IoTGateway)
|
|
//.ThenInclude(o => o.BuildingIoTGateways)
|
|
//.ThenInclude(o => o.Building.Organ)
|
|
.FirstOrDefault(o => o.Id == data.IoTDeviceId);
|
|
//var organ = device.Node.BuildingIoTGateways.FirstOrDefault()?.Building.Organ;
|
|
Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
if (string.IsNullOrEmpty(data.Value))
|
|
{
|
|
return;
|
|
}
|
|
//if (data.ValueType != IoTValueType.Int && data.ValueType != IoTValueType.Float && data.ValueType != IoTValueType.String)
|
|
//{
|
|
// return;
|
|
//}
|
|
try
|
|
{
|
|
var dbName = "iot";
|
|
var measurementName = "data";
|
|
using var client = new InfluxClient(new Uri(url), usr, pwd);
|
|
await client.CreateDatabaseAsync(dbName);
|
|
var row = new DynamicInfluxRow
|
|
{
|
|
Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(data.Timestamp).DateTime
|
|
};
|
|
//if (organ != null)
|
|
//{
|
|
// row.Fields.Add("OrganName", organ.Name);
|
|
// row.Fields.Add("OrganNumber", organ.Number);
|
|
//}
|
|
row.Fields.Add("NodeName", device.IoTGateway.Name);
|
|
row.Fields.Add("NodeNumber", device.IoTGateway.Number);
|
|
row.Fields.Add("DeviceNumber", device.Number);
|
|
row.Fields.Add("DeviceName", device.Name);
|
|
row.Fields.Add(data.Key, this.GetDataValue(data));
|
|
await client.WriteAsync(dbName, measurementName, new List<DynamicInfluxRow> { row });
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ex.PrintStack();
|
|
}
|
|
});
|
|
}
|
|
|
|
private object GetDataValue(IoTData model)
|
|
{
|
|
return model.ValueType switch
|
|
{
|
|
IoTValueType.Int => Convert.ToInt32(model.Value),
|
|
|
|
IoTValueType.Double => Convert.ToDouble(model.Value),
|
|
|
|
_ => model.Value,
|
|
};
|
|
}
|
|
|
|
public void Handle(Guid id)
|
|
{
|
|
try
|
|
{
|
|
_logger.LogInformation($"global timer exec at {DateTime.Now.ToString("G")}:{id}");
|
|
var timer = _organSceneTimerRepo.ReadOnlyTable()
|
|
.Include(o => o.IoTScene).ThenInclude(o => o.IoTSceneIoTCommands).ThenInclude(o => o.IoTCommand).ThenInclude(o => o.IoTDevice).ThenInclude(o => o.IoTGateway)
|
|
.FirstOrDefault(o => o.Id == id);
|
|
if (timer != null)
|
|
{
|
|
foreach (var sceneCommand in timer.IoTScene.IoTSceneIoTCommands.OrderBy(o => o.IoTCommand.DisplayOrder))
|
|
{
|
|
try
|
|
{
|
|
_hub.ServerToClient(Methods.ExecCommand, sceneCommand.IoTCommandId, sceneCommand.IoTCommand.IoTDevice.IoTGateway.Number, null);
|
|
Delay(_settingService, sceneCommand.IoTCommand.Delay);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex.ToString());
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
_logger.LogError($"timer {id} does not exist");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex.ToString());
|
|
}
|
|
}
|
|
|
|
public static void Delay(ISettingService _settingService, int delay)
|
|
{
|
|
if (int.TryParse(_settingService.GetValue("delay"), out int commandDelay))
|
|
{
|
|
if (commandDelay > 0)
|
|
{
|
|
delay += commandDelay;
|
|
}
|
|
}
|
|
if (delay > 0)
|
|
{
|
|
Thread.Sleep(delay);
|
|
}
|
|
}
|
|
}
|
|
}
|