Skip to content

Commit

Permalink
Merge pull request #331 from datayoga-io/96-redisread_stream-rediswri…
Browse files Browse the repository at this point in the history
…te-and-redislookup-blocks-support-tls

TLS support for Redis connections
  • Loading branch information
spicy-sauce authored Oct 31, 2023
2 parents 318dc6f + b90950e commit af9d793
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 80 deletions.
8 changes: 7 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,27 @@
"cSpell.words": [
"autopep",
"datayoga",
"hgetall",
"hset",
"isort",
"jdbc",
"jmespath",
"jsonschema",
"keyspace",
"loglevel",
"lpush",
"orjson",
"Pylance",
"pylintrc",
"pytest",
"PYTHONDEVMODE",
"PYTHONPATH",
"sadd",
"sqlalchemy",
"sqlserver",
"venv",
"xadd"
"xadd",
"zadd"
],
"python.analysis.typeCheckingMode": "basic",
"python.languageServer": "Default",
Expand Down
7 changes: 1 addition & 6 deletions core/src/datayoga_core/blocks/redis/lookup/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ def init(self, context: Optional[Context] = None):
# Dry mode is internal and used for validate the block without establishing a connection.
# This behavior should be implemented in a common way, see this issue: https://lnk.pw/eklj
if not self.properties.get("dry"):
self.redis_client = redis_utils.get_client(
connection.get("host"),
connection.get("port"),
connection.get("user"),
connection.get("password")
)
self.redis_client = redis_utils.get_client(connection)

self.field_path = [utils.unescape_field(field) for field in utils.split_field(self.properties.get("field"))]

Expand Down
6 changes: 1 addition & 5 deletions core/src/datayoga_core/blocks/redis/read_stream/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")

connection = get_connection_details(self.properties.get("connection"), context)
self.redis_client = redis_utils.get_client(
connection.get("host"),
connection.get("port"),
connection.get("user"),
connection.get("password"))
self.redis_client = redis_utils.get_client(connection)

self.stream = self.properties["stream_name"]
self.snapshot = self.properties.get("snapshot", False)
Expand Down
41 changes: 37 additions & 4 deletions core/src/datayoga_core/blocks/redis/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,49 @@
from typing import Optional
import ssl
from typing import Any, Dict

import redis
from redis import Redis


def get_client(host: str, port: int, user: Optional[str] = None, password: Optional[str] = None) -> Redis:
def get_client(connection: Dict[str, Any]) -> Redis:
"""Establishes a connection to a Redis server with optional SSL/TLS encryption and authentication.
Args:
connection (Dict[str, Any]): A dictionary containing connection parameters:
- "host" (str): The Redis server hostname or IP address.
- "port" (int): The Redis server port number.
- "user" (Optional[str]): Redis username.
- "password" (Optional[str]): Redis password.
- "key" (Optional[str]): Path to the client private key file for SSL/TLS.
- "key_password" (Optional[str]): Password for the client private key file.
- "cert" (Optional[str]): Path to the client certificate file for SSL/TLS.
- "cacert" (Optional[str]): Path to the CA certificate file for SSL/TLS.
Returns:
Redis: Redis client instance.
Raises:
ValueError: If connection to Redis fails.
"""
host = connection["host"]
port = connection["port"]
key = connection.get("key")
key_password = connection.get("key_password")
cert = connection.get("cert")
cacert = connection.get("cacert")

try:
client = redis.Redis(
host=host,
port=port,
username=user,
password=password,
username=connection.get("user"),
password=connection.get("password"),
ssl=(key is not None and cert is not None) or cacert is not None,
ssl_keyfile=key,
ssl_password=key_password,
ssl_certfile=cert,
ssl_cert_reqs=ssl.CERT_REQUIRED if cacert else ssl.CERT_NONE,
ssl_ca_certs=cacert,
decode_responses=True,
client_name="datayoga"
)
Expand Down
6 changes: 1 addition & 5 deletions core/src/datayoga_core/blocks/redis/write/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")

connection = get_connection_details(self.properties.get("connection"), context)
self.redis_client = redis_utils.get_client(
connection.get("host"),
connection.get("port"),
connection.get("user"),
connection.get("password"))
self.redis_client = redis_utils.get_client(connection)

self.command = self.properties.get("command", "HSET")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,31 @@
"password": {
"description": "Redis DB password",
"type": "string"
},
"key": {
"title": "Private key file to authenticate with",
"type": "string"
},
"key_password": {
"title": "Password for unlocking an encrypted private key",
"type": "string"
},
"cert": {
"title": "Client certificate file to authenticate with",
"type": "string"
},
"cacert": {
"title": "CA certificate file to verify with",
"type": "string"
}
},
"additionalProperties": false,
"required": ["type", "host", "port"],
"dependentSchemas": {
"user": {
"required": ["password"]
}
"dependentRequired": {
"key": ["cert"],
"cert": ["key"],
"key_password": ["key"],
"user": ["password"]
},
"examples": [
{
Expand Down
117 changes: 85 additions & 32 deletions integration-tests/common/redis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@


def get_redis_client(host: str, port: int, password: Optional[str] = None) -> Redis:
"""Establishes a connection to a Redis server.
Args:
host (str): The Redis server hostname or IP address.
port (int): The Redis server port number.
password (Optional[str], optional): Redis password. Defaults to None.
Returns:
Redis: Redis client instance.
Raises:
ValueError: If connection to Redis fails.
"""
try:
client = redis.Redis(
host=host,
Expand All @@ -25,39 +38,79 @@ def get_redis_client(host: str, port: int, password: Optional[str] = None) -> Re


def get_redis_oss_container(redis_port: int, redis_password: Optional[str] = None) -> RedisContainer:
"""Creates a Redis container for testing purposes.
Args:
redis_port (int): The port to bind the container's Redis server.
redis_password (Optional[str], optional): Redis password. Defaults to None.
Returns:
RedisContainer: Redis container instance.
"""
return RedisContainer(password=redis_password).with_bind_ports(6379, redis_port)


def add_to_emp_stream(redis_client: Redis):
redis_client.xadd(
"emp",
{"message":
json.dumps({"_id": 1, "fname": "john", "lname": "doe", "country_code": 972, "country_name": "israel",
"credit_card": "1234-1234-1234-1234", "gender": "M", "addresses": [
{"id": 1, "country_code": "IL", "address": "my address 1"},
{"id": 2, "country_code": "US", "address": "my address 2"}
], "__$$opcode": "d"})})

redis_client.xadd(
"emp",
{"message":
json.dumps({"_id": 2, "fname": "jane", "lname": "doe", "country_code": 972, "country_name": "israel",
"credit_card": "1000-2000-3000-4000", "gender": "F", "addresses": [
{"id": 3, "country_code": "IL", "address": "my address 3"},
{"id": 4, "country_code": "US", "address": "my address 4"}
], "__$$opcode": "u"})})

redis_client.xadd(
"emp",
{"message":
json.dumps({
"_id": 12, "fname": "john", "lname": "doe", "country_code": 972, "country_name": "israel",
"credit_card": "1234-1234-1234-1234", "gender": "M", "addresses": [
{"id": 5, "country_code": "IL", "address": "my address 5"}], "__$$opcode": "u"})})

# unsupported opcode
redis_client.xadd(
"emp",
{"message":
json.dumps({"_id": 99, "fname": "john", "lname": "doe", "country_code": 972, "country_name": "israel",
"credit_card": "1234-1234-1234-1234", "gender": "M", "addresses": [], "__$$opcode": "x"})})
"""Adds records to the 'emp' Redis stream using XADD command.
Args:
redis_client (Redis): Redis client instance.
"""
records = [
{
"_id": 1,
"fname": "john",
"lname": "doe",
"country_code": 972,
"country_name": "israel",
"credit_card": "1234-1234-1234-1234",
"gender": "M",
"addresses": [
{"id": 1, "country_code": "IL", "address": "my address 1"},
{"id": 2, "country_code": "US", "address": "my address 2"}
],
"__$$opcode": "d"
},
{
"_id": 2,
"fname": "jane",
"lname": "doe",
"country_code": 972,
"country_name": "israel",
"credit_card": "1000-2000-3000-4000",
"gender": "F",
"addresses": [
{"id": 3, "country_code": "IL", "address": "my address 3"},
{"id": 4, "country_code": "US", "address": "my address 4"}
],
"__$$opcode": "u"
},
{
"_id": 12,
"fname": "john",
"lname": "doe",
"country_code": 972,
"country_name": "israel",
"credit_card": "1234-1234-1234-1234",
"gender": "M",
"addresses": [
{"id": 5, "country_code": "IL", "address": "my address 5"}
],
"__$$opcode": "u"
},
# unsupported opcode
{
"_id": 99,
"fname": "john",
"lname": "doe",
"country_code": 972,
"country_name": "israel",
"credit_card": "1234-1234-1234-1234",
"gender": "M",
"addresses": [],
"__$$opcode": "x"
}
]

for record in records:
redis_client.xadd("emp", {"message": json.dumps(record)})
11 changes: 5 additions & 6 deletions integration-tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ def wait_program(process: Popen, sig: Optional[int] = signal.SIGTERM, ignore_err
raise ValueError("command failed")


def run_job(job: str, piped_from: Optional[str] = None, piped_to: Optional[str] = None,
def run_job(job_name: str, piped_from: Optional[str] = None, piped_to: Optional[str] = None,
background: bool = False) -> Optional[Popen]:
"""
Runs a job using the `datayoga` command-line tool.
"""Runs a job using the `datayoga` command-line tool.
Args:
job (str): The name or identifier of the job to run.
job_name (str): The name or identifier of the job to run.
piped_from (Optional[str], optional): The command or file to pipe input from. Defaults to None.
piped_to (Optional[str], optional): The file to redirect output to. Defaults to None.
background (bool, optional): If True, runs the job in the background. Defaults to False.
Expand All @@ -74,8 +73,8 @@ def run_job(job: str, piped_from: Optional[str] = None, piped_to: Optional[str]
piped_from_cmd = f"{piped_from} | " if piped_from else ""
piped_to_cmd = f" > {piped_to}" if piped_to else ""

command = f'{piped_from_cmd}datayoga run {job} ' \
f'--dir {path.join(os.path.dirname(os.path.realpath(__file__)), "..", "resources")} ' \
command = f'{piped_from_cmd}datayoga run {job_name} ' \
f'--dir {path.join(path.dirname(path.realpath(__file__)), "..", "resources")} ' \
f'--loglevel DEBUG{piped_to_cmd}'

return execute_program(command, background=background)
14 changes: 0 additions & 14 deletions integration-tests/resources/jobs/pg_to_redis.yaml

This file was deleted.

16 changes: 13 additions & 3 deletions integration-tests/test_redis_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@


@pytest.mark.parametrize(
"configuration, key, expected",
"job_name, key, expected",
[("tests.redis_lookup_string", "0", "None"),
("tests.redis_lookup_string", "1", "test_string"),
("tests.redis_lookup_hash", "2", "{'tf0': 'tv0', 'tf1': 'tv1'}"),
("tests.redis_lookup_set", "3", "{'tv0'}"),
("tests.redis_lookup_sorted_set", "4", "['tv0', '10', 'tv1', '20']"),
("tests.redis_lookup_list", "5", "['tv2', 'tv1', 'tv0']"),
("tests.redis_lookup_string_nested", "1", "{'a': {'b': {'c.d': 'test_string'}}}")])
def test_redis_lookup(configuration, key, expected):
def test_redis_lookup(job_name: str, key: str, expected: str):
"""Tests the functionality of Redis lookup operations.
Args:
job_name (str): The name of the job to run.
key (str): The Redis key to lookup.
expected (str): The expected value after the Redis lookup.
Raises:
AssertionError: If the retrieved value from Redis does not match the expected value.
"""
redis_container = redis_utils.get_redis_oss_container(redis_utils.REDIS_PORT)
redis_container.start()

Expand All @@ -25,7 +35,7 @@ def test_redis_lookup(configuration, key, expected):
redis_client.zadd("sorted_set_key", mapping={"tv0": "10", "tv1": "20"})
redis_client.lpush("list_key", "tv0", "tv1", "tv2")

run_job(configuration)
run_job(job_name)

val = redis_client.hgetall(key)
assert val["obj"] == expected
Expand Down

0 comments on commit af9d793

Please sign in to comment.