Skip to content

Commit

Permalink
HD-76 | NIFI-7856 | NIFI-8196 : Applied required patches
Browse files Browse the repository at this point in the history
  • Loading branch information
harshith-212 committed Oct 17, 2023
1 parent 2e6408e commit 678c253
Show file tree
Hide file tree
Showing 36 changed files with 566 additions and 178 deletions.
14 changes: 7 additions & 7 deletions nifi-docs/src/main/asciidoc/walkthroughs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ NOTE: This document is provided with no warranty. All steps have been evaluated

== Installing Apache NiFi

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. In addition to NiFi, there is the NiFi Toolkit, a collection of command-line tools which help perform administrative tasks such as interacting with remote services, generating TLS certificates, managing nodes in a cluster, and encrypting sensitive configuration values.
Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. In addition to NiFi, there is the NiFi Toolkit, a collection of command-line tools which help perform administrative tasks such as interacting with remote services, generating TLS certificates, managing nodes in a cluster, and encrypting sensitive configuration values.

|=======================================================================================================================
|Description | Instructions on downloading the Apache NiFi application and Toolkit
Expand Down Expand Up @@ -1146,9 +1146,9 @@ NOTE: The `nifi.cluster.load.balance.host=` entry must be manually populated her
** `<property name="Connect String"></property>` -> `<property name="Connect String">node1.nifi:2181,node2.nifi:2182,node3.nifi:2183</property>`
* `cp node1.nifi/conf/state-management.xml node2.nifi/conf/state-management.xml` -- Copies the modified `state-management.xml` file from `node1` to `node2`
* `cp node1.nifi/conf/state-management.xml node3.nifi/conf/state-management.xml` -- Copies the modified `state-management.xml` file from `node1` to `node3`
. Update the `authorizers.xml` file. This file contains the *Initial Node Identities* and *Initial User Identities*. The *users* consist of all human users _and_ all nodes in the cluster. The `authorizers.xml` file can be identical on each node (assuming no other changes were made), so the modified file will be copied to the other nodes.
. Update the `authorizers.xml` file. This file contains the *Initial Node Identities* and *Initial User Identities*. The *users* consist of all human users _and_ all nodes in the cluster. The `authorizers.xml` file can be identical on each node (assuming no other changes were made), so the modified file will be copied to the other nodes.
+
NOTE: Each `Initial User Identity` must have a **unique** name (`Initial User Identity 1`, `Initial User Identity 2`, etc.).
NOTE: Each `Initial User Identity` must have a **unique** name (`Initial User Identity 1`, `Initial User Identity 2`, etc.).

* `$EDITOR node1.nifi/conf/authorizers.xml` -- Opens the `authorizers.xml` file in a text editor
* Update the following lines:
Expand Down Expand Up @@ -1179,11 +1179,11 @@ image::authorizers-xml-initial-node-identities.png["authorizers.xml with Initial
* Update the following lines:
** `nifi.cluster.flow.election.max.wait.time=5 mins` -> `nifi.cluster.flow.election.max.wait.time=1 min` -- Changes the flow election wait time to 1 min, speeding up cluster availability
** `nifi.cluster.flow.election.max.candidates=` -> `nifi.cluster.flow.election.max.candidates=3` -- Changes the flow election to occur when 3 nodes are present, speeding up cluster availability
. Start the NiFi cluster. Once all three nodes have started and joined the cluster, the flow is agreed upon and a cluster coordinator is elected, the UI is available on any of the three nodes.
. Start the NiFi cluster. Once all three nodes have started and joined the cluster, the flow is agreed upon and a cluster coordinator is elected, the UI is available on any of the three nodes.
* `./node1.nifi/bin/nifi.sh start` -- Starts `node1`
* `./node2.nifi/bin/nifi.sh start` -- Starts `node2`
* `./node3.nifi/bin/nifi.sh start` -- Starts `node3`
. Wait approximately 30 seconds and open the web browser to link:https://node3.nifi:9443/nifi[`https://node3.nifi:9443/nifi`^]. The cluster should be available. _Note that the Initial Admin Identity has no permissions on the root process group. This is an artifact of legacy design decisions where the root process group ID is not known at initial start time._
. Wait approximately 30 seconds and open the web browser to link:https://node3.nifi:9443/nifi[`https://node3.nifi:9443/nifi`^]. The cluster should be available. _Note that the Initial Admin Identity has no permissions on the root process group. This is an artifact of legacy design decisions where the root process group ID is not known at initial start time._
+
--
_The running cluster_
Expand All @@ -1194,14 +1194,14 @@ _The cluster status from the global menu at the top right_

image::nifi-secure-cluster-status.png["NiFi secure cluster status pane"]
--
. To update the Initial Admin Identity's permissions for the root process group, stop each node in the cluster and remove the `authorizations.xml` and `users.xml` file from each node. These files will be regenerated when the node starts again and be populated with the correct permissions.
. To update the Initial Admin Identity's permissions for the root process group, stop each node in the cluster and remove the `authorizations.xml` and `users.xml` file from each node. These files will be regenerated when the node starts again and be populated with the correct permissions.
* `./node1.nifi/bin/nifi.sh stop` -- Stops `node1`
* `rm node1.nifi/conf/authorizations.xml node1.nifi/conf/users.xml` -- Removes the `authorizations.xml` and `users.xml` for `node1`
* `./node2.nifi/bin/nifi.sh stop` -- Stops `node2`
* `rm node2.nifi/conf/authorizations.xml node2.nifi/conf/users.xml` -- Removes the `authorizations.xml` and `users.xml` for `node2`
* `./node3.nifi/bin/nifi.sh stop` -- Stops `node3`
* `rm node3.nifi/conf/authorizations.xml node3.nifi/conf/users.xml` -- Removes the `authorizations.xml` and `users.xml` for `node3`
. Start the nodes again.
. Start the nodes again.
* `./node1.nifi/bin/nifi.sh start` -- Starts `node1`
* `./node2.nifi/bin/nifi.sh start` -- Starts `node2`
* `./node3.nifi/bin/nifi.sh start` -- Starts `node3`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.nifi.cluster.coordination;

import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
Expand Down Expand Up @@ -276,6 +277,12 @@ public interface ClusterCoordinator {
*/
void registerEventListener(ClusterTopologyEventListener eventListener);

/**
* Validates that the heartbeat is valid and if not takes appropriate action to rectify
*/
default void validateHeartbeat(NodeHeartbeat nodeHeartbeat) {
}

/**
* Stops notifying the given listener when cluster topology events occurs
* @param eventListener the event listener to stop notifying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ public interface NodeHeartbeat {
* @return the time that the node reports having started NiFi
*/
long getSystemStartTime();

/**
* @return the number of updates that have occurred to the Revision Manager
*/
long getRevisionUpdateCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.nifi.cluster.protocol;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;

import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
Expand All @@ -31,8 +27,16 @@
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Objects;

public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
private static final Logger logger = LoggerFactory.getLogger(AbstractNodeProtocolSender.class);
private final SocketConfiguration socketConfiguration;
private final ProtocolContext<ProtocolMessage> protocolContext;

Expand All @@ -42,10 +46,24 @@ public AbstractNodeProtocolSender(final SocketConfiguration socketConfiguration,
}

@Override
public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg, final boolean allowConnectToSelf) throws ProtocolException, UnknownServiceAddressException {
Socket socket = null;
try {
socket = createSocket();
final InetSocketAddress socketAddress;
try {
socketAddress = getServiceAddress();
} catch (final IOException e) {
throw new ProtocolException("Could not determined address of Cluster Coordinator", e);
}

// If node is not allowed to connect to itself, then we need to check the address of the Cluster Coordinator.
// If the Cluster Coordinator is currently set to this node, then we will throw an UnknownServiceAddressException
if (!allowConnectToSelf) {
validateNotConnectingToSelf(msg, socketAddress);
}

logger.info("Cluster Coordinator is located at {}. Will send Cluster Connection Request to this address", socketAddress);
socket = createSocket(socketAddress);

try {
// marshal message to output stream
Expand Down Expand Up @@ -76,6 +94,21 @@ public ConnectionResponseMessage requestConnection(final ConnectionRequestMessag
}
}

private void validateNotConnectingToSelf(final ConnectionRequestMessage msg, final InetSocketAddress socketAddress) {
final NodeIdentifier localNodeIdentifier = msg.getConnectionRequest().getProposedNodeIdentifier();
if (localNodeIdentifier == null) {
return;
}

final String localAddress = localNodeIdentifier.getSocketAddress();
final int localPort = localNodeIdentifier.getSocketPort();

if (Objects.equals(localAddress, socketAddress.getHostString()) && localPort == socketAddress.getPort()) {
throw new UnknownServiceAddressException("Cluster Coordinator is currently " + socketAddress.getHostString() + ":" + socketAddress.getPort() + ", which is this node, but " +
"connecting to self is not allowed at this phase of the lifecycle. This node must wait for a new Cluster Coordinator to be elected before connecting to the cluster.");
}
}

@Override
public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
final String hostname;
Expand Down Expand Up @@ -112,11 +145,9 @@ public ClusterWorkloadResponseMessage clusterWorkload(final ClusterWorkloadReque
throw new ProtocolException("Expected message type '" + MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" + responseMessage.getType() + "'");
}

private Socket createSocket() {
InetSocketAddress socketAddress = null;
private Socket createSocket(final InetSocketAddress socketAddress) {
try {
// create a socket
socketAddress = getServiceAddress();
return SocketUtils.createSocket(socketAddress, socketConfiguration);
} catch (final IOException ioe) {
if (socketAddress == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.cluster.protocol;

import org.apache.nifi.web.Revision;
import org.apache.nifi.web.revision.RevisionSnapshot;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class ComponentRevisionSnapshot {
private List<ComponentRevision> componentRevisions;
private Long revisionUpdateCount;

public List<ComponentRevision> getComponentRevisions() {
return componentRevisions;
}

public void setComponentRevisions(final List<ComponentRevision> componentRevisions) {
this.componentRevisions = componentRevisions;
}

public Long getRevisionUpdateCount() {
return revisionUpdateCount;
}

public void setRevisionUpdateCount(final Long revisionUpdateCount) {
this.revisionUpdateCount = revisionUpdateCount;
}

public static ComponentRevisionSnapshot fromRevisionSnapshot(final RevisionSnapshot revisionSnapshot) {
final List<ComponentRevision> componentRevisions = revisionSnapshot.getRevisions().stream()
.map(ComponentRevision::fromRevision)
.collect(Collectors.toList());

final ComponentRevisionSnapshot componentRevisionSnapshot = new ComponentRevisionSnapshot();
componentRevisionSnapshot.setComponentRevisions(componentRevisions);
componentRevisionSnapshot.setRevisionUpdateCount(revisionSnapshot.getRevisionUpdateCount());
return componentRevisionSnapshot;
}

public RevisionSnapshot toRevisionSnapshot() {
final List<Revision> revisions = componentRevisions == null ? Collections.emptyList() : componentRevisions.stream()
.map(ComponentRevision::toRevision)
.collect(Collectors.toList());
final long updateCount = revisionUpdateCount == null ? 0L : revisionUpdateCount;

return new RevisionSnapshot(revisions, updateCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public class ConnectionResponse {
private final DataFlow dataFlow;
private final String instanceId;
private final List<NodeConnectionStatus> nodeStatuses;
private final List<ComponentRevision> componentRevisions;
private final ComponentRevisionSnapshot revisionSnapshot;


public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow dataFlow,
final String instanceId, final List<NodeConnectionStatus> nodeStatuses, final List<ComponentRevision> componentRevisions) {
final String instanceId, final List<NodeConnectionStatus> nodeStatuses, final ComponentRevisionSnapshot revisionSnapshot) {

if (nodeIdentifier == null) {
throw new IllegalArgumentException("Node identifier may not be empty or null.");
Expand All @@ -57,7 +57,7 @@ public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow da
this.rejectionReason = null;
this.instanceId = instanceId;
this.nodeStatuses = Collections.unmodifiableList(new ArrayList<>(nodeStatuses));
this.componentRevisions = componentRevisions == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(componentRevisions));
this.revisionSnapshot = revisionSnapshot;
}

public ConnectionResponse(final int tryLaterSeconds, final String explanation) {
Expand All @@ -70,7 +70,7 @@ public ConnectionResponse(final int tryLaterSeconds, final String explanation) {
this.rejectionReason = explanation;
this.instanceId = null;
this.nodeStatuses = null;
this.componentRevisions = null;
this.revisionSnapshot = null;
}

private ConnectionResponse(final String rejectionReason) {
Expand All @@ -80,7 +80,7 @@ private ConnectionResponse(final String rejectionReason) {
this.rejectionReason = rejectionReason;
this.instanceId = null;
this.nodeStatuses = null;
this.componentRevisions = null;
this.revisionSnapshot = null;
}

public static ConnectionResponse createBlockedByFirewallResponse() {
Expand Down Expand Up @@ -123,7 +123,7 @@ public List<NodeConnectionStatus> getNodeConnectionStatuses() {
return nodeStatuses;
}

public List<ComponentRevision> getComponentRevisions() {
return componentRevisions;
public ComponentRevisionSnapshot getComponentRevisions() {
return revisionSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
*/
package org.apache.nifi.cluster.protocol;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.security.xml.XmlUtils;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.security.xml.XmlUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

/**
* The payload of the heartbeat. The payload contains status to inform the cluster manager the current workload of this node.
Expand All @@ -53,6 +54,7 @@ public class HeartbeatPayload {
private long totalFlowFileBytes;
private long systemStartTime;
private List<NodeConnectionStatus> clusterStatus;
private long revisionUpdateCount;

public int getActiveThreadCount() {
return activeThreadCount;
Expand Down Expand Up @@ -94,6 +96,14 @@ public void setClusterStatus(final List<NodeConnectionStatus> clusterStatus) {
this.clusterStatus = clusterStatus;
}

public long getRevisionUpdateCount() {
return revisionUpdateCount;
}

public void setRevisionUpdateCount(final long revisionUpdateCount) {
this.revisionUpdateCount = revisionUpdateCount;
}

public byte[] marshal() throws ProtocolException {
final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
marshal(this, payloadBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ public interface NodeProtocolSender {
* Sends a "connection request" message to the cluster manager.
*
* @param msg a message
* @param allowConnectionToSelf whether or not the node should be allowed to connect to the cluster if the Cluster Coordinator is this node
* @return the response
* @throws UnknownServiceAddressException if the cluster manager's address
* is not known
* @throws ProtocolException if communication failed
*/
ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException;
ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg, boolean allowConnectionToSelf) throws ProtocolException, UnknownServiceAddressException;

/**
* Sends a heartbeat to the address given
Expand Down
Loading

0 comments on commit 678c253

Please sign in to comment.