Class: GenevaDrive::Workflow
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- GenevaDrive::Workflow
- Includes:
- FlowControl
- Defined in:
- lib/geneva_drive/workflow.rb
Overview
Base class for all durable workflows in GenevaDrive.
Provides a DSL for defining multi-step workflows that execute asynchronously, with strong guarantees around idempotency, concurrency control, and state management.
Class Method Summary collapse
-
.cancel_if(*conditions) { ... } ⇒ void
Defines a blanket cancellation condition for the workflow.
-
.may_proceed_without_hero! ⇒ void
Allows the workflow to continue even if the hero is deleted.
-
.on_exception(*args, action: nil, wait: nil, max_reattempts: nil, terminal_action: :pause!, report: :always, &block) ⇒ Object
Declares a class-level exception handling policy.
-
.resolve_exception_policy(error) ⇒ ExceptionPolicy?
Resolves the class-level exception policy for a given error.
-
.set_step_job_options(**options) ⇒ void
Sets job options for step execution jobs.
-
.step(name = nil, **options) { ... } ⇒ GenevaDrive::StepDefinition
Defines a step in the workflow.
-
.step_definitions ⇒ Array<StepDefinition>
Returns the step definitions for this workflow class.
-
.steps ⇒ StepCollection
Returns the step collection with proper ordering.
Instance Method Summary collapse
-
#after_step_execution(step_execution) ⇒ void
Hook called after step code completes, before finalization.
-
#around_step_execution(step_execution) { ... } ⇒ Object
Hook that wraps around the actual step code execution.
-
#before_step_execution(step_execution) ⇒ void
Hook called before step execution, after validation passes.
-
#current_execution ⇒ StepExecution?
Returns the current active step execution, if any.
-
#execution_history ⇒ ActiveRecord::Relation<StepExecution>
Returns all step executions in chronological order.
-
#logger ⇒ Logger
Returns the Logger properly tagged to this Workflow.
-
#ongoing? ⇒ Boolean
Returns whether this workflow is in an ongoing (non-terminal) state.
-
#previous_step_name ⇒ String?
Returns the name of the previously executed step.
-
#reschedule_current_step!(wait: nil) ⇒ StepExecution
Reschedules the current step for another attempt.
-
#resume! ⇒ StepExecution?
Resumes a paused workflow.
-
#schedule_next_step!(wait: nil) ⇒ StepExecution?
Schedules the next step in the workflow.
-
#steps ⇒ StepCollection
Returns the step collection for this workflow's class.
-
#transition_to!(new_state, **attributes) ⇒ void
Transitions the workflow to a new state.
-
#with_logger(logger) { ... } ⇒ Object
Temporarily overrides the workflow's logger for the duration of the block.
Methods included from FlowControl
#cancel!, #finished!, #pause!, #reattempt!, #skip!
Class Method Details
.cancel_if(*conditions) { ... } ⇒ void
This method returns an undefined value.
Defines a blanket cancellation condition for the workflow. Checked before every step execution.
166 167 168 169 170 171 172 |
# File 'lib/geneva_drive/workflow.rb', line 166 def cancel_if(*conditions, &block) # Duplicate parent's array to avoid mutation self._cancel_conditions = _cancel_conditions.dup _cancel_conditions.concat(conditions) _cancel_conditions << block if block_given? end |
.may_proceed_without_hero! ⇒ void
This method returns an undefined value.
Allows the workflow to continue even if the hero is deleted. By default, workflows cancel if their hero is missing.
200 201 202 |
# File 'lib/geneva_drive/workflow.rb', line 200 def may_proceed_without_hero! self._may_proceed_without_hero = true end |
.on_exception(action, *exception_matchers, wait: nil, max_reattempts: nil, report: :always) ⇒ Object .on_exception(*exception_matchers, action:, wait: nil, max_reattempts: nil, report: :always) ⇒ Object .on_exception(*exception_matchers, report: :always) {|error| ... } ⇒ Object
Declares a class-level exception handling policy.
Policies are checked when a step raises an exception and the step itself
does not have an explicit on_exception: override.
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/geneva_drive/workflow.rb', line 248 def on_exception(*args, action: nil, wait: nil, max_reattempts: nil, terminal_action: :pause!, report: :always, &block) # Separate exception classes from a leading action symbol if args.first.is_a?(Symbol) raise ArgumentError, "Cannot pass both a positional action and action: keyword" if action action = args.shift end exception_matchers = args.map do |matcher| if matcher.is_a?(String) GenevaDrive::ExceptionPolicy::LazyExceptionMatcher.new(matcher) elsif matcher.is_a?(Class) unless matcher <= Exception raise GenevaDrive::StepConfigurationError, "Expected an Exception subclass, got #{matcher.inspect}" end matcher elsif matcher.respond_to?(:===) matcher else raise GenevaDrive::StepConfigurationError, "Expected an exception matcher (Exception subclass, String, or object responding to #===), got #{matcher.inspect}" end end policy = if block GenevaDrive::ExceptionPolicy.new(report: report, &block) else raise ArgumentError, "Either an action or a block is required" unless action GenevaDrive::ExceptionPolicy.new(action, wait: wait, max_reattempts: max_reattempts, terminal_action: terminal_action, report: report) end policy.exception_matchers.concat(exception_matchers) # Duplicate parent's array to avoid mutation if _exception_policies.equal?(superclass._exception_policies) self._exception_policies = _exception_policies.dup end _exception_policies << policy end |
.resolve_exception_policy(error) ⇒ ExceptionPolicy?
Resolves the class-level exception policy for a given error. Checks policies in reverse definition order (most recent first). Specific (exception class) policies are checked before blanket policies.
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/geneva_drive/workflow.rb', line 295 def resolve_exception_policy(error) # Walk in reverse order (most recently defined first). # Check specific policies (with exception class filters) first, # then blanket policies (no filters). blanket_policy = nil _exception_policies.reverse_each do |policy| if policy.specific? return policy if policy.matches?(error) else # Remember the first (most recent) blanket policy blanket_policy ||= policy end end blanket_policy end |
.set_step_job_options(**options) ⇒ void
This method returns an undefined value.
Sets job options for step execution jobs. Options are passed to ActiveJob's set method.
182 183 184 185 |
# File 'lib/geneva_drive/workflow.rb', line 182 def (**) # Merge with parent's options self. = .merge() end |
.step(name = nil, **options) { ... } ⇒ GenevaDrive::StepDefinition
Defines a step in the workflow.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/geneva_drive/workflow.rb', line 116 def step(name = nil, **, &block) # Capture source locations before any other operations caller_loc = caller_locations(1, 1).first call_location = caller_loc ? [caller_loc.path, caller_loc.lineno] : nil block_location = block&.source_location # Duplicate parent's array only if we haven't already (avoid mutating inherited definitions) if _step_definitions.equal?(superclass._step_definitions) self._step_definitions = _step_definitions.dup end # Invalidate cached step collection since we're adding a step @steps = nil step_name = (name || generate_step_name).to_s # Check for duplicate step names if _step_definitions.any? { |s| s.name == step_name } raise GenevaDrive::StepConfigurationError, "Step '#{step_name}' is already defined in #{self.name}" end # Validate positioning references exist validate_step_positioning_reference!(step_name, [:before_step], :before_step) validate_step_positioning_reference!(step_name, [:after_step], :after_step) step_def = GenevaDrive::StepDefinition.new( name: step_name, callable: block || name, call_location: call_location, block_location: block_location, ** ) _step_definitions << step_def step_def end |
.step_definitions ⇒ Array<StepDefinition>
Returns the step definitions for this workflow class.
316 317 318 |
# File 'lib/geneva_drive/workflow.rb', line 316 def step_definitions _step_definitions end |
.steps ⇒ StepCollection
Returns the step collection with proper ordering.
323 324 325 |
# File 'lib/geneva_drive/workflow.rb', line 323 def steps @steps ||= GenevaDrive::StepCollection.new(_step_definitions) end |
Instance Method Details
#after_step_execution(step_execution) ⇒ void
This method returns an undefined value.
Hook called after step code completes, before finalization. Called regardless of whether the step succeeded, failed, or used flow control.
515 516 517 |
# File 'lib/geneva_drive/workflow.rb', line 515 def after_step_execution(step_execution) # Override in subclasses end |
#around_step_execution(step_execution) { ... } ⇒ Object
Hook that wraps around the actual step code execution. Use this for APM instrumentation that requires wrapping a block.
IMPORTANT: Subclasses MUST call super when overriding this method, otherwise the step code will not execute.
536 537 538 |
# File 'lib/geneva_drive/workflow.rb', line 536 def around_step_execution(step_execution) yield end |
#before_step_execution(step_execution) ⇒ void
This method returns an undefined value.
Hook called before step execution, after validation passes. Use this for APM instrumentation like setting AppSignal action/params.
506 507 508 |
# File 'lib/geneva_drive/workflow.rb', line 506 def before_step_execution(step_execution) # Override in subclasses end |
#current_execution ⇒ StepExecution?
Returns the current active step execution, if any.
457 458 459 |
# File 'lib/geneva_drive/workflow.rb', line 457 def current_execution step_executions.where(state: %w[scheduled in_progress]).first end |
#execution_history ⇒ ActiveRecord::Relation<StepExecution>
Returns all step executions in chronological order.
464 465 466 |
# File 'lib/geneva_drive/workflow.rb', line 464 def execution_history step_executions.order(:created_at) end |
#logger ⇒ Logger
Returns the Logger properly tagged to this Workflow
755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 |
# File 'lib/geneva_drive/workflow.rb', line 755 public def logger @tagged_logger ||= begin base_logger = @injected_base_logger || super tagged_logger = ActiveSupport::TaggedLogging.new(base_logger) # Tag log entries with the workflow, including hero info if present. # Step name is logged separately via the StepExecution logger. tag_parts = [self.class.name, " id=", to_param] if hero_id.present? tag_parts.concat([" hero_type=", hero_type, " hero_id=", hero_id]) end workflow_tag = tag_parts.join tagged_logger.tagged(workflow_tag) end end |
#ongoing? ⇒ Boolean
Returns whether this workflow is in an ongoing (non-terminal) state.
369 370 371 |
# File 'lib/geneva_drive/workflow.rb', line 369 def ongoing? !finished? && !canceled? end |
#previous_step_name ⇒ String?
Returns the name of the previously executed step.
Logic:
- If
current_step_nameis set (currently executing), returns the step before it - If only
next_step_nameis set (waiting for next step), returns the step before it - If workflow is finished (no next step), returns the last step in the sequence
- Returns nil if this is the first step or no steps have been executed
484 485 486 487 488 489 490 491 492 493 |
# File 'lib/geneva_drive/workflow.rb', line 484 def previous_step_name reference_step = current_step_name || next_step_name if reference_step previous_step = steps.previous_before(reference_step) previous_step&.name elsif finished? steps.last&.name end end |
#reschedule_current_step!(wait: nil) ⇒ StepExecution
Reschedules the current step for another attempt.
Uses current_step_name if executing, otherwise next_step_name.
398 399 400 401 402 403 404 405 |
# File 'lib/geneva_drive/workflow.rb', line 398 def reschedule_current_step!(wait: nil) # Use current_step_name during execution, next_step_name otherwise step_name = current_step_name || next_step_name step_def = steps.named(step_name) wait_msg = wait ? " with wait #{wait.inspect}" : "" logger.info("Rescheduling step #{step_name.inspect}#{wait_msg}") create_step_execution(step_def, wait: wait) end |
#resume! ⇒ StepExecution?
Resumes a paused workflow.
Scheduling behavior
Since pause! leaves the scheduled execution intact, resume! re-enqueues a job for the existing execution:
- Scheduled time still in future: Enqueues job with remaining wait time
- Scheduled time has passed (overdue): Enqueues job to run immediately
- No scheduled execution exists: Creates a new execution for immediate run (This happens if the executor ran while paused and canceled the execution)
This approach provides better timeline visibility - you can see that a step was scheduled, became overdue during pause, and when it actually ran.
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/geneva_drive/workflow.rb', line 434 def resume! raise GenevaDrive::InvalidStateError, "Cannot resume a #{state} workflow" unless state == "paused" logger.info("Resuming paused workflow, next step: #{next_step_name.inspect}") with_lock do update!(state: "ready", transitioned_at: nil) end # Look for a scheduled execution to resume scheduled_execution = current_execution if scheduled_execution enqueue_scheduled_execution(scheduled_execution) else # No scheduled execution exists - create one for the next step step_def = steps.named(next_step_name) create_step_execution(step_def, wait: nil) end end |
#schedule_next_step!(wait: nil) ⇒ StepExecution?
Schedules the next step in the workflow.
Uses current_step_name as reference if executing, otherwise next_step_name.
379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/geneva_drive/workflow.rb', line 379 def schedule_next_step!(wait: nil) # Use current_step_name during execution, next_step_name otherwise reference_step = current_step_name || next_step_name next_step = steps.next_after(reference_step) unless next_step logger.info("No more steps after #{reference_step.inspect}, finishing workflow") return finish_workflow! end logger.info("Scheduling next step #{next_step.name.inspect} after #{reference_step.inspect}") create_step_execution(next_step, wait: wait || next_step.wait) end |
#steps ⇒ StepCollection
Returns the step collection for this workflow's class.
471 472 473 |
# File 'lib/geneva_drive/workflow.rb', line 471 def steps self.class.steps end |
#transition_to!(new_state, **attributes) ⇒ void
This method returns an undefined value.
Transitions the workflow to a new state.
545 546 547 548 549 550 551 552 553 |
# File 'lib/geneva_drive/workflow.rb', line 545 def transition_to!(new_state, **attributes) with_lock do attrs = attributes.merge(state: new_state) if %w[finished canceled paused].include?(new_state) attrs[:transitioned_at] = Time.current end update!(attrs) end end |
#with_logger(logger) { ... } ⇒ Object
Temporarily overrides the workflow's logger for the duration of the block.
This allows the Executor to inject the step execution's logger (which
includes both workflow and step-specific tags) so that step code calling
logger gets the fully-tagged step execution logger.
The passed logger is used directly - no additional tagging is applied. The original logger is restored after the block completes.
744 745 746 747 748 749 750 |
# File 'lib/geneva_drive/workflow.rb', line 744 public def with_logger(logger) previous_tagged_logger = @tagged_logger @tagged_logger = logger yield ensure @tagged_logger = previous_tagged_logger end |