Step specifications
Defining and configuring a step specification
A step specification is defined, depending on the step nature and function, as either:
-
The root of a scenario (the first step after .start()).
or
-
The successor of a previous step.
In the example below, my-tcp is the root step; reuse-tcp is a successor step.
scenario {
minionsCount = 150
profile {
regular(150, minionsCount) (1)
}
.start()
.netty()
.tcp { (2)
name = "my-tcp"
connect {
address("localhost", 3000)
noDelay = true
}
request { tcpRequest1 }
metrics { connectTime = true }
}
.reuseTcp("my-tcp") { (3)
name = "reuse-tcp"
iterate(20, Duration.ofMillis(100))
request { tcpRequest2 }
}
}
| 1 | All 150 minions are deployed simultaneously. |
| 2 | The .tcp step ("my-tcp") is the root of the scenario. |
| 3 | The .reuseTcp step ("reuse-tcp") is a successor of the previous step ("my-tcp") and receives the previous step’s output as its input. |
A step specification is configured such that:
-
The step receives the configuration closure {} as a parameter (as in
.tcpand.reuse-tcpin the example above).or
-
.configureis called after the step as in the example below:
.reuseTcp("my-tcp") {
request { tcpRequest2 }
}.configure {
name = "reuse-tcp"
iterate(20, Duration.ofMillis(100))
retryPolicy(instanceOfRetryPolicy)
timeout(2000)
}
Step context
A step’s context is the mechanism by which:
-
Data received from the previous step is transported as input to the next steps.
-
The next steps transform the data and provides it as output.
-
The output is forwarded to the next steps as input.
Additionally, a step’s context determines how prior errors are transported through the step.
Common load distribution steps
Fan-out
To split a branch and add two or more concurrent steps as successors, execute a .split.
In the example below, anyStep receives the output of the previous step as input. anyStep transforms its input per its step configuration. The transformed data is provided as input to both the map and the flatten steps.
If the entity returned by the step providing the data before the .split (anyStep() in the example) is liable to change, do not change it in one branch, as this would have side-effects on the other branch. Consider creating a deep clone of the record that is to be changed to avoid this issue.
The map and the flatten steps each do something different with the data.
|
.anyStep { }
.split {
.map { ... }
.validate{ ... }
}
.flatten()
.filterNotNull()
Fan-in/joins
Fan-in/Join functions allow the user to re-join split branches of a scenario or to verify data from different sources.
You can join the branches in one of two ways:
-
Use the name of a concurrent step.
-
Create a step directly in the join.
The following example uses the name of a concurrent step:
.anyStep { }
.innerJoin<EntityWithKey, EntityWithId>(
on = { "${it.key}" }
with = "my-datasource-poll-step" (1)
having = { it.id } (2)
)
.map { } (3)
| 1 | The step my-datasource-poll-step is declared in a parallel branch (or earlier in the same branch) and provides an output of type EntityWithId with a field id being a string in this example. |
| 2 | Closure to extract the key from the right side, which should match the left side value. |
| 3 | The result is a Pair<EntityWithKey, EntityWithId> where string representation of key of the EntityWithKey equals to the field id of the EntityWithId. |
Records coming from the right side are kept in a cache until one comes from the left side with the same key. Once used, they are evicted from the cache.
When a flow comes from the left side (from anyStep) but has no corresponding record yet coming from the right (my-datasource-poll-step), it waits until the defined timeout on the step. Refer to Defining and configuring a step specification for more details.
The step context after the join inherits from the one coming from the left.
The following example creates a step directly in the join:
anyStep { }
.innerJoin<EntityWithKey, EntityWithId>(
on = { "${it.key}" },
with = {
r2dbc().poll{
}
},
having = { it.id }
)
.map {}
This method is similar to naming a concurrent step except that a step is created in the join. The receiver of the with is the scenario.
Error processing steps
When an error occurs when executing a step, after all the potential retries, the step context being carried from top to bottom is marked as "exhausted".
From that point of the process, only the steps relevant for error processing are executed, the others are simply bypassed.
The steps relevant for error processing include:
-
catchErrors: takes the collection of errors as parameter to process them. In the example the number of previous errors to process is10..catchErrors(10) -
catchExhaustedContextdetect when a coroutine context has been exhausted. It takes the full exhausted step context as a parameter. Available parameters for.catchExhaustedContext include:-
minionId= "" -
scenarioName= "" -
stepName= "" -
isExhausted= ""; available values aretrue(do not execute next step) orfalse(execute next step).In the example, when a coroutine is exhausted, an error is returned indicating that the minion ID of the exhausted step is
250..catchExhaustedContext(minionID = 250)
-
Operators
QALIPSIS provides default operators to tweak the execution of your scenario, that do not require the addition of plugins to your classpath.
Out-of-the-box QALIPSIS operators perform a variety of functions including data transformation, data filtering, data verification, altering the behavior of the data, etc. Together with user-defined operators the list of operators is infinite.
Some of the commonly used operators are defined in this section.
Timing operators
This section provides an explanation and examples of the delay, pace, acceleratingPace, and constantPace timing operators.
-
delayadds a constant delay before executing the next step, keeping the same execution pace as the previous step. In the example, thedelaybetween steps is set to123MS..delay(Duration.ofMillis(123)) -
paceensures that the next step is executed at an expected pace, whatever the pace of the input is. The calculation applies independently to each minion. In the following examples, thepaceis set to10MS..pace { pastPeriodMs -> pastPeriodMs + 10 }or
.pace(Duration.ofMillis(10)) -
acceleratingPacereduces the waiting interval for each iteration; to slow down the pace, use an accelerator lower than 1. In the example, the start pace is set to 20 MS, the acceleration is set to 2.0; the end pace is set to4MS..acceleratingPace(20, 2.0, 4) -
constantPaceapplies a constant pace to execute the successor steps. Theconstant paceis set to20MS in the example..constantPace(20)
Filter operators
This section provides an explanation and examples of the validate,filter, and verify filter operators.
-
validatevalidates the input. If errors are returned, the step context is marked as exhausted and can then only be processed by error processing steps. In the example, steps with an input specification that is not an even integer return an error and are marked as exhausted..validate{ if(it % 2 != 0) listOf(StepError("The input should be even")) else emptyList() } -
filterremoves the step contexts having an input not matching the filter specification. In the example, steps with an input specification that is not an even integer are filtered out..filter(is even)filteris a silent equivalent ofvalidate; it does not generate an error and does not mark the step context as exhausted. -
verifyvalidates assertions rather than the input data quality/safety that is validated byvalidate. In the example, .verify confirms that that the batteryLevel property of the foundedBatteryState is exactly the same as the batteryLevel property of the savedBatteryState. If the two values are not the same, an error is returned and the step is marked as exhausted..verify { result -> val savedBatteryState = result.first val foundedBatteryState = result.second foundedBatteryState.batteryLevel shouldBeExactly savedBatteryState.batteryLevel }
Transformation operators
This section provides an explanation and examples of the map, mapWithContext, flatMap, flatten, and onEach transformation operators.
-
mapconverts each input element into another element in the output. In the example,.mapmultiplies each of its input values (it) by3..map { it * 3 } -
mapWithContextprovides the same capability asmapbut with the step context as an additional argument. In the example,.mapWithContextreturns the value of its input (it) and the input value times3..mapWithContext{ context, input -> context.minionId + "-" + input } -
flatMapconverts a collective input into a single element in the output, with a user-defined strategy. In the example,flatmapconverts a collection from the propertymembersof the input into a flow. Each record ofmemberswill be individually provided to the next steps...flatMap { input -> flowOf(*input.members.toTypedArray()) } -
flattenconverts a collective input into a single element in the output for a known collective type (array, iterable, etc.), keeping each item unchanged.In the example, the .flatten converts the output of the .map functions into a single collection of integers where each integer is either the original integer multiplied by 3 or the original integer plus 100. [3, 103, 6, 106, 9, 109].
.map { it * 3 } .map { arrayOf(it, it + 100) } .flatten() -
onEachexecutes one or several statements on each element of the input and forwards them unchanged to the output. The example prints the input of the.onEachstep to the console..onEach { input -> { println(input) }
Caching operators
This section provides an explanation and examples of the shelve and unshelve caching operators.
-
shelvecaches the provided values in a shared cache. In the example,.shelvecreates a new sequence by transforming each element of the original sequence into a map of key-value pairs ("value-1" to it + 1)..shelve { input -> mapOf("value-1" to input + 1) }When using a distributed architecture, make sure the cached value can be serialized by the used implementation of shared state registry. The ID of the minion is automatically added to the key to avoid collision.
-
unshelveextracts a specific value from the cache using the keys passed as arguments. In the example,unshelve<Int, Double>("value-1")takes two type arguments<Int, Double>, which specify the types of the input of the step and the one of the value with keyvalue-1, retrieved from the cache.It then extracts the values associated with the key
"value-1", which is of typeDouble. The extracted values are collected into a new collection of typeList<Double>..unshelve<Int, Double>("value-1")
Flow operators
This section provides an explanation and examples of the stage flow operator.
-
stagegroups a linear graph of steps.Stages are not mandatory in a scenario, but provide a convenient way to iterate steps. In the example, after
flatten()is completed it repeats 5 times starting at the first map..stage("my-stage") { .map { it * 3 } .map { arrayOf(it, it + 100) } .flatten() }.configure { iterate(5) }
Combining operators
The most often used combining operator is collect.
collect collects inputs across all the minions and releases them as a list within the latest received step context. Its function is the opposite of flatten.
In the example, .collect collects the elements of a workflow with a timeout of 1 second or until there are 10 items in the collection. After one second or when 10 items are collected, the collection is provided to the next steps.
.collect(Duration.ofSeconds(1), 10)