You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iot/projects/Platform/Services/IoTCenterEventHandler.cs

445 lines
15 KiB

using Hangfire;
using Hangfire.Storage;
using Infrastructure.Application.Services.Settings;
using Infrastructure.Data;
using Infrastructure.Events;
using Infrastructure.Extensions;
using IoT.Shared.Application.Domain.Entities;
using IoT.Shared.Application.Models;
using Jint;
using Jint.Native;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Vibrant.InfluxDB.Client;
using Vibrant.InfluxDB.Client.Rows;
namespace Platform.Services
{
public class IoTCenterEventHandler :
IEventHander<EntityInsertedEvent<Organ>>,
IEventHander<EntityUpdatedEvent<Organ>>,
IEventHander<EntityDeletedEvent<Organ>>,
IEventHander<EntityInsertedEvent<IoTTimer>>,
IEventHander<EntityUpdatedEvent<IoTTimer>>,
IEventHander<EntityDeletedEvent<IoTTimer>>,
IEventHander<EntityInsertedEvent<IoTGateway>>,
IEventHander<EntityUpdatedEvent<IoTGateway>>,
IEventHander<EntityDeletedEvent<IoTGateway>>,
IEventHander<EntityInsertedEvent<IoTProduct>>,
IEventHander<EntityUpdatedEvent<IoTProduct>>,
IEventHander<EntityDeletedEvent<IoTProduct>>,
IEventHander<EntityInsertedEvent<IoTDevice>>,
IEventHander<EntityUpdatedEvent<IoTDevice>>,
IEventHander<EntityDeletedEvent<IoTDevice>>,
IEventHander<EntityInsertedEvent<IoTData>>,
IEventHander<EntityUpdatedEvent<IoTData>>,
IEventHander<EntityDeletedEvent<IoTData>>,
IEventHander<EntityInsertedEvent<IoTCommand>>,
IEventHander<EntityUpdatedEvent<IoTCommand>>,
IEventHander<EntityDeletedEvent<IoTCommand>>
{
private readonly IConfiguration _cfg;
private readonly ISettingService _settingService;
private readonly ILogger<IoTCenterEventHandler> _logger;
private readonly IRepository<IoTGateway> _nodeRepo;
private readonly IRepository<Organ> _organRepo;
private readonly IRepository<BuildingIoTGateway> _buildingIoTGatewayRepo;
private readonly IRepository<IoTDevice> _deviceRepo;
private readonly IRepository<IoTTimer> _organSceneTimerRepo;
private readonly IRepository<IoTTigger> _sceneTiggerRepo;
private readonly ISceneTiggerService _sceneTiggerService;
private readonly IHubContext<IoTCenterHub> _hub;
private readonly IHttpClientFactory _httpClientFactory;
public IoTCenterEventHandler(IConfiguration cfg,
ISettingService settingService,
ILogger<IoTCenterEventHandler> logger,
IRepository<IoTGateway> nodeRepo,
IRepository<Organ> organRepo,
IRepository<BuildingIoTGateway> buildingIoTGatewayRepo,
IRepository<IoTDevice> deviceRepo,
IRepository<IoTTimer> organSceneTimerRepo,
IRepository<IoTTigger> sceneTiggerRepo,
ISceneTiggerService sceneTiggerService,
IHubContext<IoTCenterHub> hub,
IHttpClientFactory httpClientFactory)
{
this._cfg = cfg;
this._settingService = settingService;
this._logger = logger;
this._nodeRepo = nodeRepo;
this._organRepo = organRepo;
this._buildingIoTGatewayRepo = buildingIoTGatewayRepo;
this._deviceRepo = deviceRepo;
this._organSceneTimerRepo = organSceneTimerRepo;
this._sceneTiggerRepo = sceneTiggerRepo;
this._sceneTiggerService = sceneTiggerService;
this._hub = hub;
this._httpClientFactory = httpClientFactory;
}
#region timer
public void Handle(EntityInsertedEvent<IoTTimer> message)
{
var timer = message.Data;
RecurringJob.AddOrUpdate(timer.Id.ToString(), () => Handle(timer.Id), timer.Cron, TimeZoneInfo.Local);
}
public void Handle(EntityUpdatedEvent<IoTTimer> message)
{
var timer = message.Data;
RecurringJob.AddOrUpdate(timer.Id.ToString(), () => Handle(timer.Id), timer.Cron, TimeZoneInfo.Local);
}
public void Handle(EntityDeletedEvent<IoTTimer> message)
{
var timer = message.Data;
RecurringJob.RemoveIfExists(timer.Id.ToString());
}
public void UpdateTimer()
{
using var conn = JobStorage.Current.GetConnection();
var jobs = conn.GetRecurringJobs();
foreach (var job in jobs)
{
RecurringJob.RemoveIfExists(job.Id);
}
var list = this._organSceneTimerRepo.ReadOnlyTable().ToList();
foreach (var timer in list)
{
try
{
RecurringJob.AddOrUpdate(timer.Id.ToString(), () => Handle(timer.Id), timer.Cron, TimeZoneInfo.Local);
}
catch (Exception ex)
{
ex.PrintStack();
this._logger.LogError(ex.ToString());
}
}
}
#endregion timer
#region Product
public void Handle(EntityInsertedEvent<IoTProduct> message)
{
this.Notify(message);
}
public void Handle(EntityUpdatedEvent<IoTProduct> message)
{
this.Notify(message);
}
public void Handle(EntityDeletedEvent<IoTProduct> message)
{
this.Notify(message);
}
#endregion Product
#region Node
public void Handle(EntityInsertedEvent<IoTGateway> message)
{
this.Notify(message);
//this.UpdateOrganNode(message.Data);
}
public void Handle(EntityUpdatedEvent<IoTGateway> message)
{
this.Notify(message);
//this.UpdateOrganNode(message.Data);
}
public void Handle(EntityDeletedEvent<IoTGateway> message)
{
this.Notify(message);
}
#endregion Node
#region organ
public void Handle(EntityInsertedEvent<Organ> message)
{
this.Notify(message);
//this.UpdateOrganNode(message.Data);
}
public void Handle(EntityUpdatedEvent<Organ> message)
{
this.Notify(message);
//this.UpdateOrganNode(message.Data);
}
public void Handle(EntityDeletedEvent<Organ> message)
{
this.Notify(message);
}
#endregion organ
#region Device
public void Handle(EntityInsertedEvent<IoTDevice> message)
{
this.Notify(message);
}
public void Handle(EntityUpdatedEvent<IoTDevice> message)
{
this.Notify(message);
}
public void Handle(EntityDeletedEvent<IoTDevice> message)
{
this.Notify(message);
}
#endregion Device
#region Data
public void Handle(EntityInsertedEvent<IoTData> message)
{
message.Data.Device = _deviceRepo.ReadOnlyTable().Include(o => o.IoTGateway).Where(o => o.Id == message.Data.DeviceId).FirstOrDefault();
this.Notify(message);
this.TiggerHandle(message);
this.LogData(message.Data);
}
public void Handle(EntityUpdatedEvent<IoTData> message)
{
message.Data.Device = _deviceRepo.ReadOnlyTable().Include(o => o.IoTGateway).Where(o => o.Id == message.Data.DeviceId).FirstOrDefault();
this.Notify(message);
this.TiggerHandle(message);
this.LogData(message.Data);
}
public void Handle(EntityDeletedEvent<IoTData> message)
{
this.Notify(message);
}
#endregion Data
#region Command
public void Handle(EntityInsertedEvent<IoTCommand> message)
{
this.Notify(message);
}
public void Handle(EntityUpdatedEvent<IoTCommand> message)
{
this.Notify(message);
}
public void Handle(EntityDeletedEvent<IoTCommand> message)
{
this.Notify(message);
}
#endregion Command
private void TiggerHandle(BaseEvent<IoTData> message)
{
try
{
foreach (var tigger in this._sceneTiggerService.GetSceneTiggers())
{
var data = message.Data;
if (tigger.DataId == data.Id)
{
try
{
var condition = tigger.Condition;
var value = data.Value;
var engine = new Engine().Execute($"function valid(value){{return {condition};}}");
var result = engine.Invoke("valid", value);
if (result == JsValue.True)
{
this.TiggerHandle(tigger.Id);
}
}
catch (Exception ex)
{
this._logger.LogError(ex.ToString());
}
}
}
}
catch (Exception ex)
{
this._logger.LogError(ex.ToString());
}
}
private void Notify<T>(BaseEvent<T> message)
{
Task.Run(() =>
{
try
{
_hub.ServerToClient($"{typeof(T).Name}{message.Arg}", message.Data, "page", null);
}
catch (Exception ex)
{
ex.PrintStack();
}
});
}
private void TiggerHandle(Guid tiggerId)
{
this._logger.LogDebug($"global tigger exec:{tiggerId}");
var tigger = this._sceneTiggerRepo.ReadOnlyTable()
.Include(o => o.IoTScene).ThenInclude(o => o.IoTSceneIoTCommands).ThenInclude(o => o.IoTCommand).ThenInclude(o => o.IoTDevice).ThenInclude(o => o.IoTGateway)
.FirstOrDefault(o => o.Id == tiggerId);
foreach (var sceneCommand in tigger.IoTScene.IoTSceneIoTCommands.OrderBy(o => o.IoTCommand.DisplayOrder))
{
try
{
this._hub.ServerToClient(Methods.ExecCommand, sceneCommand.IoTCommandId, sceneCommand.IoTCommand.IoTDevice.IoTGateway.Number, null);
Delay(_settingService, sceneCommand.IoTCommand.Delay);
}
catch (Exception ex)
{
ex.PrintStack();
}
}
}
private void LogData(IoTData data)
{
var url = _cfg["influxdb:url"];
var usr = _cfg["influxdb:usr"];
var pwd = _cfg["influxdb:pwd"];
var device = _deviceRepo.ReadOnlyTable()
.Include(o => o.IoTGateway)
//.ThenInclude(o => o.BuildingIoTGateways)
//.ThenInclude(o => o.Building.Organ)
.FirstOrDefault(o => o.Id == data.DeviceId);
//var organ = device.Node.BuildingIoTGateways.FirstOrDefault()?.Building.Organ;
Task.Run(async () =>
{
try
{
if (string.IsNullOrEmpty(data.Value))
{
return;
}
if (data.Type != IoTDataType.Int && data.Type != IoTDataType.Float && data.Type != IoTDataType.String)
{
return;
}
try
{
var dbName = "iot";
var measurementName = "data";
using var client = new InfluxClient(new Uri(url), usr, pwd);
await client.CreateDatabaseAsync(dbName);
var row = new DynamicInfluxRow
{
Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(data.Timestamp).DateTime
};
//if (organ != null)
//{
// row.Fields.Add("OrganName", organ.Name);
// row.Fields.Add("OrganNumber", organ.Number);
//}
row.Fields.Add("NodeName", device.IoTGateway.Name);
row.Fields.Add("NodeNumber", device.IoTGateway.Number);
row.Fields.Add("DeviceNumber", device.Number);
row.Fields.Add("DeviceName", device.Name);
row.Fields.Add(data.Key, this.GetDataValue(data));
await client.WriteAsync(dbName, measurementName, new List<DynamicInfluxRow> { row });
}
catch (Exception ex)
{
ex.PrintStack();
}
}
catch (Exception ex)
{
ex.PrintStack();
}
});
}
private object GetDataValue(IoTData model)
{
return model.Type switch
{
IoTDataType.Int => Convert.ToInt32(model.Value),
IoTDataType.Float => Convert.ToSingle(model.Value),
_ => model.Value,
};
}
public void Handle(Guid id)
{
try
{
_logger.LogInformation($"global timer exec at {DateTime.Now.ToString("G")}:{id}");
var timer = _organSceneTimerRepo.ReadOnlyTable()
.Include(o => o.IoTScene).ThenInclude(o => o.IoTSceneIoTCommands).ThenInclude(o => o.IoTCommand).ThenInclude(o => o.IoTDevice).ThenInclude(o => o.IoTGateway)
.FirstOrDefault(o => o.Id == id);
if (timer != null)
{
foreach (var sceneCommand in timer.IoTScene.IoTSceneIoTCommands.OrderBy(o => o.IoTCommand.DisplayOrder))
{
try
{
_hub.ServerToClient(Methods.ExecCommand, sceneCommand.IoTCommandId, sceneCommand.IoTCommand.IoTDevice.IoTGateway.Number, null);
Delay(_settingService, sceneCommand.IoTCommand.Delay);
}
catch (Exception ex)
{
_logger.LogError(ex.ToString());
}
}
}
else
{
_logger.LogError($"timer {id} does not exist");
}
}
catch (Exception ex)
{
_logger.LogError(ex.ToString());
}
}
public static void Delay(ISettingService _settingService, int delay)
{
if (int.TryParse(_settingService.GetValue("delay"), out int commandDelay))
{
if (commandDelay > 0)
{
delay += commandDelay;
}
}
if (delay > 0)
{
Thread.Sleep(delay);
}
}
}
}