This commit is contained in:
@@ -285,87 +285,106 @@ namespace AyaNova.Biz
|
|||||||
|
|
||||||
#region PROCESSOR
|
#region PROCESSOR
|
||||||
|
|
||||||
|
static bool ActivelyProcessing = false;
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Process all jobs (stock jobs and those found in operations table)
|
/// Process all jobs (stock jobs and those found in operations table)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
internal static async Task ProcessJobsAsync(AyContext ct, AyaNova.Api.ControllerHelpers.ApiServerState serverState)
|
internal static async Task ProcessJobsAsync(AyContext ct, AyaNova.Api.ControllerHelpers.ApiServerState serverState)
|
||||||
{
|
{
|
||||||
//Flush metrics report before anything else happens
|
|
||||||
log.LogTrace("Flushing metrics to reporters");
|
|
||||||
await CoreJobMetricsReport.DoJobAsync();
|
|
||||||
|
|
||||||
|
if (ActivelyProcessing)
|
||||||
//BIZOBJECT DYNAMIC JOBS
|
|
||||||
//get a list of exclusive jobs that are due to happen
|
|
||||||
//Call into each item in turn
|
|
||||||
List<OpsJob> exclusiveJobs = await GetReadyJobsExclusiveOnlyAsync(ct);
|
|
||||||
foreach (OpsJob j in exclusiveJobs)
|
|
||||||
{
|
{
|
||||||
try
|
log.LogTrace("ProcessJobs called but actively processing other jobs so returning");
|
||||||
{
|
|
||||||
await ProcessJobAsync(j, ct);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
log.LogError(ex, $"ProcessJobs::Exclusive -> job {j.Name} failed with exception");
|
|
||||||
await LogJobAsync(j.GId, "Job failed with errors:", ct);
|
|
||||||
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex), ct);
|
|
||||||
await UpdateJobStatusAsync(j.GId, JobStatus.Failed, ct);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//Get a list of non-exlusive jobs that are due
|
|
||||||
|
|
||||||
//LOOKAT: Parallelize / background this block
|
|
||||||
//http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core
|
|
||||||
|
|
||||||
//var backgroundTask = Task.Run(() => DoComplexCalculation(42));
|
|
||||||
//also have to deal with db object etc, I guess they'd have to instantiate themselves to avoid disposed object being used error
|
|
||||||
//This area may turn out to need a re-write in future, but I think it might only involve this block and ProcessJobAsync
|
|
||||||
//the actual individual objects that are responsible for jobs will likely not need a signature rewrite or anything (I hope)
|
|
||||||
//For now I'm hoping that no job will be so slow that it can hold up all the other jobs indefinitely.
|
|
||||||
|
|
||||||
List<OpsJob> sharedJobs = await GetReadyJobsNotExlusiveOnlyAsync(ct);
|
|
||||||
foreach (OpsJob j in sharedJobs)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await ProcessJobAsync(j, ct);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
log.LogError(ex, $"ProcessJobs::Shared -> job {j.Name} failed with exception");
|
|
||||||
await LogJobAsync(j.GId, "Job failed with errors:", ct);
|
|
||||||
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex), ct);
|
|
||||||
await UpdateJobStatusAsync(j.GId, JobStatus.Failed, ct);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//STOCK JOBS
|
|
||||||
|
|
||||||
//Sweep jobs table
|
|
||||||
await CoreJobSweeper.DoSweepAsync(ct);
|
|
||||||
|
|
||||||
//Health check / metrics
|
|
||||||
await CoreJobMetricsSnapshot.DoJobAsync(ct);
|
|
||||||
|
|
||||||
//License check
|
|
||||||
long CurrentActiveCount = await UserBiz.ActiveCountAsync();
|
|
||||||
long LicensedUserCount = AyaNova.Core.License.ActiveKey.ActiveNumber;
|
|
||||||
// log.LogInformation("JobsBiz::Checking license active count");
|
|
||||||
if (CurrentActiveCount > LicensedUserCount)
|
|
||||||
{
|
|
||||||
var msg = $"E1020 - Active count exceeded capacity";
|
|
||||||
serverState.SetSystemLock(msg);
|
|
||||||
log.LogCritical(msg);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
ActivelyProcessing = true;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
//Flush metrics report before anything else happens
|
||||||
|
log.LogTrace("Flushing metrics to reporters");
|
||||||
|
await CoreJobMetricsReport.DoJobAsync();
|
||||||
|
|
||||||
|
|
||||||
//Notifications
|
//BIZOBJECT DYNAMIC JOBS
|
||||||
|
//get a list of exclusive jobs that are due to happen
|
||||||
|
//Call into each item in turn
|
||||||
|
List<OpsJob> exclusiveJobs = await GetReadyJobsExclusiveOnlyAsync(ct);
|
||||||
|
foreach (OpsJob j in exclusiveJobs)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await ProcessJobAsync(j, ct);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
log.LogError(ex, $"ProcessJobs::Exclusive -> job {j.Name} failed with exception");
|
||||||
|
await LogJobAsync(j.GId, "Job failed with errors:", ct);
|
||||||
|
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex), ct);
|
||||||
|
await UpdateJobStatusAsync(j.GId, JobStatus.Failed, ct);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//Get a list of non-exlusive jobs that are due
|
||||||
|
|
||||||
|
//LOOKAT: Parallelize / background this block
|
||||||
|
//http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core
|
||||||
|
|
||||||
|
//var backgroundTask = Task.Run(() => DoComplexCalculation(42));
|
||||||
|
//also have to deal with db object etc, I guess they'd have to instantiate themselves to avoid disposed object being used error
|
||||||
|
//This area may turn out to need a re-write in future, but I think it might only involve this block and ProcessJobAsync
|
||||||
|
//the actual individual objects that are responsible for jobs will likely not need a signature rewrite or anything (I hope)
|
||||||
|
//For now I'm hoping that no job will be so slow that it can hold up all the other jobs indefinitely.
|
||||||
|
|
||||||
|
List<OpsJob> sharedJobs = await GetReadyJobsNotExlusiveOnlyAsync(ct);
|
||||||
|
foreach (OpsJob j in sharedJobs)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await ProcessJobAsync(j, ct);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
log.LogError(ex, $"ProcessJobs::Shared -> job {j.Name} failed with exception");
|
||||||
|
await LogJobAsync(j.GId, "Job failed with errors:", ct);
|
||||||
|
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex), ct);
|
||||||
|
await UpdateJobStatusAsync(j.GId, JobStatus.Failed, ct);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
//STOCK JOBS
|
||||||
|
|
||||||
|
//Sweep jobs table
|
||||||
|
await CoreJobSweeper.DoSweepAsync(ct);
|
||||||
|
|
||||||
|
//Health check / metrics
|
||||||
|
await CoreJobMetricsSnapshot.DoJobAsync(ct);
|
||||||
|
|
||||||
|
//License check
|
||||||
|
long CurrentActiveCount = await UserBiz.ActiveCountAsync();
|
||||||
|
long LicensedUserCount = AyaNova.Core.License.ActiveKey.ActiveNumber;
|
||||||
|
// log.LogInformation("JobsBiz::Checking license active count");
|
||||||
|
if (CurrentActiveCount > LicensedUserCount)
|
||||||
|
{
|
||||||
|
var msg = $"E1020 - Active count exceeded capacity";
|
||||||
|
serverState.SetSystemLock(msg);
|
||||||
|
log.LogCritical(msg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//Notifications
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
log.LogError(ex, "JobsBiz::ProcessJobsAsync unexpected error during processing");
|
||||||
|
//todo: alert OPS
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
ActivelyProcessing = false;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user