Files
raven/server/AyaNova/biz/JobsBiz.cs
2020-06-15 18:52:15 +00:00

300 lines
12 KiB
C#

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using AyaNova.Models;
using AyaNova.Util;
using AyaNova.Api.ControllerHelpers;
using AyaNova.Biz;
using AyaNova.Util;
namespace AyaNova.Biz
{
internal static class JobsBiz
{
private static ILogger log = AyaNova.Util.ApplicationLogging.CreateLogger("JobsBiz");
#region JOB OPS
/// <summary>
/// Get a non tracked list of jobs that are ready to process and exclusive only
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetReadyJobsExclusiveOnlyAsync()
{
return await GetReadyJobsAsync(true);
}
/// <summary>
/// Get a non tracked list of jobs that are ready to process and exclusive only
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetReadyJobsNotExlusiveOnlyAsync()
{
return await GetReadyJobsAsync(false);
}
/// <summary>
/// Get a non tracked list of jobs filtered by exclusivity
/// </summary>
/// <returns></returns>
private static async Task<List<OpsJob>> GetReadyJobsAsync(bool exclusiveOnly)
{
using (AyContext ct = ServiceProviderProvider.DBContext)
{
var ret = await ct.OpsJob
.AsNoTracking()
.Where(z => z.StartAfter < System.DateTime.UtcNow && z.Exclusive == exclusiveOnly && z.JobStatus == JobStatus.Sleeping)
.OrderBy(z => z.Created)
.ToListAsync();
return ret;
}
}
/// <summary>
/// Add a new job to the database
/// </summary>
/// <param name="newJob"></param>
internal static async Task AddJobAsync(OpsJob newJob)
{
using (AyContext ct = ServiceProviderProvider.DBContext)
{
await ct.OpsJob.AddAsync(newJob);
await ct.SaveChangesAsync();
}
}
/// <summary>
/// Remove the job and it's logs
/// </summary>
/// <param name="jobIdToBeDeleted"></param>
internal static async Task RemoveJobAndLogsAsync(Guid jobIdToBeDeleted)
{
using (AyContext ct = ServiceProviderProvider.DBContext)
using (var transaction = await ct.Database.BeginTransactionAsync())
{
try
{
//delete logs
await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjoblog where jobid = {jobIdToBeDeleted}");
//delete the job
await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjob where gid = {jobIdToBeDeleted}");
// Commit transaction if all commands succeed, transaction will auto-rollback
// when disposed if either commands fails
await transaction.CommitAsync();
}
catch (Exception ex)
{
throw ex;
}
}
}
/// <summary>
/// Make a log entry for a job
/// </summary>
/// <param name="jobId">(NOTE: Guid.empty indicates internal job)</param>
/// <param name="statusText"></param>
internal static async Task LogJobAsync(Guid jobId, string statusText)
{
using (AyContext ct = ServiceProviderProvider.DBContext)
{
if (string.IsNullOrWhiteSpace(statusText))
statusText = "No status provided";
OpsJobLog newObj = new OpsJobLog();
newObj.JobId = jobId;
newObj.StatusText = statusText;
await ct.OpsJobLog.AddAsync(newObj);
await ct.SaveChangesAsync();
}
}
/// <summary>
/// Update the status of a job
/// </summary>
/// <param name="jobId"></param>
/// <param name="newStatus"></param>
internal static async Task UpdateJobStatusAsync(Guid jobId, JobStatus newStatus)
{
using (AyContext ct = ServiceProviderProvider.DBContext)
{
var oFromDb = await ct.OpsJob.SingleOrDefaultAsync(z => z.GId == jobId);
if (oFromDb == null) return;
oFromDb.JobStatus = newStatus;
await ct.SaveChangesAsync();
}
}
#endregion Job ops
#region PROCESSOR
static bool ActivelyProcessing = false;
/// <summary>
/// Process all jobs (stock jobs and those found in operations table)
/// </summary>
/// <returns></returns>
internal static async Task ProcessJobsAsync()
{
if (ActivelyProcessing)
{
//System.Diagnostics.Debug.WriteLine("ProcessJobs called but actively processing other jobs so returning");
log.LogTrace("ProcessJobs called but actively processing other jobs so returning");
return;
}
ActivelyProcessing = true;
try
{
//### Critical internal jobs, these run even if there is a license related serverlock
//LICENSE FETCH
await CoreJobLicense.DoWorkAsync();
//"SHENANIGAN" CHECK
if (await UserBiz.ActiveCountAsync() > AyaNova.Core.License.ActiveKey.ActiveNumber)
{
var msg = $"E1020 - Active count exceeded capacity";
ServiceProviderProvider.ServerState.SetSystemLock(msg);
log.LogCritical(msg);
return;
}
//METRICS
CoreJobMetricsSnapshot.DoWork();
//### Server state dependent jobs
ApiServerState serverState = ServiceProviderProvider.ServerState;
//system lock (no license) is a complete deal breaker for continuation beyond here
if (serverState.IsSystemLocked) return;
//TODO: NOTIFICATIONS
//await CoreJobNotify.DoWorkAsync();
//BACKUP
await CoreJobBackup.DoWorkAsync();
//JOB SWEEPER
await CoreJobSweeper.DoWorkAsync();
//### API Open only jobs
if (!serverState.IsOpen) return;
//BIZOBJECT DYNAMIC JOBS
//get a list of exclusive jobs that are due to happen
//Call into each item in turn
List<OpsJob> exclusiveJobs = await GetReadyJobsExclusiveOnlyAsync();
foreach (OpsJob j in exclusiveJobs)
{
try
{
await ProcessJobAsync(j);
}
catch (Exception ex)
{
log.LogError(ex, $"ProcessJobs::Exclusive -> job {j.Name} failed with exception");
await LogJobAsync(j.GId, "Job failed with errors:");
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex));
await UpdateJobStatusAsync(j.GId, JobStatus.Failed);
}
}
///////////////////////////////////////
//NON-EXCLUSIVE JOBS
//
//These fire and forget but use a technique to bubble up exceptions anyway
List<OpsJob> sharedJobs = await GetReadyJobsNotExlusiveOnlyAsync();
foreach (OpsJob j in sharedJobs)
{
try
{
//System.Diagnostics.Debug.WriteLine($"JobsBiz processing NON-exclusive biz job {j.Name}");
TaskUtil.Forget(Task.Run(() => ProcessJobAsync(j)));
}
catch (Exception ex)
{
log.LogError(ex, $"ProcessJobs::Shared -> job {j.Name} failed with exception");
await LogJobAsync(j.GId, "Job failed with errors:");
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex));
await UpdateJobStatusAsync(j.GId, JobStatus.Failed);
}
}
}
catch (Exception ex)
{
log.LogError(ex, "JobsBiz::ProcessJobsAsync unexpected error during processing");
//TODO:OPSNOTIFY
}
finally
{
ActivelyProcessing = false;
//System.Diagnostics.Debug.WriteLine($"JobsBiz in Finally - completed run");
}
}
/// <summary>
/// Process a job by calling into it's biz object
/// </summary>
/// <param name="job"></param>
/// <returns></returns>
internal static async Task ProcessJobAsync(OpsJob job)
{
var JobDescription = $"{job.Name} {job.JobType.ToString()}";
if (job.SubType != JobSubType.NotSet)
JobDescription += $":{job.SubType}";
await LogJobAsync(job.GId, $"Process job \"{JobDescription}\"");
log.LogDebug($"ProcessJobAsync -> Processing job {JobDescription}");
IJobObject o = null;
using (AyContext ct = ServiceProviderProvider.DBContext)
{
switch (job.JobType)
{
case JobType.Backup:
//This is called when on demand only, normal backups are processed above with normal system jobs
await CoreJobBackup.DoWorkAsync(true);
await UpdateJobStatusAsync(job.GId, JobStatus.Completed);
break;
case JobType.TestJob:
o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.ServerJob, ct);
break;
case JobType.SeedTestData:
o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.TrialSeeder, ct);
break;
case JobType.BulkCoreBizObjectOperation:
//bulk op, hand off to biz object to deal with
//note, convention is that there is an idList in job.jobinfo json if preselected else it's all objects of type
o = (IJobObject)BizObjectFactory.GetBizObject(job.ObjectType, ct);
break;
default:
throw new System.NotSupportedException($"ProcessJobAsync type {job.JobType.ToString()} is not supported");
}
if (o != null)
await o.HandleJobAsync(job);
}
log.LogDebug($"ProcessJobAsync -> Job completed {JobDescription}");
}
#endregion process jobs
/////////////////////////////////////////////////////////////////////
}//eoc
}//eons