Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blueprint for a message broker bridge NApp #457

Open
viniarck opened this issue Feb 19, 2024 · 10 comments · Fixed by atlanticwave-sdx/sdx-oxp-integrator#6
Open

blueprint for a message broker bridge NApp #457

viniarck opened this issue Feb 19, 2024 · 10 comments · Fixed by atlanticwave-sdx/sdx-oxp-integrator#6
Assignees
Labels
future_release Planned for the next release

Comments

@viniarck
Copy link
Member

There are use cases where a NApp subscribes from kytos event handlers and publish them to an external message broker. This is for potentially integrating with other platforms that already uses a message broker and/or new external applications.

Use cases will be further refined and discussed (future BAPM integration, SDX integration, and so on), but the idea is just a bare minimum message broker bridge NApp that can forward accordingly to a supported message broker.

Note: One day Kytos-ng event bus might also natively support other message brokers, but strategically it was always part of the to have a lightweight simplified event bus as we currently have in Python land without external dependencies.

@viniarck viniarck added the future_release Planned for the next release label Feb 19, 2024
@viniarck
Copy link
Member Author

Potentially the message broker will be RabbitMQ. This will be double confirmed as it gets prioritized.

@viniarck viniarck removed the future_release Planned for the next release label Feb 27, 2024
@lmarinve
Copy link

lmarinve commented Mar 13, 2024

:EP: 38
:Title: Broadcasting events
:Authors:
- Luis;
- Italo;
- Vinicius;
- Jeronimo Bezerra [email protected];
:Issued Date: to be defined
:Status: Pending
:Type: Standards Track


EP03X - Broadcasting events


########
Abstract
########

This blueprint details how Kytos should export/broadcast its events
to external applications in a safe and scalable way.

##########
Motivation
##########

With Kytos running in production, we learned that Kytos events can be
leveraged for multiple purposes, from capacity planning to topology evaluation,
from troubleshooting to accounting. However, creating Kytos napps to only monitor the environment should be avoided to minimize possible impacts on running the kytos instance. Moreover, this napp has the potential to expedite the prototyping of new applications and frameworks that need an understanding of the
network topology and control plane, such as the AtlanticWave-SDX project.

##########
Requirements
##########

This napp should not be named after or delegate part of its functions to the message broker system
This napp has to be an event broadcaster only. No input via message_bus should be supported/allowed.
This napp has to support multiple queues: one queue per napp and a queue for all napps (verbose_queue)
This napp is optional, meaning if there is any issue with it, kytos shouldn’t be impacted
This napp must use async.io for all asynchronous calls, for instance, alister_to instead of listen_to
This napp has to keep consistency after reloading (re-read settings.py again, close and open rabbitmq sessions, with tests for these functionalities)
This napp must support filtering specific REGEX messages via settings.py (a list of regex)

The Broadcasting Events Napp (application) must be able to utilize any replaceable message transportation system. This requirement underscores the necessity for flexibility and adaptability in the system's architecture. Consequently, the Broadcasting Events Napp and the Events Consumer Napp should be isolated into microservices napps, distinct from the Message Queue Producer and Consumer Napps. This architectural decision offers several benefits:

########
Modularity and Scalability
########

Microservices allow for modular design, enabling each component to be developed, deployed, and scaled independently. This modularity facilitates easier management and scaling of the system as a whole.
Technology Agnosticism:

By isolating the Broadcasting Events Napp and the Events Consumer Napp, the system becomes agnostic to the specific message transportation system. This means that the choice of a message queue or transportation system can be easily swapped out or upgraded without affecting the core functionality of the microservices.

##########################
Ease of Maintenance and Updates
##########################

Separating the microservices reduces the complexity of each component, making maintenance and updates more straightforward. Changes to one microservice can be made without impacting the others, allowing for faster iteration and deployment cycles.

######################
Resilience and Fault Isolation
######################

Isolating microservices enhances fault isolation. If one microservice experiences issues or failures, they are less likely to impact the overall system's performance, ensuring greater resilience and stability of the application.
Scalability and Performance Optimization:

Microservices architecture facilitates resource optimization by allowing individual components to be scaled independently based on demand. This scalability ensures optimal performance even during peak loads.
Enhanced Security:

Isolating components into microservices can enhance security by reducing the attack surface area. Each microservice can have its security measures, such as access controls and authentication mechanisms, ensuring that security breaches are contained and mitigated effectively.

The message queue operates by receiving messages from the events originating application and distributing them to one or more recipient applications in a first-in-first-out (FIFO) manner. This architecture facilitates communication between different parts of a system without direct coupling.

Events Napp can establish separate message queues for SDX Napp and BAPM Napp to disseminate updates or commands. Events Napp would then dispatch distinct messages to each queue, and the relevant applications would retrieve messages from their designated queues.

Upon retrieval, the system removes messages from the queue to ensure each message is processed exactly once.

Message queues decouple components within the system. Events Napp can transmit updates without the continuous availability of SDX Napp or BAPM applications. Moreover, the persistent nature of message queues ensures that if any application experiences a restart, it can seamlessly resume processing messages from its designated queue once it is back online.

This approach enhances scalability and fault tolerance in applications by reducing dependencies between interconnected systems. Additionally, it facilitates better handling of system failures or temporary unavailability, thereby strengthening the overall robustness of the architecture.

AMQP 0.9.1, a highly efficient and versatile protocol, empowers RabbitMQ, a widely acclaimed message broker, to communicate seamlessly across various systems. This robust combination ensures reliable communication between different components of a distributed system.

aio-pika is an asynchronous AMQP client library designed for Python applications. It enables asynchronous and efficient interaction with RabbitMQ, making it well-suited for high-performance applications or systems that require non-blocking I/O operations.

By leveraging the power of aio-pika, we can seamlessly integrate RabbitMQ's robust messaging capabilities into our Kytos Napps. This potent combination of RabbitMQ's strength and aio-pika's asynchronous nature inspires the creation of scalable and responsive distributed systems, fueling our projects' potential.

#######
Benefits
#######

Real-time monitoring of interdomain link status.

Scalable solution with dynamically created queues.

Fault-tolerant design ensures persistent handling of link-down events.

Flexibility in queue management, allowing for dynamic addition or configuration based on settings.

Overall, this use case demonstrates how message queues can be effectively utilized for monitoring and managing interdomain link status changes in an SDX network infrastructure, ensuring timely detection and response to connectivity issues.

In conclusion, isolating the Broadcasting Events Napp and the Events Consumer Napp into separate microservices from the Message Queue Producer and Consumer Napps promotes flexibility, modularity, scalability, and resilience in the system's architecture. This design approach enables the system to adapt to changing requirements and technologies while maintaining robustness and efficiency in event broadcasting and consumption.

#####################################
Use Case: Interdomain Link Up/Down Monitoring
#####################################

Scenario:

In SDX, monitoring the status of interdomain links for connectivity and reliability is crucial. This use case involves setting up a system to detect link status changes (up/down) and notify consumers about these events through message queues.

Components:

Producer: Generates events based on link status changes.

Consumer: Monitors link status by consuming messages from the appropriate queues.

Implementation:

Link Status Queue Initialization:

Each interdomain link has its dedicated queue.

Queues are either dynamically created or configured based on settings.

Producer Functionality:

Upon detecting a link-down event:

If the link-down event is persistent:
The producer checks if the corresponding queue is empty.

If empty, it adds a new message "[Link Down]" to the unique element link status queue.

If the link-down event is not persistent:
The producer keeps checking until the consumer reads and deletes the message.

Upon detecting a link-up event:
The producer sends a message "[Link Up]" to override the existing message in the link status queue.

Consumer Functionality:

Consumers must be aware of the queues they need to monitor for link status changes.

Consumers continuously monitor the designated queues for new messages indicating link status changes.

Upon receiving a message:

If the message is "[Link Down]," the consumer processes the link down event.

If the message is "[Link Up]", the consumer processes the link-up event.

@italovalcy
Copy link

Hi everybody, it seems like PR #465 didnt make it, but I see many important comments and discussions there! very nice.

I'm also CCing @Auwate for his comments here! Austin has started working on a prototype for integrating Kytos and Kafka recently and I suggested him reading the blueprint proposal Luis started a while ago. After reading the blueprint proposal, Austin has some comments. Austin, please the floor is yours!

@Auwate
Copy link

Auwate commented Nov 8, 2024

Hey everybody, good afternoon. I've worked on prototypes with Kytos and Kafka, focusing on areas such as serialization, SSL/TLS transmission, and Kafka deployments. Here's the overall analysis:

Strengths

  • Modular: Using two different NApps to facilitate message brokering allows the development of both to be streamlined without negatively affecting the other
  • Scalable: Switching RabbitMQ out for Kafka allows Kafka's horizontal scalability to shine, especially for peak data throughput
  • Fault-tolerant: Using two NApps allows one to go offline without negatively affecting the other, as it waits for the NApp to come online before continuing data brokering.

There's a lot more strengths to talk about, as it's a very well structured application. However, my analysis is more on optimizing what's available and highlighting potential bottlenecks and weaknesses:

Analysis summary

The role of mq_producer is somewhat abstract, but assuming listen_events directly sends events to Kafka, the application's event path looks like this:

  • 1: Event is created in Kytos
  • 2: Event is consumed, filtered, serialized, and pushed to Kafka
  • 3: Operational tasks are handled by mq_producer, which includes operating topics, increasing/decreasing partitions, etc.
  • 4: Kafka load balances partitions onto different instances, as well as other internal processes
  • 5: Consumer applications poll from Kafka, handling retries and other necessities.

This is a solid approach, and provides a scalable way to stream events to consumer applications. However, there are some important tradeoffs to consider:

  • 1: Consumer NApps would have to install and use a Kafka library like kafka-python-ng or aiokafka.

Because they are polling directly from Kafka, they would need to have the necessary installations to do so which may increase operational complexity (such as specifying the version to install, issues against Python versions, etc.).

A separate issue is standardizing the way consumer applications poll data. Now there are several ways we can approach this, starting with direct communication between Kafka and consumers. This is likely the most scalable way, as we can increase the amount of Kafka instances in the cluster to support increase demand. However, it would also require us to provide a standardized way of communicating with Kafka. This can be solved with providing a library we send to PyPI, which individuals install and use as a object or function. In addition, direct communication between a Kafka cluster and consumer may introduce security concerns if an application can just connect and receive data.

The second way we could do this is through another NAPP that acts as a proxy, sending data to consumers via their REST API. This continues the format of Kytos's design of a REST API, following a simple and straightforward approach. In addition, we could optimize it for network throughput via compression (like gzip) and possibly increase scalability. This would require much more development as it's an in-house option, but it could be worth it if security and ease-of-use from consumers are important factors to the overall architecture. For an example of what I'm talking about, here's a link to Walmart's MPS proxy system: https://blog.bytebytego.com/p/the-trillion-message-kafka-setup

  • 2: listen_events may need careful consideration on it's development

Depending on whether or not we want high throughput of events, focusing on making the listen_events loosely coupled between components is highly desirable. Specifically, the way it currently works is the listen_events NApp listens for events, filters them, serializes the message, and sends it to Kafka. If the idea is to keep it to a single NApp, then making sure each portion is multithreaded (consuming event, filter events, and serializing events) with asynchronous messaging is vital.

Looking at filtering, it's likely the easiest of the 3 to implement. This is because a set could be created that filters based on what's inside the set.

Next, looking next at sending data, this will likely need to be developed carefully. Specifically, Kafka works best with batched messages, but in order to get acknowledgements there may be some work involved depending on the framework. kafka-python-ng is a traditional synchronous library so using async here might not be a good idea. Further research into aiokafka might be a good idea, although it is a less-used framework.

  • 3: listen_events may need careful consideration on serialization

A massive bottleneck to listen_events will likely be serialization. With this, possible serialization methods include Protobuf, Avro, or MessagePack. MessagePack looks very promising as it's simple like JSON but encodes to binary instead of strings. In addition, it's schema-less, just like JSON which makes updating the serializer for new classes much easier

  • 4: The Kafka cluster would likely need to be deployed on a platform like Docker-Compose

A major point of the application is that it should be able to temporarily store data if a NApp goes down. However, if the Kafka instance goes down itself, then it would need to come back up and continue sending data. Something like Docker-Compose works great with specifying restarts and attaching volumes when necessary, while also being much more lightweight than something like Kubernetes.


Additional comments:

I understand that the NApp would need to be written in Python to connect with Kytos as a NApp. However, if possible, I believe Java is a really good option with connecting to Kafka. It has access to the Kafka Streams API, which creates fast, readable code that can do things like filtering with ease and exactly-once semantics. Here's the link to more information (It's based on Confluent, but the concepts remain the same): https://docs.confluent.io/platform/current/streams/introduction.html

@Auwate
Copy link

Auwate commented Nov 8, 2024

Hello again all, I took a look at the MR that this references and I was looking at addressing some issues regarding the blueprint:

1: Why Kafka

In the above comment I assumed that we would be using Kafka (specifically Apache Kafka) but I wanted to explain why:

Kafka and RabbitMQ (while both message brokers) are fundamentally different and thus provide usage to different needs. Specifically, RabbitMQ (supporting AMQP protocol) is really good at providing complex asynchronous message queues between applications, specifically from it's "exchange" feature. In addition, it provides several features that allow for more complex business logic, such as administrative tools (like a web ui), message priorities, message ordering, etc. However, it's not necessarily built for high-throughput, which is exactly what Kafka is good at. Specifically, Kafka gives up many features given to you in RabbitMQ in exchange for being able to send out millions of messages a second, whereas RabbitMQ (according to https://aws.amazon.com/compare/the-difference-between-rabbitmq-and-kafka/) mentions it throttles at around thousand if on one instance.

2: Serialization

In the above comment I mentioned that serialization would be a major concern for the application. A serializing algorithm like json.dumps() would cause massive bottlenecks if throughput went too high, which led me to investigate various other algorithms.

A promising serializing algorithm was MessagePack, which has a similar syntax to json serialization. It promises increased speed and efficiency as it encodes directly to binary rather than to strings, decreasing the overall network traffic and speeding up serialization speed. In addition, deserializing would be just as easy and would occur much faster than JSON. However, there are much faster options like Protobuf, but they would require us to create and compile proto files, which may increase complexity at a moderate tangible benefit. Overall, it's a good option and works great with Kafka.

3: Retries

Vinicius mentioned concerns on what the NApp should do in case of outages where sending data fails. I like the idea of storing the data on the NApp, and sending once the cluster comes back online, but this requires some nuance to make it scalable and maintainable.

A simple option that maintains a level of fault-tolerance would be to add it to a queue that supports size limits. Arguably, if a component goes down for too long, that would be catastrophic and would require manual intervention, so keeping a buffer equal to the buffer size for aiokafka or kafka-python-ng would suffice. This would allow us to immediately send data out to kafka once the application comes back on.

However, if the NApp itself goes down, it may be a good idea to just let the events go unprocessed. This could be amended based on key requirements, but a possible alternative could be a different NApp that listen_events would pull from in case it goes down. However, pushing every incoming event to the disk to prevent data loss would possibly slow down the NApp significantly, which may go against the NApp's objective of high-throughput, real-time streaming.

4: NApp possible design:

Here's a potential idea:

Because the logic is split into 4 separate steps, it may make logical sense to have a thread for each stage. Here's an example:

1: Getting data: A thread is used to continuously run async functions that listen for events. It would then pass any new events through a pipeline/queue to the filter

2: Filtering: A thread is used to await new data from the pipeline/queue and filter it based on preset configurations. It would then move the data into the next pipeline to the serializer

3: Serializing: A thread is used to await new data from the pipeline/queue and run serializing algorithms on it. This would likely be a potential bottleneck, so perhaps increasing efficiency here is optimal. After serializing, it puts it onto the next pipeline/queue to be sent out

4: Publishing data: A thread is used to await new data from the incoming pipeline/queue and send it out to Kafka. I believe having multiple asynchronous functions going at the same time would be a good idea to maximize the amount of data sent and to limit I/O bottlenecks.

@viniarck
Copy link
Member Author

@Auwate, great start. Here's a partial review:

1: Why Kafka

However, it's not necessarily built for high-throughput, which is exactly what Kafka is good at.

Kafka is a solid choice generally, and for high-throughput it has a slightly edge over RabbitMQ when focusing on throughput. But, what is high-throughput for the use case we're going for?

  • a) In a scenario with 300 EVCs converging (which is a scenario not too far away from prod-like scalability at Amight), which tends to have many events being published, it tend to have approximately 1300 epm - events per minute - (or 78k eps), which is a local peak situation, which doesn't typically last for too long.

  • b) In a theorical unbounded scenario, using my machine, with async tasks publishing events as fast as it can, it can generate up to 187k epm (or 11.22M eps).

Scenario b) is very far from any typical NApp that's currently being used in prod. So a message broker that has a quick sustained throughput around 250k eps with an average size for current usage it should operate relatively well without queuing too much. Which, brings me back to the initial question, what's high-throughput for the intended supported cases are we talking about a) or b)? and also, equally important, how will it be used, consumed by which other applications? Kafka certainly can bridge the events with the case b), but at what cost, how many nodes, storage and SSD would be needed for that kind of on-premises setup? Kafka is a bit infamous for having high OpEx.

Do we to be ready for scenario b) out of the gate? This probably won't impact on the NApp since it'll be written with performance in mind, but in terms of infrastructure if we're aiming for high-throughput which kind of infrastructure needs to exist in place? and will AmLight be able to afford it if not right away but in the future? Let's also highlight overall costs to have a initial Kafka cluster capable of sustaining 500k average sized messages per second (double the amount of the current peak scenario for a typical AmLight topology) without too much delay. How many servers, CPUs and SSDs and NICs would be needed for both this case and for case b)? Scaling out on-premises is harder than in the cloud, but if we're aiming for high-throughput then we should also be able to have an infrastructure to support it, otherwise it's picking a bazooka to kill a fly. So, let's exercise some scenarios here, the beauty of it is that it can scale gradually out, but to achieve the highest throughput how expensive would it be to maintain it? If the Operations team have the resources and is ready to maintain a Kafka cluster let's go for it.

In the future, Kafka also has the potential to be used internally on Kytos-ng core.

2: Serialization

A serializing algorithm like json.dumps() would cause massive bottlenecks if throughput went too high, which led me to investigate various other algorithms. A promising serializing algorithm was MessagePack

Many event contents currently have Python objects, they haven't been standardized to always be have a std format, it was free due to the guarantee that any consumer is still consuming in Python land in the same runtime. So, the first step is to make sure the format can be easily consumed, which brings me again to use case questions:

  • Which kind of consumers we'll we have, will it still be only Python or any other langs? If any other langs, this NApp here would also need to standardize and encode the objects accordingly since changing the original events would require too much refactoring.

MessagePack is a good choice, it can be harder do debug encoding-wise and decoding-wise, but it can be worth it to save some bytes to gain a bit more throughput on wire, and that's pivotal for the hot path. But, let's also measure if with broker supported compression algorithms if MessagePack is still needed, otherwise we can potentially try to stick with JSON with a lib that uses c-extensions + message broker compression, debugging this would be easier. So, let's measure both and make a decision.

3: Retries

However, if the NApp itself goes down, it may be a good idea to just let the events go unprocessed

It's fair to only have retries and do nothing or discard if the broker isn't available, and often times the lib of the message broker provides this funcionality. But, again, goes back to the use case question:

  • For the current consumers and how it'll be used, how critical is to potentially lose some messages?

If the application doesn't have any other fallback, which is fair, then we should make it clear that if consumers can't afford to lose then it's on network operators to ensure that the connectivity latency to the cluster will be low and close and will have many nodes which will minimize greatly the chance of full outage. That way it's crystal clear that for whoever will use the NApp, they'll understand the price infrastructure-wise to pay depending on the criticality of losing an event or not.

This can also be covered later if such extra guarantee is needed, but either way, needs to be clear from the start which kind of guarantees it provides. At the end of the day, locally you can only do so much, so it also won't completely save it indefinitely, but can temporarily buffer things together while the broker is out.

4: NApp possible design:

Because the logic is split into 4 separate steps, it may make logical sense to have a thread for each stage. Here's an example:

Newer kytos-ng org NApps, specially like this one that's IO-based are recommended to be entirely based on asyncio not only for being more light weight, but also to avoid out of order messages when subscribing to Kytos events. You can implement it all in this async context handler, it'll run in a task, and then for subsequent IO, you gotta make sure it's also asyncio compatible. For serialization if it's heavy you might need a custom threadpool, but I'd be surprised if for a typical payload you'd need it, this handler here it would result in a concurrent task being executed in the event loop:

    def setup(self):
        # bootstrap producer connection or let it crash

    def shutdown(self):
        # stop producer

    @alisten_to(".*")
    async def handle_events(self, event: KytosEvent):
        # filter
        # potentially reconnect or autoreconnect
        # await serialize
        # await publish

Overall, this is expected to suffice, from here you can also consider bootstraping new producers in different processes if ever needed if you start to hit CPU issues, but since this is supposed to be IO-bound I'd be surprised if that's ever needed for this case.

5 configuration.

How would it be configured? and which if any APIs would it expose? Reading this as it is, if I were to have a Kafka consumer for example, I wouldn't have an idea how I'd be able to configure certain start streaming certain events to a given topic and partition (if needed).

@Auwate
Copy link

Auwate commented Nov 15, 2024

Performance benchmarks

I agree that we must set standard for "high-throughput", and the scenarios you provided are quite insightful. I believe having a base minimal scalability equal to 1300 epm (or 78k events per hour) is crucial if we plan to put this into production. Taking this into account, I decided to do some trial runs of some simple producer NApps that sends data to Kafka, using JSON vs MessagePack serialization. In addition, it only uses one Kafka instance in a cluster:

JSON Serialization Performance

The total messages sent were 3727 EPM, which is 223k EPH. This is the code used:

"""
Mainclass of italovalcy/testnapp NApp.                                                                                                               
This class is the entry point for this napp
"""                                                                                                                                                      
                                                                                                                                                             
    def setup(self):                                                                                                                                         
        log.info('SETUP testnapp')                                                                                                                           
        self._send_ops = KafkaSendOperations(BOOTSTRAP_SERVERS, ACKS)                                                                                                
        if not self._send_ops.checkForTopic(TOPIC_NAME):                                                                                                     
            self._send_ops.createTopic(TOPIC_NAME, DEFAULT_NUM_PARTITIONS)                                                                                           
        self._event_count: dict[str, int] = {}

     @listen_to('.*')
     def handle_new_switch(self, event):                                                                                                                              
          """Handle the event of a new created switch"""                                                                                                               
          #log.info(f'handle_new_switch event={event} content={event.content}')                                                                                                                                                                                                                                                     
          if event.name in IGNORED_EVENTS:                                                                                                                                 
              log.info(f'{event.name} ignored
              return                                                                                                                                                   
          self._send_ops.sendMessage(TOPIC_NAME, event.name, event.name, event.content)

class KafkaSendOperations:

    def __init__(self, bootstrap_servers: str, acks: int | str) -> None:
        self._producer = KafkaProducer(bootstrap_servers=bootstrap_servers, acks=acks)
        self._admin    = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    def createTopic(self, topic_name: str, num_partitions: int) -> None:
        """ Create a topic with a provided number of partitions """
        self._admin.create_topics(NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=REPLICATION_FACTOR))

    def checkForTopic(self, topic_name: str) -> bool:
        """ Checks if a topic exists """
        try:
            result = self._admin.describe_topics([topic_name])
            return True
        except UnknownTopicOrPartitionError:
            return False
        except Exception as exc:
            raise exc

    def shutdown(self) -> None:
        """ Shut down the admin and producer """
        self._admin.close()
        self._producer.close()

    def sendMessage(self, topic_name: str, event: str, key: str, message: any) -> None:
        """ Send a message to the specified topic name """
        json_message = json.dumps({"event": event, "type": key, "message": message}, cls=ComplexEncoder).encode()
        self._producer.send(topic=topic_name, value=json_message)

ComplexEncoder looks like this:

import json

class ComplexEncoder(json.JSONEncoder):

    def default(self, o):
        """
        Args:
            o (object): An arbitrary object that must be converted to a dict
        """
        try:
            # Check if the object has the "as_dict" function
            if callable(getattr(o, "as_dict", None)):
                return o.as_dict()
            # Check if the object has an overidden "__repr__"
            if type(o).__repr__ != object.__repr__:
                return repr(o)
            return {key: self.encode(element) for key, element in vars(o).items()}
        except Exception:
            return super().default(o

Overall not a bad choice.

MessagePack Serialization Performance

The total messages is 3679 EPM, which is 220k EPH. This is the code that was used:

class Main(KytosNApp):
"""
Mainclass of italovalcy/testnapp NApp.                                                                                                               
This class is the entry point for this napp
"""                                                                                                                                                      
                                                                                                                                                             
    def setup(self):                                                                                                                                         
        log.info('SETUP testnapp')                                                                                                                           
        self._send_ops = KafkaSendOperations(BOOTSTRAP_SERVERS, ACKS)                                                                                                
        if not self._send_ops.checkForTopic(TOPIC_NAME):                                                                                                     
            self._send_ops.createTopic(TOPIC_NAME, DEFAULT_NUM_PARTITIONS)                                                                                           
        self._event_count: dict[str, int] = {}

     @listen_to('.*')
     def handle_new_switch(self, event):                                                                                                                              
          """Handle the event of a new created switch"""                                                                                                               
          #log.info(f'handle_new_switch event={event} content={event.content}')                                                                                                                                                                                                                                                     
          if event.name in IGNORED_EVENTS:                                                                                                                                 
              log.info(f'{event.name} ignored
              return                                                                                                                                                   
          self._send_ops.sendMessage(TOPIC_NAME, event.name, event.name, event.content)

class KafkaSendOperations:

    def __init__(self, bootstrap_servers: str, acks: int | str) -> None:
        self._producer = KafkaProducer(bootstrap_servers=bootstrap_servers, acks=acks)
        self._admin    = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    def createTopic(self, topic_name: str, num_partitions: int) -> None:
        """ Create a topic with a provided number of partitions """
        self._admin.create_topics(NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=REPLICATION_FACTOR))

    def checkForTopic(self, topic_name: str) -> bool:
        """ Checks if a topic exists """
        try:
            result = self._admin.describe_topics([topic_name])
            return True
        except UnknownTopicOrPartitionError:
            return False
        except Exception as exc:
            raise exc

    def shutdown(self) -> None:
        """ Shut down the admin and producer """
        self._admin.close()
        self._producer.close()

    def sendMessage(self, topic_name: str, event: str, key: str, message: any) -> None:
        """ Send a message to the specified topic name """
        msgpack_message = msgpack.packb({"event": event, "type": key, "payload": message}, default=custom_serializer, use_bin_type=True)
        self._producer.send(topic=topic_name, value=msgpack_message)

custom_serializer looks like this:

def custom_serializer(o: any) -> bytes:
    """
    The default serializer when msgpack cannot encode an object
    """
    try:
        # Check if the object has the "as_dict" function
        if callable(getattr(o, "as_dict", None)):
            return o.as_dict()
        # Check if the object has an overidden "__repr__"
        if type(o).__repr__ != object.__repr__:
            return repr(o)
        return {type(o).__name__: {key: element for key, element in vars(o).items()}}
    except Exception:
        return o

Findings

Overall the findings support that currently, the algorithm for encoding data does not seem to affect the performance of the NApp, but rather the issue may be a combination of the total network load and time-to-serialize. Because of the single-instance nature of the application, the findings may be hampered by the fact that this design cannot be horizontally scaled, thus making both algorithms rather slow.

In addition, it's worth mentioning that while both algorithms used embedded C implementations, they both use their respective "default" algorithm to handle objects that cannot be encoded normally. This means it will spend quite a lot of time in Python, which will slow it down substantially.

Conclusion

Currently, the NApps can handle approximately 3-4k EPM, which is much higher than scenario A (which is 1300EPM). Because of that, it seems to fit nicely in our current needs, especially considering this is a single NApp and a single-instance Kafka cluster, giving it a relatively low memory footprint.

However, a point you brought up that I agree with is the operational complexity of Kafka. I believe that if we went forward with this project, I believe training those in operation on how to use the shell commands in Kafka is critically important, as well as the Docker configurations. This may or may not be a major blow to the project, as it depends on the individuals willingness to learn.

Limitations

This test could have been improved by reducing network load through compression techniques (like gzip). This could be replicated with gzip included, which I hypothesize would help MessagePack more (since it's encoded directly to bytes).

Retries

I believe that this application would be considered "highly critical" and so making sure the application maintains fault tolerance at all times is crucial. This is especially evident if consumer applications only receive event notifications from the new listen_events NApp and Kafka cluster. Because of this, I retract my statement about letting events go unprocessed, and believe that some level of redesign for the NApp might be necessary.

Specifically, the listen_events NApp would be run via Kytos, so preparing for down-time would necessarily involve Kytos. Because of this, I don't believe tmaking the NApp fault tolerant is possible since that would require making Kytos fault tolerant (which is beyond the scope of this application). Thus, we would have to move our sights on making Kafka as fault tolerant as possible, which is much easier. This can be achieved through a multi-instance cluster, where partitions are balanced across each node (making it highly redundant).

I've worked with a multi-node cluster before so I can look towards doing another benchmark in a future test, perhaps using Netflix's Chaos Monkey or Pumba to simulate down-time or network delays.

@viniarck
Copy link
Member Author

3727 EPM, which is 223k EPH
3679 EPM, which is 220k EPH

It's good to know @Auwate that with one Kafka node and even using a non asyncio lib in the producer, that's a good start. Later on, pick an asyncio well supported client lib.

the algorithm for encoding data does not seem to affect the performance of the NApp
This could be replicated with gzip included, which I hypothesize would help MessagePack more (since it's encoded directly to bytes).

Yes, it's expected that serialization-wise MessagePack compared to JSON is expected to result in smaller payloads. But, for smaller payloads we're talking about microseconds, the bottleneck will tend to be IO-wise which will be in milliseconds scale. We can certainly adopt MessagePack as the only default option initially, but only if it's positively contributing to other values such as 1) contributing to a higher throughput on wire, and 2) due to smaller size requiring less storage on Kafka infrastructure. Both of these two points are important for us, if we have concrete data justifying 1) and 2) then we can chose and maintain it.

Yes, it'd be interesting to also have more benchmarks and comparing some combinations with the broker compression algorithms.

However, a point you brought up that I agree with is the operational complexity of Kafka. I believe that if we went forward with this project, I believe training those in operation on how to use the shell commands in Kafka is critically important, as well as the Docker configurations. This may or may not be a major blow to the project, as it depends on the individuals willingness to learn.

Yes, development-wise we know that Kafka will be able to scale and you're in the right path, but, knowing if the Operations (Ops) team will support and maintain it and be able to afford the cluster size and configuration that needs to be an early conversation to have. And then also understand the retention policies, if they only use shorter ones then maybe it won't require too much extra storage. Anyway, good to exercise some estimates cost here for how beefy those serves need to be too for being able to handle the throughput we'll be aiming for and then also for how long messages will be retainined, which will depend on the use cases too.

I don't believe tmaking the NApp fault tolerant is possible since that would require making Kytos fault tolerant (which is beyond the scope of this application)

If kytosd is down, there's no events to be forwarded. The rationale here isn't completely right. The main part of fault tolerance that matters here is broker connectivity outage. And if that will be supported, then understand locally what else can be used. Also, btw, Kytos-ng provides a simplified dead letter queue of events localhost:8181/api/kytos/core/dead_letter/ (this can be partly used too, although gotta careful which types of events can be reinjected in the handler, since some older events might not make sense if a newer once had been received, so there's some statefulness in certain events too)

Thus, we would have to move our sights on making Kafka as fault tolerant as possible, which is much easier.

That's reasonable, as long as it's crystal clear and then whoever will use it needs to have this in mind otherwise it might not suit the use case.

Other questions still open:

  • Who and which projects will be the consumers of this project? We need to know how the data will be used to also understand for how long data will need to be retained.
  • How flexible will be the events filtering configurations

Finally, once the questions are addressed and sorted out if it moves forward, in the end, if this NApp will be maintained in the kytos-ng org, I'll recommend that you also plan to send a blueprint PR, which basically is a document that make official what's being written and proposed here, just so in subquent implementation PRs, whoever is reviewing can refer to that doc to understand.

@Auwate
Copy link

Auwate commented Dec 5, 2024

How flexible will the event filtering configurations be?

As of right now, we only have a set that acts as a list of "unaccepted" events from being pushed into Kafka. However, this is a very simple approach and can be expanded outwards to increase the amount of flexibility. For example, we can use an external file (similar to settings.py) that acts as an updatable repository of event names that can be added and removed from the set if need be. Then, we could have a Python function that periodically reads from it to find any changes.

Who and which projects will be consumers?

This question is still rather unclear, as a similar project is going up as requested by Jeronimo and will be used by the network engineering team. I think Luis or Italo would have many ideas for consumers of this application, and so they are more qualified to answer this.

Benchmarks

Earlier we discussed benchmarking statistics for the Kafka cluster, which showed very promising results. However, it came to my attention that it would be wise to also measure the CPU usage, memory usage, and network usage (on a single node cluster working as both broker and controller) for both Kafka and Kytos. The following are characteristics I found upon doing some research:

Results

Kafka

The following are benchmarks for Kafka using different serialization strategies for data. It should be noted that the CPU usage was relatively the same across each, but their peaks (during heavy load) differed.

  • MessagePack w/ Gzip:

CPU Peak:

18.4%

Network Usage:

Incoming:
    Peak: 801 kBit/s

Outgoing:
    Peak: 555 kBit/s

Total memory:

Peak: 4% or 310MiB
  • MessagePack without Gzip:

CPU Peak:

25.4%

Network Usage:

Incoming:
    Peak: 797 kBits/s
Outgoing:
    Peak: 525 kBit/s

Total Memory:

Peak: 4.01% or 318MiB
  • JSON w/ Gzip:

CPU Peak:

27.34%

Network Usage:

Incoming:
    Peak: 797.73 kBit/s
Outgoing:
    Peak: 401 kBit/s

Total Memory:

Peak: 3.9% or 313MiB
  • JSON without Gzip:

CPU Peak:

31.98%

Network Usage:

Incoming:
    Peak: 761 kBit/s
Outgoing:
    Peak: 387 kBit/s

Total Memory:

Peak: 3.98% or 3.17MiB

Kytos

The following are benchmarks for Kytos with and without integration to Kytos:

Kytos - NO INTEGRATION

CPU AVG: 10% - 15%
CPU Peak: ~70.9 (AVG)

Kytos - WITH INTEGRATION

CPU AVG: 15% - 20%
CPU Peak: ~77.1% (AVG)

Conclusions

Starting with Kafka, the benchmarks showed that Gzip overall improved the network utilization of the applications by small margins, but it was nothing that necessarily showed massive improvement over.

It should be noted that the tests for Kafka were done using top and docker stats. If you would like to try it, you could use any monitoring tools at your disposal.

Next with Kytos, the benchmarks showed that integrating Kafka did yield a somewhat noticable increase in CPU peaks, with the average going up by 5%. This could be due to a number of things, but likely the serialization played a large role in this.

I also wanted to mention that I am currently working on a Kafka cluster that would align more closely with what the team may want in production: https://github.com/Auwate/kafka-service. It's been tested with Kytos itself so the data is working, with some level of resilience with the brokers having multiple containers. If this looks more appealing as a cluster, we can always work with this, but a single node cluster also works great.

@viniarck
Copy link
Member Author

viniarck commented Dec 6, 2024

That's good progress @Auwate.

Suggestions to consider:

  • Present the comparive data in a (html) table or spreadsheet in the future.
  • Are you also using different size of payloads and always injecting/using the same set of events? I'd recommend that you pick and inject a set of events like the one mentioned in this comment case a)

Regarding the use cases, here are some recommendations that should facilitate for you:

That's fine fine is only @jab1982 or network engineering know more about the use cases. But, the earlier you know the critically of it and the throughput required, the easier will be to map the requirements and understand the size of the cluster and overall cluster availability related to event bridging that we're discussing. For instance, would it be accepted if the cluster isn't available that events get lost? and when that happens how should consumers should be eventually notified to try to reconciliate? That might be acceptable for many use cases, but are those uses cases the one you guys are expecting? If it's not then we potentially using local application storage might be needed, that won't be a silver bullet either, but it's yet another thing that for certain kinds of full outages might be able to hold and resume well for some time (which is another requirement that we're discussing).

When using a message broker knowing for what and the workload used is of typically one of the most critical part, and maybe the main use cases aren't fully specified yet and certainly this can keep evolving. But, from the development point of view we should be able from the start to make it clear which guarantees this NApp will provide and which kind of load and throughput it'll expected to be able to handle. Once that's sorted out, then in the NApp blueprint and readme we can make it clear. That said, I highly recommend you to discuss with @jab1982 and the rest of the network engineering clear if they also know more about the consumers and the criticality of it, we can also consider generalized consumer use cases and then keep evolving.

To give you another example and analogy, the DB that NApps use can also have a full cluster outage, currently we don't provide other local application guarantees and that can leave in a state where new CRUD operations like creating a new EVC won't work temporarily, which might or not be critical, depends on the EVC importance, and in this case even the DB isn't UP the data plane and the rest of the network in the real world will still work, so it's not a full catastrophe from the AmLight newtork point of view and its transport use case for its customers. That said, knowing these guarantees, when provisioning and sizing the DB that will be used we recommend to use 3+ nodes and keeping them as closely to the kytosd process to minimize DB ops latency and try to keep it as available as possible to mitigate the risks here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
future_release Planned for the next release
Projects
None yet
4 participants