Adding links to Nexus signals#1593
Conversation
|
|
| # If this signal-with-start is issued from inside a Nexus operation handler (but not as the | ||
| # nexus-backing workflow, whose links are handled separately by | ||
| # WorkflowRunOperationContext.start_workflow), capture the signal backlink the server | ||
| # returned so the caller workflow's Nexus history event links to the signaled event. A | ||
| # plain start does not capture a backlink: it only forwards the inbound links onto the | ||
| # start request. | ||
| nexus_ctx = self._try_nexus_start_operation_context() | ||
| if ( | ||
| nexus_ctx is not None | ||
| and not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context() | ||
| and isinstance( | ||
| resp, | ||
| temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse, | ||
| ) | ||
| ): | ||
| # Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at | ||
| # the WorkflowExecutionSignaled event; older servers leave it unset. | ||
| if resp.HasField("signal_link"): | ||
| nexus_ctx._add_backlink(resp.signal_link) |
There was a problem hiding this comment.
I'm wondering if nexus_ctx._add_outbound_links() should be renamed to reflect that it's from workflows (e.g. nexus_ctx._add_workflow_backlinks() or something similar) and updated to handle this and be called here? It would also require removing the call in _start_nexus_backing_workflow
Seems like it might be more correct to just attach any back links to the nexus context if it exists, regardless of if the target workflow will complete the nexus operation.
There was a problem hiding this comment.
Renamed it to _add_backing_workflow_response_link. _add_backing_workflow_response_link has to construct a link if it's not set, but _add_response_link doesn't - so I'm not sure about combining them though.
| @@ -241,6 +260,14 @@ async def _build_start_workflow_execution_request( | |||
| req.on_conflict_options.attach_request_id = True | |||
| req.on_conflict_options.attach_completion_callbacks = True | |||
| req.on_conflict_options.attach_links = True | |||
| else: | |||
| # If this is a plain start_workflow issued from inside a Nexus operation handler | |||
| # (not the nexus-backing workflow, which already carries inbound links via | |||
| # input.links), forward the inbound Nexus task links so the started callee's | |||
| # WorkflowExecutionStarted event links back to the caller. | |||
| nexus_ctx = self._try_nexus_start_operation_context() | |||
| if nexus_ctx is not None: | |||
| req.links.extend(nexus_ctx._get_outgoing_request_links()) | |||
There was a problem hiding this comment.
I like this change overall, but wonder if the flags are not set correctly. In the backing_workflow_start_context case, we definitely want the callbacks attacked.
In any Nexus case, I think we want the request ID and the links attached.
There was a problem hiding this comment.
If I understand this right - the plain start_workflow runs under the user's own conflict policy, so those flags should already be set. The backing worflow start doesn't use the user's confict policy, so those have to be set. So I think this is correct.
There was a problem hiding this comment.
Users supply the conflict policy in both cases. These flags indicate behavior for when that policy is WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING and a conflict is detected.
What separates a 'backing' workflow from a 'plain' workflow is that the backing workflow has callbacks associated that will complete the nexus operation with the result of the workflow.
In this new path, we want to attach the links and request ID. Since the TemporalNexusClient and WorkflowRunOperationContext should be in charge of correctly setting the callbacks, it's safe to set all the flags to true when this is invoked from an operation handler.
There was a problem hiding this comment.
Please add tests that exercise this via standalone nexus operations.
There was a problem hiding this comment.
So this did actually uncover a problem - _link_conversion.py was out of date. That's why I had trouble with this test.
Standalone Nexus operations (invoked via client.create_nexus_client(...)) receive a NexusOperation-variant inbound link in the server's canonical format …/nexus-operations/{op_id}/{run_id}/details. The SDK's parser only matched an older …/nexus-operations/{op_id}?runID=… form, so it silently dropped the link — no forward link propagated to the signaled callee.
Updated the regex/parser/formatter in _link_conversion.py to the canonical /{run_id}/details format so the link parses and the forward link now lands on the callee.
| @service_handler(service=SignalingService) | ||
| class SignalingServiceHandler: | ||
| @sync_operation | ||
| async def op(self, _ctx: StartOperationContext, input: str) -> str: |
There was a problem hiding this comment.
Please use a dataclass rather than a string split by ':' for input
This adds links to Nexus signals - the receiving workflow has a link to the workflow sending the signal. and the sending workflow has a link to the receiving workflow.