Redis Lettuce plugin

Overview

The Redis Lettuce plugin connects QALIPSIS to Redis using the Lettuce Java client.

Technology addressed

Redis: https://redis.io/

Dependency

io.qalipsis.plugin:qalipsis-plugin-redis-lettuce

Namespace in scenario

redisLettuce()

Client library

Lettuce Core: refer to Lettuce.

Model separation
  • Core Redis (poll*, save) targets keys and Redis data structures.

  • Redis Streams (streamsConsume, streamsProduce) targets stream entries and consumer groups.

Supported steps

Core Redis examples

Poll step

The poll family in the Redis Lettuce plugin periodically scans Redis and forwards new records.

Ancestor

Scenario

Functionality

Use exactly one scan flavor depending on the targeted Redis structure:

  • pollScan: key pattern scan (SCAN)

  • pollSscan: set scan (SSCAN)

  • pollHscan: hash scan (HSCAN)

  • pollZscan: sorted set scan (ZSCAN)

Example (pollSscan)
redisLettuce().pollSscan {
    name = "poll-sscan" (1)
    connection { (2)
        nodes = listOf("localhost:6379")
        database = 0
        redisConnectionType = RedisConnectionType.SINGLE
    }
    keyOrPattern("battery-state-devices") (3)
    pollDelay(Duration.ofSeconds(1)) (4)
    monitoring {
        events = true
        meters = true
    }
}.flatten() (5)
1 Create a poll step using the Redis SSCAN command.
2 Configure the Redis connection for this step.
3 Define the Redis key to scan for SSCAN.
4 Set the delay between two poll executions.
5 Emits one RedisRecord<String> at a time instead of batched LettucePollResult<String>.
Tips
  • Use pollScan when you need wildcard key discovery.

  • Use pollSscan, pollHscan, or pollZscan when you target a known key of a specific Redis structure.

Reference Documentation

Refer to Redis and Lettuce for command and topology details.

Save step

The save step writes generated records to Redis.

Ancestor

Step

Functionality

For each incoming input, records {} generates one or more LettuceSaveRecord objects. The record type selects the Redis command.

Example
.redisLettuce()
.save {
    name = "save" (1)
    connection { (2)
        nodes = listOf("localhost:6379")
        database = 0
        redisConnectionType = RedisConnectionType.SINGLE
    }
    records { stepContext, input -> (3)
        listOf(
            HashRecord(
                key = "battery:${input.deviceId}",
                value = mapOf(
                    "deviceId" to input.deviceId,
                    "batteryLevel" to input.batteryLevel.toString(),
                    "timestamp" to input.timestamp.toString()
                )
            )
        )
    }
}
1 Create a save step.
2 Configure the Redis connection for this step.
3 Build HSET records (via HashRecord) from the current input.
Tips
  • Use ValueRecord for SET, SetRecord for SADD, HashRecord for HSET, and SortedRecord for ZADD.

  • Use a collect step before save if you need to reduce command churn and write in larger logical batches.

Reference Documentation

Refer to Redis and Lettuce for command and topology details.

Redis streams examples

Consume step

The streamsConsume step consumes records from a Redis stream using a consumer group.

Ancestor

Scenario

Functionality

The step reads from a stream key and consumer group and can emit data as batches or single records.

Example
redisLettuce().streamsConsume {
    name = "consume" (1)
    connection { (2)
        nodes = listOf("localhost:6379")
        database = 0
        redisConnectionType = RedisConnectionType.SINGLE
    }
    streamKey("battery-state-stream") (3)
    group("battery-consumers") (4)
    offset(LettuceStreamsConsumerOffset.FROM_BEGINNING) (5)
    concurrency(3) (6)
}.flatten() (7)
1 Create a stream consumer step.
2 Configure the Redis connection for this step.
3 Define the stream to read.
4 Define the consumer group name.
5 Start from the beginning when no known state exists for the group.
6 Run three concurrent stream consumers.
7 Emits one LettuceStreamsConsumedRecord at a time.
Tip

Use offset(LettuceStreamsConsumerOffset.LAST_CONSUMED) when you want the group to continue from pending/acked state and LATEST when you only want new records.

Reference Documentation

Refer to Redis and Lettuce for stream and topology details.

Configuration

DSL configuration

Property Description

connection

Configures Redis connection parameters at the step level.
Applicable Steps: Poll, Save, Consume, Produce
Optional/Required: Optional (defaults can be used)
Data Type: Lambda with receiver RedisConnectionConfiguration
Default Value: nodes = listOf("localhost:6379"), database = 0, redisConnectionType = SINGLE, authUser = "", authPassword = "", masterId = ""

Example
connection {
    nodes = listOf("localhost:6379")
    database = 0
    redisConnectionType = RedisConnectionType.SINGLE
    authUser = ""
    authPassword = ""
}

connection.nodes

Redis node addresses formatted as host:port. For CLUSTER, provide all relevant nodes; for SENTINEL, provide sentinel nodes.
Applicable Steps: Poll, Save, Consume, Produce
Optional/Required: Optional
Data Type: List<String>
Default Value: listOf("localhost:6379")

connection.database

Redis logical database index.
Applicable Steps: Poll, Save, Consume, Produce
Optional/Required: Optional
Data Type: Int
Default Value: 0

connection.redisConnectionType

Redis connection topology.
Applicable Steps: Poll, Save, Consume, Produce
Optional/Required: Optional
Data Type: RedisConnectionType (enum) one of SINGLE, CLUSTER, SENTINEL
Default Value: SINGLE

connection.authUser

Username for ACL-based authentication (Redis 6+).
Applicable Steps: Poll, Save, Consume, Produce
Optional/Required: Optional
Data Type: String
Default Value: ""

connection.authPassword

Password for authenticated connections. For SENTINEL, this value is also used when configuring sentinel authentication.
Applicable Steps: Poll, Save, Consume, Produce
Optional/Required: Optional
Data Type: String
Default Value: ""

connection.masterId

Redis Sentinel master identifier. Required when connection.redisConnectionType = SENTINEL.
Applicable Steps: Poll, Save, Consume, Produce
Optional/Required: Optional in general, required for SENTINEL
Data Type: String
Default Value: ""

Example (SENTINEL)
connection {
    nodes = listOf("localhost:26379")
    redisConnectionType = RedisConnectionType.SENTINEL
    masterId = "mymaster"
}

Core Redis model properties (Poll/Save)

Property Description

keyOrPattern

Defines the target scanned by polling.
For pollScan, this is a key pattern.
For pollSscan, pollHscan, and pollZscan, this is the key to scan.
Applicable Step: Poll
Optional/Required: Required (@NotBlank)
Data Type: String
Default Value: ""

Example
keyOrPattern("battery_state_*")

pollDelay

Delay between two poll executions.
Applicable Step: Poll
Optional/Required: Optional (@PositiveOrZeroDuration)
Data Type: Function overload accepting Duration or Long
Default Value: Duration.ofSeconds(10)

Example
pollDelay(Duration.ofSeconds(1))
pollDelay(1000L)

records

Generates records to persist for each incoming input.
Supported record types: ValueRecord (SET), SetRecord (SADD), HashRecord (HSET), SortedRecord (ZADD).
Applicable Step: Save
Optional/Required: Required to effectively write data
Data Type: Lambda which takes the stepContext and the input and returns a list of LettuceSaveRecord
Default Value: { _, _ → emptyList() }

Example
records { stepContext, input ->
    listOf(
        ValueRecord("device:42", "ok"),
        SetRecord("devices", "device:42"),
        HashRecord("device:42:state", mapOf("batteryLevel" to "98")),
        SortedRecord("device:42:history", 1712520900.0 to "98")
    )
}

Redis streams model properties (Consume/Produce)

Property Description

streamKey

Name of the Redis stream to consume from.
Applicable Step: Consume
Optional/Required: Required (@NotBlank)
Data Type: String
Default Value: ""

Example
streamKey("battery-state-stream")

group

Redis consumer group name used by the consume step.
Applicable Step: Consume
Optional/Required: Required (@NotBlank)
Data Type: String
Default Value: ""

Example
group("battery-consumers")

offset

Offset strategy used when the consumer group state is not yet known.
Applicable Step: Consume
Optional/Required: Optional
Data Type: Function call offset(strategy) where strategy is one of LettuceStreamsConsumerOffset.FROM_BEGINNING, LettuceStreamsConsumerOffset.LAST_CONSUMED, or LettuceStreamsConsumerOffset.LATEST
Default Value: FROM_BEGINNING

Example
offset(LettuceStreamsConsumerOffset.FROM_BEGINNING)

concurrency

Number of concurrent consumers attached to the same stream/group. Must be a positive integer.
Applicable Step: Consume
Optional/Required: Optional
Data Type: Int
Default Value: 1

Example
concurrency(5)

records

Generates stream entries to publish for each incoming input.
Each LettuceStreamsProduceRecord contains the destination stream key and the map payload.
Applicable Step: Produce
Optional/Required: Required to effectively publish data
Data Type: Lambda which takes the stepContext and the input and returns a list of LettuceStreamsProduceRecord.
Default Value: { _, _ → emptyList() }

Example
records { stepContext, input ->
    listOf(
        LettuceStreamsProduceRecord(
            key = "battery-state-stream",
            value = mapOf(
                "deviceId" to "device-42",
                "batteryLevel" to "98",
                "timestamp" to "1712520900"
            )
        )
    )
}

Shared defaults for Redis Lettuce steps

You can define defaults once in the scenario section or just after, and let all following Redis Lettuce steps inherit them.

scenario {
  redisLettuce().defaults { (1)
    connection {
      nodes = listOf("localhost:6379")
      database = 0
      redisConnectionType = RedisConnectionType.SINGLE
    }
    monitoring {
      events = true
      meters = true
    }
  }
}
1 Defaults are applied to subsequent Redis Lettuce steps in the same scenario. Individual steps can still override values.