This commit is contained in:
412
server/biz/JobsBiz.cs
Normal file
412
server/biz/JobsBiz.cs
Normal file
@@ -0,0 +1,412 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using System.Collections.Generic;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Sockeye.Models;
|
||||
using Sockeye.Util;
|
||||
using Sockeye.Api.ControllerHelpers;
|
||||
|
||||
|
||||
namespace Sockeye.Biz
|
||||
{
|
||||
|
||||
|
||||
internal static class JobsBiz
|
||||
{
|
||||
private static ILogger log = Sockeye.Util.ApplicationLogging.CreateLogger("JobsBiz");
|
||||
|
||||
#region JOB OPS
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Get a non tracked list of jobs that are ready to process and exclusive only
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
internal static async Task<List<OpsJob>> GetReadyJobsExclusiveOnlyAsync()
|
||||
{
|
||||
return await GetReadyJobsAsync(true);
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Get a non tracked list of jobs that are ready to process and exclusive only
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
internal static async Task<List<OpsJob>> GetReadyJobsNotExlusiveOnlyAsync()
|
||||
{
|
||||
return await GetReadyJobsAsync(false);
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Get a non tracked list of jobs filtered by exclusivity
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
private static async Task<List<OpsJob>> GetReadyJobsAsync(bool exclusiveOnly)
|
||||
{
|
||||
using (AyContext ct = ServiceProviderProvider.DBContext)
|
||||
{
|
||||
var ret = await ct.OpsJob
|
||||
.AsNoTracking()
|
||||
.Where(z => z.StartAfter < System.DateTime.UtcNow && z.Exclusive == exclusiveOnly && z.JobStatus == JobStatus.Sleeping)
|
||||
.OrderBy(z => z.Created)
|
||||
.ToListAsync();
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Add a new job to the database
|
||||
/// </summary>
|
||||
/// <param name="newJob"></param>
|
||||
internal static async Task AddJobAsync(OpsJob newJob)
|
||||
{
|
||||
using (AyContext ct = ServiceProviderProvider.DBContext)
|
||||
{
|
||||
log.LogDebug($"Adding new job:{newJob.ToString()}");
|
||||
await LogJobAsync(newJob.GId, $"LT:JobCreated \"{newJob.Name}\"");
|
||||
await ct.OpsJob.AddAsync(newJob);
|
||||
await ct.SaveChangesAsync();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Request the cancellation of a job, not all jobs honour this
|
||||
/// </summary>
|
||||
/// <param name="jobId"></param>
|
||||
internal static async Task RequestCancelAsync(Guid jobId)
|
||||
{
|
||||
await UpdateJobStatusAsync(jobId, JobStatus.CancelRequested);
|
||||
await LogJobAsync(jobId, "LT:Cancel");
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Remove the job and it's logs
|
||||
/// </summary>
|
||||
/// <param name="jobIdToBeDeleted"></param>
|
||||
internal static async Task RemoveJobAndLogsAsync(Guid jobIdToBeDeleted)
|
||||
{
|
||||
using (AyContext ct = ServiceProviderProvider.DBContext)
|
||||
using (var transaction = await ct.Database.BeginTransactionAsync())
|
||||
{
|
||||
try
|
||||
{
|
||||
log.LogDebug($"RemoveJobAndLogs for job id:{jobIdToBeDeleted}");
|
||||
//delete logs
|
||||
await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjoblog where jobid = {jobIdToBeDeleted}");
|
||||
//delete the job
|
||||
await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjob where gid = {jobIdToBeDeleted}");
|
||||
// Commit transaction if all commands succeed, transaction will auto-rollback
|
||||
// when disposed if either commands fails
|
||||
await transaction.CommitAsync();
|
||||
}
|
||||
catch
|
||||
{
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Make a log entry for a job
|
||||
/// </summary>
|
||||
/// <param name="jobId">(NOTE: Guid.empty indicates internal job)</param>
|
||||
/// <param name="statusText"></param>
|
||||
internal static async Task LogJobAsync(Guid jobId, string statusText)
|
||||
{
|
||||
using (AyContext ct = ServiceProviderProvider.DBContext)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(statusText))
|
||||
statusText = "No status provided";
|
||||
OpsJobLog newObj = new OpsJobLog();
|
||||
newObj.JobId = jobId;
|
||||
newObj.StatusText = statusText;
|
||||
await ct.OpsJobLog.AddAsync(newObj);
|
||||
await ct.SaveChangesAsync();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Update the status of a job
|
||||
/// </summary>
|
||||
/// <param name="jobId"></param>
|
||||
/// <param name="newStatus"></param>
|
||||
internal static async Task UpdateJobStatusAsync(Guid jobId, JobStatus newStatus)
|
||||
{
|
||||
using (AyContext ct = ServiceProviderProvider.DBContext)
|
||||
{
|
||||
var oFromDb = await ct.OpsJob.SingleOrDefaultAsync(z => z.GId == jobId);
|
||||
if (oFromDb == null) return;
|
||||
oFromDb.JobStatus = newStatus;
|
||||
await ct.SaveChangesAsync();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the status of a job
|
||||
/// </summary>
|
||||
/// <param name="jobId"></param>
|
||||
internal static async Task<JobStatus> GetJobStatusAsync(Guid jobId)
|
||||
{
|
||||
using (AyContext ct = ServiceProviderProvider.DBContext)
|
||||
{
|
||||
var o = await ct.OpsJob.AsNoTracking().SingleOrDefaultAsync(z => z.GId == jobId);
|
||||
if (o == null) return JobStatus.Absent;
|
||||
return o.JobStatus;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Update the progress of a job
|
||||
/// </summary>
|
||||
/// <param name="jobId"></param>
|
||||
/// <param name="progress"></param>
|
||||
internal static async Task UpdateJobProgressAsync(Guid jobId, string progress)
|
||||
{
|
||||
using (AyContext ct = ServiceProviderProvider.DBContext)
|
||||
{
|
||||
var oFromDb = await ct.OpsJob.SingleOrDefaultAsync(z => z.GId == jobId);
|
||||
if (oFromDb == null) return;
|
||||
oFromDb.Progress = progress;
|
||||
await ct.SaveChangesAsync();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the progress and status of a job
|
||||
/// </summary>
|
||||
/// <param name="jobId"></param>
|
||||
internal static async Task<JobProgress> GetJobProgressAsync(Guid jobId)
|
||||
{
|
||||
using (AyContext ct = ServiceProviderProvider.DBContext)
|
||||
{
|
||||
var o = await ct.OpsJob.AsNoTracking().SingleOrDefaultAsync(z => z.GId == jobId);
|
||||
if (o == null) return new JobProgress() { JobStatus = JobStatus.Absent, Progress = string.Empty };
|
||||
return new JobProgress() { JobStatus = o.JobStatus, Progress = o.Progress };
|
||||
}
|
||||
}
|
||||
#endregion Job ops
|
||||
|
||||
#region PROCESSOR
|
||||
|
||||
internal static bool KeepOnWorking()
|
||||
{
|
||||
ApiServerState serverState = ServiceProviderProvider.ServerState;
|
||||
|
||||
//system lock (no license) is a complete deal breaker for continuation beyond here
|
||||
if (serverState.IsSystemLocked) return false;
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
static bool ActivelyProcessing = false;
|
||||
/// <summary>
|
||||
/// Process all jobs (stock jobs and those found in operations table)
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
internal static async Task ProcessJobsAsync()
|
||||
{
|
||||
if (ActivelyProcessing)
|
||||
{
|
||||
//System.Diagnostics.Debug.WriteLine("ProcessJobs called but actively processing other jobs so returning");
|
||||
//log.LogTrace("ProcessJobs called but actively processing other jobs so returning");
|
||||
return;
|
||||
}
|
||||
|
||||
//Do not process if there is no db, everything relies on it below here
|
||||
if (!ServerGlobalOpsSettingsCache.DBAVAILABLE)
|
||||
{
|
||||
//This will set dbavailable flag if it becomes available
|
||||
DbUtil.CheckDatabaseServerAvailable(log);
|
||||
return;
|
||||
}
|
||||
|
||||
ActivelyProcessing = true;
|
||||
log.LogTrace("Processing internal jobs");
|
||||
try
|
||||
{
|
||||
log.LogTrace("Processing level 1 internal jobs");
|
||||
|
||||
//######################################################################################
|
||||
//### Critical internal jobs
|
||||
|
||||
//METRICS
|
||||
CoreJobMetricsSnapshot.DoWork();
|
||||
|
||||
//######################################################################################
|
||||
//## JOBS that will not run in a license or import mode or other system lock scenario from here down
|
||||
|
||||
if (!KeepOnWorking()) return;
|
||||
log.LogTrace("Processing level 2 internal jobs");
|
||||
|
||||
//BACKUP
|
||||
await CoreJobBackup.DoWorkAsync();
|
||||
if (!KeepOnWorking()) return;
|
||||
|
||||
//NOTIFICATIONS
|
||||
await CoreJobNotify.DoWorkAsync();
|
||||
if (!KeepOnWorking()) return;
|
||||
|
||||
await CoreNotificationSweeper.DoWorkAsync();
|
||||
if (!KeepOnWorking()) return;
|
||||
|
||||
|
||||
|
||||
//JOB SWEEPER / AND USER COUNT CHECK
|
||||
await CoreJobSweeper.DoWorkAsync();
|
||||
if (!KeepOnWorking()) return;
|
||||
|
||||
//Cleanup temp folder
|
||||
CoreJobTempFolderCleanup.DoWork();
|
||||
if (!KeepOnWorking()) return;
|
||||
|
||||
//Check for and kill stuck report rendering engine processes
|
||||
await CoreJobReportRenderEngineProcessCleanup.DoWork();
|
||||
if (!KeepOnWorking()) return;
|
||||
|
||||
//CUSTOMER NOTIFICATIONS
|
||||
TaskUtil.Forget(Task.Run(() => CoreJobCustomerNotify.DoWorkAsync()));//must fire and forget as it will call a report render job. In fact probably all of these can be fire and forget
|
||||
|
||||
//INTEGRATION LOG SWEEP
|
||||
await CoreIntegrationLogSweeper.DoWorkAsync();
|
||||
if (!KeepOnWorking()) return;
|
||||
|
||||
|
||||
log.LogTrace("Processing exclusive dynamic jobs");
|
||||
|
||||
//BIZOBJECT DYNAMIC JOBS
|
||||
//get a list of exclusive jobs that are due to happen
|
||||
//Call into each item in turn
|
||||
List<OpsJob> exclusiveJobs = await GetReadyJobsExclusiveOnlyAsync();
|
||||
foreach (OpsJob j in exclusiveJobs)
|
||||
{
|
||||
if (!KeepOnWorking()) return;
|
||||
try
|
||||
{
|
||||
await ProcessJobAsync(j);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.LogError(ex, $"ProcessJobs::Exclusive -> job {j.Name} failed with exception");
|
||||
await LogJobAsync(j.GId, "LT:JobFailed");
|
||||
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex));
|
||||
await UpdateJobStatusAsync(j.GId, JobStatus.Failed);
|
||||
}
|
||||
}
|
||||
|
||||
//### Server state dependent jobs
|
||||
ApiServerState serverState = ServiceProviderProvider.ServerState;
|
||||
|
||||
//### API Open only jobs
|
||||
if (!serverState.IsOpen)
|
||||
{
|
||||
log.LogDebug("Server state is NOT open, skipping processing non-exclusive dynamic jobs");
|
||||
return;
|
||||
}
|
||||
|
||||
///////////////////////////////////////
|
||||
//NON-EXCLUSIVE JOBS
|
||||
//
|
||||
log.LogTrace("Processing non-exclusive dynamic jobs");
|
||||
if (!KeepOnWorking()) return;
|
||||
//These fire and forget but use a technique to bubble up exceptions anyway
|
||||
List<OpsJob> sharedJobs = await GetReadyJobsNotExlusiveOnlyAsync();
|
||||
foreach (OpsJob j in sharedJobs)
|
||||
{
|
||||
if (!KeepOnWorking()) return;
|
||||
try
|
||||
{
|
||||
//System.Diagnostics.Debug.WriteLine($"JobsBiz processing NON-exclusive biz job {j.Name}");
|
||||
TaskUtil.Forget(Task.Run(() => ProcessJobAsync(j)));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.LogError(ex, $"ProcessJobs::Shared -> job {j.Name} failed with exception");
|
||||
await LogJobAsync(j.GId, "LT:JobFailed");
|
||||
await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex));
|
||||
await UpdateJobStatusAsync(j.GId, JobStatus.Failed);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var msg = "Server::ProcessJobsAsync unexpected error during processing";
|
||||
log.LogError(ex, msg);
|
||||
DbUtil.HandleIfDatabaseUnavailableTypeException(ex);
|
||||
await NotifyEventHelper.AddOpsProblemEvent(msg, ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
ActivelyProcessing = false;
|
||||
//System.Diagnostics.Debug.WriteLine($"JobsBiz in Finally - completed run");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Process a job by calling into it's biz object
|
||||
/// </summary>
|
||||
/// <param name="job"></param>
|
||||
/// <returns></returns>
|
||||
internal static async Task ProcessJobAsync(OpsJob job)
|
||||
{
|
||||
var JobDescription = $"{job.Name} - {job.JobType.ToString()}";
|
||||
if (job.SubType != JobSubType.NotSet)
|
||||
JobDescription += $":{job.SubType}";
|
||||
await LogJobAsync(job.GId, $"LT:ProcessingJob \"{JobDescription}\"");
|
||||
log.LogDebug($"ProcessJobAsync -> Processing job {JobDescription}");
|
||||
IJobObject o = null;
|
||||
using (AyContext ct = ServiceProviderProvider.DBContext)
|
||||
{
|
||||
switch (job.JobType)
|
||||
{
|
||||
case JobType.Backup:
|
||||
//This is called when on demand only, normal backups are processed above with normal system jobs
|
||||
await CoreJobBackup.DoWorkAsync(true);
|
||||
await UpdateJobStatusAsync(job.GId, JobStatus.Completed);
|
||||
break;
|
||||
case JobType.TestJob:
|
||||
o = (IJobObject)BizObjectFactory.GetBizObject(SockType.ServerJob, ct, 1, AuthorizationRoles.BizAdmin);
|
||||
break;
|
||||
|
||||
case JobType.AttachmentMaintenance:
|
||||
o = (IJobObject)BizObjectFactory.GetBizObject(SockType.FileAttachment, ct, 1, AuthorizationRoles.BizAdmin);
|
||||
break;
|
||||
case JobType.BatchCoreObjectOperation:
|
||||
//batch op, hand off to biz object to deal with
|
||||
//note, convention is that there is an idList in job.jobinfo json if preselected else it's all objects of type
|
||||
o = (IJobObject)BizObjectFactory.GetBizObject(job.SockType, ct, 1, AuthorizationRoles.BizAdmin);
|
||||
break;
|
||||
case JobType.RenderReport:
|
||||
o = (IJobObject)BizObjectFactory.GetBizObject(SockType.Report, ct, 1, AuthorizationRoles.BizAdmin);
|
||||
break;
|
||||
default:
|
||||
throw new System.NotSupportedException($"ProcessJobAsync type {job.JobType.ToString()} is not supported");
|
||||
}
|
||||
|
||||
if (o != null)
|
||||
await o.HandleJobAsync(job);
|
||||
}
|
||||
log.LogDebug($"ProcessJobAsync -> Job completed {JobDescription}");
|
||||
}
|
||||
|
||||
|
||||
#endregion process jobs
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
||||
}//eoc
|
||||
|
||||
|
||||
}//eons
|
||||
|
||||
Reference in New Issue
Block a user