using Application.Domain.Entities; using Application.Models; using Hangfire; using Infrastructure.Data; using Infrastructure.Domain; using Infrastructure.Events; using Infrastructure.Extensions; using IoT.Shared.Services; using IoTNode.Services; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Threading; using System.Threading.Tasks; namespace IoT.Shared.Services { public class IoTNodeClient : IHostedService, IDisposable { private string _notifyHost; private HubConnection Connection; private readonly IServiceProvider applicationServices; private readonly IConfiguration _cfg; public string ConnectionId { get; private set; } public IoTNodeClient(IServiceProvider applicationServices, IConfiguration configuration) { this.applicationServices = applicationServices; this._cfg = configuration; this.ConnectionId = Guid.NewGuid().ToBase62(); } public Task StartAsync(CancellationToken cancellationToken) { Task.Run(async () => { using var scope = this.applicationServices.CreateScope(); var timerRepo = scope.ServiceProvider.GetService>(); var timers = timerRepo.ReadOnlyTable().ToList(); foreach (var timer in timers) { RecurringJob.AddOrUpdate(timer.Id.ToString(), o => o.TimerHanle(timer.Id), timer.Cron); } }); Task.Run(async () => { while (!cancellationToken.IsCancellationRequested) { this.Connect(); await Task.Delay(10 * 1000).ConfigureAwait(true); } }); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] public void Connect() { if (this._cfg.GetValue("notify:enabled", false)) { Console.WriteLine("notify is enabled"); try { if (Connection == null) { Console.WriteLine("connection is null"); InitConnection(); } if (Connection.State == HubConnectionState.Disconnected) { Console.WriteLine("start connect"); if (this._notifyHost != this._cfg["notify:host"]) { InitConnection(); } Connection.StartAsync().Wait(); this.OnConnected(); } else { if (this._notifyHost != this._cfg["notify:host"]) { this.ReConnect(null); } else { Console.WriteLine($"connection has connected"); } } } catch (Exception ex) { ex.PrintStack(); } } else { Console.WriteLine("notify is disabled"); this.Close(); } } public void Close() { if (this.Connection != null) { if (this.Connection.State == HubConnectionState.Connected) { this.Connection.StopAsync(); } this.Connection.DisposeAsync(); this.Connection = null; } } private Task ReConnect(Exception arg) { this.Close(); this.Connect(); return Task.CompletedTask; } private void InitConnection() { this._notifyHost = this._cfg["notify:host"]; var url = $"http://{this._notifyHost}/hub?group={this._cfg["sn"]}"; Console.WriteLine($"init connection for {url}"); if (this.Connection != null) { this.Connection.DisposeAsync(); } this.Connection = new HubConnectionBuilder().WithUrl(url).Build(); this.Connection.Closed += ReConnect; this.Connection.On(Methods.ServerToClient, (string method, string message, string fromConnectionId) => this.OnServerToClient(method, message, fromConnectionId)); } public void ServerToClient(string method, string message, string fromConnectionId) { this.OnServerToClient(method, method, fromConnectionId); } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] public void ClientToServer(string method, string message, string fromConnectionId = null) { Task.Run(() => { try { if (this.Connection != null && this.Connection.State == HubConnectionState.Connected) { this.Connection.SendAsync(Methods.ClientToServer, method, message, fromConnectionId ?? this._cfg["sn"]); } else { Console.WriteLine($"{_notifyHost} not connected"); } } catch (Exception ex) { ex.PrintStack(); } }); } public void Dispose() { this.Close(); } ///////////////////////////// public void OnConnected() { Console.WriteLine($"{_notifyHost} OnConnected"); //上传节点 this.UpdateEntityIdList(null, Methods.UpdateNodeResponse); //上传产品 this.UpdateEntityIdList(null, Methods.UpdateProductResponse); //上传接口 this.UpdateEntityIdList(null, Methods.UpdateApiResponse); //上传参数 this.UpdateEntityIdList(null, Methods.UpdateParameterResponse); //上传设备Id列表、设备 this.UpdateEntityIdList(Methods.UpdateDeviceIdListResponse, Methods.UpdateDeviceResponse); //上传数据 this.UpdateEntityIdList(null, Methods.UpdateDataResponse, null, o => !o.Hidden); //上传命令Id列表、命令 this.UpdateEntityIdList(Methods.UpdateCommandIdListResponse, Methods.UpdateCommandResponse); //上传场景Id列表、场景 this.UpdateEntityIdList(Methods.UpdateSceneIdListResponse, Methods.UpdateSceneResponse); //上传定时器Id列表、定时器 this.UpdateEntityIdList(Methods.UpdateIoTTimerIdListResponse, Methods.UpdateIoTTimerResponse); //上传触发器Id列表、触发器 this.UpdateEntityIdList(Methods.UpdateIoTTiggerIdListResponse, Methods.UpdateIoTTiggerResponse); //上传场景命令Id列表、场景命令 this.UpdateEntityIdList(Methods.UpdateIoTTiggerIdListResponse, Methods.UpdateIoTTiggerResponse); //上传定时器命令Id列表、定时器命令 this.UpdateEntityIdList(Methods.UpdateIoTTiggerIdListResponse, Methods.UpdateIoTTiggerResponse); //上传触发器命令Id列表、触发器命令 this.UpdateEntityIdList(Methods.UpdateIoTTiggerIdListResponse, Methods.UpdateIoTTiggerResponse); } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] public void OnServerToClient(string method, string message, string fromConnectionId) { try { using var scope = this.applicationServices.CreateScope(); var dataService = scope.ServiceProvider.GetService(); var eventPublisher = scope.ServiceProvider.GetService(); if (method == Methods.EditNodeRequest)//服务端编辑节点 { var model = message.FromJson(); dataService.Edit(model); this.ClientToServer(Methods.EditNodeResponse, message); } else if (method == Methods.EditDeviceRequest)//服务端编辑设备 { var model = message.FromJson(); dataService.Edit(model); this.ClientToServer(Methods.EditDeviceResponse, message); } else if (method == Methods.DeleteDeviceRequest)//服务端删除设备 { var model = message.FromJson(); dataService.Delete(model); this.ClientToServer(Methods.DeleteDeviceResponse, message); } else if (method == Methods.EditDataRequest) { var model = message.FromJson(); dataService.Edit(model); this.ClientToServer(Methods.EditDataResponse, message); } else if (method == Methods.DeleteDataRequest) { var model = message.FromJson(); dataService.Delete(model); this.ClientToServer(Methods.DeleteDataResponse, message); } /////////////////////////////////////////////////////////// else if (method == Methods.CallApi) { var cfg = scope.ServiceProvider.GetService(); var port = cfg["server.urls"].Split(':')[2]; var url = $"http://localhost:{port}{message}"; var httpClient = scope.ServiceProvider.GetService().CreateClient(); var result = httpClient.GetStringAsync(url).Result; this.Connection.SendAsync(Methods.ClientToServer, Methods.ApiCallback, result, cfg["sn"]); } else if (method == Methods.CallScene) { var commandRepo = scope.ServiceProvider.GetService>(); var commands = commandRepo.ReadOnlyTable() .Include(o => o.Api) .Include(o => o.Device) //.Where(o => o.Scene.Name == message) .ToList(); foreach (var command in commands) { try { var cfg = scope.ServiceProvider.GetService(); var port = cfg["server.urls"].Split(':')[2]; var url = $"http://localhost:{port}{command.Api.Path}{command.Api.Command}?number={command.Device.Number}{(string.IsNullOrEmpty(command.QueryString) ? "" : "&")}{command.QueryString}"; var httpClient = scope.ServiceProvider.GetService().CreateClient(); var result = httpClient.GetStringAsync(url).Result; } catch (Exception ex) { ex.PrintStack(); } } } else if (method == Methods.EditSceneRequest) { var model = message.FromJson(); var nodeRepo = scope.ServiceProvider.GetService>(); var sceneRepo = scope.ServiceProvider.GetService>(); var scene = sceneRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (scene == null) { scene = new Scene { Id = model.Id }; sceneRepo.Add(scene); } scene.FromDto(model); scene.NodeId = nodeRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == model.NodeNumber).Id; sceneRepo.SaveChanges(); this.ClientToServer(Methods.EditSceneResponse, message); } else if (method == Methods.DeleteSceneRequest) { var model = message.FromJson(); var sceneRepo = scope.ServiceProvider.GetService>(); var scene = sceneRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (scene != null) { sceneRepo.Delete(scene); sceneRepo.SaveChanges(); } this.ClientToServer(Methods.DeleteSceneResponse, message); } else if (method == Methods.EditCommandRequest) { var model = message.FromJson(); var commandRepo = scope.ServiceProvider.GetService>(); var apiRepo = scope.ServiceProvider.GetService>(); var deviceRepo = scope.ServiceProvider.GetService>(); var device = deviceRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == model.DeviceNumber); var command = commandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (command == null) { command = new Command { Id = model.Id }; commandRepo.Add(command); } command.FromDto(model); command.DeviceId = device.Id; command.ApiId = apiRepo.ReadOnlyTable().Where(o => o.ProductId == device.ProductId && o.Name == model.ApiName).FirstOrDefault().Id; commandRepo.SaveChanges(); this.ClientToServer(Methods.EditCommandResponse, message); } else if (method == Methods.DeleteCommandRequest) { var model = message.FromJson(); var commandRepo = scope.ServiceProvider.GetService>(); var command = commandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (command != null) { commandRepo.Delete(command); commandRepo.SaveChanges(); } this.ClientToServer(Methods.DeleteCommandResponse, message); } else if (method == Methods.EditSceneCommandRequest) { var model = message.FromJson(); var sceneRepo = scope.ServiceProvider.GetService>(); var commandRepo = scope.ServiceProvider.GetService>(); var scene = sceneRepo.Table().FirstOrDefault(o => o.Id == model.Id); var command = commandRepo.Table().FirstOrDefault(o => o.Id == model.Id); var sceneCommandRepo = scope.ServiceProvider.GetService>(); var sceneCommand = sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand == null) { sceneCommand = new SceneCommand { Id = model.Id }; sceneCommandRepo.Add(sceneCommand); } sceneCommand.SceneId = model.SceneId.Value; sceneCommand.CommandId = model.CommandId.Value; sceneCommandRepo.SaveChanges(); this.ClientToServer(Methods.EditSceneCommandResponse, message); } else if (method == Methods.DeleteSceneCommandRequest) { var model = message.FromJson(); var sceneCommandRepo = scope.ServiceProvider.GetService>(); var sceneCommand = sceneCommandRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (sceneCommand != null) { sceneCommandRepo.Delete(sceneCommand); sceneCommandRepo.SaveChanges(); } this.ClientToServer(Methods.DeleteSceneCommandResponse, message); } else if (method == Methods.EditIoTTimerRequest) { var model = message.FromJson(); var nodeRepo = scope.ServiceProvider.GetService>(); var timerRepo = scope.ServiceProvider.GetService>(); var timer = timerRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (timer == null) { timer = new IoTTimer { Id = model.Id }; timerRepo.Add(timer); } timer.FromDto(model); timer.NodeId = nodeRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == model.NodeNumber).Id; timerRepo.SaveChanges(); var list = scope.ServiceProvider.GetServices>(); eventPublisher.Publish(new EntityUpdatedEvent(timer)); this.ClientToServer(Methods.EditIoTTimerResponse, message); } else if (method == Methods.DeleteIoTTimerRequest) { var model = message.FromJson(); var timerRepo = scope.ServiceProvider.GetService>(); var timer = timerRepo.Table().FirstOrDefault(o => o.Id == model.Id); if (timer != null) { timerRepo.Delete(timer); timerRepo.SaveChanges(); eventPublisher.Publish(new EntityDeletedEvent(timer)); } this.ClientToServer(Methods.DeleteIoTTimerResponse, message); } else if (method == Methods.EditIoTTiggerRequest) { var model = message.FromJson(); var nodeRepo = scope.ServiceProvider.GetService>(); var dataRepo = scope.ServiceProvider.GetService>(); var repo = scope.ServiceProvider.GetService>(); var entity = repo.Table().FirstOrDefault(o => o.Id == model.Id); if (entity == null) { entity = new IoTTigger { Id = model.Id }; repo.Add(entity); } entity.FromDto(model); entity.NodeId = nodeRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == model.NodeNumber).Id; entity.DataId = dataRepo.ReadOnlyTable().FirstOrDefault(o => o.Device.Number == model.DeviceNumber && o.Key == model.DataKey).Id; repo.SaveChanges(); eventPublisher.Publish(new EntityUpdatedEvent(entity)); this.ClientToServer(Methods.EditIoTTiggerResponse, message); } else if (method == Methods.DeleteIoTTiggerRequest) { var model = message.FromJson(); var repo = scope.ServiceProvider.GetService>(); var entity = repo.Table().FirstOrDefault(o => o.Id == model.Id); if (entity != null) { repo.Delete(entity); repo.SaveChanges(); scope.ServiceProvider.GetService().Publish(new EntityDeletedEvent(entity)); } this.ClientToServer(Methods.DeleteIoTTiggerResponse, message); } } catch (Exception ex) { ex.PrintStack(); } } private void UpdateEntityIdList(string updateIdListMethod, string updateEntityMethod, Func, IQueryable> include = null, Func predicate = null) where T : BaseEntity { using var scope = this.applicationServices.CreateScope(); var repo = scope.ServiceProvider.GetService>(); var query = repo.ReadOnlyTable(); if (include != null) { query = include(query); } if (predicate != null) { query = query.Where(predicate).AsQueryable(); } var entities = query.ToList(); if (!string.IsNullOrEmpty(updateIdListMethod)) { this.SendToServer(updateIdListMethod, entities.Select(o => o.Id).ToList()); } if (!string.IsNullOrEmpty(updateEntityMethod)) { foreach (var entity in entities) { this.SendToServer(updateEntityMethod, entity); } } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] public void SendToServer(string method, object data) { Task.Run(() => { try { Console.WriteLine($"send node to server {_notifyHost}"); this.ClientToServer(method, data.ToJson()); } catch (Exception ex) { ex.PrintStack(); } }); } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:不捕获常规异常类型", Justification = "<挂起>")] public string GetApiJson(string prefix) { try { using var scope = applicationServices.CreateScope(); var serviceProvider = scope.ServiceProvider; var cfg = serviceProvider.GetService(); var port = cfg["server.urls"].Split(':')[2]; var url = $"http://localhost:{port}/swagger/v1/swagger.json"; var hc = serviceProvider.GetService().CreateClient(); var result = hc.GetStringAsync(url).Result; var json = JsonConvert.DeserializeObject(result) as JObject; var paths = json.Properties().FirstOrDefault(o => o.Name == "paths").Value as JObject; var names = paths.Properties().Select(o => o.Name).ToList(); foreach (var item in names) { if (!item.StartsWith(prefix)) { paths.Remove(item); } } var tags = json.Properties().FirstOrDefault(o => o.Name == "tags").Value as JArray; var names2 = tags.Select(o => (o as JObject).Properties().FirstOrDefault(o => o.Name == "name").Value.ToString()).ToList(); foreach (var item in names2) { if (item != prefix.Trim('/')) { tags.Remove(tags.FirstOrDefault(o => (o as JObject).Properties().FirstOrDefault(o => o.Name == "name").Value.ToString() != prefix.Trim('/'))); } } var realResult = JsonConvert.SerializeObject(json); return realResult; } catch (Exception ex) { ex.PrintStack(); return null; } } } }