Class: GenevaDrive::Workflow

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
FlowControl
Defined in:
lib/geneva_drive/workflow.rb

Overview

Base class for all durable workflows in GenevaDrive.

Provides a DSL for defining multi-step workflows that execute asynchronously, with strong guarantees around idempotency, concurrency control, and state management.

Examples:

Basic workflow definition

class SignupWorkflow < GenevaDrive::Workflow
  step :send_welcome_email do
    WelcomeMailer.welcome(hero).deliver_later
  end

  step :send_reminder, wait: 2.days do
    ReminderMailer.remind(hero).deliver_later
  end
end

Creating and starting a workflow

SignupWorkflow.create!(hero: current_user)

Class Method Summary collapse

Instance Method Summary collapse

Methods included from FlowControl

#cancel!, #finished!, #pause!, #reattempt!, #skip!

Class Method Details

.cancel_if(*conditions) { ... } ⇒ void

This method returns an undefined value.

Defines a blanket cancellation condition for the workflow. Checked before every step execution.

Examples:

Cancel if hero is deactivated

cancel_if { hero.deactivated? }

Cancel using a method

cancel_if :hero_deactivated?

Parameters:

  • conditions (Array<Symbol, Proc>)

    condition methods or procs

Yields:

  • an optional block condition



166
167
168
169
170
171
172
# File 'lib/geneva_drive/workflow.rb', line 166

def cancel_if(*conditions, &block)
  # Duplicate parent's array to avoid mutation
  self._cancel_conditions = _cancel_conditions.dup

  _cancel_conditions.concat(conditions)
  _cancel_conditions << block if block_given?
end

.may_proceed_without_hero!void

This method returns an undefined value.

Allows the workflow to continue even if the hero is deleted. By default, workflows cancel if their hero is missing.

Examples:

Allow cleanup workflows to run without hero

class CleanupWorkflow < GenevaDrive::Workflow
  may_proceed_without_hero!

  step :cleanup do
    DataArchive.cleanup_for_hero_id(hero&.id)
  end
end


200
201
202
# File 'lib/geneva_drive/workflow.rb', line 200

def may_proceed_without_hero!
  self._may_proceed_without_hero = true
end

.on_exception(action, *exception_matchers, wait: nil, max_reattempts: nil, report: :always) ⇒ Object .on_exception(*exception_matchers, action:, wait: nil, max_reattempts: nil, report: :always) ⇒ Object .on_exception(*exception_matchers, report: :always) {|error| ... } ⇒ Object

Declares a class-level exception handling policy. Policies are checked when a step raises an exception and the step itself does not have an explicit on_exception: override.

Examples:

Blanket default for all exceptions

on_exception :reattempt!, wait: 15.seconds, max_reattempts: 3

Match specific exception classes

on_exception OAuth2::Error, action: :reattempt!, wait: 15.seconds
on_exception Google::Apis::ClientError, action: :cancel!

Imperative block handler

on_exception RateLimitError do |error|
  reattempt! wait: error.retry_after.seconds
end

Suppress reporting for expected rate limits

on_exception RateLimitError, report: :never do |error|
  reattempt! wait: error.retry_after.seconds
end

Report only when reattempts are exhausted

on_exception Timeout::Error, action: :reattempt!, max_reattempts: 5, report: :terminal_only

Overloads:

  • .on_exception(action, *exception_matchers, wait: nil, max_reattempts: nil, report: :always) ⇒ Object

    Declarative mode — specify an action symbol.

    Parameters:

    • action (Symbol)

      :pause!, :cancel!, :reattempt!, or :skip!

    • exception_matchers (Array<Class>)

      optional exception classes to match

    • wait (ActiveSupport::Duration, nil) (defaults to: nil)

      wait before reattempt

    • max_reattempts (Integer, nil) (defaults to: nil)

      max consecutive reattempts

    • report (Symbol) (defaults to: :always)

      when to report the exception to +Rails.error.report+ (+:always+, +:never+, or +:terminal_only+). See ExceptionPolicy for details.

  • .on_exception(*exception_matchers, action:, wait: nil, max_reattempts: nil, report: :always) ⇒ Object

    Declarative mode with exception classes as leading args and action as keyword.

    Parameters:

    • exception_matchers (Array<Class>)

      exception classes to match

    • action (Symbol)

      :pause!, :cancel!, :reattempt!, or :skip!

    • report (Symbol) (defaults to: :always)

      when to report the exception (+:always+, +:never+, or +:terminal_only+)

  • .on_exception(*exception_matchers, report: :always) {|error| ... } ⇒ Object

    Imperative mode — block receives exception, runs in workflow context.

    Parameters:

    • exception_matchers (Array<Class>)

      optional exception classes to match

    • report (Symbol) (defaults to: :always)

      when to report the exception (+:always+, +:never+, or +:terminal_only+)

    Yields:

    • (error)

      the exception that was raised



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/geneva_drive/workflow.rb', line 248

def on_exception(*args, action: nil, wait: nil, max_reattempts: nil, terminal_action: :pause!, report: :always, &block)
  # Separate exception classes from a leading action symbol
  if args.first.is_a?(Symbol)
    raise ArgumentError, "Cannot pass both a positional action and action: keyword" if action
    action = args.shift
  end

  exception_matchers = args.map do |matcher|
    if matcher.is_a?(String)
      GenevaDrive::ExceptionPolicy::LazyExceptionMatcher.new(matcher)
    elsif matcher.is_a?(Class)
      unless matcher <= Exception
        raise GenevaDrive::StepConfigurationError,
          "Expected an Exception subclass, got #{matcher.inspect}"
      end
      matcher
    elsif matcher.respond_to?(:===)
      matcher
    else
      raise GenevaDrive::StepConfigurationError,
        "Expected an exception matcher (Exception subclass, String, or object responding to #===), got #{matcher.inspect}"
    end
  end

  policy = if block
    GenevaDrive::ExceptionPolicy.new(report: report, &block)
  else
    raise ArgumentError, "Either an action or a block is required" unless action
    GenevaDrive::ExceptionPolicy.new(action, wait: wait, max_reattempts: max_reattempts, terminal_action: terminal_action, report: report)
  end

  policy.exception_matchers.concat(exception_matchers)

  # Duplicate parent's array to avoid mutation
  if _exception_policies.equal?(superclass._exception_policies)
    self._exception_policies = _exception_policies.dup
  end

  _exception_policies << policy
end

.resolve_exception_policy(error) ⇒ ExceptionPolicy?

Resolves the class-level exception policy for a given error. Checks policies in reverse definition order (most recent first). Specific (exception class) policies are checked before blanket policies.

Parameters:

  • error (Exception)

    the exception to match

Returns:



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/geneva_drive/workflow.rb', line 295

def resolve_exception_policy(error)
  # Walk in reverse order (most recently defined first).
  # Check specific policies (with exception class filters) first,
  # then blanket policies (no filters).
  blanket_policy = nil

  _exception_policies.reverse_each do |policy|
    if policy.specific?
      return policy if policy.matches?(error)
    else
      # Remember the first (most recent) blanket policy
      blanket_policy ||= policy
    end
  end

  blanket_policy
end

.set_step_job_options(**options) ⇒ void

This method returns an undefined value.

Sets job options for step execution jobs. Options are passed to ActiveJob's set method.

Examples:

Set queue for workflow steps

set_step_job_options queue: :workflows, priority: 10

Parameters:

  • options (Hash)

    job options (queue, priority, etc.)



182
183
184
185
# File 'lib/geneva_drive/workflow.rb', line 182

def set_step_job_options(**options)
  # Merge with parent's options
  self._step_job_options = _step_job_options.merge(options)
end

.step(name = nil, **options) { ... } ⇒ GenevaDrive::StepDefinition

Defines a step in the workflow.

Examples:

Named step with block

step :send_email do
  Mailer.send(hero).deliver_later
end

Step with wait time

step :send_reminder, wait: 2.days do
  ReminderMailer.remind(hero).deliver_later
end

Step with skip condition

step :charge, skip_if: -> { hero.free_tier? } do
  PaymentGateway.charge(hero)
end

Step with simple exception handling

step :external_api, on_exception: :reattempt! do
  ExternalApi.call(hero)
end

Step with composable exception policies

step :sync_calendar, on_exception: [
  GenevaDrive::ExceptionPolicy.new(:reattempt!, matching: Timeout::Error, max_reattempts: 5),
  GenevaDrive::ExceptionPolicy.new(:cancel!, matching: OAuth2::Error),
  GenevaDrive::ExceptionPolicy.new(:skip!)  # blanket fallback
] do
  GoogleCalendar.sync(hero)
end

Parameters:

  • name (String, Symbol, nil) (defaults to: nil)

    the step name (auto-generated if nil)

  • options (Hash)

    step options

Options Hash (**options):

  • :wait (ActiveSupport::Duration, nil)

    delay before execution

  • :skip_if (Proc, Symbol, Boolean, nil)

    condition for skipping

  • :on_exception (Symbol, GenevaDrive::ExceptionPolicy, Proc, Array<GenevaDrive::ExceptionPolicy>)

    exception handling policy. Accepts:

    • A Symbol (+:pause!+, +:cancel!+, +:reattempt!+, +:skip!+) for simple actions
    • An ExceptionPolicy object for reusable, configurable policies
    • A Proc/lambda that receives the exception and calls a flow control method
    • An Array of ExceptionPolicy objects for composable per-exception-type handling. Specific policies (those with +matching:+) are checked first; the first blanket policy acts as a fallback. If nothing matches, class-level policies are consulted.
  • :max_reattempts (Integer, nil)

    max consecutive reattempts (only with symbol form)

  • :terminal_action (Symbol)

    what to do when max_reattempts is exceeded (+:pause!+ or +:cancel!+, only with symbol form)

  • :before_step (String, Symbol, nil)

    position before this step

  • :after_step (String, Symbol, nil)

    position after this step

Yields:

  • the step implementation

Returns:



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/geneva_drive/workflow.rb', line 116

def step(name = nil, **options, &block)
  # Capture source locations before any other operations
  caller_loc = caller_locations(1, 1).first
  call_location = caller_loc ? [caller_loc.path, caller_loc.lineno] : nil
  block_location = block&.source_location

  # Duplicate parent's array only if we haven't already (avoid mutating inherited definitions)
  if _step_definitions.equal?(superclass._step_definitions)
    self._step_definitions = _step_definitions.dup
  end
  # Invalidate cached step collection since we're adding a step
  @steps = nil

  step_name = (name || generate_step_name).to_s

  # Check for duplicate step names
  if _step_definitions.any? { |s| s.name == step_name }
    raise GenevaDrive::StepConfigurationError,
      "Step '#{step_name}' is already defined in #{self.name}"
  end

  # Validate positioning references exist
  validate_step_positioning_reference!(step_name, options[:before_step], :before_step)
  validate_step_positioning_reference!(step_name, options[:after_step], :after_step)

  step_def = GenevaDrive::StepDefinition.new(
    name: step_name,
    callable: block || name,
    call_location: call_location,
    block_location: block_location,
    **options
  )

  _step_definitions << step_def

  step_def
end

.step_definitionsArray<StepDefinition>

Returns the step definitions for this workflow class.

Returns:



316
317
318
# File 'lib/geneva_drive/workflow.rb', line 316

def step_definitions
  _step_definitions
end

.stepsStepCollection

Returns the step collection with proper ordering.

Returns:



323
324
325
# File 'lib/geneva_drive/workflow.rb', line 323

def steps
  @steps ||= GenevaDrive::StepCollection.new(_step_definitions)
end

Instance Method Details

#after_step_execution(step_execution) ⇒ void

This method returns an undefined value.

Hook called after step code completes, before finalization. Called regardless of whether the step succeeded, failed, or used flow control.

Parameters:



515
516
517
# File 'lib/geneva_drive/workflow.rb', line 515

def after_step_execution(step_execution)
  # Override in subclasses
end

#around_step_execution(step_execution) { ... } ⇒ Object

Hook that wraps around the actual step code execution. Use this for APM instrumentation that requires wrapping a block.

IMPORTANT: Subclasses MUST call super when overriding this method, otherwise the step code will not execute.

Examples:

Wrap with AppSignal transaction

def around_step_execution(step_execution)
  Appsignal.monitor(
    namespace: "workflow",
    action: "#{self.class.name}##{step_execution.step_name}"
  ) { super }
end

Parameters:

Yields:

  • the step code block

Returns:

  • (Object)

    the result of the block



536
537
538
# File 'lib/geneva_drive/workflow.rb', line 536

def around_step_execution(step_execution)
  yield
end

#before_step_execution(step_execution) ⇒ void

This method returns an undefined value.

Hook called before step execution, after validation passes. Use this for APM instrumentation like setting AppSignal action/params.

Examples:

Set AppSignal transaction metadata

def before_step_execution(step_execution)
  Appsignal.set_action("#{self.class.name}##{step_execution.step_name}")
  Appsignal.set_params("hero" => { "type" => hero_type, "id" => hero_id })
end

Parameters:



506
507
508
# File 'lib/geneva_drive/workflow.rb', line 506

def before_step_execution(step_execution)
  # Override in subclasses
end

#current_executionStepExecution?

Returns the current active step execution, if any.

Returns:



457
458
459
# File 'lib/geneva_drive/workflow.rb', line 457

def current_execution
  step_executions.where(state: %w[scheduled in_progress]).first
end

#execution_historyActiveRecord::Relation<StepExecution>

Returns all step executions in chronological order.

Returns:

  • (ActiveRecord::Relation<StepExecution>)

    the execution history



464
465
466
# File 'lib/geneva_drive/workflow.rb', line 464

def execution_history
  step_executions.order(:created_at)
end

#loggerLogger

Returns the Logger properly tagged to this Workflow

Returns:

  • (Logger)


755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
# File 'lib/geneva_drive/workflow.rb', line 755

public def logger
  @tagged_logger ||= begin
    base_logger = @injected_base_logger || super
    tagged_logger = ActiveSupport::TaggedLogging.new(base_logger)

    # Tag log entries with the workflow, including hero info if present.
    # Step name is logged separately via the StepExecution logger.
    tag_parts = [self.class.name, " id=", to_param]
    if hero_id.present?
      tag_parts.concat([" hero_type=", hero_type, " hero_id=", hero_id])
    end
    workflow_tag = tag_parts.join

    tagged_logger.tagged(workflow_tag)
  end
end

#ongoing?Boolean

Returns whether this workflow is in an ongoing (non-terminal) state.

Returns:

  • (Boolean)


369
370
371
# File 'lib/geneva_drive/workflow.rb', line 369

def ongoing?
  !finished? && !canceled?
end

#previous_step_nameString?

Returns the name of the previously executed step.

Logic:

  • If current_step_name is set (currently executing), returns the step before it
  • If only next_step_name is set (waiting for next step), returns the step before it
  • If workflow is finished (no next step), returns the last step in the sequence
  • Returns nil if this is the first step or no steps have been executed

Returns:

  • (String, nil)

    the previous step name or nil



484
485
486
487
488
489
490
491
492
493
# File 'lib/geneva_drive/workflow.rb', line 484

def previous_step_name
  reference_step = current_step_name || next_step_name

  if reference_step
    previous_step = steps.previous_before(reference_step)
    previous_step&.name
  elsif finished?
    steps.last&.name
  end
end

#reschedule_current_step!(wait: nil) ⇒ StepExecution

Reschedules the current step for another attempt.

Uses current_step_name if executing, otherwise next_step_name.

Parameters:

  • wait (ActiveSupport::Duration, nil) (defaults to: nil)

    delay before retry

Returns:



398
399
400
401
402
403
404
405
# File 'lib/geneva_drive/workflow.rb', line 398

def reschedule_current_step!(wait: nil)
  # Use current_step_name during execution, next_step_name otherwise
  step_name = current_step_name || next_step_name
  step_def = steps.named(step_name)
  wait_msg = wait ? " with wait #{wait.inspect}" : ""
  logger.info("Rescheduling step #{step_name.inspect}#{wait_msg}")
  create_step_execution(step_def, wait: wait)
end

#resume!StepExecution?

Resumes a paused workflow.

Scheduling behavior

Since pause! leaves the scheduled execution intact, resume! re-enqueues a job for the existing execution:

  • Scheduled time still in future: Enqueues job with remaining wait time
  • Scheduled time has passed (overdue): Enqueues job to run immediately
  • No scheduled execution exists: Creates a new execution for immediate run (This happens if the executor ran while paused and canceled the execution)

This approach provides better timeline visibility - you can see that a step was scheduled, became overdue during pause, and when it actually ran.

Examples:

Resuming while step is still scheduled for future

# step_two has wait: 2.days, scheduled for tomorrow
workflow.pause!         # paused today
workflow.resume!        # step_two still scheduled for tomorrow

Resuming after scheduled time passed (overdue)

# step_two was scheduled for yesterday
workflow.pause!         # paused last week
workflow.resume!        # step_two runs immediately (overdue)

Returns:

  • (StepExecution, nil)

    the step execution that will run, or nil if none

Raises:



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/geneva_drive/workflow.rb', line 434

def resume!
  raise GenevaDrive::InvalidStateError, "Cannot resume a #{state} workflow" unless state == "paused"

  logger.info("Resuming paused workflow, next step: #{next_step_name.inspect}")

  with_lock do
    update!(state: "ready", transitioned_at: nil)
  end

  # Look for a scheduled execution to resume
  scheduled_execution = current_execution
  if scheduled_execution
    enqueue_scheduled_execution(scheduled_execution)
  else
    # No scheduled execution exists - create one for the next step
    step_def = steps.named(next_step_name)
    create_step_execution(step_def, wait: nil)
  end
end

#schedule_next_step!(wait: nil) ⇒ StepExecution?

Schedules the next step in the workflow.

Uses current_step_name as reference if executing, otherwise next_step_name.

Parameters:

  • wait (ActiveSupport::Duration, nil) (defaults to: nil)

    override wait time

Returns:

  • (StepExecution, nil)

    the created step execution or nil if finished



379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/geneva_drive/workflow.rb', line 379

def schedule_next_step!(wait: nil)
  # Use current_step_name during execution, next_step_name otherwise
  reference_step = current_step_name || next_step_name
  next_step = steps.next_after(reference_step)
  unless next_step
    logger.info("No more steps after #{reference_step.inspect}, finishing workflow")
    return finish_workflow!
  end

  logger.info("Scheduling next step #{next_step.name.inspect} after #{reference_step.inspect}")
  create_step_execution(next_step, wait: wait || next_step.wait)
end

#stepsStepCollection

Returns the step collection for this workflow's class.

Returns:



471
472
473
# File 'lib/geneva_drive/workflow.rb', line 471

def steps
  self.class.steps
end

#transition_to!(new_state, **attributes) ⇒ void

This method returns an undefined value.

Transitions the workflow to a new state.

Parameters:

  • new_state (String)

    the target state

  • attributes (Hash)

    additional attributes to update



545
546
547
548
549
550
551
552
553
# File 'lib/geneva_drive/workflow.rb', line 545

def transition_to!(new_state, **attributes)
  with_lock do
    attrs = attributes.merge(state: new_state)
    if %w[finished canceled paused].include?(new_state)
      attrs[:transitioned_at] = Time.current
    end
    update!(attrs)
  end
end

#with_logger(logger) { ... } ⇒ Object

Temporarily overrides the workflow's logger for the duration of the block. This allows the Executor to inject the step execution's logger (which includes both workflow and step-specific tags) so that step code calling logger gets the fully-tagged step execution logger.

The passed logger is used directly - no additional tagging is applied. The original logger is restored after the block completes.

Examples:

Use step execution logger during step code

workflow.with_logger(step_execution.logger) do
  step_def.execute_in_context(workflow)
end

Parameters:

  • logger (Logger)

    the logger to use (typically the step execution's logger)

Yields:

  • the block to execute with the injected logger

Returns:

  • (Object)

    the result of the block



744
745
746
747
748
749
750
# File 'lib/geneva_drive/workflow.rb', line 744

public def with_logger(logger)
  previous_tagged_logger = @tagged_logger
  @tagged_logger = logger
  yield
ensure
  @tagged_logger = previous_tagged_logger
end