using System; using System.Linq; using System.Threading.Tasks; using System.Threading; using System.Collections.Generic; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using AyaNova.Models; using AyaNova.Util; namespace AyaNova.Biz { internal static class JobsBiz { private static ILogger log = AyaNova.Util.ApplicationLogging.CreateLogger("JobsBiz"); #region JOB OPS /// /// Get a non tracked list of jobs that are ready to process and exclusive only /// /// internal static async Task> GetReadyJobsExclusiveOnlyAsync() { return await GetReadyJobsAsync(true); } /// /// Get a non tracked list of jobs that are ready to process and exclusive only /// /// internal static async Task> GetReadyJobsNotExlusiveOnlyAsync() { return await GetReadyJobsAsync(false); } /// /// Get a non tracked list of jobs filtered by exclusivity /// /// private static async Task> GetReadyJobsAsync(bool exclusiveOnly) { using (AyContext ct = ServiceProviderProvider.DBContext) { var ret = await ct.OpsJob .AsNoTracking() .Where(z => z.StartAfter < System.DateTime.UtcNow && z.Exclusive == exclusiveOnly && z.JobStatus == JobStatus.Sleeping) .OrderBy(z => z.Created) .ToListAsync(); return ret; } } /// /// Add a new job to the database /// /// internal static async Task AddJobAsync(OpsJob newJob) { using (AyContext ct = ServiceProviderProvider.DBContext) { await ct.OpsJob.AddAsync(newJob); await ct.SaveChangesAsync(); } } /// /// Remove the job and it's logs /// /// internal static async Task RemoveJobAndLogsAsync(Guid jobIdToBeDeleted) { using (AyContext ct = ServiceProviderProvider.DBContext) using (var transaction = await ct.Database.BeginTransactionAsync()) { try { //delete logs await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjoblog where jobid = {jobIdToBeDeleted}"); //delete the job await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjob where gid = {jobIdToBeDeleted}"); // Commit transaction if all commands succeed, transaction will auto-rollback // when disposed if either commands fails await transaction.CommitAsync(); } catch (Exception ex) { throw ex; } } } /// /// Make a log entry for a job /// /// (NOTE: Guid.empty indicates internal job) /// internal static async Task LogJobAsync(Guid jobId, string statusText) { using (AyContext ct = ServiceProviderProvider.DBContext) { if (string.IsNullOrWhiteSpace(statusText)) statusText = "No status provided"; OpsJobLog newObj = new OpsJobLog(); newObj.JobId = jobId; newObj.StatusText = statusText; await ct.OpsJobLog.AddAsync(newObj); await ct.SaveChangesAsync(); } } /// /// Update the status of a job /// /// /// internal static async Task UpdateJobStatusAsync(Guid jobId, JobStatus newStatus) { using (AyContext ct = ServiceProviderProvider.DBContext) { var oFromDb = await ct.OpsJob.SingleOrDefaultAsync(z => z.GId == jobId); if (oFromDb == null) return; oFromDb.JobStatus = newStatus; await ct.SaveChangesAsync(); } } #endregion Job ops #region PROCESSOR static bool ActivelyProcessing = false; /// /// Process all jobs (stock jobs and those found in operations table) /// /// internal static async Task ProcessJobsAsync() { if (ActivelyProcessing) { //System.Diagnostics.Debug.WriteLine("ProcessJobs called but actively processing other jobs so returning"); log.LogTrace("ProcessJobs called but actively processing other jobs so returning"); return; } ActivelyProcessing = true; try { //Sweep jobs table //System.Diagnostics.Debug.WriteLine($"JobsBiz processing sweeper"); await CoreJobSweeper.DoSweepAsync();//run exclusively //BIZOBJECT DYNAMIC JOBS //get a list of exclusive jobs that are due to happen //Call into each item in turn List exclusiveJobs = await GetReadyJobsExclusiveOnlyAsync(); foreach (OpsJob j in exclusiveJobs) { try { //System.Diagnostics.Debug.WriteLine($"JobsBiz processing exclusive biz job {j.Name}"); await ProcessJobAsync(j); //Capture metrics CoreJobMetricsSnapshot.DoJob(); } catch (Exception ex) { log.LogError(ex, $"ProcessJobs::Exclusive -> job {j.Name} failed with exception"); await LogJobAsync(j.GId, "Job failed with errors:"); await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex)); await UpdateJobStatusAsync(j.GId, JobStatus.Failed); } } //System.Diagnostics.Debug.WriteLine($"JobsBiz processing exclusive license check"); //License check long CurrentActiveCount = await UserBiz.ActiveCountAsync(); long LicensedUserCount = AyaNova.Core.License.ActiveKey.ActiveNumber; // log.LogInformation("JobsBiz::Checking license active count"); if (CurrentActiveCount > LicensedUserCount) { var msg = $"E1020 - Active count exceeded capacity"; ServiceProviderProvider.ServerState.SetSystemLock(msg); log.LogCritical(msg); return; } //backup //System.Diagnostics.Debug.WriteLine($"JobsBiz processing backup"); await CoreJobBackup.DoWorkAsync();//sb exclusive //System.Diagnostics.Debug.WriteLine($"JobsBiz processing metrics snapshotter"); //Capture metrics CoreJobMetricsSnapshot.DoJob(); /////////////////////////////////////// //NON-EXCLUSIVE JOBS // //These fire and forget but use a technique to bubble up exceptions anyway List sharedJobs = await GetReadyJobsNotExlusiveOnlyAsync(); foreach (OpsJob j in sharedJobs) { try { //System.Diagnostics.Debug.WriteLine($"JobsBiz processing NON-exclusive biz job {j.Name}"); TaskUtil.Forget(Task.Run(() => ProcessJobAsync(j))); } catch (Exception ex) { log.LogError(ex, $"ProcessJobs::Shared -> job {j.Name} failed with exception"); await LogJobAsync(j.GId, "Job failed with errors:"); await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex)); await UpdateJobStatusAsync(j.GId, JobStatus.Failed); } } //Capture metrics CoreJobMetricsSnapshot.DoJob(); //TODO: NOTIFICATIONS } catch (Exception ex) { log.LogError(ex, "JobsBiz::ProcessJobsAsync unexpected error during processing"); //TODO:OPSNOTIFY } finally { ActivelyProcessing = false; //System.Diagnostics.Debug.WriteLine($"JobsBiz in Finally - completed run"); } } /// /// Process a job by calling into it's biz object /// /// /// internal static async Task ProcessJobAsync(OpsJob job) { var JobDescription = $"{job.Name} {job.JobType.ToString()}"; if (job.SubType != JobSubType.NotSet) JobDescription += $":{job.SubType}"; await LogJobAsync(job.GId, $"Process job \"{JobDescription}\""); log.LogDebug($"ProcessJobAsync -> Processing job {JobDescription}"); IJobObject o = null; switch (job.JobType) { case JobType.Backup: //This is called when on demand only, normal backups are processed above with normal system jobs await CoreJobBackup.DoWorkAsync(true); await UpdateJobStatusAsync(job.GId, JobStatus.Completed); break; case JobType.TestJob: o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.ServerJob); break; case JobType.SeedTestData: o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.TrialSeeder); break; case JobType.BulkCoreBizObjectOperation: //bulk op, hand off to biz object to deal with //note, convention is that there is an idList in job.jobinfo json if preselected else it's all objects of type o = (IJobObject)BizObjectFactory.GetBizObject(job.ObjectType); break; default: throw new System.NotSupportedException($"ProcessJobAsync type {job.JobType.ToString()} is not supported"); } if (o != null) await o.HandleJobAsync(job); log.LogDebug($"ProcessJobAsync -> Job completed {JobDescription}"); } #endregion process jobs ///////////////////////////////////////////////////////////////////// }//eoc }//eons