Step specifications

Defining and configuring a step specification

A step specification is defined, depending on the step nature and function, as either:

  • The root of a scenario (the first step after .start()).

    or

  • The successor of a previous step.

In the example below, my-tcp is the root step; reuse-tcp is a successor step.

scenario {
    minionsCount = 150
    profile {
        regular(150, minionsCount) (1)
    }
    .start()
    .netty()
    .tcp { (2)
        name = "my-tcp"
        connect {
            address("localhost", 3000)
            noDelay = true
        }
        request { tcpRequest1 }
        metrics { connectTime = true }
    }
    .reuseTcp("my-tcp") { (3)
        name = "reuse-tcp"
        iterate(20, Duration.ofMillis(100))
        request { tcpRequest2 }
    }
}
1 All 150 minions are deployed simultaneously.
2 The .tcp step ("my-tcp") is the root of the scenario.
3 The .reuseTcp step ("reuse-tcp") is a successor of the previous step ("my-tcp") and receives the previous step’s output as its input.

A step specification is configured such that:

  • The step receives the configuration closure {} as a parameter (as in .tcp and .reuse-tcp in the example above).

    or

  • .configure is called after the step as in the example below:

    .reuseTcp("my-tcp") {
        request { tcpRequest2 }
    }.configure {
        name = "reuse-tcp"
        iterate(20, Duration.ofMillis(100))
        retryPolicy(instanceOfRetryPolicy)
        timeout(2000)
    }

Step context

A step’s context is the mechanism by which:

  1. Data received from the previous step is transported as input to the next steps.

  2. The next steps transform the data and provides it as output.

  3. The output is forwarded to the next steps as input.

Additionally, a step’s context determines how prior errors are transported through the step.

Common load distribution steps

Fan-out

To split a branch and add two or more concurrent steps as successors, execute a .split.

In the example below, anyStep receives the output of the previous step as input. anyStep transforms its input per its step configuration. The transformed data is provided as input to both the map and the flatten steps.

If the entity returned by the step providing the data before the .split (anyStep() in the example) is liable to change, do not change it in one branch, as this would have side-effects on the other branch. Consider creating a deep clone of the record that is to be changed to avoid this issue. The map and the flatten steps each do something different with the data.
.anyStep { }
    .split {
        .map { ... }
            .validate{ ... }
    }
        .flatten()
            .filterNotNull()

Fan-in/joins

Fan-in/Join functions allow the user to re-join split branches of a scenario or to verify data from different sources.

You can join the branches in one of two ways:

  • Use the name of a concurrent step.

  • Create a step directly in the join.

The following example uses the name of a concurrent step:

.anyStep { }
    .innerJoin<EntityWithKey, EntityWithId>(
        on = { "${it.key}" }
        with = "my-datasource-poll-step" (1)
        having = { it.id } (2)
    )
    .map { } (3)
1 The step my-datasource-poll-step is declared in a parallel branch (or earlier in the same branch) and provides an output of type EntityWithId with a field id being a string in this example.
2 Closure to extract the key from the right side, which should match the left side value.
3 The result is a Pair<EntityWithKey, EntityWithId> where string representation of key of the EntityWithKey equals to the field id of the EntityWithId.

Records coming from the right side are kept in a cache until one comes from the left side with the same key. Once used, they are evicted from the cache.

When a flow comes from the left side (from anyStep) but has no corresponding record yet coming from the right (my-datasource-poll-step), it waits until the defined timeout on the step. Refer to Defining and configuring a step specification for more details.

The step context after the join inherits from the one coming from the left.

The following example creates a step directly in the join:

    anyStep  { }
        .innerJoin<EntityWithKey, EntityWithId>(
            on = { "${it.key}" },
            with = {
                r2dbc().poll{

                }
            },
            having = { it.id }
        )
        .map {}

This method is similar to naming a concurrent step except that a step is created in the join. The receiver of the with is the scenario.

Error processing steps

When an error occurs when executing a step, after all the potential retries, the step context being carried from top to bottom is marked as "exhausted".

From that point of the process, only the steps relevant for error processing are executed, the others are simply bypassed.

The steps relevant for error processing include:

  • catchErrors: takes the collection of errors as parameter to process them. In the example the number of previous errors to process is 10.

    .catchErrors(10)
  • catchExhaustedContext detect when a coroutine context has been exhausted. It takes the full exhausted step context as a parameter. Available parameters for.catchExhaustedContext include:

    • minionId = ""

    • scenarioName = ""

    • stepName = ""

    • isExhausted = ""; available values are true (do not execute next step) or false (execute next step).

      In the example, when a coroutine is exhausted, an error is returned indicating that the minion ID of the exhausted step is 250.

      .catchExhaustedContext(minionID = 250)

Operators

QALIPSIS provides default operators to tweak the execution of your scenario, that do not require the addition of plugins to your classpath.

Out-of-the-box QALIPSIS operators perform a variety of functions including data transformation, data filtering, data verification, altering the behavior of the data, etc. Together with user-defined operators the list of operators is infinite.

Some of the commonly used operators are defined in this section.

Timing operators

This section provides an explanation and examples of the delay, pace, acceleratingPace, and constantPace timing operators.

  • delay adds a constant delay before executing the next step, keeping the same execution pace as the previous step. In the example, the delay between steps is set to 123 MS.

    .delay(Duration.ofMillis(123))
  • pace ensures that the next step is executed at an expected pace, whatever the pace of the input is. The calculation applies independently to each minion. In the following examples, the pace is set to 10 MS.

    .pace { pastPeriodMs -> pastPeriodMs + 10 }

    or

    .pace(Duration.ofMillis(10))
  • acceleratingPace reduces the waiting interval for each iteration; to slow down the pace, use an accelerator lower than 1. In the example, the start pace is set to 20 MS, the acceleration is set to 2.0; the end pace is set to 4 MS.

    .acceleratingPace(20, 2.0, 4)
  • constantPace applies a constant pace to execute the successor steps. The constant pace is set to 20 MS in the example.

    .constantPace(20)

Filter operators

This section provides an explanation and examples of the validate,filter, and verify filter operators.

  • validate validates the input. If errors are returned, the step context is marked as exhausted and can then only be processed by error processing steps. In the example, steps with an input specification that is not an even integer return an error and are marked as exhausted.

    .validate{ if(it % 2 != 0) listOf(StepError("The input should be even")) else emptyList() }
  • filter removes the step contexts having an input not matching the filter specification. In the example, steps with an input specification that is not an even integer are filtered out.

    .filter(is even)

    filter is a silent equivalent of validate; it does not generate an error and does not mark the step context as exhausted.

  • verify validates assertions rather than the input data quality/safety that is validated by validate. In the example, .verify confirms that that the batteryLevel property of the foundedBatteryState is exactly the same as the batteryLevel property of the savedBatteryState. If the two values are not the same, an error is returned and the step is marked as exhausted.

    .verify { result ->
        val savedBatteryState = result.first
        val foundedBatteryState = result.second
        foundedBatteryState.batteryLevel shouldBeExactly savedBatteryState.batteryLevel
    }

Transformation operators

This section provides an explanation and examples of the map, mapWithContext, flatMap, flatten, and onEach transformation operators.

  • map converts each input element into another element in the output. In the example, .map multiplies each of its input values (it) by 3.

    .map { it * 3 }
  • mapWithContext provides the same capability as map but with the step context as an additional argument. In the example, .mapWithContext returns the value of its input (it) and the input value times 3.

    .mapWithContext{ context, input -> context.minionId + "-" + input }
  • flatMap converts a collective input into a single element in the output, with a user-defined strategy. In the example, flatmap converts a collection from the property members of the input into a flow. Each record of members will be individually provided to the next steps..

    .flatMap { input -> flowOf(*input.members.toTypedArray()) }
  • flatten converts a collective input into a single element in the output for a known collective type (array, iterable, etc.), keeping each item unchanged.

    In the example, the .flatten converts the output of the .map functions into a single collection of integers where each integer is either the original integer multiplied by 3 or the original integer plus 100. [3, 103, 6, 106, 9, 109].

    .map { it * 3 }
    .map { arrayOf(it, it + 100) }
    .flatten()
  • onEach executes one or several statements on each element of the input and forwards them unchanged to the output. The example prints the input of the .onEach step to the console.

    .onEach { input -> { println(input) }

Caching operators

This section provides an explanation and examples of the shelve and unshelve caching operators.

  • shelve caches the provided values in a shared cache. In the example, .shelve creates a new sequence by transforming each element of the original sequence into a map of key-value pairs ("value-1" to it + 1).

    .shelve { input -> mapOf("value-1" to input + 1) }

    When using a distributed architecture, make sure the cached value can be serialized by the used implementation of shared state registry. The ID of the minion is automatically added to the key to avoid collision.

  • unshelve extracts a specific value from the cache using the keys passed as arguments. In the example, unshelve<Int, Double>("value-1") takes two type arguments <Int, Double>, which specify the types of the input of the step and the one of the value with key value-1, retrieved from the cache.

    It then extracts the values associated with the key "value-1", which is of type Double. The extracted values are collected into a new collection of type List<Double>.

    .unshelve<Int, Double>("value-1")

Flow operators

This section provides an explanation and examples of the stage flow operator.

  • stage groups a linear graph of steps.

    Stages are not mandatory in a scenario, but provide a convenient way to iterate steps. In the example, after flatten() is completed it repeats 5 times starting at the first map.

    .stage("my-stage") {
        .map { it * 3 }
        .map { arrayOf(it, it + 100) }
        .flatten()
    }.configure {
        iterate(5)
    }

Combining operators

The most often used combining operator is collect.

collect collects inputs across all the minions and releases them as a list within the latest received step context. Its function is the opposite of flatten.

In the example, .collect collects the elements of a workflow with a timeout of 1 second or until there are 10 items in the collection. After one second or when 10 items are collected, the collection is provided to the next steps.

.collect(Duration.ofSeconds(1), 10)