Files
raven/server/AyaNova/biz/JobsBiz.cs
2020-01-27 22:37:47 +00:00

413 lines
15 KiB
C#

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using AyaNova.Models;
using AyaNova.Util;
namespace AyaNova.Biz
{
internal static class JobsBiz
{
private static ILogger log = AyaNova.Util.ApplicationLogging.CreateLogger("JobsBiz");
#region JOB OPS
/// <summary>
/// Get a non tracked list of jobs for an object
/// </summary>
/// <param name="ayObj"></param>
/// <param name="ct"></param>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetJobsForObjectAsync(AyaTypeId ayObj, AyContext ct)
{
return await ct.OpsJob
.AsNoTracking()
.Where(c => c.ObjectId == ayObj.ObjectId && c.ObjectType == ayObj.ObjectType)
.OrderBy(m => m.Created)
.ToListAsync();
}
/// <summary>
/// Get a non tracked list of jobs that are ready to process and exclusive only
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetReadyJobsExclusiveOnlyAsync(AyContext ct)
{
return await GetReadyJobsAsync(true, ct);
}
/// <summary>
/// Get a non tracked list of jobs that are ready to process and exclusive only
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetReadyJobsNotExlusiveOnlyAsync(AyContext ct)
{
return await GetReadyJobsAsync(false, ct);
}
/// <summary>
/// Get a non tracked list of jobs filtered by exclusivity
/// </summary>
/// <returns></returns>
private static async Task<List<OpsJob>> GetReadyJobsAsync(bool exclusiveOnly, AyContext ct)
{
var ret = await ct.OpsJob
.AsNoTracking()
.Where(c => c.StartAfter < System.DateTime.UtcNow && c.Exclusive == exclusiveOnly && c.JobStatus == JobStatus.Sleeping)
.OrderBy(m => m.Created)
.ToListAsync();
return ret;
}
/// <summary>
/// Get a non tracked list of all jobs that are not completed
/// could be running or sleeping
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetAllSleepingOrRunningJobsAsync(AyContext ct)
{
var ret = await ct.OpsJob
.AsNoTracking()
.Where(c => c.JobStatus == JobStatus.Sleeping || c.JobStatus == JobStatus.Running)
.OrderBy(m => m.Created)
.ToListAsync();
return ret;
}
/// <summary>
/// Get a non tracked list of all jobs for a JobType
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetAllJobsForJobTypeAsync(AyContext ct, JobType jobType)
{
var ret = await ct.OpsJob
.AsNoTracking()
.Where(c => c.JobType == jobType)
.OrderBy(m => m.Created)
.ToListAsync();
return ret;
}
/// <summary>
/// Get a non tracked list of all jobs that are status running but have no last activity for XX HOURS
/// </summary>
/// <returns></returns>
internal static async Task<List<OpsJob>> GetPotentiallyDeadRunningJobsAsync(AyContext ct)
{
var ret = await ct.OpsJob
.AsNoTracking()
.Where(c => c.JobStatus == JobStatus.Sleeping || c.JobStatus == JobStatus.Running)
.OrderBy(m => m.Created)
.ToListAsync();
return ret;
}
/// <summary>
/// Get a count of all jobs for a JobStatus
/// </summary>
/// <returns></returns>
internal static async Task<long> GetCountForJobStatusAsync(AyContext ct, JobStatus jobStatus)
{
var ret = await ct.OpsJob
.Where(c => c.JobStatus == jobStatus)
.LongCountAsync();
return ret;
}
/// <summary>
/// Add a new job to the database
/// </summary>
/// <param name="newJob"></param>
/// <param name="ct"></param>
/// <returns></returns>
internal static async Task<OpsJob> AddJobAsync(OpsJob newJob, AyContext ct)
{
//TODO: Does this need to create an event so we know which user created the job?
await ct.OpsJob.AddAsync(newJob);
await ct.SaveChangesAsync();
return newJob;
}
/// <summary>
/// Remove any jobs or logs for the object in question
/// </summary>
/// <param name="ayObj"></param>
/// <param name="ct"></param>
internal static async Task DeleteJobsForObjectAsync(AyaTypeId ayObj, AyContext ct)
{
//Get a list of all jobid's for the object passed in
List<OpsJob> jobsForObject = await GetJobsForObjectAsync(ayObj, ct);
//short circuit
if (jobsForObject.Count == 0)
return;
using (var transaction = await ct.Database.BeginTransactionAsync())
{
try
{
foreach (OpsJob jobToBeDeleted in jobsForObject)
{
await RemoveJobAndLogsAsync(ct, jobToBeDeleted.GId);
}
// Commit transaction if all commands succeed, transaction will auto-rollback
// when disposed if either commands fails
await transaction.CommitAsync();
}
catch (Exception ex)
{
throw ex;
}
}
}
/// <summary>
/// Remove job and logs for that job
/// </summary>
/// <param name="jobId"></param>
/// <param name="ct"></param>
internal static async Task DeleteJobAndLogAsync(Guid jobId, AyContext ct)
{
using (var transaction = await ct.Database.BeginTransactionAsync())
{
try
{
await RemoveJobAndLogsAsync(ct, jobId);
// Commit transaction if all commands succeed, transaction will auto-rollback
// when disposed if either commands fails
await transaction.CommitAsync();
}
catch (Exception ex)
{
throw ex;
}
}
}
/// <summary>
/// REmove the job and it's logs
/// </summary>
/// <param name="ct"></param>
/// <param name="jobIdToBeDeleted"></param>
private static async Task RemoveJobAndLogsAsync(AyContext ct, Guid jobIdToBeDeleted)
{
// //delete logs
// await ct.Database.ExecuteSqlCommandAsync("delete from aopsjoblog where jobid = {0}", new object[] { jobIdToBeDeleted });
// //delete the job
// await ct.Database.ExecuteSqlCommandAsync("delete from aopsjob where gid = {0}", new object[] { jobIdToBeDeleted });
//delete logs
await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjoblog where jobid = {jobIdToBeDeleted}");
//delete the job
await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjob where gid = {jobIdToBeDeleted}");
}
/// <summary>
/// Make a log entry for a job
/// (no context version)
/// </summary>
/// <param name="jobId"></param>
/// <param name="statusText"></param>
internal static async Task<OpsJobLog> LogJobAsync(Guid jobId, string statusText)
{
return await LogJobAsync(jobId, statusText, null);
}
/// <summary>
/// Make a log entry for a job
/// </summary>
/// <param name="jobId"></param>
/// <param name="statusText"></param>
/// <param name="ct"></param>
internal static async Task<OpsJobLog> LogJobAsync(Guid jobId, string statusText, AyContext ct)
{
if (ct == null)
ct = ServiceProviderProvider.DBContext;
if (string.IsNullOrWhiteSpace(statusText))
statusText = "No status provided";
OpsJobLog newObj = new OpsJobLog();
newObj.JobId = jobId;
newObj.StatusText = statusText;
await ct.OpsJobLog.AddAsync(newObj);
await ct.SaveChangesAsync();
return newObj;
}
/// <summary>
/// Update the status of a job
/// </summary>
/// <param name="jobId"></param>
/// <param name="newStatus"></param>
/// <param name="ct"></param>
internal static async Task<OpsJob> UpdateJobStatusAsync(Guid jobId, JobStatus newStatus, AyContext ct)
{
var oFromDb = await ct.OpsJob.SingleOrDefaultAsync(m => m.GId == jobId);
if (oFromDb == null) return null;
oFromDb.JobStatus = newStatus;
await ct.SaveChangesAsync();
return oFromDb;
}
#endregion Job ops
#region PROCESSOR
/// <summary>
/// Process all jobs (stock jobs and those found in operations table)
/// </summary>
/// <returns></returns>
internal static async Task ProcessJobsAsync(AyContext ct, AyaNova.Api.ControllerHelpers.ApiServerState serverState)
{
//Flush metrics report before anything else happens
log.LogTrace("Flushing metrics to reporters");
await CoreJobMetricsReport.DoJobAsync();
//BIZOBJECT DYNAMIC JOBS
//get a list of exclusive jobs that are due to happen
//Call into each item in turn
List<OpsJob> exclusiveJobs = await GetReadyJobsExclusiveOnlyAsync(ct);
foreach (OpsJob j in exclusiveJobs)
{
try
{
await ProcessJobAsync(j, ct);
}
catch (Exception ex)
{
log.LogError(ex, $"ProcessJobs::Exclusive -> job {j.Name} failed with exception");
await LogJobAsync(j.GId, "Job failed with errors:", ct);
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex), ct);
await UpdateJobStatusAsync(j.GId, JobStatus.Failed, ct);
}
}
//Get a list of non-exlusive jobs that are due
//LOOKAT: Parallelize / background this block
//http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core
//var backgroundTask = Task.Run(() => DoComplexCalculation(42));
//also have to deal with db object etc, I guess they'd have to instantiate themselves to avoid disposed object being used error
//This area may turn out to need a re-write in future, but I think it might only involve this block and ProcessJobAsync
//the actual individual objects that are responsible for jobs will likely not need a signature rewrite or anything (I hope)
//For now I'm hoping that no job will be so slow that it can hold up all the other jobs indefinitely.
List<OpsJob> sharedJobs = await GetReadyJobsNotExlusiveOnlyAsync(ct);
foreach (OpsJob j in sharedJobs)
{
try
{
await ProcessJobAsync(j, ct);
}
catch (Exception ex)
{
log.LogError(ex, $"ProcessJobs::Shared -> job {j.Name} failed with exception");
await LogJobAsync(j.GId, "Job failed with errors:", ct);
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex), ct);
await UpdateJobStatusAsync(j.GId, JobStatus.Failed, ct);
}
}
//STOCK JOBS
//Sweep jobs table
await CoreJobSweeper.DoSweepAsync(ct);
//Health check / metrics
await CoreJobMetricsSnapshot.DoJobAsync(ct);
//License check
long CurrentActiveCount = await UserBiz.ActiveCountAsync();
long LicensedUserCount = AyaNova.Core.License.ActiveKey.ActiveNumber;
// log.LogInformation("JobsBiz::Checking license active count");
if (CurrentActiveCount > LicensedUserCount)
{
var msg = $"E1020 - Active count exceeded capacity";
serverState.SetSystemLock(msg);
log.LogCritical(msg);
return;
}
//Notifications
}
/// <summary>
/// Process a job by calling into it's biz object
/// </summary>
/// <param name="job"></param>
/// <param name="ct"></param>
/// <returns></returns>
internal static async Task ProcessJobAsync(OpsJob job, AyContext ct)
{
log.LogDebug($"ProcessJobAsync -> Processing job {job.Name} (type {job.JobType.ToString()})");
IJobObject o = null;
switch (job.JobType)
{
case JobType.TestWidgetJob:
o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.Widget, ct);
break;
case JobType.ImportV7Data:
o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.AyaNova7Import, ct);
break;
case JobType.SeedTestData:
o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.TrialSeeder, ct);
break;
default:
throw new System.NotSupportedException($"ProcessJobAsync type {job.JobType.ToString()} is not supported");
}
await o.HandleJobAsync(job);
log.LogDebug($"ProcessJobAsync -> Job completed {job.Name} (type {job.JobType.ToString()})");
}
#endregion process jobs
/////////////////////////////////////////////////////////////////////
}//eoc
}//eons