You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iot/projects/IoT.Shared/Services/IoTNode/IoTNodeClient.cs

523 lines
21 KiB

using Application.Domain.Entities;
using Application.Models;
using Infrastructure.Application.Services.Settings;
using Infrastructure.Data;
using Infrastructure.Domain;
using Infrastructure.Events;
using Infrastructure.Extensions;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace IoT.Shared.Services
{
public class IoTNodeClient : IHostedService, IDisposable
{
private bool disposed = false;
private string _notifyHost;
private HubConnection Connection;
private readonly IHostApplicationLifetime _lifetime;
private readonly IServiceProvider _applicationServices;
private readonly ILogger<IoTNodeClient> _logger;
public IoTNodeClient(IHostApplicationLifetime lifetime, IServiceProvider applicationServices, ILogger<IoTNodeClient> logger)
{
this._lifetime = lifetime;
this._applicationServices = applicationServices;
this._logger = logger;
}
private string GetSetting(string name)
{
using var scope = _applicationServices.CreateScope();
return scope.ServiceProvider.GetRequiredService<ISettingService>().GetSetting(name).Value;
}
public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
this.Connect();
await Task.Delay(10 * 1000).ConfigureAwait(true);
}
});
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public void Connect()
{
var enable = GetSetting("notify:enabled");
if (enable == "true")
{
this._logger.LogDebug("notify is enabled");
try
{
var host = this.GetSetting("notify:host");
if (Connection == null)
{
this._logger.LogDebug("connection is null");
InitConnection();
}
if (Connection.State == HubConnectionState.Disconnected)
{
this._logger.LogDebug("start connect");
if (this._notifyHost != host)
{
InitConnection();
}
Connection.StartAsync().Wait();
this.OnConnected();
}
else
{
if (this._notifyHost != host)
{
this.ReConnect(null);
}
else
{
this._logger.LogDebug($"connection has connected");
}
}
}
catch (Exception ex)
{
ex.PrintStack();
}
}
else
{
this._logger.LogDebug("notify is disabled");
this.Close();
}
}
public void Close()
{
if (this.Connection != null)
{
if (this.Connection.State == HubConnectionState.Connected)
{
this.Connection.StopAsync();
}
this.Connection.DisposeAsync();
this.Connection = null;
}
}
private Task ReConnect(Exception arg)
{
this.Close();
this.Connect();
return Task.CompletedTask;
}
private void InitConnection()
{
this._notifyHost = GetSetting("notify:host");
var url = $"{this._notifyHost}/hub?type=node&group={GetSetting("sn")}";
this._logger.LogDebug($"init connection for {url}");
if (this.Connection != null)
{
this.Connection.DisposeAsync();
}
this.Connection = new HubConnectionBuilder().WithUrl(url).Build();
this.Connection.Closed += ReConnect;
this.Connection.On(Methods.ServerToClient, (string method, string message, string to, string from) => this.OnServerToClient(method, message, to, from));
}
public void ServerToClient(string method, string message, string to, string from)
{
this.OnServerToClient(method, message, to, from);
}
public void ClientToServer(string method, object data, string to, string from = null)
{
Task.Run(() =>
{
try
{
if (this.Connection != null && this.Connection.State == HubConnectionState.Connected)
{
this.Connection.SendAsync(Methods.ClientToServer, method, data.ToJson(), to, from ?? this.GetSetting("sn"));
}
else
{
this._logger.LogWarning($"{_notifyHost} not connected");
}
}
catch (Exception ex)
{
ex.PrintStack();
}
});
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposed)
return;
if (disposing)
{
this.Close();
}
disposed = true;
}
public void OnConnected()
{
this._logger.LogDebug($"{_notifyHost} OnConnected");
//上传节点
this.UpdateEntityIdList<Node>(null, Methods.UpdateNodeResponse);
//上传产品
this.UpdateEntityIdList<Product>(null, Methods.UpdateProductResponse);
//上传接口
this.UpdateEntityIdList<Api>(null, Methods.UpdateApiResponse);
//上传参数
this.UpdateEntityIdList<Parameter>(null, Methods.UpdateParameterResponse);
//上传设备Id列表、设备
this.UpdateEntityIdList<Device>(Methods.UpdateDeviceIdListResponse, Methods.UpdateDeviceResponse);
//上传数据
this.UpdateEntityIdList<Data>(null, Methods.UpdateDataResponse, null, o => !o.Hidden);
//上传命令Id列表、命令
this.UpdateEntityIdList<Command>(Methods.UpdateCommandIdListResponse, Methods.UpdateCommandResponse);
//上传场景Id列表、场景
this.UpdateEntityIdList<Scene>(Methods.UpdateSceneIdListResponse, Methods.UpdateSceneResponse);
//上传定时器Id列表、定时器
this.UpdateEntityIdList<SceneTimer>(Methods.UpdateIoTTimerIdListResponse, Methods.UpdateIoTTimerResponse);
//上传触发器Id列表、触发器
this.UpdateEntityIdList<SceneTigger>(Methods.UpdateIoTTiggerIdListResponse, Methods.UpdateIoTTiggerResponse);
//上传场景命令Id列表、场景命令
this.UpdateEntityIdList<SceneCommand>(Methods.UpdateSceneCommandIdListResponse, Methods.UpdateSceneCommandResponse);
}
public void OnServerToClient(string method, string message, string to, string from)
{
try
{
using var scope = this._applicationServices.CreateScope();
var dataService = scope.ServiceProvider.GetService<DataService>();
var eventPublisher = scope.ServiceProvider.GetService<IEventPublisher>();
if (method == Methods.StopNode)
{
this._lifetime.StopApplication();
}
else if (method == Methods.UploadNode)
{
this.OnConnected();
}
else if (method == $"Edit{nameof(Node)}")//服务端编辑节点
{
var model = message.FromJson<EditNodeModel>();
dataService.Edit<Node, EditNodeModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Edit{nameof(Device)}")//服务端编辑设备
{
var model = message.FromJson<EditDeviceModel>();
//if node category not had,get category from serever by http sync
dataService.Edit<Device, EditDeviceModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Delete{nameof(Device)}")//服务端删除设备
{
var model = message.FromJson<EditDeviceModel>();
dataService.Delete<Device, EditDeviceModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Edit{nameof(Data)}")//服务端编辑数据
{
var model = message.FromJson<EditDataModel>();
dataService.Edit<Data, EditDataModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Delete{nameof(Data)}")//服务端删除数据
{
var model = message.FromJson<EditDataModel>();
dataService.Delete<Data, EditDataModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Edit{nameof(Command)}")//服务端编辑命令
{
var model = message.FromJson<EditCommandModel>();
dataService.Edit<Command, EditCommandModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Delete{nameof(Command)}")//服务端删除命令
{
var model = message.FromJson<EditCommandModel>();
dataService.Delete<Command, EditCommandModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Edit{nameof(Scene)}")//服务端编辑场景
{
var model = message.FromJson<EditSceneModel>();
dataService.Edit<Scene, EditSceneModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Delete{nameof(Scene)}")//服务端删除场景
{
var model = message.FromJson<EditSceneModel>();
dataService.Delete<Scene, EditSceneModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Edit{nameof(SceneTimer)}")
{
var model = message.FromJson<EditSceneTimerModel>();
dataService.Edit<SceneTimer, EditSceneTimerModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Delete{nameof(SceneTimer)}")
{
var model = message.FromJson<EditSceneTimerModel>();
dataService.Delete<SceneTimer, EditSceneTimerModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Edit{nameof(SceneTigger)}")
{
var model = message.FromJson<EditSceneTiggerModel>();
dataService.Edit<SceneTigger, EditSceneTiggerModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Delete{nameof(SceneTigger)}")
{
var model = message.FromJson<EditSceneTiggerModel>();
dataService.Delete<SceneTigger, EditSceneTiggerModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Edit{nameof(SceneCommand)}")
{
var model = message.FromJson<EditSceneCommandModel>();
dataService.Edit<SceneCommand, EditSceneCommandModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == $"Delete{nameof(SceneCommand)}")
{
var model = message.FromJson<EditSceneCommandModel>();
dataService.Delete<SceneCommand, EditSceneCommandModel>(model);
this.ClientToServer(method, model, null);
}
else if (method == Methods.ExecApiRequest)
{
var cfg = scope.ServiceProvider.GetService<IConfiguration>();
var port = cfg["server.urls"].Split(':')[2];
var url = $"http://localhost:{port}{message.FromJson<string>()}";
var httpClient = scope.ServiceProvider.GetService<IHttpClientFactory>().CreateClient();
var result = httpClient.GetStringAsync(url).Result;
this.ClientToServer(Methods.ExecApiResponse, result, from, to);
}
else if (method == Methods.ExecSceneRequest)
{
var sceneId = message.FromJson<Guid>();
var sceneCommandRepo = scope.ServiceProvider.GetService<IRepository<SceneCommand>>();
var commands = sceneCommandRepo.ReadOnlyTable()
.Where(o => o.SceneId == sceneId)
.Include(o => o.Command).ThenInclude(o => o.Api)
.Include(o => o.Command).ThenInclude(o => o.Device)
.Select(o => o.Command)
.ToList();
this.ExecCommands(commands);
this.ClientToServer(Methods.ExecSceneRsponse, null, from);
}
///////////////////////////////////////////////////////////
else if (method == Methods.ExecCommand)
{
var id = message.FromJson<Guid>();
this.ExecCommand(id);
}
else if (method == Methods.UpdateCamera)
{
var model = message.FromJson<EditDataModel>();
dataService.Edit<Data, EditDataModel>(model);
this.ClientToServer("EditData", model, null);
}
}
catch (Exception ex)
{
ex.PrintStack();
}
}
public void ExecCommand(Guid id)
{
using var scope = this._applicationServices.CreateScope();
var commandRepo = scope.ServiceProvider.GetService<IRepository<Command>>();
var command = commandRepo.ReadOnlyTable().Include(o => o.Device).Include(o => o.Api).FirstOrDefault(o => o.Id == id);
if (command != null)
{
try
{
var cfg = scope.ServiceProvider.GetService<IConfiguration>();
var port = cfg["server.urls"].Split(':')[2];
var url = this.GetCommandUrl(command);
var httpClient = scope.ServiceProvider.GetService<IHttpClientFactory>().CreateClient();
var result = httpClient.GetStringAsync(url).Result;
}
catch (Exception ex)
{
ex.PrintStack();
}
}
}
public void ExecCommands(List<Command> commands)
{
using var scope = this._applicationServices.CreateScope();
foreach (var command in commands.OrderBy(o => o.DisplayOrder))
{
try
{
var cfg = scope.ServiceProvider.GetService<IConfiguration>();
var port = cfg["server.urls"].Split(':')[2];
var url = this.GetCommandUrl(command);
var httpClient = scope.ServiceProvider.GetService<IHttpClientFactory>().CreateClient();
var result = httpClient.GetStringAsync(url).Result;
Delay(command.Delay);
}
catch (Exception ex)
{
ex.PrintStack();
}
}
}
private string GetCommandUrl(Command command)
{
using var scope = this._applicationServices.CreateScope();
var cfg = scope.ServiceProvider.GetService<IConfiguration>();
var port = cfg["server.urls"].Split(':')[2];
var url = $"http://localhost:{port}{command.Api.Path}{command.Api.Command}?number={command.Device.Number}{(string.IsNullOrEmpty(command.QueryString) ? "" : "&")}{command.QueryString}";
return url;
}
private void UpdateEntityIdList<T>(string updateIdListMethod, string updateEntityMethod, Func<IQueryable<T>, IQueryable<T>> include = null, Func<T, bool> predicate = null) where T : BaseEntity
{
using var scope = this._applicationServices.CreateScope();
var repo = scope.ServiceProvider.GetService<IRepository<T>>();
var query = repo.ReadOnlyTable();
if (include != null)
{
query = include(query);
}
if (predicate != null)
{
query = query.Where(predicate).AsQueryable();
}
var entities = query.ToList();
if (!string.IsNullOrEmpty(updateIdListMethod))
{
this.ClientToServer(updateIdListMethod, entities.Select(o => o.Id).ToList(), null);
}
if (!string.IsNullOrEmpty(updateEntityMethod))
{
foreach (var entity in entities)
{
this.ClientToServer(updateEntityMethod, entity, null);
}
}
}
public string GetApiJson(string prefix)
{
try
{
using var scope = _applicationServices.CreateScope();
var serviceProvider = scope.ServiceProvider;
var cfg = serviceProvider.GetService<IConfiguration>();
var port = cfg["server.urls"].Split(':')[2];
var url = $"http://localhost:{port}/swagger/v1/swagger.json";
var hc = serviceProvider.GetService<IHttpClientFactory>().CreateClient();
var result = hc.GetStringAsync(url).Result;
var json = JsonConvert.DeserializeObject(result) as JObject;
var paths = json.Properties().FirstOrDefault(o => o.Name == "paths").Value as JObject;
var names = paths.Properties().Select(o => o.Name).ToList();
foreach (var item in names)
{
if (!item.StartsWith(prefix))
{
paths.Remove(item);
}
}
var tags = json.Properties().FirstOrDefault(o => o.Name == "tags").Value as JArray;
var names2 = tags.Select(o => (o as JObject).Properties().FirstOrDefault(o => o.Name == "name").Value.ToString()).ToList();
foreach (var item in names2)
{
if (item != prefix.Trim('/'))
{
tags.Remove(tags.FirstOrDefault(o => (o as JObject).Properties().FirstOrDefault(o => o.Name == "name").Value.ToString() != prefix.Trim('/')));
}
}
var realResult = JsonConvert.SerializeObject(json);
return realResult;
}
catch (Exception ex)
{
ex.PrintStack();
return null;
}
}
private void Delay(int commandDelay)
{
using var scope = _applicationServices.CreateScope();
var serviceProvider = scope.ServiceProvider;
var settingService = serviceProvider.GetService<ISettingService>();
var delay = 0;
try
{
delay = Convert.ToInt32(settingService.GetSetting("delay").Value);
}
catch (Exception ex)
{
this._logger.LogError(ex.ToString());
}
if (commandDelay > 0)
{
delay += commandDelay;
}
if (delay > 0)
{
Thread.Sleep(delay);
}
}
}
}