Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from odoo.tools import config

from ..delay import chain, group
from ..exception import FailedJobError, RetryableJobError
from ..exception import FailedJobError, JobMethodNotFound, RetryableJobError
from ..job import ENQUEUED, Job

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,7 +78,30 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
ENQUEUED,
)
return None
job = Job.load(env, job_uuid)
try:
job = Job.load(env, job_uuid)
except JobMethodNotFound as exc:
# In case a job's method no longer exists, we don't want the runner
# to keep re-enqueuing it.
exc_name = f"{exc.__class__.__module__}.{exc.__class__.__name__}"
exc_message = str(exc)
failed_record = env["queue.job"].search([("uuid", "=", job_uuid)], limit=1)
if failed_record:
failed_record.write(
{
"state": "failed",
"exc_name": exc_name,
"exc_message": exc_message,
"exc_info": exc_message,
}
)
_logger.warning(
"Job %s references a non-existent method and was marked as failed",
job_uuid,
)
if not config["test_enable"]:
env.cr.commit()
Comment on lines +83 to +103

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the raw SQL and the ORM are using the same cursor; the same savepoint?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps job.in_temporary_env() and then ORM methods instead of raw SQL?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's good point, I was overthinking it cause JobMethodNotFound wont cause any rollback, so it's safe using the same cursor I guess.

in_temporary_env won't work here because it's failing during loading the job as getattr fails

return None
assert job and job.state == ENQUEUED
job.set_started()
job.store()
Expand Down
13 changes: 13 additions & 0 deletions queue_job/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ class FailedJobError(JobError):
"""A job had an error having to be resolved."""


class JobMethodNotFound(FailedJobError):
"""The job's target method no longer exists on the model."""

def __init__(self, model_name, method_name):
self.model_name = model_name
self.method_name = method_name
super().__init__(
f"Method '{method_name}' does not exist on model '{model_name}'."
f" The job function may have been removed or the module providing"
f" it was uninstalled after this job was created."
)


class RetryableJobError(JobError):
"""A job had an error but can be retried.

Expand Down
25 changes: 21 additions & 4 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

import odoo

from .exception import FailedJobError, NoSuchJobError, RetryableJobError
from .exception import (
FailedJobError,
JobMethodNotFound,
NoSuchJobError,
RetryableJobError,
)

WAIT_DEPENDENCIES = "wait_dependencies"
PENDING = "pending"
Expand Down Expand Up @@ -217,10 +222,20 @@ def load(cls, env, job_uuid):
def load_many(cls, env, job_uuids):
"""Read jobs in batch from the Database

Jobs not found are ignored.
Jobs not found are ignored. Jobs whose method no longer exists are also
skipped with a warning.
"""
recordset = cls.db_records_from_uuids(env, job_uuids)
return {cls._load_from_db_record(record) for record in recordset}
jobs = set()
for record in recordset:
try:
jobs.add(cls._load_from_db_record(record))
except JobMethodNotFound:
_logger.warning(
"Skipping job %s as method no longer exists",
record.uuid,
)
return jobs

def add_lock_record(self) -> None:
"""
Expand Down Expand Up @@ -281,7 +296,9 @@ def _load_from_db_record(cls, job_db_record):
method_name = stored.method_name

recordset = stored.records
method = getattr(recordset, method_name)
method = getattr(recordset, method_name, None)
if method is None:
raise JobMethodNotFound(recordset._name, method_name)

eta = None
if stored.eta:
Expand Down
17 changes: 14 additions & 3 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from odoo.addons.base_sparse_field.models.fields import Serialized

from ..delay import Graph
from ..exception import JobError, RetryableJobError
from ..exception import JobError, JobMethodNotFound, RetryableJobError
from ..fields import JobSerialized
from ..job import (
CANCELLED,
Expand Down Expand Up @@ -279,7 +279,15 @@ def write(self, vals):
def open_related_action(self):
"""Open the related action associated to the job"""
self.ensure_one()
job = Job.load(self.env, self.uuid)
try:
job = Job.load(self.env, self.uuid)
except JobMethodNotFound as exc:
raise exceptions.UserError(
_(
"The job function is no longer available: %s",
exc,
)
) from exc
action = job.related_action()
if action is None:
raise exceptions.UserError(_("No action available for this job"))
Expand Down Expand Up @@ -309,7 +317,10 @@ def _change_job_state(self, state, result=None):
(date, result, ...).
"""
for record in self:
job_ = Job.load(record.env, record.uuid)
try:
job_ = Job.load(record.env, record.uuid)
except JobMethodNotFound:
continue
if state == DONE:
job_.set_done(result=result)
job_.store()
Expand Down
40 changes: 40 additions & 0 deletions test_queue_job/tests/test_acquire_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from unittest import mock

from odoo.tests import tagged
from odoo.tests.common import mute_logger

from odoo.addons.queue_job.controllers.main import RunJobController
from odoo.addons.queue_job.job import Job

from .common import JobCommonCase

Expand Down Expand Up @@ -49,3 +51,41 @@ def test_acquire_started_job(self):
"was requested to run job test_started_job, but it does not exist",
logs.output[0],
)

def _create_non_existing_method_job(self):
test_job = Job(self.method)
test_job.store()
self.env.cr.execute(
"""
UPDATE queue_job SET state = 'enqueued', method_name = %s
WHERE uuid = %s
""",
("nonexistent_xyz", test_job.uuid),
)
self.env["queue.job"].invalidate_model()
return test_job.uuid

def test_acquire_returns_none_when_method_missing(self):
uuid = self._create_non_existing_method_job()
with (
mock.patch.object(
self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all)
),
mute_logger("odoo.addons.queue_job.controllers.main"),
):
job = RunJobController._acquire_job(self.env, uuid)
self.assertIsNone(job)

def test_acquire_marks_job_failed_when_method_missing(self):
uuid = self._create_non_existing_method_job()
with (
mock.patch.object(
self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all)
),
mute_logger("odoo.addons.queue_job.controllers.main"),
):
RunJobController._acquire_job(self.env, uuid)
self.env["queue.job"].invalidate_model()
job_record = self.env["queue.job"].search([("uuid", "=", uuid)])
self.assertEqual(job_record.state, "failed")
self.assertIn("nonexistent_xyz", job_record.exc_message)
39 changes: 39 additions & 0 deletions test_queue_job/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from unittest import mock

import odoo.tests.common as common
from odoo.tests.common import mute_logger

from odoo.addons.queue_job import identity_exact
from odoo.addons.queue_job.delay import DelayableGraph
from odoo.addons.queue_job.exception import (
FailedJobError,
JobMethodNotFound,
NoSuchJobError,
RetryableJobError,
)
Expand Down Expand Up @@ -304,6 +306,33 @@ def test_job_unlinked(self):
with self.assertRaises(NoSuchJobError):
Job.load(self.env, test_job.uuid)

def _create_non_existing_method_job(self):
test_job = Job(self.method)
test_job.store()
self.env.cr.execute(
"UPDATE queue_job SET method_name = %s WHERE uuid = %s",
("nonexistent_xyz", test_job.uuid),
)
self.env["queue.job"].invalidate_model()
return test_job.uuid

def test_load_missing_method(self):
"""Job.load raises JobMethodNotFound when the method is missing."""
uuid = self._create_non_existing_method_job()
with self.assertRaises(JobMethodNotFound):
Job.load(self.env, uuid)

def test_load_many_skips_missing_method(self):
"""load_many skips a broken job — other jobs still load."""
good = Job(self.method)
good.store()
bad_uuid = self._create_non_existing_method_job()
with mute_logger("odoo.addons.queue_job.job"):
loaded = Job.load_many(self.env, [good.uuid, bad_uuid])
loaded_uuids = {j.uuid for j in loaded}
self.assertIn(good.uuid, loaded_uuids)
self.assertNotIn(bad_uuid, loaded_uuids)

def test_unicode(self):
test_job = Job(
self.method,
Expand Down Expand Up @@ -571,6 +600,16 @@ def test_requeue(self):
stored.requeue()
self.assertEqual(stored.state, PENDING)

def test_change_state_skips_missing_method(self):
"""_change_job_state does not crash when the method is missing."""
stored = self._create_job()
self.env.cr.execute(
"UPDATE queue_job SET method_name = %s WHERE uuid = %s",
("vanished_xyz", stored.uuid),
)
self.env["queue.job"].invalidate_model()
stored.button_done()

def test_requeue_wait_dependencies_not_touched(self):
job_root = Job(self.env["test.queue.job"].testing_method)
job_child = Job(self.env["test.queue.job"].testing_method)
Expand Down
11 changes: 11 additions & 0 deletions test_queue_job/tests/test_related_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,14 @@ def test_decorator(self):
"url": "https://en.wikipedia.org/wiki/Discworld",
}
self.assertEqual(job_.related_action(), expected)

def test_open_related_action_missing_method(self):
job_ = self.model.with_delay().testing_related_action__no()
stored = job_.db_record()
self.env.cr.execute(
"UPDATE queue_job SET method_name = %s WHERE uuid = %s",
("gone_xyz", job_.uuid),
)
self.env["queue.job"].invalidate_model()
with self.assertRaises(exceptions.UserError):
stored.open_related_action()
Loading