Files
raven/server/AyaNova/biz/JobsBiz.cs
2020-05-25 20:58:43 +00:00

413 lines
16 KiB
C#

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using AyaNova.Models;
using AyaNova.Util;
namespace AyaNova.Biz
{
internal static class JobsBiz
{
private static ILogger log = AyaNova.Util.ApplicationLogging.CreateLogger("JobsBiz");
#region JOB OPS
/// <summary>
/// Get a non tracked list of jobs for an object
/// </summary>
/// <param name="ayObj"></param>
///
/// <returns></returns>
internal static async Task<List<OpsJob>> GetJobsForObjectAsync(AyaTypeId ayObj)
{
var ct = ServiceProviderProvider.DBContext;
return await ct.OpsJob
.AsNoTracking()
.Where(z => z.ObjectId == ayObj.ObjectId && z.ObjectType == ayObj.ObjectType)
.OrderBy(z => z.Created)
.ToListAsync();
}
/// <summary>
/// Get a non tracked list of jobs that are ready to process and exclusive only
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetReadyJobsExclusiveOnlyAsync()
{
return await GetReadyJobsAsync(true);
}
/// <summary>
/// Get a non tracked list of jobs that are ready to process and exclusive only
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetReadyJobsNotExlusiveOnlyAsync()
{
return await GetReadyJobsAsync(false);
}
/// <summary>
/// Get a non tracked list of jobs filtered by exclusivity
/// </summary>
/// <returns></returns>
private static async Task<List<OpsJob>> GetReadyJobsAsync(bool exclusiveOnly)
{
var ct = ServiceProviderProvider.DBContext;
var ret = await ct.OpsJob
.AsNoTracking()
.Where(z => z.StartAfter < System.DateTime.UtcNow && z.Exclusive == exclusiveOnly && z.JobStatus == JobStatus.Sleeping)
.OrderBy(z => z.Created)
.ToListAsync();
return ret;
}
/// <summary>
/// Get a non tracked list of all jobs that are not completed
/// could be running or sleeping
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetAllSleepingOrRunningJobsAsync()
{
var ct = ServiceProviderProvider.DBContext;
var ret = await ct.OpsJob
.AsNoTracking()
.Where(z => z.JobStatus == JobStatus.Sleeping || z.JobStatus == JobStatus.Running)
.OrderBy(z => z.Created)
.ToListAsync();
return ret;
}
/// <summary>
/// Get a non tracked list of all jobs for a JobType
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetAllJobsForJobTypeAsync(JobType jobType)
{
var ct = ServiceProviderProvider.DBContext;
var ret = await ct.OpsJob
.AsNoTracking()
.Where(z => z.JobType == jobType)
.OrderBy(z => z.Created)
.ToListAsync();
return ret;
}
/// <summary>
/// Get a non tracked list of all jobs that are status running but have no last activity for XX HOURS
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetPotentiallyDeadRunningJobsAsync()
{
var ct = ServiceProviderProvider.DBContext;
var ret = await ct.OpsJob
.AsNoTracking()
.Where(z => z.JobStatus == JobStatus.Sleeping || z.JobStatus == JobStatus.Running)
.OrderBy(z => z.Created)
.ToListAsync();
return ret;
}
/// <summary>
/// Get a count of all jobs for a JobStatus
/// </summary>
/// <returns></returns>
internal static async Task<long> GetCountForJobStatusAsync(JobStatus jobStatus)
{
var ct = ServiceProviderProvider.DBContext;
var ret = await ct.OpsJob
.Where(z => z.JobStatus == jobStatus)
.LongCountAsync();
return ret;
}
/// <summary>
/// Add a new job to the database
/// </summary>
/// <param name="newJob"></param>
/// <returns></returns>
internal static async Task<OpsJob> AddJobAsync(OpsJob newJob)
{
var ct = ServiceProviderProvider.DBContext;
await ct.OpsJob.AddAsync(newJob);
await ct.SaveChangesAsync();
return newJob;
}
/// <summary>
/// Remove any jobs or logs for the object in question
/// </summary>
/// <param name="ayObj"></param>
///
internal static async Task DeleteJobsForObjectAsync(AyaTypeId ayObj)
{
//Get a list of all jobid's for the object passed in
List<OpsJob> jobsForObject = await GetJobsForObjectAsync(ayObj);
if (jobsForObject.Count == 0)
return;
foreach (OpsJob jobToBeDeleted in jobsForObject)
{
await RemoveJobAndLogsAsync(jobToBeDeleted.GId);
}
}
/// <summary>
/// REmove the job and it's logs
/// </summary>
/// <param name="jobIdToBeDeleted"></param>
internal static async Task RemoveJobAndLogsAsync(Guid jobIdToBeDeleted)
{
var ct = ServiceProviderProvider.DBContext;
using (var transaction = await ct.Database.BeginTransactionAsync())
{
try
{
//delete logs
await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjoblog where jobid = {jobIdToBeDeleted}");
//delete the job
await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjob where gid = {jobIdToBeDeleted}");
// Commit transaction if all commands succeed, transaction will auto-rollback
// when disposed if either commands fails
await transaction.CommitAsync();
}
catch (Exception ex)
{
throw ex;
}
}
}
/// <summary>
/// Make a log entry for a job
/// </summary>
/// <param name="jobId">(NOTE: Guid.empty indicates internal job)</param>
/// <param name="statusText"></param>
///
internal static async Task<OpsJobLog> LogJobAsync(Guid jobId, string statusText)
{
var ct = ServiceProviderProvider.DBContext;
if (string.IsNullOrWhiteSpace(statusText))
statusText = "No status provided";
OpsJobLog newObj = new OpsJobLog();
newObj.JobId = jobId;
newObj.StatusText = statusText;
await ct.OpsJobLog.AddAsync(newObj);
await ct.SaveChangesAsync();
return newObj;
}
/// <summary>
/// Update the status of a job
/// </summary>
/// <param name="jobId"></param>
/// <param name="newStatus"></param>
internal static async Task<OpsJob> UpdateJobStatusAsync(Guid jobId, JobStatus newStatus)
{
var ct = ServiceProviderProvider.DBContext;
var oFromDb = await ct.OpsJob.SingleOrDefaultAsync(z => z.GId == jobId);
if (oFromDb == null) return null;
oFromDb.JobStatus = newStatus;
await ct.SaveChangesAsync();
return oFromDb;
}
#endregion Job ops
#region PROCESSOR
static bool ActivelyProcessing = false;
/// <summary>
/// Process all jobs (stock jobs and those found in operations table)
/// </summary>
/// <returns></returns>
internal static async Task ProcessJobsAsync(CancellationToken ctoken)
{
if (ActivelyProcessing)
{
System.Diagnostics.Debug.WriteLine("ProcessJobs called but actively processing other jobs so returning");
log.LogTrace("ProcessJobs called but actively processing other jobs so returning");
return;
}
ActivelyProcessing = true;
try
{
ctoken.ThrowIfCancellationRequested();
//Sweep jobs table
System.Diagnostics.Debug.WriteLine($"JobsBiz processing sweeper");
await CoreJobSweeper.DoSweepAsync(ctoken);//run exclusively
//BIZOBJECT DYNAMIC JOBS
//get a list of exclusive jobs that are due to happen
//Call into each item in turn
List<OpsJob> exclusiveJobs = await GetReadyJobsExclusiveOnlyAsync();
foreach (OpsJob j in exclusiveJobs)
{
try
{
ctoken.ThrowIfCancellationRequested();
System.Diagnostics.Debug.WriteLine($"JobsBiz processing exclusive biz job {j.Name}");
await ProcessJobAsync(j);
}
catch (Exception ex)
{
log.LogError(ex, $"ProcessJobs::Exclusive -> job {j.Name} failed with exception");
await LogJobAsync(j.GId, "Job failed with errors:");
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex));
await UpdateJobStatusAsync(j.GId, JobStatus.Failed);
}
}
ctoken.ThrowIfCancellationRequested();
System.Diagnostics.Debug.WriteLine($"JobsBiz processing exclusive license check");
//License check
long CurrentActiveCount = await UserBiz.ActiveCountAsync();
long LicensedUserCount = AyaNova.Core.License.ActiveKey.ActiveNumber;
// log.LogInformation("JobsBiz::Checking license active count");
if (CurrentActiveCount > LicensedUserCount)
{
var msg = $"E1020 - Active count exceeded capacity";
ServiceProviderProvider.ServerState.SetSystemLock(msg);
log.LogCritical(msg);
return;
}
ctoken.ThrowIfCancellationRequested();
//backup
System.Diagnostics.Debug.WriteLine($"JobsBiz processing backup");
await CoreJobBackup.DoWorkAsync();//sb exclusive
System.Diagnostics.Debug.WriteLine($"JobsBiz processing metrics snapshotter");
///////////////////////////////////////
//NON-EXCLUSIVE JOBS
//
//LOOKAT: Parallelize / background this block
//http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core
//var backgroundTask = Task.Run(() => DoComplexCalculation(42));
//also have to deal with db object etc, I guess they'd have to instantiate themselves to avoid disposed object being used error
//This area may turn out to need a re-write in future, but I think it might only involve this block and ProcessJobAsync
//the actual individual objects that are responsible for jobs will likely not need a signature rewrite or anything (I hope)
//For now I'z hoping that no job will be so slow that it can hold up all the other jobs indefinitely.
ctoken.ThrowIfCancellationRequested();
List<OpsJob> sharedJobs = await GetReadyJobsNotExlusiveOnlyAsync();
foreach (OpsJob j in sharedJobs)
{
try
{
ctoken.ThrowIfCancellationRequested();
System.Diagnostics.Debug.WriteLine($"JobsBiz processing NON-exclusive biz job {j.Name}");
// Task.Run(() => FireAway());
Task.Run(() => ProcessJobAsync(j), ctoken);
// await ProcessJobAsync(j, ct);
}
catch (Exception ex)
{
log.LogError(ex, $"ProcessJobs::Shared -> job {j.Name} failed with exception");
await LogJobAsync(j.GId, "Job failed with errors:");
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex));
await UpdateJobStatusAsync(j.GId, JobStatus.Failed);
}
}
//STOCK JOBS
//Notifications
}
catch (Exception ex)
{
log.LogError(ex, "JobsBiz::ProcessJobsAsync unexpected error during processing");
//todo: alert OPS
}
finally
{
ActivelyProcessing = false;
System.Diagnostics.Debug.WriteLine($"JobsBiz in Finally - completed run");
}
}
/// <summary>
/// Process a job by calling into it's biz object
/// </summary>
/// <param name="job"></param>
/// <returns></returns>
internal static async Task ProcessJobAsync(OpsJob job)
{
var JobDescription = $"{job.Name} {job.JobType.ToString()}";
if (job.SubType != JobSubType.NotSet)
JobDescription += $":{job.SubType}";
await LogJobAsync(job.GId, $"Process job \"{JobDescription}\"");
log.LogDebug($"ProcessJobAsync -> Processing job {JobDescription}");
IJobObject o = null;
switch (job.JobType)
{
case JobType.Backup:
//This is called when on demand only, normal backups are processed above with normal system jobs
await CoreJobBackup.DoWorkAsync(true);
await UpdateJobStatusAsync(job.GId, JobStatus.Completed);
break;
case JobType.TestJob:
o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.ServerJob);
break;
case JobType.SeedTestData:
o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.TrialSeeder);
break;
case JobType.BulkCoreBizObjectOperation:
//bulk op, hand off to biz object to deal with
//note, convention is that there is an idList in job.jobinfo json if preselected else it's all objects of type
o = (IJobObject)BizObjectFactory.GetBizObject(job.ObjectType);
break;
default:
throw new System.NotSupportedException($"ProcessJobAsync type {job.JobType.ToString()} is not supported");
}
if (o != null)
await o.HandleJobAsync(job);
log.LogDebug($"ProcessJobAsync -> Job completed {JobDescription}");
}
#endregion process jobs
/////////////////////////////////////////////////////////////////////
}//eoc
}//eons