You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iot/projects/Platform/Services/DataSync/DataSyncService.cs

305 lines
12 KiB

using Infrastructure.Application.Services.Settings;
using Infrastructure.Data;
using Infrastructure.Extensions;
using Application.Domain.Entities;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using TangShanKaiPing;
using static TangShanKaiPing.WS_DataEX_ShareClient;
namespace Platform.Services
{
public class DataSyncService : BackgroundService
{
private readonly IServiceProvider _sp;
private readonly ILogger<DataSyncService> _logger;
public DataSyncService(IServiceProvider applicationServices, ILogger<DataSyncService> logger)
{
this._sp = applicationServices;
this._logger = logger;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Task.Run(async () =>
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
Handler();
}
catch (Exception ex)
{
ex.PrintStack();
this._logger.LogError("ExecuteAsync:" + ex.ToString());
}
await Task.Delay(10 * 60 * 1000, stoppingToken);
}
});
return Task.CompletedTask;
}
private void Handler()
{
this._logger.LogInformation("data sync:start");
OrganSync();
DataSync("T_BASE_USER", "", o => UserSync(o));
this._logger.LogInformation("data sync:end");
}
private void DataSync(string table, string where, Func<Dictionary<string, string>, bool> sync)
{
while (true)
{
try
{
using var scope = _sp.CreateScope();
var _settingService = scope.ServiceProvider.GetService<ISettingService>();
var url = _settingService.GetValue("data_sync_url")?.Trim();//url = "http://221.194.113.154:8100/wsdl/EMIIS_WS/webServer/dataEX_share";//debug
if (string.IsNullOrEmpty(url))
{
return;
}
var ws = new WS_DataEX_ShareClient(EndpointConfiguration.WS_DataEX_ShareImplPort, url);
var token = GetToken();
var data = ws.queryDataAsync(new dataEXQueryRequest
{
dataEX_Token = token,
dataEX_QueryObject = table,
dataEX_QueryCondition = where
}, 1).Result.@return;
var total = data.dataEX_ResultSet.dataResult.Length;
this._logger.LogInformation($"data sync:{table}:{total}");
if (total > 0)
{
var values = data.dataEX_ResultSet.dataResult;
var names = data.dataEX_ResultSet.dataStructure;
for (int i = 0; i < values.Length; i++)
{
var list = new Dictionary<string, string>();
for (int j = 0; j < names.Length; j++)
{
list.Add(names[j], values[i][j]);
}
var sn = list["DATA_SN"];
if (sync(list))
{
this._logger.LogInformation($"data sync:{table} confirm {sn}");
ws.queryDataConfirmAsync(new dataEXConfirmRequest
{
dataEX_Token = token,
data_sn = new string[] { sn }
}).Wait();
};
}
}
else
{
break;
}
}
catch (Exception ex)
{
ex.PrintStack();
this._logger.LogError("DataSync:" + ex.ToString());
break;
}
}
}
private void OrganSync()
{
try
{
using var scope = _sp.CreateScope();
var _settingService = scope.ServiceProvider.GetService<ISettingService>();
var url = _settingService.GetValue("data_sync_url")?.Trim();//"http://221.194.113.154:8100/wsdl/EMIIS_WS/webServer/dataEX_share";
if (string.IsNullOrEmpty(url))
{
return;
}
var ws = new WS_DataEX_ShareClient(EndpointConfiguration.WS_DataEX_ShareImplPort, url);
var token = this.GetToken();
var data = this.QueryData(ws, "T_BASE_ORGTREE", "ORG_TYPE = 0 or ORG_TYPE = 1", true);
var total = data.dataEX_ResultSet.dataResult.Length;
this._logger.LogInformation($"data sync:T_BASE_ORGTREE:{total}");
var _organRepo = scope.ServiceProvider.GetService<IRepository<Organ>>();
if (total > 0)
{
var values = data.dataEX_ResultSet.dataResult;
var names = data.dataEX_ResultSet.dataStructure;
var list = new List<Tuple<string, string, string, string>>();
for (int i = 0; i < values.Length; i++)
{
var dict = new Dictionary<string, string>();
for (int j = 0; j < names.Length; j++)
{
dict.Add(names[j], values[i][j]);
}
list.Add(new Tuple<string, string, string, string>(dict["ORG_NAME"], dict["ORG_ID"], dict["PARENT_ID"], dict["DATA_SN"]));
}
list = list.OrderBy(o => o.Item3).ToList();
foreach (var item in list)
{
var organ = _organRepo.Table().FirstOrDefault(o => o.Number == item.Item2);
if (organ is null)
{
organ = _organRepo.Table().FirstOrDefault(o => o.Name == item.Item1);
}
if (organ == null)
{
organ = new Organ
{
Name = item.Item1,
Number = item.Item2
};
_organRepo.Add(organ);
}
else
{
organ.Name = item.Item1;
organ.Number = item.Item2;
}
_organRepo.SaveChanges();
}
foreach (var item in list)
{
var organ = _organRepo.Table().FirstOrDefault(o => o.Number == item.Item2);
if (item.Item3 != "0" && Guid.TryParse(item.Item3, out Guid parentNumber))
{
var parent = _organRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == item.Item3);
if (parent != null)
{
organ.ParentId = parent.Id;
_organRepo.SaveChanges();
}
else
{
this._logger.LogError($"organ:{item.Item3} does not exist");
}
}
}
this.Confrim(ws, list.Select(o => o.Item4).ToArray());
}
}
catch (Exception ex)
{
ex.PrintStack();
this._logger.LogError("OrganSync:" + ex.ToString());
}
}
private void Confrim(WS_DataEX_ShareClient ws, string[] sns)
{
this._logger.LogInformation($"data sync:confirm {string.Join(',', sns)}");
ws.queryDataConfirmAsync(new dataEXConfirmRequest
{
dataEX_Token = GetToken(),
data_sn = sns
}).Wait();
}
private dataEXQueryResult QueryData(WS_DataEX_ShareClient ws, string table, string where, bool fetchAll)
{
return ws.queryDataAsync(new dataEXQueryRequest
{
dataEX_Token = GetToken(),
dataEX_QueryObject = table,
dataEX_QueryCondition = where
}, fetchAll ? 0 : 1).Result.@return;
}
private object QueryData()
{
throw new NotImplementedException();
}
private dataEXToken GetToken()
{
return new dataEXToken
{
userName = "wulianwang",
loginTime = "2020-11-16",
userToken = "ImEWl9l5vSYMjth6YcgxoQ==",
};
}
private bool UserSync(Dictionary<string, string> list)
{
try
{
using var scope = _sp.CreateScope();
var _organRepo = scope.ServiceProvider.GetService<IRepository<Organ>>();
var _userRepo = scope.ServiceProvider.GetService<IRepository<User>>();
var _organUserRepo = scope.ServiceProvider.GetService<IRepository<OrganUser>>();
var organNumber = list["ORG_ID"];
var organ = _organRepo.ReadOnlyTable().FirstOrDefault(o => o.Number == organNumber);
if (organ != null)
{
var username = list["LOGIN_NAME"];
var realName = list["USER_REALNAME"];
//var userEmail = list["USER_EMAIL"];
var user = _userRepo.Table().FirstOrDefault(o => o.UserName == username);
if (user == null)
{
user = new User
{
UserName = username
};
_userRepo.Add(user);
}
user.NickName = realName;
//user.Email = userEmail;
var organUsers = _organUserRepo.ReadOnlyTable().Where(o => o.UserId == user.Id);
foreach (var item in organUsers)
{
if (item.OrganId != organ.Id)
{
_organUserRepo.Delete(item);
_organUserRepo.SaveChanges();
}
}
var organUser = _organUserRepo.ReadOnlyTable().FirstOrDefault(o => o.UserId == user.Id && o.OrganId == organ.Id);
if (organUser == null)
{
organUser = new OrganUser
{
UserId = user.Id,
OrganId = organ.Id
};
_organUserRepo.Add(organUser);
}
_organRepo.SaveChanges();
}
else
{
this._logger.LogError($"organ:{organNumber} does not exist");
}
return true;
}
catch (Exception ex)
{
ex.PrintStack();
this._logger.LogError("UserSync:" + ex.ToString());
}
return false;
}
}
}