Skip to content

Add Process Hooks #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
v?.?.? - ?? ??? ????
---

v0.6.0 - ?? ??? 2024
---

* Add `before_started`, `after_completed` and `after_failed` functionality.
* Add `logger` helper method, available within task methods.
* Use `SecureRandom.hex(10)` instead of `uuid` for shorter process and tasks IDs.
* Bug fix for options on `sequential`, `concurrent`, `for_each` and `sub_process` methods.
* Bug fix instrumentation payload.
* Documentation updates.

v0.5.2 - 04 Oct 2024
---
* Time arguments fix for Redis 5.0. Fixes #28
Expand Down
6 changes: 3 additions & 3 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
taskinator (0.5.2)
taskinator (0.6.0)
builder (>= 3.2.2)
connection_pool (>= 2.2.0)
globalid (>= 0.3)
Expand Down Expand Up @@ -115,10 +115,10 @@ GEM
rspec-mocks (~> 3.12.0)
rspec-core (3.12.0)
rspec-support (~> 3.12.0)
rspec-expectations (3.12.1)
rspec-expectations (3.12.2)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.12.0)
rspec-mocks (3.12.1)
rspec-mocks (3.12.2)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.12.0)
rspec-rails (5.1.2)
Expand Down
File renamed without changes.
53 changes: 49 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ end
process = MyProcess.create_process Date.today, :option_1 => true
```

_NOTE:_ The current implementation performs a naive check on the count of arguments.
_NOTE:_ The current implementation performs a naïve check on the count of arguments.

Next, specify the tasks with their corresponding implementation methods, that make up the
process, using the `task` method and providing the `method` to execute for the task.
Expand Down Expand Up @@ -291,6 +291,51 @@ module MyProcess
end
```

#### Before Process Started and After Process Completion or Failure

You may want to run further tasks asynchrously before or after a process has completed
or failed. These tasks provide a way to execute logic independently of the process.

Specify these tasks using the `before_started`, `after_completed` or `after_failed` methods.

For example, using `after_completed` to set off another business process or `after_failed` to
send an email to an operator.

```ruby
module MyProcess
extend Taskinator::Definition

# defines a process
define_process do

# define task to execute on before
before_started :slack_notification

# usual tasks, sub-process, etc.

# define task to execute on completion
after_completed :further_process

# define task to execute on failure
after_failed :email_operations

end

def slack_notification
# ...
end

def further_process
# ...
end

def email_operations
# ...
end

end
```

#### Complex Process Definitions

Any combination or nesting of `task`, `sequential`, `concurrent` and `for_each` steps are
Expand Down Expand Up @@ -363,12 +408,12 @@ MyProcess.create_process(1, 2, 3, :send_notification => true)

```

#### Reusing ActiveJob jobs
#### Reusing `ActiveJob` jobs

It is likely that you already have one or more [jobs](https://guides.rubyonrails.org/active_job_basics.html)
and want to reuse them within the process definition.

Define a `job` step, providing the class of the Active Job to run and then taskinator will
Define a `job` step, providing the class of the `ActiveJob` to run and then taskinator will
invoke that job as part of the process.

The `job` step will be queued and executed on same queue as
Expand Down Expand Up @@ -425,7 +470,7 @@ _This may be something that gets refactored down the line_.
To best understand how arguments are handled, you need to break it down into 3 phases. Namely:

* Definition,
* Creation and
* Creation, and
* Execution

Firstly, a process definition is declarative in that the `define_process` and a mix of
Expand Down
6 changes: 4 additions & 2 deletions lib/taskinator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
require 'securerandom'
require 'benchmark'
require 'delegate'
require 'builder'

require 'taskinator/version'

require 'taskinator/complete_on'
require 'taskinator/redis_connection'
require 'taskinator/logger'

require 'taskinator/builder'
require 'taskinator/definition'

require 'taskinator/workflow'
Expand All @@ -33,7 +35,7 @@
module Taskinator

NAME = "Taskinator"
LICENSE = 'See LICENSE.txt for licensing details.'
LICENSE = 'See LICENSE for licensing details.'

DEFAULTS = {
# none for now...
Expand All @@ -48,7 +50,7 @@ def options=(opts)
end

def generate_uuid
SecureRandom.uuid
SecureRandom.hex(10)
end

##
Expand Down
64 changes: 54 additions & 10 deletions lib/taskinator/builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ def initialize(process, definition, *args)
end

def option?(key, &block)
# instead of LocalJumpError
raise ArgumentError, 'block' unless block_given?

yield if builder_options[key]
end

Expand All @@ -24,7 +27,7 @@ def sequential(options={}, &block)

sub_process = Process.define_sequential_process_for(@definition, options)
task = define_sub_process_task(@process, sub_process, options)
Builder.new(sub_process, @definition, *@args).instance_eval(&block)
Builder.new(sub_process, @definition, *@args, @builder_options).instance_eval(&block)
@process.tasks << task if sub_process.tasks.any?
nil
end
Expand All @@ -35,7 +38,7 @@ def concurrent(complete_on=CompleteOn::Default, options={}, &block)

sub_process = Process.define_concurrent_process_for(@definition, complete_on, options)
task = define_sub_process_task(@process, sub_process, options)
Builder.new(sub_process, @definition, *@args).instance_eval(&block)
Builder.new(sub_process, @definition, *@args, @builder_options).instance_eval(&block)
@process.tasks << task if sub_process.tasks.any?
nil
end
Expand All @@ -54,7 +57,7 @@ def for_each(method, options={}, &block)
#
method_args = options.any? ? [*@args, options] : @args
@executor.send(method, *method_args) do |*args|
Builder.new(@process, @definition, *args).instance_eval(&block)
Builder.new(@process, @definition, *args, @builder_options).instance_eval(&block)
end
nil
end
Expand All @@ -80,9 +83,32 @@ def job(job, options={})
nil
end

# TODO: add mailer
# TODO: add complete!
# TODO: add fail!
# defines a task which executes the given @method before the process has started
def before_started(method, options={})
raise ArgumentError, 'method' if method.nil?
raise NoMethodError, method unless @executor.respond_to?(method)

define_before_started_task(@process, method, @args, options)
nil
end

# defines a task which executes the given @method after the process has completed
def after_completed(method, options={})
raise ArgumentError, 'method' if method.nil?
raise NoMethodError, method unless @executor.respond_to?(method)

define_after_completed_task(@process, method, @args, options)
nil
end

# defines a task which executes the given @method after the process has failed
def after_failed(method, options={})
raise ArgumentError, 'method' if method.nil?
raise NoMethodError, method unless @executor.respond_to?(method)

define_after_failed_task(@process, method, @args, options)
nil
end

# defines a sub process task, for the given @definition
# the definition specified must have input compatible arguments
Expand All @@ -101,13 +127,31 @@ def sub_process(definition, options={})
private

def define_step_task(process, method, args, options={})
define_task(process) {
add_task(process.tasks) {
Task.define_step_task(process, method, args, combine_options(options))
}
end

def define_before_started_task(process, method, args, options={})
add_task(process.before_started_tasks) {
Task.define_hook_task(process, method, args, combine_options(options))
}
end

def define_after_completed_task(process, method, args, options={})
add_task(process.after_completed_tasks) {
Task.define_hook_task(process, method, args, combine_options(options))
}
end

def define_after_failed_task(process, method, args, options={})
add_task(process.after_failed_tasks) {
Task.define_hook_task(process, method, args, combine_options(options))
}
end

def define_job_task(process, job, args, options={})
define_task(process) {
add_task(process.tasks) {
Task.define_job_task(process, job, args, combine_options(options))
}
end
Expand All @@ -116,8 +160,8 @@ def define_sub_process_task(process, sub_process, options={})
Task.define_sub_process_task(process, sub_process, combine_options(options))
end

def define_task(process)
process.tasks << task = yield
def add_task(list)
list << task = yield
task
end

Expand Down
4 changes: 4 additions & 0 deletions lib/taskinator/create_process_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def perform
process_args << { :uuid => uuid }
end

# generate the process for the given definition and arguments
# and enqueue the processes tasks
# -> sequential processes - enqueues the first task
# -> concurrent processes - enqueues all the tasks
@definition._create_process_(false, *process_args).enqueue!

end
Expand Down
2 changes: 0 additions & 2 deletions lib/taskinator/definition.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
require 'taskinator/builder'

module Taskinator
module Definition

Expand Down
11 changes: 11 additions & 0 deletions lib/taskinator/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,16 @@ def options
task.options if task
end

# helpers

def logger
Taskinator.logger
end

def error
# task.process.error
raise NoMethodError
end

end
end
14 changes: 7 additions & 7 deletions lib/taskinator/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def payload_for(state, additional={})
# need to cache here, since this method hits redis, so can't be part of multi statement following
process_key = self.process_key

tasks_count, processing_count, completed_count, cancelled_count, failed_count = Taskinator.redis do |conn|
count, processing, completed, cancelled, failed = Taskinator.redis do |conn|
conn.hmget process_key,
:tasks_count,
:tasks_processing,
Expand All @@ -53,7 +53,7 @@ def payload_for(state, additional={})
:tasks_failed
end

tasks_count = tasks_count.to_f
count = count.to_f

return OpenStruct.new(
{
Expand All @@ -64,12 +64,12 @@ def payload_for(state, additional={})
:uuid => uuid,
:options => options.dup,
:state => state,
:percentage_failed => (tasks_count > 0) ? (failed_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_cancelled => (tasks_count > 0) ? (cancelled_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_processing => (tasks_count > 0) ? (processing_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_completed => (tasks_count > 0) ? (completed_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_failed => (count > 0) ? (failed.to_i / count) * 100.0 : 0.0,
:percentage_cancelled => (count > 0) ? (cancelled.to_i / count) * 100.0 : 0.0,
:percentage_processing => (count > 0) ? (processing.to_i / count) * 100.0 : 0.0,
:percentage_completed => (count > 0) ? (completed.to_i / count) * 100.0 : 0.0,
}.merge(additional)
).freeze
)

end

Expand Down
Loading
Loading