Skip to content
Snippets Groups Projects

Enforce assignment timeouts from the core

5 files
+ 113
23
Compare changes
  • Side-by-side
  • Inline

Files

@@ -18,6 +18,7 @@ import nl.tudelft.ewi.auta.common.communication.ServerCommunicationException;
import nl.tudelft.ewi.auta.common.threads.Threads;
import nl.tudelft.ewi.auta.core.database.EntityContainer;
import nl.tudelft.ewi.auta.core.database.SubmissionRepository;
import nl.tudelft.ewi.auta.core.model.Job;
import nl.tudelft.ewi.auta.core.settings.GlobalSettings;
import nl.tudelft.ewi.auta.core.settings.SettingKey;
import org.slf4j.Logger;
@@ -266,34 +267,27 @@ public class MessageReceiver {
return;
}
@Nullable
final var job = worker.getCurrentJob();
switch (message.getType()) {
case "bye":
logger.info("Worker {} shutting down", worker);
if (job != null) {
this.cancelJob(worker, job, message);
}
this.pool.eject(worker);
break;
case "exception":
final var job = worker.getCurrentJob();
if (job != null) {
logger.warn("Submission caused an uncaught exception in worker {}", worker);
logger.warn("{}", message.getData());
var exceptionMessage = (String) message.getData();
final var submission = job.getSubmission();
final var fakeResults = new EntityContainer(
new ProjectEntity(),
submission.getId(),
true,
exceptionMessage,
submission.getAssignmentId()
);
submission.getPipelineLog().setAnalysisDone(Instant.now());
this.submissionRepository.save(submission);
this.cancelJob(worker, job, message);
this.resultsProcessor.process(fakeResults);
worker.setCurrentJob(null);
this.pool.dismiss(worker);
} else {
logger.warn("Idle worker {} reported exception {}", worker, message.getData());
@@ -327,6 +321,34 @@ public class MessageReceiver {
}
}
/**
* Cancels a job via a message.
*
* @param worker the worker the job was assigned to
* @param message the message that triggered the cancelation
* @param job the job to cancel
*/
private void cancelJob(final WorkerConnection worker, final Job job, final Message message) {
worker.setCurrentJob(null);
@Nullable
var exceptionMessage = (String) message.getData();
final var submission = job.getSubmission();
final var fakeResults = new EntityContainer(
new ProjectEntity(),
submission.getId(),
true,
exceptionMessage,
submission.getAssignmentId()
);
submission.getPipelineLog().setAnalysisDone(Instant.now());
this.submissionRepository.save(submission);
this.resultsProcessor.process(fakeResults);
}
/**
* Handles a ping response command ("polo").
*
Loading