Skip to content

Commit

Permalink
Add YamlComputeNodeData (#31308)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored May 19, 2024
1 parent 6cf5e7e commit 72671d8
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,16 @@
package org.apache.shardingsphere.infra.instance;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.RequiredArgsConstructor;

@NoArgsConstructor
/**
* Compute node data.
*/
@RequiredArgsConstructor
@Getter
@Setter
public final class ComputeNodeData {

private String attribute;

private String version;
private final String attribute;

public ComputeNodeData(final String attribute, final String version) {
this.attribute = attribute;
this.version = version;
}
private final String version;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.instance.yaml;

import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;

/**
* YAML compute node data.
*/
@Getter
@Setter
public final class YamlComputeNodeData implements YamlConfiguration {

private String attribute;

private String version;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.instance.yaml;

import org.apache.shardingsphere.infra.instance.ComputeNodeData;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;

/**
* YAML compute node data swapper.
*/
public final class YamlComputeNodeDataSwapper implements YamlConfigurationSwapper<YamlComputeNodeData, ComputeNodeData> {

@Override
public YamlComputeNodeData swapToYamlConfiguration(final ComputeNodeData data) {
YamlComputeNodeData result = new YamlComputeNodeData();
result.setAttribute(data.getAttribute());
result.setVersion(data.getVersion());
return result;
}

@Override
public ComputeNodeData swapToObject(final YamlComputeNodeData yamlConfig) {
return new ComputeNodeData(yamlConfig.getAttribute(), yamlConfig.getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataFactory;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.instance.yaml.YamlComputeNodeData;
import org.apache.shardingsphere.infra.instance.yaml.YamlComputeNodeDataSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand All @@ -50,8 +52,8 @@ public final class ComputeNodeStatusService {
*/
public void registerOnline(final ComputeNodeInstance computeNodeInstance) {
String instanceId = computeNodeInstance.getMetaData().getId();
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceId, computeNodeInstance.getMetaData().getType()),
YamlEngine.marshal(new ComputeNodeData(computeNodeInstance.getMetaData().getAttributes(), computeNodeInstance.getMetaData().getVersion())));
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceId, computeNodeInstance.getMetaData().getType()), YamlEngine.marshal(
new YamlComputeNodeDataSwapper().swapToYamlConfiguration(new ComputeNodeData(computeNodeInstance.getMetaData().getAttributes(), computeNodeInstance.getMetaData().getVersion()))));
repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(instanceId), computeNodeInstance.getState().getCurrentState().name());
persistInstanceLabels(instanceId, computeNodeInstance.getLabels());
}
Expand Down Expand Up @@ -136,7 +138,7 @@ private Collection<ComputeNodeInstance> loadComputeNodeInstances(final InstanceT
if (Strings.isNullOrEmpty(value)) {
continue;
}
ComputeNodeData computeNodeData = YamlEngine.unmarshal(value, ComputeNodeData.class);
ComputeNodeData computeNodeData = new YamlComputeNodeDataSwapper().swapToObject(YamlEngine.unmarshal(value, YamlComputeNodeData.class));
result.add(loadComputeNodeInstance(InstanceMetaDataFactory.create(each, instanceType, computeNodeData.getAttribute(), computeNodeData.getVersion())));
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,23 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataFactory;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.instance.yaml.YamlComputeNodeData;
import org.apache.shardingsphere.infra.instance.yaml.YamlComputeNodeDataSwapper;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeInstanceStateChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeInstanceStateChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -99,7 +101,7 @@ private Optional<GovernanceEvent> createInstanceGovernanceEvent(final DataChange
private Optional<GovernanceEvent> createInstanceEvent(final DataChangedEvent event) {
Matcher matcher = getInstanceOnlinePathMatcher(event.getKey());
if (matcher.find()) {
ComputeNodeData computeNodeData = YamlEngine.unmarshal(event.getValue(), ComputeNodeData.class);
ComputeNodeData computeNodeData = new YamlComputeNodeDataSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(), YamlComputeNodeData.class));
InstanceMetaData instanceMetaData = InstanceMetaDataFactory.create(matcher.group(2),
InstanceType.valueOf(matcher.group(1).toUpperCase()), computeNodeData.getAttribute(), computeNodeData.getVersion());
if (Type.ADDED == event.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service;

import org.apache.shardingsphere.infra.instance.ComputeNodeData;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.instance.util.IpUtils;
import org.apache.shardingsphere.infra.instance.yaml.YamlComputeNodeData;
import org.apache.shardingsphere.infra.state.instance.InstanceState;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
Expand Down Expand Up @@ -109,8 +109,14 @@ void assertLoadInstanceWorkerId() {
void assertLoadAllComputeNodeInstances() {
when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("foo_instance_3307"));
when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("foo_instance_3308"));
when(repository.getDirectly("/nodes/compute_nodes/online/jdbc/foo_instance_3307")).thenReturn(YamlEngine.marshal(new ComputeNodeData("127.0.0.1@3307", "foo_version")));
when(repository.getDirectly("/nodes/compute_nodes/online/proxy/foo_instance_3308")).thenReturn(YamlEngine.marshal(new ComputeNodeData("127.0.0.1@3308", "foo_version")));
YamlComputeNodeData yamlComputeNodeData0 = new YamlComputeNodeData();
yamlComputeNodeData0.setAttribute("127.0.0.1@3307");
yamlComputeNodeData0.setVersion("foo_version");
when(repository.getDirectly("/nodes/compute_nodes/online/jdbc/foo_instance_3307")).thenReturn(YamlEngine.marshal(yamlComputeNodeData0));
YamlComputeNodeData yamlComputeNodeData1 = new YamlComputeNodeData();
yamlComputeNodeData1.setAttribute("127.0.0.1@3308");
yamlComputeNodeData1.setVersion("foo_version");
when(repository.getDirectly("/nodes/compute_nodes/online/proxy/foo_instance_3308")).thenReturn(YamlEngine.marshal(yamlComputeNodeData1));
List<ComputeNodeInstance> actual = new ArrayList<>(new ComputeNodeStatusService(repository).loadAllComputeNodeInstances());
assertThat(actual.size(), is(2));
assertThat(actual.get(0).getMetaData().getId(), is("foo_instance_3307"));
Expand Down

0 comments on commit 72671d8

Please sign in to comment.