Redis Lettuce plugin
Overview
- Technology addressed
-
Redis: https://redis.io/
- Dependency
-
io.qalipsis.plugin:qalipsis-plugin-redis-lettuce - Namespace in scenario
-
redisLettuce() - Client library
-
Lettuce Core: refer to Lettuce.
- Model separation
-
-
Core Redis (
poll*,save) targets keys and Redis data structures. -
Redis Streams (
streamsConsume,streamsProduce) targets stream entries and consumer groups.
-
Supported steps
Core Redis examples
Poll step
The poll family in the Redis Lettuce plugin periodically scans Redis and forwards new records.
- Ancestor
-
Scenario
- Functionality
-
Use exactly one scan flavor depending on the targeted Redis structure:
-
pollScan: key pattern scan (SCAN) -
pollSscan: set scan (SSCAN) -
pollHscan: hash scan (HSCAN) -
pollZscan: sorted set scan (ZSCAN)
-
- Example (
pollSscan) -
redisLettuce().pollSscan { name = "poll-sscan" (1) connection { (2) nodes = listOf("localhost:6379") database = 0 redisConnectionType = RedisConnectionType.SINGLE } keyOrPattern("battery-state-devices") (3) pollDelay(Duration.ofSeconds(1)) (4) monitoring { events = true meters = true } }.flatten() (5)1 Create a poll step using the Redis SSCANcommand.2 Configure the Redis connection for this step. 3 Define the Redis key to scan for SSCAN.4 Set the delay between two poll executions. 5 Emits one RedisRecord<String>at a time instead of batchedLettucePollResult<String>. - Tips
-
-
Use
pollScanwhen you need wildcard key discovery. -
Use
pollSscan,pollHscan, orpollZscanwhen you target a known key of a specific Redis structure.
-
- Reference Documentation
-
Refer to Redis and Lettuce for command and topology details.
Save step
The save step writes generated records to Redis.
- Ancestor
-
Step
- Functionality
-
For each incoming input,
records {}generates one or moreLettuceSaveRecordobjects. The record type selects the Redis command. - Example
-
.redisLettuce() .save { name = "save" (1) connection { (2) nodes = listOf("localhost:6379") database = 0 redisConnectionType = RedisConnectionType.SINGLE } records { stepContext, input -> (3) listOf( HashRecord( key = "battery:${input.deviceId}", value = mapOf( "deviceId" to input.deviceId, "batteryLevel" to input.batteryLevel.toString(), "timestamp" to input.timestamp.toString() ) ) ) } }1 Create a save step. 2 Configure the Redis connection for this step. 3 Build HSETrecords (viaHashRecord) from the current input. - Tips
-
-
Use
ValueRecordforSET,SetRecordforSADD,HashRecordforHSET, andSortedRecordforZADD. -
Use a
collectstep beforesaveif you need to reduce command churn and write in larger logical batches.
-
- Reference Documentation
-
Refer to Redis and Lettuce for command and topology details.
Redis streams examples
Consume step
The streamsConsume step consumes records from a Redis stream using a consumer group.
- Ancestor
-
Scenario
- Functionality
-
The step reads from a stream key and consumer group and can emit data as batches or single records.
- Example
-
redisLettuce().streamsConsume { name = "consume" (1) connection { (2) nodes = listOf("localhost:6379") database = 0 redisConnectionType = RedisConnectionType.SINGLE } streamKey("battery-state-stream") (3) group("battery-consumers") (4) offset(LettuceStreamsConsumerOffset.FROM_BEGINNING) (5) concurrency(3) (6) }.flatten() (7)1 Create a stream consumer step. 2 Configure the Redis connection for this step. 3 Define the stream to read. 4 Define the consumer group name. 5 Start from the beginning when no known state exists for the group. 6 Run three concurrent stream consumers. 7 Emits one LettuceStreamsConsumedRecordat a time. - Tip
-
Use
offset(LettuceStreamsConsumerOffset.LAST_CONSUMED)when you want the group to continue from pending/acked state andLATESTwhen you only want new records. - Reference Documentation
Configuration
DSL configuration
| Property | Description |
|---|---|
|
Configures Redis connection parameters at the step level. Example
|
|
Redis node addresses formatted as |
|
Redis logical database index. |
|
Redis connection topology. |
|
Username for ACL-based authentication (Redis 6+). |
|
Password for authenticated connections. For |
|
Redis Sentinel master identifier. Required when Example (
SENTINEL)
|
Core Redis model properties (Poll/Save)
| Property | Description |
|---|---|
|
Defines the target scanned by polling. Example
|
|
Delay between two poll executions. Example
|
|
Generates records to persist for each incoming input. Example
|
Redis streams model properties (Consume/Produce)
| Property | Description |
|---|---|
|
Name of the Redis stream to consume from. Example
|
|
Redis consumer group name used by the consume step. Example
|
|
Offset strategy used when the consumer group state is not yet known. Example
|
|
Number of concurrent consumers attached to the same stream/group. Must be a positive integer. Example
|
|
Generates stream entries to publish for each incoming input. Example
|
Shared defaults for Redis Lettuce steps
You can define defaults once in the scenario section or just after, and let all following Redis Lettuce steps inherit them.
scenario {
redisLettuce().defaults { (1)
connection {
nodes = listOf("localhost:6379")
database = 0
redisConnectionType = RedisConnectionType.SINGLE
}
monitoring {
events = true
meters = true
}
}
}
| 1 | Defaults are applied to subsequent Redis Lettuce steps in the same scenario. Individual steps can still override values. |