Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165" #3330

Open
gaurangomar opened this issue Oct 24, 2024 · 8 comments

Comments

@gaurangomar
Copy link

Environment:

  • eKuiper version : 1.13.3
  • Hardware configuration:
    Architecture: x86_64
    CPU op-mode(s): 32-bit, 64-bit
    Address sizes: 39 bits physical, 48 bits virtual
    Byte Order: Little Endian
    CPU(s): 8
    On-line CPU(s) list: 0-7
    Vendor ID: GenuineIntel
    Model name: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz
    CPU family: 6
    Model: 94
    Thread(s) per core: 2
    Core(s) per socket: 4
    Socket(s): 1
  • OS:
    NAME="Oracle Linux Server"
    VERSION="9.0"
  • Others:

What happened and what you expected to happen:
We have multiple rules and we are experiencing an intermittent issue that rules are getting into stopped state.
We are using MQTT as a source and same MQTT broker as SINK.
Rules works fine but intermittently we see that some rules go into stopped state and stop processing data publishing to the source topic.
We are not able to see any error logs, below are the logs which we observed.
We are always sending structured data into the source topic, random/ raw entries will never exist in the source topic.

time="2024-10-23T19:01:44Z" level=info msg="new mqtt client created" file="mqtt/connection.go:165" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="The connection to mqtt broker is established" file="mqtt/connection.go:62" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Open connector reader" file="mqtt/mqtt_source_connector.go:167" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Rule RULE_CSB1XGR011QG_CSB1WY9R11QG is replaced." file="processor/rule.go:112"
time="2024-10-23T19:01:44Z" level=info msg="Rule RULE_CSB1XGR011QG_CSB1WY9R11QG is update." file="processor/rule.go:90"
time="2024-10-23T19:03:17Z" level=info msg="Stream STREAM_CSB1X7KG11QG_CSB1WPKC11QG is replaced." file="processor/stream.go:213"
time="2024-10-23T19:03:17Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 RestartStrategy:0xc003b28750 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:48"
time="2024-10-23T19:03:17Z" level=info msg="Set config 'mqtt_source.mqtt_source_server' to 'tcp://edge-message-bus:1883' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:03:17Z" level=info msg="Set config 'mqtt_source.mqtt_source_qos' to '0' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:03:17Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:03:17Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=error msg="not found subscription id RULE_CSB1X7KG11QG_CSB1WPKC11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Opening stream" file="topo/topo.go:208" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="decode op started" file="node/decode_op.go:76" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WPGM11QG/CSB1X7KG11QG/CSB1WPKC11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="open sink node 1 instances with batchSize%!(EXTRA int=0)" file="node/sink_node.go:115" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:03:17Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:03:17Z" level=info msg="Opening source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG" file="node/source_connector_node.go:64" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Connecting to mqtt server" file="mqtt/mqtt_source_connector.go:86" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="new mqtt client created" file="mqtt/connection.go:165" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Open connector reader" file="mqtt/mqtt_source_connector.go:167" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="The connection to mqtt broker is established" file="mqtt/connection.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="The connection to mqtt broker is established successfully for tcp://edge-message-bus:1883." file="mqtt/mqtt.go:132"
time="2024-10-23T19:03:17Z" level=info msg="new mqtt client created" file="mqtt/mqtt_wrapper.go:80"
time="2024-10-23T19:03:17Z" level=info msg="Init client wrapper for client type mqtt" file="clients/client_registy.go:107"
time="2024-10-23T19:03:17Z" level=info msg="sink node mqtt_0 instance 0 starts with conf {Concurrency:1 Omitempty:false SendSingle:false DataTemplate: Format:json SchemaId: Delimiter: BufferLength:1024 Fields:[] DataField: BatchSize:0 LingerInterval:0 SinkConf:{MemoryCacheThreshold:1024 MaxDiskCache:1024000 BufferPageSize:256 EnableCache:false ResendInterval:0 CleanCacheAtStop:false ResendAlterQueue:false ResendPriority:0 ResendIndicatorField:}}" file="node/sink_node.go:154" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="The connection to mqtt broker tcp://edge-message-bus:1883 client id 768b49fd-9171-11ef-b0f5-c6a7ee380e91 established" file="mqtt/mqtt_wrapper.go:87"
time="2024-10-23T19:03:17Z" level=info msg="Rule RULE_CSB1X7KG11QG_CSB1WPKC11QG is replaced." file="processor/rule.go:112"
time="2024-10-23T19:03:17Z" level=info msg="Rule RULE_CSB1X7KG11QG_CSB1WPKC11QG is update." file="processor/rule.go:90"
time="2024-10-23T19:03:17Z" level=info msg="start removing RULE_CSB1XGR011QG_CSB1WY9R11QG metrics" file="topo/topo.go:311"
time="2024-10-23T19:03:17Z" level=info msg="finish removing RULE_CSB1XGR011QG_CSB1WY9R11QG metrics" file="topo/topo.go:321"
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=error msg="not found subscription id RULE_CSB1XGR011QG_CSB1WY9R11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:03:17Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="source connector STREAM_CSB1XGR011QG_CSB1WY9R11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WY8W11QG/CSB1XGR011QG/CSB1WY9R11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Stop rulestate RULE_CSB1XGR011QG_CSB1WY9R11QG" file="rule/ruleState.go:186"
time="2024-10-23T19:03:19Z" level=info msg="start removing RULE_CSB1X7KG11QG_CSB1WPKC11QG metrics" file="topo/topo.go:311"
time="2024-10-23T19:03:19Z" level=info msg="finish removing RULE_CSB1X7KG11QG_CSB1WPKC11QG metrics" file="topo/topo.go:321"
time="2024-10-23T19:03:19Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=error msg="not found subscription id RULE_CSB1X7KG11QG_CSB1WPKC11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:03:19Z" level=info msg="Stop rulestate RULE_CSB1X7KG11QG_CSB1WPKC11QG" file="rule/ruleState.go:186"
time="2024-10-23T19:03:19Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WPGM11QG/CSB1X7KG11QG/CSB1WPKC11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
�[?2004h

How to reproduce it (as minimally and precisely as possible):
We do not have any specific scenario to reproduce this issue, this is an intermittent issue.
We are using the etc/mqtt_source.yaml file for providing the source connection info, and for the SINK we use something like below.

"actions": [
    {
      "mqtt": {
        "server": "tcp://edge-message-bus:1883",
        "topic": "kuiper-sink-channel",
        "qos": 0,
        "username": "kuiper-user",
        "password": "password",
        "retained": false
      }
    }
  ]

Anything else we need to know?:
Let me know in any other details are required.

@ngjaying
Copy link
Collaborator

Hi @gaurangomar, looks like you have issue an update command then the rule stops. Could you please check your rule payload, make sure triggerred=true is set. { "id": "rule_scene", "triggered": true, "sql": "Select * FROM stream", "actions": [ { "nop": { "sendSingle": true } } ] }

@gaurangomar
Copy link
Author

Thanks @ngjaying I am not able to find the documentation for triggerred=true, could you please help me with the documentation link?

@ngjaying
Copy link
Collaborator

It is not documented. It is just a property of the rule like this: { "id": "rule_scene", "triggered": true, "sql": "Select * FROM stream", "actions": [ { "nop": { "sendSingle": true } } ] }

@gaurangomar
Copy link
Author

I am not passing this "triggered": true, property when creating the rule but, when I checked the already created rules using describe command line, I can see this property is already present in the rule.
Here is an example rule.
image

I believe setting property "triggered": true, is not the solution for the issue I am facing.
Could you please take a look?

@ngjaying
Copy link
Collaborator

Do you use eKuiper manager to update the rule? Could you please find out how to reproduce the problem? Thanks.

@gaurangomar
Copy link
Author

gaurangomar commented Oct 28, 2024

I am using the REST API, for creating and updating rules.

curl --location 'http://kuiper-host:59720/rules' \
--header 'Content-Type: application/json' \
--data '{
    "id": "RULE_CTR65AAA23ZG_7MX8KA5NKEE4",
    "sql": "------SQL-----",
    "actions": [
        {
            "mqtt": {
                "server": "ttcp://edge-message-bus:1883",
                "topic": "kuiper-sink",
                "qos": 0,
                "username": "user*****",
                "password": "password******",
                "retained": false
            }
        }
    ]
}'

This issue is intermittent, i tried reproducing this on demand but not able to do it, I tried all my use cases.

@ngjaying
Copy link
Collaborator

I don't think the info is enough to identify the problem. Form this little piece of log, it looks like someone has issued an update command and stop the rule intentionally or accidentally. If you have ever used eKuiper manager, the rule update may stop the rule if the triggered is not set to true by default. Could you find out why the rules are updated? Maybe a full log with description of the symtom (which rules stop unintentionally at what time etc.) is more helpful.

@gaurangomar
Copy link
Author

gaurangomar commented Oct 30, 2024

Hi @ngjaying,
The updation of the rules is part of our workflow, we update the rules as per the user's input.
We have a workflow where as per the users selections we create/ update the SQLs and Rules using REST APIs, this allows us to process the data from the MQTT source.
This is an intermittent issues, sometimes we see that the rules are not processing any data coming into the input channel.
Here are some logs:

time="2024-10-23T19:01:21Z" level=info msg="Stream STREAM_CSB1X7KG11QG_CSB1WPKC11QG is created." file="processor/stream.go:89"
time="2024-10-23T19:01:21Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 RestartStrategy:0xc0070f1ec0 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:48"
time="2024-10-23T19:01:21Z" level=info msg="Start rulestate RULE_CSB1X7KG11QG_CSB1WPKC11QG" file="rule/ruleState.go:182"
time="2024-10-23T19:01:21Z" level=info msg="Set config 'mqtt_source.mqtt_source_server' to 'tcp://edge-message-bus:1883' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:01:21Z" level=info msg="Set config 'mqtt_source.mqtt_source_qos' to '0' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:01:21Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:21Z" level=info msg="Rule RULE_CSB1X7KG11QG_CSB1WPKC11QG is created." file="processor/rule.go:75"
time="2024-10-23T19:01:21Z" level=info msg="Opening stream" file="topo/topo.go:208" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:21Z" level=info msg="decode op started" file="node/decode_op.go:76" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:21Z" level=info msg="Opening source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG" file="node/source_connector_node.go:64" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:21Z" level=info msg="open sink node 1 instances with batchSize%!(EXTRA int=0)" file="node/sink_node.go:115" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:21Z" level=info msg="Connecting to mqtt server" file="mqtt/mqtt_source_connector.go:86" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:21Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:21Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:21Z" level=info msg="new mqtt client created" file="mqtt/connection.go:165" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:21Z" level=info msg="The connection to mqtt broker is established successfully for tcp://edge-message-bus:1883." file="mqtt/mqtt.go:132"
time="2024-10-23T19:01:21Z" level=info msg="new mqtt client created" file="mqtt/mqtt_wrapper.go:80"
time="2024-10-23T19:01:21Z" level=info msg="Init client wrapper for client type mqtt" file="clients/client_registy.go:107"
time="2024-10-23T19:01:21Z" level=info msg="Open connector reader" file="mqtt/mqtt_source_connector.go:167" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:21Z" level=info msg="sink node mqtt_0 instance 0 starts with conf {Concurrency:1 Omitempty:false SendSingle:false DataTemplate: Format:json SchemaId: Delimiter: BufferLength:1024 Fields:[] DataField: BatchSize:0 LingerInterval:0 SinkConf:{MemoryCacheThreshold:1024 MaxDiskCache:1024000 BufferPageSize:256 EnableCache:false ResendInterval:0 CleanCacheAtStop:false ResendAlterQueue:false ResendPriority:0 ResendIndicatorField:}}" file="node/sink_node.go:154" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:21Z" level=info msg="The connection to mqtt broker is established" file="mqtt/connection.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:21Z" level=info msg="The connection to mqtt broker tcp://edge-message-bus:1883 client id 31516d69-9171-11ef-b0f5-c6a7ee380e91 established" file="mqtt/mqtt_wrapper.go:87"
time="2024-10-23T19:01:22Z" level=info msg="Stream STREAM_CSB1XGR011QG_CSB1WY9R11QG is created." file="processor/stream.go:89"
time="2024-10-23T19:01:22Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 RestartStrategy:0xc006515440 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:48"
time="2024-10-23T19:01:22Z" level=info msg="Start rulestate RULE_CSB1XGR011QG_CSB1WY9R11QG" file="rule/ruleState.go:182"
time="2024-10-23T19:01:22Z" level=info msg="Set config 'mqtt_source.mqtt_source_server' to 'tcp://edge-message-bus:1883' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:01:22Z" level=info msg="Set config 'mqtt_source.mqtt_source_qos' to '0' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:01:22Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:22Z" level=info msg="Rule RULE_CSB1XGR011QG_CSB1WY9R11QG is created." file="processor/rule.go:75"
time="2024-10-23T19:01:22Z" level=info msg="Opening stream" file="topo/topo.go:208" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:22Z" level=info msg="decode op started" file="node/decode_op.go:76" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:22Z" level=info msg="Opening source connector STREAM_CSB1XGR011QG_CSB1WY9R11QG" file="node/source_connector_node.go:64" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:22Z" level=info msg="Connecting to mqtt server" file="mqtt/mqtt_source_connector.go:86" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:22Z" level=info msg="open sink node 1 instances with batchSize%!(EXTRA int=0)" file="node/sink_node.go:115" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:22Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:22Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:22Z" level=info msg="The connection to mqtt broker is established successfully for tcp://edge-message-bus:1883." file="mqtt/mqtt.go:132"
time="2024-10-23T19:01:22Z" level=info msg="new mqtt client created" file="mqtt/mqtt_wrapper.go:80"
time="2024-10-23T19:01:22Z" level=info msg="Init client wrapper for client type mqtt" file="clients/client_registy.go:107"
time="2024-10-23T19:01:22Z" level=info msg="sink node mqtt_0 instance 0 starts with conf {Concurrency:1 Omitempty:false SendSingle:false DataTemplate: Format:json SchemaId: Delimiter: BufferLength:1024 Fields:[] DataField: BatchSize:0 LingerInterval:0 SinkConf:{MemoryCacheThreshold:1024 MaxDiskCache:1024000 BufferPageSize:256 EnableCache:false ResendInterval:0 CleanCacheAtStop:false ResendAlterQueue:false ResendPriority:0 ResendIndicatorField:}}" file="node/sink_node.go:154" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:22Z" level=info msg="The connection to mqtt broker tcp://edge-message-bus:1883 client id 321b4fcf-9171-11ef-b0f5-c6a7ee380e91 established" file="mqtt/mqtt_wrapper.go:87"
time="2024-10-23T19:01:22Z" level=info msg="new mqtt client created" file="mqtt/connection.go:165" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:22Z" level=info msg="Open connector reader" file="mqtt/mqtt_source_connector.go:167" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:22Z" level=info msg="The connection to mqtt broker is established" file="mqtt/connection.go:62" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Stream STREAM_CSB1X7KG11QG_CSB1WPKC11QG is replaced." file="processor/stream.go:213"
time="2024-10-23T19:01:44Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 RestartStrategy:0xc0062a1560 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:48"
time="2024-10-23T19:01:44Z" level=info msg="Set config 'mqtt_source.mqtt_source_server' to 'tcp://edge-message-bus:1883' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:01:44Z" level=info msg="Set config 'mqtt_source.mqtt_source_qos' to '0' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:01:44Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:44Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WPGM11QG/CSB1X7KG11QG/CSB1WPKC11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=error msg="not found subscription id RULE_CSB1X7KG11QG_CSB1WPKC11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:01:44Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="Opening stream" file="topo/topo.go:208" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="decode op started" file="node/decode_op.go:76" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="Opening source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG" file="node/source_connector_node.go:64" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="Connecting to mqtt server" file="mqtt/mqtt_source_connector.go:86" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="open sink node 1 instances with batchSize%!(EXTRA int=0)" file="node/sink_node.go:115" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:44Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:44Z" level=info msg="new mqtt client created" file="mqtt/connection.go:165" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="The connection to mqtt broker is established" file="mqtt/connection.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="The connection to mqtt broker is established successfully for tcp://edge-message-bus:1883." file="mqtt/mqtt.go:132"
time="2024-10-23T19:01:44Z" level=info msg="new mqtt client created" file="mqtt/mqtt_wrapper.go:80"
time="2024-10-23T19:01:44Z" level=info msg="The connection to mqtt broker tcp://edge-message-bus:1883 client id 3f282cb9-9171-11ef-b0f5-c6a7ee380e91 established" file="mqtt/mqtt_wrapper.go:87"
time="2024-10-23T19:01:44Z" level=info msg="Init client wrapper for client type mqtt" file="clients/client_registy.go:107"
time="2024-10-23T19:01:44Z" level=info msg="sink node mqtt_0 instance 0 starts with conf {Concurrency:1 Omitempty:false SendSingle:false DataTemplate: Format:json SchemaId: Delimiter: BufferLength:1024 Fields:[] DataField: BatchSize:0 LingerInterval:0 SinkConf:{MemoryCacheThreshold:1024 MaxDiskCache:1024000 BufferPageSize:256 EnableCache:false ResendInterval:0 CleanCacheAtStop:false ResendAlterQueue:false ResendPriority:0 ResendIndicatorField:}}" file="node/sink_node.go:154" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="Open connector reader" file="mqtt/mqtt_source_connector.go:167" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:01:44Z" level=info msg="Rule RULE_CSB1X7KG11QG_CSB1WPKC11QG is replaced." file="processor/rule.go:112"
time="2024-10-23T19:01:44Z" level=info msg="Rule RULE_CSB1X7KG11QG_CSB1WPKC11QG is update." file="processor/rule.go:90"
time="2024-10-23T19:01:44Z" level=info msg="Stream STREAM_CSB1XGR011QG_CSB1WY9R11QG is replaced." file="processor/stream.go:213"
time="2024-10-23T19:01:44Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 RestartStrategy:0xc004d46450 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:48"
time="2024-10-23T19:01:44Z" level=info msg="Set config 'mqtt_source.mqtt_source_server' to 'tcp://edge-message-bus:1883' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:01:44Z" level=info msg="Set config 'mqtt_source.mqtt_source_qos' to '0' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:01:44Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:44Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=error msg="not found subscription id RULE_CSB1XGR011QG_CSB1WY9R11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:01:44Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="source connector STREAM_CSB1XGR011QG_CSB1WY9R11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WY8W11QG/CSB1XGR011QG/CSB1WY9R11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Opening stream" file="topo/topo.go:208" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="decode op started" file="node/decode_op.go:76" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Opening source connector STREAM_CSB1XGR011QG_CSB1WY9R11QG" file="node/source_connector_node.go:64" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Connecting to mqtt server" file="mqtt/mqtt_source_connector.go:86" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="open sink node 1 instances with batchSize%!(EXTRA int=0)" file="node/sink_node.go:115" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:44Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:01:44Z" level=info msg="The connection to mqtt broker is established successfully for tcp://edge-message-bus:1883." file="mqtt/mqtt.go:132"
time="2024-10-23T19:01:44Z" level=info msg="new mqtt client created" file="mqtt/mqtt_wrapper.go:80"
time="2024-10-23T19:01:44Z" level=info msg="The connection to mqtt broker tcp://edge-message-bus:1883 client id 3f328139-9171-11ef-b0f5-c6a7ee380e91 established" file="mqtt/mqtt_wrapper.go:87"
time="2024-10-23T19:01:44Z" level=info msg="Init client wrapper for client type mqtt" file="clients/client_registy.go:107"
time="2024-10-23T19:01:44Z" level=info msg="sink node mqtt_0 instance 0 starts with conf {Concurrency:1 Omitempty:false SendSingle:false DataTemplate: Format:json SchemaId: Delimiter: BufferLength:1024 Fields:[] DataField: BatchSize:0 LingerInterval:0 SinkConf:{MemoryCacheThreshold:1024 MaxDiskCache:1024000 BufferPageSize:256 EnableCache:false ResendInterval:0 CleanCacheAtStop:false ResendAlterQueue:false ResendPriority:0 ResendIndicatorField:}}" file="node/sink_node.go:154" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="new mqtt client created" file="mqtt/connection.go:165" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="The connection to mqtt broker is established" file="mqtt/connection.go:62" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Open connector reader" file="mqtt/mqtt_source_connector.go:167" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Rule RULE_CSB1XGR011QG_CSB1WY9R11QG is replaced." file="processor/rule.go:112"
time="2024-10-23T19:01:44Z" level=info msg="Rule RULE_CSB1XGR011QG_CSB1WY9R11QG is update." file="processor/rule.go:90"
time="2024-10-23T19:03:17Z" level=info msg="Stream STREAM_CSB1X7KG11QG_CSB1WPKC11QG is replaced." file="processor/stream.go:213"
time="2024-10-23T19:03:17Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 RestartStrategy:0xc003b28750 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:48"
time="2024-10-23T19:03:17Z" level=info msg="Set config 'mqtt_source.mqtt_source_server' to 'tcp://edge-message-bus:1883' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:03:17Z" level=info msg="Set config 'mqtt_source.mqtt_source_qos' to '0' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:03:17Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:03:17Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=error msg="not found subscription id RULE_CSB1X7KG11QG_CSB1WPKC11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Opening stream" file="topo/topo.go:208" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="decode op started" file="node/decode_op.go:76" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WPGM11QG/CSB1X7KG11QG/CSB1WPKC11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="open sink node 1 instances with batchSize%!(EXTRA int=0)" file="node/sink_node.go:115" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:03:17Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:03:17Z" level=info msg="Opening source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG" file="node/source_connector_node.go:64" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Connecting to mqtt server" file="mqtt/mqtt_source_connector.go:86" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="new mqtt client created" file="mqtt/connection.go:165" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Open connector reader" file="mqtt/mqtt_source_connector.go:167" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="The connection to mqtt broker is established" file="mqtt/connection.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="The connection to mqtt broker is established successfully for tcp://edge-message-bus:1883." file="mqtt/mqtt.go:132"
time="2024-10-23T19:03:17Z" level=info msg="new mqtt client created" file="mqtt/mqtt_wrapper.go:80"
time="2024-10-23T19:03:17Z" level=info msg="Init client wrapper for client type mqtt" file="clients/client_registy.go:107"
time="2024-10-23T19:03:17Z" level=info msg="sink node mqtt_0 instance 0 starts with conf {Concurrency:1 Omitempty:false SendSingle:false DataTemplate: Format:json SchemaId: Delimiter: BufferLength:1024 Fields:[] DataField: BatchSize:0 LingerInterval:0 SinkConf:{MemoryCacheThreshold:1024 MaxDiskCache:1024000 BufferPageSize:256 EnableCache:false ResendInterval:0 CleanCacheAtStop:false ResendAlterQueue:false ResendPriority:0 ResendIndicatorField:}}" file="node/sink_node.go:154" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="The connection to mqtt broker tcp://edge-message-bus:1883 client id 768b49fd-9171-11ef-b0f5-c6a7ee380e91 established" file="mqtt/mqtt_wrapper.go:87"
time="2024-10-23T19:03:17Z" level=info msg="Rule RULE_CSB1X7KG11QG_CSB1WPKC11QG is replaced." file="processor/rule.go:112"
time="2024-10-23T19:03:17Z" level=info msg="Rule RULE_CSB1X7KG11QG_CSB1WPKC11QG is update." file="processor/rule.go:90"
time="2024-10-23T19:03:17Z" level=info msg="start removing RULE_CSB1XGR011QG_CSB1WY9R11QG metrics" file="topo/topo.go:311"
time="2024-10-23T19:03:17Z" level=info msg="finish removing RULE_CSB1XGR011QG_CSB1WY9R11QG metrics" file="topo/topo.go:321"
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=error msg="not found subscription id RULE_CSB1XGR011QG_CSB1WY9R11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:03:17Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="source connector STREAM_CSB1XGR011QG_CSB1WY9R11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WY8W11QG/CSB1XGR011QG/CSB1WY9R11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Stop rulestate RULE_CSB1XGR011QG_CSB1WY9R11QG" file="rule/ruleState.go:186"
time="2024-10-23T19:03:19Z" level=info msg="start removing RULE_CSB1X7KG11QG_CSB1WPKC11QG metrics" file="topo/topo.go:311"
time="2024-10-23T19:03:19Z" level=info msg="finish removing RULE_CSB1X7KG11QG_CSB1WPKC11QG metrics" file="topo/topo.go:321"
time="2024-10-23T19:03:19Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=error msg="not found subscription id RULE_CSB1X7KG11QG_CSB1WPKC11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:03:19Z" level=info msg="Stop rulestate RULE_CSB1X7KG11QG_CSB1WPKC11QG" file="rule/ruleState.go:186"
time="2024-10-23T19:03:19Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WPGM11QG/CSB1X7KG11QG/CSB1WPKC11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG

Additionally I tried creating and updating rules manually but the rules are not getting stopped/closed they are working as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants