Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS support for Redis connections #331

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,27 @@
"cSpell.words": [
"autopep",
"datayoga",
"hgetall",
"hset",
"isort",
"jdbc",
"jmespath",
"jsonschema",
"keyspace",
"loglevel",
"lpush",
"orjson",
"Pylance",
"pylintrc",
"pytest",
"PYTHONDEVMODE",
"PYTHONPATH",
"sadd",
"sqlalchemy",
"sqlserver",
"venv",
"xadd"
"xadd",
"zadd"
],
"python.analysis.typeCheckingMode": "basic",
"python.languageServer": "Default",
Expand Down
7 changes: 1 addition & 6 deletions core/src/datayoga_core/blocks/redis/lookup/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ def init(self, context: Optional[Context] = None):
# Dry mode is internal and used for validate the block without establishing a connection.
# This behavior should be implemented in a common way, see this issue: https://lnk.pw/eklj
if not self.properties.get("dry"):
self.redis_client = redis_utils.get_client(
connection.get("host"),
connection.get("port"),
connection.get("user"),
connection.get("password")
)
self.redis_client = redis_utils.get_client(connection)

self.field_path = [utils.unescape_field(field) for field in utils.split_field(self.properties.get("field"))]

Expand Down
6 changes: 1 addition & 5 deletions core/src/datayoga_core/blocks/redis/read_stream/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")

connection = get_connection_details(self.properties.get("connection"), context)
self.redis_client = redis_utils.get_client(
connection.get("host"),
connection.get("port"),
connection.get("user"),
connection.get("password"))
self.redis_client = redis_utils.get_client(connection)

self.stream = self.properties["stream_name"]
self.snapshot = self.properties.get("snapshot", False)
Expand Down
41 changes: 37 additions & 4 deletions core/src/datayoga_core/blocks/redis/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,49 @@
from typing import Optional
import ssl
from typing import Any, Dict

import redis
from redis import Redis


def get_client(host: str, port: int, user: Optional[str] = None, password: Optional[str] = None) -> Redis:
def get_client(connection: Dict[str, Any]) -> Redis:
"""Establishes a connection to a Redis server with optional SSL/TLS encryption and authentication.

Args:
connection (Dict[str, Any]): A dictionary containing connection parameters:
- "host" (str): The Redis server hostname or IP address.
- "port" (int): The Redis server port number.
- "user" (Optional[str]): Redis username.
- "password" (Optional[str]): Redis password.
- "key" (Optional[str]): Path to the client private key file for SSL/TLS.
- "key_password" (Optional[str]): Password for the client private key file.
- "cert" (Optional[str]): Path to the client certificate file for SSL/TLS.
- "cacert" (Optional[str]): Path to the CA certificate file for SSL/TLS.

Returns:
Redis: Redis client instance.

Raises:
ValueError: If connection to Redis fails.
"""
host = connection["host"]
port = connection["port"]
key = connection.get("key")
key_password = connection.get("key_password")
cert = connection.get("cert")
cacert = connection.get("cacert")

try:
client = redis.Redis(
host=host,
port=port,
username=user,
password=password,
username=connection.get("user"),
password=connection.get("password"),
ssl=(key is not None and cert is not None) or cacert is not None,
ssl_keyfile=key,
ssl_password=key_password,
ssl_certfile=cert,
ssl_cert_reqs=ssl.CERT_REQUIRED if cacert else ssl.CERT_NONE,
ssl_ca_certs=cacert,
decode_responses=True,
client_name="datayoga"
)
Expand Down
6 changes: 1 addition & 5 deletions core/src/datayoga_core/blocks/redis/write/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")

connection = get_connection_details(self.properties.get("connection"), context)
self.redis_client = redis_utils.get_client(
connection.get("host"),
connection.get("port"),
connection.get("user"),
connection.get("password"))
self.redis_client = redis_utils.get_client(connection)

self.command = self.properties.get("command", "HSET")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,31 @@
"password": {
"description": "Redis DB password",
"type": "string"
},
"key": {
"title": "Private key file to authenticate with",
"type": "string"
},
"key_password": {
"title": "Password for unlocking an encrypted private key",
"type": "string"
},
"cert": {
"title": "Client certificate file to authenticate with",
"type": "string"
},
"cacert": {
"title": "CA certificate file to verify with",
"type": "string"
}
},
"additionalProperties": false,
"required": ["type", "host", "port"],
"dependentSchemas": {
"user": {
"required": ["password"]
}
"dependentRequired": {
"key": ["cert"],
"cert": ["key"],
"key_password": ["key"],
"user": ["password"]
},
"examples": [
{
Expand Down
117 changes: 85 additions & 32 deletions integration-tests/common/redis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@


def get_redis_client(host: str, port: int, password: Optional[str] = None) -> Redis:
"""Establishes a connection to a Redis server.

Args:
host (str): The Redis server hostname or IP address.
port (int): The Redis server port number.
password (Optional[str], optional): Redis password. Defaults to None.

Returns:
Redis: Redis client instance.

Raises:
ValueError: If connection to Redis fails.
"""
try:
client = redis.Redis(
host=host,
Expand All @@ -25,39 +38,79 @@ def get_redis_client(host: str, port: int, password: Optional[str] = None) -> Re


def get_redis_oss_container(redis_port: int, redis_password: Optional[str] = None) -> RedisContainer:
"""Creates a Redis container for testing purposes.

Args:
redis_port (int): The port to bind the container's Redis server.
redis_password (Optional[str], optional): Redis password. Defaults to None.

Returns:
RedisContainer: Redis container instance.
"""
return RedisContainer(password=redis_password).with_bind_ports(6379, redis_port)


def add_to_emp_stream(redis_client: Redis):
redis_client.xadd(
"emp",
{"message":
json.dumps({"_id": 1, "fname": "john", "lname": "doe", "country_code": 972, "country_name": "israel",
"credit_card": "1234-1234-1234-1234", "gender": "M", "addresses": [
{"id": 1, "country_code": "IL", "address": "my address 1"},
{"id": 2, "country_code": "US", "address": "my address 2"}
], "__$$opcode": "d"})})

redis_client.xadd(
"emp",
{"message":
json.dumps({"_id": 2, "fname": "jane", "lname": "doe", "country_code": 972, "country_name": "israel",
"credit_card": "1000-2000-3000-4000", "gender": "F", "addresses": [
{"id": 3, "country_code": "IL", "address": "my address 3"},
{"id": 4, "country_code": "US", "address": "my address 4"}
], "__$$opcode": "u"})})

redis_client.xadd(
"emp",
{"message":
json.dumps({
"_id": 12, "fname": "john", "lname": "doe", "country_code": 972, "country_name": "israel",
"credit_card": "1234-1234-1234-1234", "gender": "M", "addresses": [
{"id": 5, "country_code": "IL", "address": "my address 5"}], "__$$opcode": "u"})})

# unsupported opcode
redis_client.xadd(
"emp",
{"message":
json.dumps({"_id": 99, "fname": "john", "lname": "doe", "country_code": 972, "country_name": "israel",
"credit_card": "1234-1234-1234-1234", "gender": "M", "addresses": [], "__$$opcode": "x"})})
"""Adds records to the 'emp' Redis stream using XADD command.

Args:
redis_client (Redis): Redis client instance.
"""
records = [
{
"_id": 1,
"fname": "john",
"lname": "doe",
"country_code": 972,
"country_name": "israel",
"credit_card": "1234-1234-1234-1234",
"gender": "M",
"addresses": [
{"id": 1, "country_code": "IL", "address": "my address 1"},
{"id": 2, "country_code": "US", "address": "my address 2"}
],
"__$$opcode": "d"
},
{
"_id": 2,
"fname": "jane",
"lname": "doe",
"country_code": 972,
"country_name": "israel",
"credit_card": "1000-2000-3000-4000",
"gender": "F",
"addresses": [
{"id": 3, "country_code": "IL", "address": "my address 3"},
{"id": 4, "country_code": "US", "address": "my address 4"}
],
"__$$opcode": "u"
},
{
"_id": 12,
"fname": "john",
"lname": "doe",
"country_code": 972,
"country_name": "israel",
"credit_card": "1234-1234-1234-1234",
"gender": "M",
"addresses": [
{"id": 5, "country_code": "IL", "address": "my address 5"}
],
"__$$opcode": "u"
},
# unsupported opcode
{
"_id": 99,
"fname": "john",
"lname": "doe",
"country_code": 972,
"country_name": "israel",
"credit_card": "1234-1234-1234-1234",
"gender": "M",
"addresses": [],
"__$$opcode": "x"
}
]

for record in records:
redis_client.xadd("emp", {"message": json.dumps(record)})
11 changes: 5 additions & 6 deletions integration-tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ def wait_program(process: Popen, sig: Optional[int] = signal.SIGTERM, ignore_err
raise ValueError("command failed")


def run_job(job: str, piped_from: Optional[str] = None, piped_to: Optional[str] = None,
def run_job(job_name: str, piped_from: Optional[str] = None, piped_to: Optional[str] = None,
background: bool = False) -> Optional[Popen]:
"""
Runs a job using the `datayoga` command-line tool.
"""Runs a job using the `datayoga` command-line tool.

Args:
job (str): The name or identifier of the job to run.
job_name (str): The name or identifier of the job to run.
piped_from (Optional[str], optional): The command or file to pipe input from. Defaults to None.
piped_to (Optional[str], optional): The file to redirect output to. Defaults to None.
background (bool, optional): If True, runs the job in the background. Defaults to False.
Expand All @@ -74,8 +73,8 @@ def run_job(job: str, piped_from: Optional[str] = None, piped_to: Optional[str]
piped_from_cmd = f"{piped_from} | " if piped_from else ""
piped_to_cmd = f" > {piped_to}" if piped_to else ""

command = f'{piped_from_cmd}datayoga run {job} ' \
f'--dir {path.join(os.path.dirname(os.path.realpath(__file__)), "..", "resources")} ' \
command = f'{piped_from_cmd}datayoga run {job_name} ' \
f'--dir {path.join(path.dirname(path.realpath(__file__)), "..", "resources")} ' \
f'--loglevel DEBUG{piped_to_cmd}'

return execute_program(command, background=background)
14 changes: 0 additions & 14 deletions integration-tests/resources/jobs/pg_to_redis.yaml

This file was deleted.

16 changes: 13 additions & 3 deletions integration-tests/test_redis_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@


@pytest.mark.parametrize(
"configuration, key, expected",
"job_name, key, expected",
[("tests.redis_lookup_string", "0", "None"),
("tests.redis_lookup_string", "1", "test_string"),
("tests.redis_lookup_hash", "2", "{'tf0': 'tv0', 'tf1': 'tv1'}"),
("tests.redis_lookup_set", "3", "{'tv0'}"),
("tests.redis_lookup_sorted_set", "4", "['tv0', '10', 'tv1', '20']"),
("tests.redis_lookup_list", "5", "['tv2', 'tv1', 'tv0']"),
("tests.redis_lookup_string_nested", "1", "{'a': {'b': {'c.d': 'test_string'}}}")])
def test_redis_lookup(configuration, key, expected):
def test_redis_lookup(job_name: str, key: str, expected: str):
"""Tests the functionality of Redis lookup operations.

Args:
job_name (str): The name of the job to run.
key (str): The Redis key to lookup.
expected (str): The expected value after the Redis lookup.

Raises:
AssertionError: If the retrieved value from Redis does not match the expected value.
"""
redis_container = redis_utils.get_redis_oss_container(redis_utils.REDIS_PORT)
redis_container.start()

Expand All @@ -25,7 +35,7 @@ def test_redis_lookup(configuration, key, expected):
redis_client.zadd("sorted_set_key", mapping={"tv0": "10", "tv1": "20"})
redis_client.lpush("list_key", "tv0", "tv1", "tv2")

run_job(configuration)
run_job(job_name)

val = redis_client.hgetall(key)
assert val["obj"] == expected
Expand Down