You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iot/projects/IoTCenter/Services/IoTCenterEventHandler.cs

416 lines
14 KiB

using Application.Domain.Entities;
using Application.Models;
using CSScriptLib;
using Infrastructure.Data;
using Infrastructure.Events;
using Infrastructure.Extensions;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Vibrant.InfluxDB.Client;
using Vibrant.InfluxDB.Client.Rows;
namespace IoTCenter.Services
{
public class IoTCenterEventHandler :
IEventHander<EntityInsertedEvent<SceneTimer>>,
IEventHander<EntityUpdatedEvent<SceneTimer>>,
IEventHander<EntityDeletedEvent<SceneTimer>>,
IEventHander<EntityInsertedEvent<SceneTigger>>,
IEventHander<EntityUpdatedEvent<SceneTigger>>,
IEventHander<EntityDeletedEvent<SceneTigger>>,
IEventHander<EntityInsertedEvent<Node>>,
IEventHander<EntityUpdatedEvent<Node>>,
IEventHander<EntityDeletedEvent<Node>>,
IEventHander<EntityInsertedEvent<Product>>,
IEventHander<EntityUpdatedEvent<Product>>,
IEventHander<EntityDeletedEvent<Product>>,
IEventHander<EntityInsertedEvent<Device>>,
IEventHander<EntityUpdatedEvent<Device>>,
IEventHander<EntityDeletedEvent<Device>>,
IEventHander<EntityInsertedEvent<Data>>,
IEventHander<EntityUpdatedEvent<Data>>,
IEventHander<EntityDeletedEvent<Data>>,
IEventHander<EntityInsertedEvent<Command>>,
IEventHander<EntityUpdatedEvent<Command>>,
IEventHander<EntityDeletedEvent<Command>>,
IEventHander<EntityInsertedEvent<Scene>>,
IEventHander<EntityUpdatedEvent<Scene>>,
IEventHander<EntityDeletedEvent<Scene>>
{
public static ConcurrentDictionary<Guid, SceneTigger> Tiggers = new ConcurrentDictionary<Guid, SceneTigger>();
private readonly IConfiguration _cfg;
private readonly ILogger<IoTCenterEventHandler> _logger;
private readonly IRepository<Scene> _sceneRepo;
private readonly IRepository<Device> _deviceRepo;
private readonly IRepository<SceneTigger> _sceneTiggerRepo;
private readonly IHubContext<IoTCenterHub> _hub;
private readonly IHttpClientFactory _httpClientFactory;
public IoTCenterEventHandler(IConfiguration cfg,
ILogger<IoTCenterEventHandler> logger,
IRepository<Scene> sceneRepo,
IRepository<Device> deviceRepo,
IRepository<SceneTigger> sceneTiggerRepo,
IHubContext<IoTCenterHub> hub,
IHttpClientFactory httpClientFactory)
{
this._cfg = cfg;
this._logger = logger;
this._sceneRepo = sceneRepo;
this._deviceRepo = deviceRepo;
this._sceneTiggerRepo = sceneTiggerRepo;
this._hub = hub;
this._httpClientFactory = httpClientFactory;
}
#region timer
public void Handle(EntityInsertedEvent<SceneTimer> message)
{
var timer = message.Data;
if (_sceneRepo.ReadOnlyTable().Any(o => o.Id == timer.SceneId && o.NodeId == null))
{
var url = _cfg.GetConnectionString("JobServer") + "/RecurringJob/AddOrUpdate";
var id = timer.Id.ToString();
this.UpdateJobServer(url, new
{
id,
url = $"{_cfg.GetConnectionString("JobCallBack")}/{id}",
cron = timer.Cron
});
}
}
public void Handle(EntityUpdatedEvent<SceneTimer> message)
{
var timer = message.Data;
if (_sceneRepo.ReadOnlyTable().Any(o => o.Id == timer.SceneId && o.NodeId == null))
{
var url = _cfg.GetConnectionString("JobServer") + "/RecurringJob/AddOrUpdate";
var id = timer.Id.ToString();
this.UpdateJobServer(url, new
{
id,
url = $"{_cfg.GetConnectionString("JobCallBack")}/{id}",
cron = timer.Cron
});
}
}
public void Handle(EntityDeletedEvent<SceneTimer> message)
{
var timer = message.Data;
if (_sceneRepo.ReadOnlyTable().Any(o => o.Id == timer.SceneId && o.NodeId == null))
{
var url = _cfg.GetConnectionString("JobServer") + $"/RecurringJob/Remove/{timer.Id}";
this.UpdateJobServer(url);
}
}
#endregion timer
#region tigger
public void Handle(EntityInsertedEvent<SceneTigger> message)
{
var tigger = message.Data;
if (_sceneRepo.ReadOnlyTable().Any(o => o.Id == tigger.SceneId && o.NodeId == null))
{
Tiggers.TryRemove(message.Data.Id, out _);
Tiggers.TryAdd(tigger.Id, tigger);
}
}
public void Handle(EntityUpdatedEvent<SceneTigger> message)
{
var tigger = message.Data;
if (_sceneRepo.ReadOnlyTable().Any(o => o.Id == tigger.SceneId && o.NodeId == null))
{
Tiggers.TryRemove(message.Data.Id, out _);
Tiggers.TryAdd(tigger.Id, tigger);
}
}
public void Handle(EntityDeletedEvent<SceneTigger> message)
{
var tigger = message.Data;
if (_sceneRepo.ReadOnlyTable().Any(o => o.Id == tigger.SceneId && o.NodeId == null))
{
Tiggers.TryRemove(tigger.Id, out _);
}
}
#endregion tigger
#region Product
public void Handle(EntityInsertedEvent<Product> message)
{
this.Notify(message);
}
public void Handle(EntityUpdatedEvent<Product> message)
{
this.Notify(message);
}
public void Handle(EntityDeletedEvent<Product> message)
{
this.Notify(message);
}
#endregion Product
#region Node
public void Handle(EntityInsertedEvent<Node> message)
{
this.Notify(message);
}
public void Handle(EntityUpdatedEvent<Node> message)
{
this.Notify(message);
}
public void Handle(EntityDeletedEvent<Node> message)
{
this.Notify(message);
}
#endregion Node
#region Device
public void Handle(EntityInsertedEvent<Device> message)
{
this.Notify(message);
}
public void Handle(EntityUpdatedEvent<Device> message)
{
this.Notify(message);
}
public void Handle(EntityDeletedEvent<Device> message)
{
this.Notify(message);
}
#endregion Device
#region Data
public void Handle(EntityInsertedEvent<Data> message)
{
this.Notify(message);
this.TiggerHandle(message);
this.LogData(message.Data);
}
public void Handle(EntityUpdatedEvent<Data> message)
{
message.Data.Device = _deviceRepo.ReadOnlyTable().Include(o => o.Node).Where(o => o.Id == message.Data.DeviceId).FirstOrDefault();
this.Notify(message);
this.TiggerHandle(message);
this.LogData(message.Data);
}
public void Handle(EntityDeletedEvent<Data> message)
{
this.Notify(message);
}
#endregion Data
#region Command
public void Handle(EntityInsertedEvent<Command> message)
{
this.Notify(message);
}
public void Handle(EntityUpdatedEvent<Command> message)
{
this.Notify(message);
}
public void Handle(EntityDeletedEvent<Command> message)
{
this.Notify(message);
}
#endregion Command
#region Scene
public void Handle(EntityInsertedEvent<Scene> message)
{
this.Notify(message);
}
public void Handle(EntityUpdatedEvent<Scene> message)
{
this.Notify(message);
}
public void Handle(EntityDeletedEvent<Scene> message)
{
this.Notify(message);
}
#endregion Scene
private void UpdateJobServer(string url, object data = null)
{
try
{
this._logger.LogDebug($"update job server");
var client = _httpClientFactory.CreateClient();
using var content = new StringContent(JsonConvert.SerializeObject(data), Encoding.UTF8, "application/json");
var result = client.PostAsync(url, content).Result;
this._logger.LogDebug($"{result.StatusCode}:{result.Content.ReadAsStringAsync().Result}");
}
catch (Exception ex)
{
ex.PrintStack();
this._logger.LogError(ex.ToString());
}
}
private void TiggerHandle(BaseEvent<Data> message)
{
try
{
foreach (var item in Tiggers)
{
var data = message.Data;
var tigger = item.Value;
if (tigger.DataId == data.Id)
{
var methodText = $"bool Valid(string name,string key,{data.Type.ToString().ToLower()} value,string description){{ return {tigger.Condition};}}";
try
{
dynamic method = CSScript.Evaluator.LoadMethod(methodText);
dynamic value = data.GetValue();
var result = method.Valid(data.Name, data.Key, value, data.Description);
if (result)
{
this.TiggerHandle(tigger.Id);
}
}
catch (Exception ex)
{
ex.PrintStack();
}
}
}
}
catch (Exception ex)
{
ex.PrintStack();
}
}
private void Notify<T>(BaseEvent<T> message)
{
Task.Run(() =>
{
try
{
_hub.ServerToClient($"{typeof(T).Name}{message.Arg}", message.Data, "page", null);
}
catch (Exception ex)
{
ex.PrintStack();
}
});
}
private void TiggerHandle(Guid tiggerId)
{
this._logger.LogDebug($"global tigger exec:{tiggerId}");
var tigger = this._sceneTiggerRepo.ReadOnlyTable()
.Include(o => o.Scene).ThenInclude(o => o.SceneCommands).ThenInclude(o => o.Command).ThenInclude(o => o.Device).ThenInclude(o => o.Node)
.FirstOrDefault(o => o.Id == tiggerId);
foreach (var sceneCommand in tigger.Scene.SceneCommands)
{
try
{
this._hub.ServerToClient(Methods.ExecCommand, sceneCommand.CommandId, sceneCommand.Command.Device.Node.Number, null);
}
catch (Exception ex)
{
ex.PrintStack();
}
}
}
private void LogData(Data data)
{
var url = _cfg["influxdb:url"];
var usr = _cfg["influxdb:usr"];
var pwd = _cfg["influxdb:pwd"];
var device = _deviceRepo.ReadOnlyTable().FirstOrDefault(o => o.Id == data.DeviceId);
Task.Run(async () =>
{
try
{
if (string.IsNullOrEmpty(data.Value))
{
return;
}
if (data.Type != DeviceDataType.Int && data.Type != DeviceDataType.Float)
{
return;
}
try
{
var dbName = "iot";
var measurementName = "data";
using var client = new InfluxClient(new Uri(url), usr, pwd);
await client.CreateDatabaseAsync(dbName);
var row = new DynamicInfluxRow
{
Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(data.Timestamp).DateTime
};
row.Fields.Add("DeviceNumber", device.Number);
row.Fields.Add("DeviceName", device.Name);
row.Fields.Add(data.Key, this.GetDataValue(data));
await client.WriteAsync(dbName, measurementName, new List<DynamicInfluxRow> { row });
}
catch (Exception ex)
{
ex.PrintStack();
}
}
catch (Exception ex)
{
ex.PrintStack();
}
});
}
private object GetDataValue(Data model)
{
return model.Type switch
{
DeviceDataType.Int => Convert.ToInt32(model.Value),
DeviceDataType.Float => Convert.ToSingle(model.Value),
_ => model.Value,
};
}
}
}