Skip to content

Commit

Permalink
Create AgentNativeConnectionProvider and DistributedNativeConnectionP…
Browse files Browse the repository at this point in the history
…rovider (#688)

* Introduce datacenterAware Config for CQLSession

* Introduce rackAware and hostAware

* Fix ecc.yml Comments and Default Values for Datacenters and Racks

* Initial Implementation of AgentNativeConnectionProvider and DatacenterNativeConnectionProvider

* Introduce rackAware and hostAware

* Rebase feature/provider With feature/config

* Introduce Distributed and Agent Provider

* Add AgentNative Task on CHANGES.MD
  • Loading branch information
VictorCavichioli authored Jul 11, 2024
1 parent a78b72a commit 4efede0
Show file tree
Hide file tree
Showing 13 changed files with 909 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changes
## Version 6.0.0

* Expose AgentNativeConnectionProvider on Connection and Application Module - Issue #673
* Create DatacenterAwareConfig to add Hosts in CQL Session Through ecc.yml - Issue #671

## Version 5.0.5
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.auth.AuthProvider;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.Node;

import io.micrometer.core.instrument.MeterRegistry;
import com.ericsson.bss.cassandra.ecchronos.application.config.Config;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.Security;
import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.impl.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.application.config.connection.AgentConnectionConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.connection.NativeConnection;
import com.ericsson.bss.cassandra.ecchronos.connection.CertificateHandler;
import com.ericsson.bss.cassandra.ecchronos.core.repair.DefaultRepairConfigurationProvider;

public class AgentNativeConnectionProvider implements NativeConnectionProvider
{
private static final Logger LOG = LoggerFactory.getLogger(AgentNativeConnectionProvider.class);

private final DistributedNativeConnectionProvider myDistributedNativeConnectionProvider;

public AgentNativeConnectionProvider(
final Config config,
final Supplier<Security.CqlSecurity> cqlSecuritySupplier,
final CertificateHandler certificateHandler,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider,
final MeterRegistry meterRegistry)
{
NativeConnection nativeConfig = config.getConnectionConfig().getCqlConnection();
AgentConnectionConfig agentConnectionConfig = config.getConnectionConfig().getCqlConnection()
.getAgentConnectionConfig();
boolean remoteRouting = nativeConfig.getRemoteRouting();
Security.CqlSecurity cqlSecurity = cqlSecuritySupplier.get();
boolean authEnabled = cqlSecurity.getCqlCredentials().isEnabled();
boolean tlsEnabled = cqlSecurity.getCqlTlsConfig().isEnabled();

AuthProvider authProvider = null;
if (authEnabled)
{
authProvider = new ReloadingAuthProvider(() -> cqlSecuritySupplier.get().getCqlCredentials());
}

SslEngineFactory sslEngineFactory = null;
if (tlsEnabled)
{
sslEngineFactory = certificateHandler;
}

DistributedNativeConnectionProvider.Builder nativeConnectionBuilder =
DistributedNativeConnectionProvider.builder()
.withInitialContactPoints(resolveInitialContactPoints(agentConnectionConfig.getContactPoints()))
.withAgentType(agentConnectionConfig.getType().toString())
.withLocalDatacenter(agentConnectionConfig.getLocalDatacenter())
.withRemoteRouting(remoteRouting)
.withAuthProvider(authProvider)
.withSslEngineFactory(sslEngineFactory)
.withMetricsEnabled(config.getStatisticsConfig().isEnabled())
.withMeterRegistry(meterRegistry)
.withSchemaChangeListener(defaultRepairConfigurationProvider)
.withNodeStateListener(defaultRepairConfigurationProvider);

LOG.info("Preparing Agent Connection Config");
nativeConnectionBuilder = resolveAgentProviderBuilder(nativeConnectionBuilder, agentConnectionConfig);
LOG.info("Establishing Connection With Nodes");
myDistributedNativeConnectionProvider = tryEstablishConnection(nativeConnectionBuilder);
}

public final DistributedNativeConnectionProvider.Builder resolveAgentProviderBuilder(
final DistributedNativeConnectionProvider.Builder builder,
final AgentConnectionConfig agentConnectionConfig
)
{
switch (agentConnectionConfig.getType())
{
case datacenterAware:
LOG.info("Using DatacenterAware as Agent Config");
return builder.withDatacenterAware(resolveDatacenterAware(
agentConnectionConfig.getDatacenterAware()));
case rackAware:
LOG.info("Using RackAware as Agent Config");
return builder.withRackAware(resolveRackAware(
agentConnectionConfig.getRackAware()));
case hostAware:
LOG.info("Using HostAware as Agent Config");
return builder.withHostAware(resolveHostAware(
agentConnectionConfig.getHostAware()));
default:
}
return builder;
}

public final List<InetSocketAddress> resolveInitialContactPoints(
final Map<String, AgentConnectionConfig.Host> contactPoints
)
{
List<InetSocketAddress> resolvedContactPoints = new ArrayList<>();
for (AgentConnectionConfig.Host host : contactPoints.values())
{
InetSocketAddress tmpAddress = new InetSocketAddress(host.getHost(), host.getPort());
resolvedContactPoints.add(tmpAddress);
}
return resolvedContactPoints;
}

public final List<String> resolveDatacenterAware(final AgentConnectionConfig.DatacenterAware datacenterAware)
{
List<String> datacenterNames = new ArrayList<>();
for
(
AgentConnectionConfig.DatacenterAware.Datacenter datacenter
:
datacenterAware.getDatacenters().values())
{
datacenterNames.add(datacenter.getName());
}
return datacenterNames;
}

public final List<Map<String, String>> resolveRackAware(final AgentConnectionConfig.RackAware rackAware)
{
List<Map<String, String>> rackList = new ArrayList<>();
for
(
AgentConnectionConfig.RackAware.Rack rack
:
rackAware.getRacks().values()
)
{
Map<String, String> rackInfo = new HashMap<>();
rackInfo.put("datacenterName", rack.getDatacenterName());
rackInfo.put("rackName", rack.getRackName());
rackList.add(rackInfo);
}
return rackList;
}

public final List<InetSocketAddress> resolveHostAware(final AgentConnectionConfig.HostAware hostAware)
{
List<InetSocketAddress> resolvedHosts = new ArrayList<>();
for
(
AgentConnectionConfig.Host host
:
hostAware.getHosts().values()
)
{
InetSocketAddress tmpAddress = new InetSocketAddress(host.getHost(), host.getPort());
resolvedHosts.add(tmpAddress);
}
return resolvedHosts;
}

public final DistributedNativeConnectionProvider tryEstablishConnection(
final DistributedNativeConnectionProvider.Builder builder)
{
try
{
return builder.build();
}
catch (AllNodesFailedException | IllegalStateException e)
{
LOG.error("Unexpected interrupt while trying to connect to Cassandra. Reason: ", e);
throw e;
}
}

@Override
public final CqlSession getSession()
{
return myDistributedNativeConnectionProvider.getSession();
}

@Override
public final Node getLocalNode()
{
return myDistributedNativeConnectionProvider.getLocalNode();
}

@Override
public final boolean getRemoteRouting()
{
return myDistributedNativeConnectionProvider.getRemoteRouting();
}

@Override
public final void close()
{
myDistributedNativeConnectionProvider.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import java.util.Map;
import java.util.stream.Collectors;

import com.ericsson.bss.cassandra.ecchronos.application.ConfigurationException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public final class AgentConnectionConfig
{
private boolean enabled = false;
private ConnectionType myType = ConnectionType.datacenterAware;
private String myLocalDatacenter;
private Map<String, Host> myContactPoints = new HashMap<>();
private DatacenterAware myDatacenterAware = new DatacenterAware();
private RackAware myRackAware = new RackAware();
Expand All @@ -44,15 +46,15 @@ public AgentConnectionConfig(
}

@JsonProperty("enabled")
public boolean isEnabled()
public void setEnabled(final boolean enableAgent)
{
return enabled;
enabled = enableAgent;
}

@JsonProperty("enabled")
public void setEnabled(final boolean enableAgent)
public boolean isEnabled()
{
enabled = enableAgent;
return enabled;
}

@JsonProperty("type")
Expand All @@ -62,9 +64,33 @@ public ConnectionType getType()
}

@JsonProperty("type")
public void setType(final String type)
public void setType(final String type) throws ConfigurationException
{
myType = ConnectionType.valueOf(type);
try
{
myType = ConnectionType.valueOf(type);
}
catch (IllegalArgumentException e)
{
throw new ConfigurationException(
"Invalid connection type: "
+
type
+
"\nAccepted configurations are: datacenterAware, rackAware, hostAware", e);
}
}

@JsonProperty("localDatacenter")
public void setLocalDatacenter(final String localDatacenter)
{
myLocalDatacenter = localDatacenter;
}

@JsonProperty("localDatacenter")
public String getLocalDatacenter()
{
return myLocalDatacenter;
}

@JsonProperty("contactPoints")
Expand All @@ -74,7 +100,7 @@ public Map<String, Host> getContactPoints()
}

@JsonProperty("contactPoints")
public void setMyContactPoints(final List<Host> contactPoints)
public void setContactPoints(final List<Host> contactPoints)
{
if (contactPoints != null)
{
Expand Down Expand Up @@ -134,7 +160,7 @@ public DatacenterAware()
}

@JsonProperty("datacenters")
public Map<String, Datacenter> getDatacenterAware()
public Map<String, Datacenter> getDatacenters()
{
return myDatacenters;
}
Expand Down Expand Up @@ -187,13 +213,13 @@ public RackAware()
}

@JsonProperty("racks")
public Map<String, Rack> getRackAware()
public Map<String, Rack> getRacks()
{
return myRackAware;
}

@JsonProperty("racks")
public void setMyRackAware(final List<Rack> rackAware)
public void setRacks(final List<Rack> rackAware)
{
if (rackAware != null)
{
Expand Down Expand Up @@ -273,7 +299,6 @@ public void setHosts(final List<Host> hosts)
public static final class Host
{
private static final int DEFAULT_PORT = 9042;

private String myHost = "localhost";

private int myPort = DEFAULT_PORT;
Expand Down
Loading

0 comments on commit 4efede0

Please sign in to comment.