using System; using System.Linq; using System.Threading.Tasks; using System.Collections.Generic; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using AyaNova.Models; using AyaNova.Util; namespace AyaNova.Biz { internal static class JobsBiz { private static ILogger log = AyaNova.Util.ApplicationLogging.CreateLogger("JobsBiz"); #region JOB OPS /// /// Get a non tracked list of jobs for an object /// /// /// /// internal static async Task> GetJobsForObjectAsync(AyaTypeId ayObj, AyContext ct) { return await ct.OpsJob .AsNoTracking() .Where(c => c.ObjectId == ayObj.ObjectId && c.ObjectType == ayObj.ObjectType) .OrderBy(m => m.Created) .ToListAsync(); } /// /// Get a non tracked list of jobs that are ready to process and exclusive only /// /// internal static async Task> GetReadyJobsExclusiveOnlyAsync(AyContext ct) { return await GetReadyJobsAsync(true, ct); } /// /// Get a non tracked list of jobs that are ready to process and exclusive only /// /// internal static async Task> GetReadyJobsNotExlusiveOnlyAsync(AyContext ct) { return await GetReadyJobsAsync(false, ct); } /// /// Get a non tracked list of jobs filtered by exclusivity /// /// private static async Task> GetReadyJobsAsync(bool exclusiveOnly, AyContext ct) { var ret = await ct.OpsJob .AsNoTracking() .Where(c => c.StartAfter < System.DateTime.UtcNow && c.Exclusive == exclusiveOnly && c.JobStatus == JobStatus.Sleeping) .OrderBy(m => m.Created) .ToListAsync(); return ret; } /// /// Get a non tracked list of all jobs that are not completed /// could be running or sleeping /// /// internal static async Task> GetAllSleepingOrRunningJobsAsync(AyContext ct) { var ret = await ct.OpsJob .AsNoTracking() .Where(c => c.JobStatus == JobStatus.Sleeping || c.JobStatus == JobStatus.Running) .OrderBy(m => m.Created) .ToListAsync(); return ret; } /// /// Get a non tracked list of all jobs for a JobType /// /// internal static async Task> GetAllJobsForJobTypeAsync(AyContext ct, JobType jobType) { var ret = await ct.OpsJob .AsNoTracking() .Where(c => c.JobType == jobType) .OrderBy(m => m.Created) .ToListAsync(); return ret; } /// /// Get a non tracked list of all jobs that are status running but have no last activity for XX HOURS /// /// internal static async Task> GetPotentiallyDeadRunningJobsAsync(AyContext ct) { var ret = await ct.OpsJob .AsNoTracking() .Where(c => c.JobStatus == JobStatus.Sleeping || c.JobStatus == JobStatus.Running) .OrderBy(m => m.Created) .ToListAsync(); return ret; } /// /// Get a count of all jobs for a JobStatus /// /// internal static async Task GetCountForJobStatusAsync(AyContext ct, JobStatus jobStatus) { var ret = await ct.OpsJob .Where(c => c.JobStatus == jobStatus) .LongCountAsync(); return ret; } /// /// Add a new job to the database /// /// /// /// internal static async Task AddJobAsync(OpsJob newJob, AyContext ct) { //TODO: Does this need to create an event so we know which user created the job? await ct.OpsJob.AddAsync(newJob); await ct.SaveChangesAsync(); return newJob; } /// /// Remove any jobs or logs for the object in question /// /// /// internal static async Task DeleteJobsForObjectAsync(AyaTypeId ayObj, AyContext ct) { //Get a list of all jobid's for the object passed in List jobsForObject = await GetJobsForObjectAsync(ayObj, ct); //short circuit if (jobsForObject.Count == 0) return; using (var transaction = await ct.Database.BeginTransactionAsync()) { try { foreach (OpsJob jobToBeDeleted in jobsForObject) { await RemoveJobAndLogsAsync(ct, jobToBeDeleted.GId); } // Commit transaction if all commands succeed, transaction will auto-rollback // when disposed if either commands fails await transaction.CommitAsync(); } catch (Exception ex) { throw ex; } } } /// /// Remove job and logs for that job /// /// /// internal static async Task DeleteJobAndLogAsync(Guid jobId, AyContext ct) { using (var transaction = await ct.Database.BeginTransactionAsync()) { try { await RemoveJobAndLogsAsync(ct, jobId); // Commit transaction if all commands succeed, transaction will auto-rollback // when disposed if either commands fails await transaction.CommitAsync(); } catch (Exception ex) { throw ex; } } } /// /// REmove the job and it's logs /// /// /// private static async Task RemoveJobAndLogsAsync(AyContext ct, Guid jobIdToBeDeleted) { // //delete logs // await ct.Database.ExecuteSqlCommandAsync("delete from aopsjoblog where jobid = {0}", new object[] { jobIdToBeDeleted }); // //delete the job // await ct.Database.ExecuteSqlCommandAsync("delete from aopsjob where gid = {0}", new object[] { jobIdToBeDeleted }); //delete logs await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjoblog where jobid = {jobIdToBeDeleted}"); //delete the job await ct.Database.ExecuteSqlInterpolatedAsync($"delete from aopsjob where gid = {jobIdToBeDeleted}"); } /// /// Make a log entry for a job /// (no context version) /// /// /// internal static async Task LogJobAsync(Guid jobId, string statusText) { return await LogJobAsync(jobId, statusText, null); } /// /// Make a log entry for a job /// /// /// /// internal static async Task LogJobAsync(Guid jobId, string statusText, AyContext ct) { if (ct == null) ct = ServiceProviderProvider.DBContext; if (string.IsNullOrWhiteSpace(statusText)) statusText = "No status provided"; OpsJobLog newObj = new OpsJobLog(); newObj.JobId = jobId; newObj.StatusText = statusText; await ct.OpsJobLog.AddAsync(newObj); await ct.SaveChangesAsync(); return newObj; } /// /// Update the status of a job /// /// /// /// internal static async Task UpdateJobStatusAsync(Guid jobId, JobStatus newStatus, AyContext ct) { var oFromDb = await ct.OpsJob.SingleOrDefaultAsync(m => m.GId == jobId); if (oFromDb == null) return null; oFromDb.JobStatus = newStatus; await ct.SaveChangesAsync(); return oFromDb; } #endregion Job ops #region PROCESSOR /// /// Process all jobs (stock jobs and those found in operations table) /// /// internal static async Task ProcessJobsAsync(AyContext ct, AyaNova.Api.ControllerHelpers.ApiServerState serverState) { //Flush metrics report before anything else happens log.LogTrace("Flushing metrics to reporters"); await CoreJobMetricsReport.DoJobAsync(); //BIZOBJECT DYNAMIC JOBS //get a list of exclusive jobs that are due to happen //Call into each item in turn List exclusiveJobs = await GetReadyJobsExclusiveOnlyAsync(ct); foreach (OpsJob j in exclusiveJobs) { try { await ProcessJobAsync(j, ct); } catch (Exception ex) { log.LogError(ex, $"ProcessJobs::Exclusive -> job {j.Name} failed with exception"); await LogJobAsync(j.GId, "Job failed with errors:", ct); await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex), ct); await UpdateJobStatusAsync(j.GId, JobStatus.Failed, ct); } } //Get a list of non-exlusive jobs that are due //LOOKAT: Parallelize / background this block //http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core //var backgroundTask = Task.Run(() => DoComplexCalculation(42)); //also have to deal with db object etc, I guess they'd have to instantiate themselves to avoid disposed object being used error //This area may turn out to need a re-write in future, but I think it might only involve this block and ProcessJobAsync //the actual individual objects that are responsible for jobs will likely not need a signature rewrite or anything (I hope) //For now I'm hoping that no job will be so slow that it can hold up all the other jobs indefinitely. List sharedJobs = await GetReadyJobsNotExlusiveOnlyAsync(ct); foreach (OpsJob j in sharedJobs) { try { await ProcessJobAsync(j, ct); } catch (Exception ex) { log.LogError(ex, $"ProcessJobs::Shared -> job {j.Name} failed with exception"); await LogJobAsync(j.GId, "Job failed with errors:", ct); await LogJobAsync(j.GId, ExceptionUtil.ExtractAllExceptionMessages(ex), ct); await UpdateJobStatusAsync(j.GId, JobStatus.Failed, ct); } } //STOCK JOBS //Sweep jobs table await CoreJobSweeper.DoSweepAsync(ct); //Health check / metrics await CoreJobMetricsSnapshot.DoJobAsync(ct); //License check long CurrentActiveCount = await UserBiz.ActiveCountAsync(); long LicensedUserCount = AyaNova.Core.License.ActiveKey.ActiveNumber; // log.LogInformation("JobsBiz::Checking license active count"); if (CurrentActiveCount > LicensedUserCount) { var msg = $"E1020 - Active count exceeded capacity"; serverState.SetSystemLock(msg); log.LogCritical(msg); return; } //Notifications } /// /// Process a job by calling into it's biz object /// /// /// /// internal static async Task ProcessJobAsync(OpsJob job, AyContext ct) { log.LogDebug($"ProcessJobAsync -> Processing job {job.Name} (type {job.JobType.ToString()})"); IJobObject o = null; switch (job.JobType) { case JobType.TestWidgetJob: o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.Widget, ct); break; case JobType.ImportV7Data: o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.AyaNova7Import, ct); break; case JobType.SeedTestData: o = (IJobObject)BizObjectFactory.GetBizObject(AyaType.TrialSeeder, ct); break; default: throw new System.NotSupportedException($"ProcessJobAsync type {job.JobType.ToString()} is not supported"); } await o.HandleJobAsync(job); log.LogDebug($"ProcessJobAsync -> Job completed {job.Name} (type {job.JobType.ToString()})"); } #endregion process jobs ///////////////////////////////////////////////////////////////////// }//eoc }//eons