Skip to content

Commit

Permalink
1.0.0
Browse files Browse the repository at this point in the history
- Optimizations
- Update sdk dependency
- Upgrade to Java 8
  • Loading branch information
a-hansen authored Aug 6, 2021
1 parent 17d138b commit c03ddd3
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 140 deletions.
11 changes: 6 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ apply plugin: 'application'
apply plugin: 'java-library'

mainClassName = 'org.dsa.iot.haystack.Main'
sourceCompatibility = 1.7
targetCompatibility = 1.7
version = '0.8.1'
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
version = '1.0.0'

repositories {
mavenLocal()
Expand All @@ -13,7 +13,8 @@ repositories {
}

dependencies {
api 'org.iot-dsa:historian:0.24.2'
//api 'org.iot-dsa:historian:0.24.2' //maven local install
api 'com.github.iot-dsa:sdk-dslink-java:1.0.0'
implementation 'com.github.skyfoundry:haystack-java:3.0.7'
}

Expand All @@ -23,7 +24,7 @@ run {
}

wrapper {
gradleVersion = '6.8'
gradleVersion = '6.8.3'
}

applicationDistribution.from new File(project.projectDir, "/dslink.json")
2 changes: 1 addition & 1 deletion dslink.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dslink-java-haystack",
"version": "0.8.1",
"version": "0.8.8",
"description": "An implementation dslink of a haystack protocol consumer",
"license": "Apache",
"author": {
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
157 changes: 71 additions & 86 deletions src/main/java/org/dsa/iot/haystack/Haystack.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class Haystack {
private final Node node;
private Set<HRef> pendingSubscribe;
private Set<HRef> pendingUnsubscribe;
private boolean polling = false;
private ScheduledFuture<?> pollFuture;
private final ScheduledThreadPoolExecutor stpe;
private final Map<String, Node> subs;
Expand Down Expand Up @@ -123,6 +122,10 @@ public void handle(Void event) {
watchEnabled = false;
pollFuture.cancel(false);
pollFuture = null;
synchronized (this) {
pendingSubscribe = null;
pendingUnsubscribe = null;
}
}
});
if (enabled.getBool()) {
Expand Down Expand Up @@ -325,9 +328,8 @@ public boolean isEnabled() {
return false;
}

public void nav(HVal navId, Handler<HGrid> onComplete) {
public void nav(final HVal navId, final Handler<HGrid> onComplete) {
if (!isEnabled()) {
onComplete.handle(null);
return;
}
HGrid grid = HGrid.EMPTY;
Expand Down Expand Up @@ -365,49 +367,45 @@ public void stop() {
conn.close();
}

public void subscribe(final HRef id, Node node) {
public synchronized void subscribe(final HRef id, Node node) {
subs.put(id.toString(), node);
if (!isEnabled() || !watchEnabled) {
return;
}
synchronized (this) {
subs.put(id.toString(), node);
if (pendingSubscribe == null) {
pendingSubscribe = new HashSet<>();
}
if (pendingUnsubscribe != null) {
pendingUnsubscribe.remove(id);
}
if (pendingSubscribe.add(id)) {
stpe.schedule(new Runnable() {
@Override
public void run() {
updateSubscriptions();
}
}, 2, TimeUnit.SECONDS);
}
if (pendingSubscribe == null) {
pendingSubscribe = new HashSet<>();
}
if (pendingUnsubscribe != null) {
pendingUnsubscribe.remove(id);
}
if (pendingSubscribe.add(id)) {
stpe.schedule(new Runnable() {
@Override
public void run() {
updateSubscriptions();
}
}, 1, TimeUnit.SECONDS);
}
}

public void unsubscribe(final HRef id) {
public synchronized void unsubscribe(final HRef id) {
subs.remove(id.toString());
if (!isEnabled() || !watchEnabled) {
return;
}
synchronized (this) {
subs.remove(id.toString());
if (pendingUnsubscribe == null) {
pendingUnsubscribe = new HashSet<>();
}
if (pendingSubscribe != null) {
pendingSubscribe.remove(id);
}
if (pendingUnsubscribe.add(id)) {
stpe.schedule(new Runnable() {
@Override
public void run() {
updateSubscriptions();
}
}, 2, TimeUnit.SECONDS);
}
if (pendingUnsubscribe == null) {
pendingUnsubscribe = new HashSet<>();
}
if (pendingSubscribe != null) {
pendingSubscribe.remove(id);
}
if (pendingUnsubscribe.add(id)) {
stpe.schedule(new Runnable() {
@Override
public void run() {
updateSubscriptions();
}
}, 1, TimeUnit.SECONDS);
}
}

Expand All @@ -422,61 +420,48 @@ private void poll() {
return;
}

synchronized (this) {
if (polling) {
return;
}
polling = true;
}

try {
conn.getWatch(new StateHandler<HWatch>() {
@Override
public void handle(HWatch event) {
HGrid grid = event.pollChanges();
if (grid == null) {
return;
}
conn.getWatch(new StateHandler<HWatch>() {
@Override
public void handle(HWatch event) {
HGrid grid = event.pollChanges();
if (grid == null) {
return;
}

Iterator<?> it = grid.iterator();
while (it.hasNext()) {
HRow row = (HRow) it.next();
Node node = subs.get(row.id().toString());
if (node != null) {
Map<String, Node> children = node.getChildren();

Iterator<?> rowIt = row.iterator();
while (rowIt.hasNext()) {
Map.Entry entry = (Map.Entry) rowIt.next();
String name = (String) entry.getKey();
HVal val = (HVal) entry.getValue();
Value value = Utils.hvalToVal(val);

String encoded = StringUtils.encodeName(name);
Node child = null;
if (children != null) {
child = children.get(encoded);
}
if (child != null) {
child.setValueType(value.getType());
child.setValue(value);
} else {
NodeBuilder b = Utils.getBuilder(node, encoded);
b.setValueType(value.getType());
b.setValue(value);
Node n = b.build();
n.setSerializable(false);
}
Iterator<?> it = grid.iterator();
while (it.hasNext()) {
HRow row = (HRow) it.next();
Node node = subs.get(row.id().toString());
if (node != null) {
Map<String, Node> children = node.getChildren();

Iterator<?> rowIt = row.iterator();
while (rowIt.hasNext()) {
Map.Entry entry = (Map.Entry) rowIt.next();
String name = (String) entry.getKey();
HVal val = (HVal) entry.getValue();
Value value = Utils.hvalToVal(val);

String encoded = StringUtils.encodeName(name);
Node child = null;
if (children != null) {
child = children.get(encoded);
}
if (child != null) {
child.setValueType(value.getType());
child.setValue(value);
} else {
NodeBuilder b = Utils.getBuilder(node, encoded);
b.setValueType(value.getType());
b.setValue(value);
Node n = b.build();
n.setSerializable(false);
}
}
}
}
});
} finally {
synchronized (this) {
polling = false;
}
}
});
}

private void setupPoll(int time) {
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/dsa/iot/haystack/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,18 @@ public static ValueType getHaystackTypes() {
};
return ValueType.makeEnum(enums);
}

public static boolean shouldUpdateList(Node node) {
if (node == null) {
return true;
}
Value val = node.getRoConfig("lu");
if (val == null) {
return true;
}
long curr = System.currentTimeMillis();
long lastUpdate = val.getNumber().longValue();
long diff = curr - lastUpdate;
return (diff > ListHandler.REFRESH_TIME);
}
}
17 changes: 6 additions & 11 deletions src/main/java/org/dsa/iot/haystack/handlers/ListHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.dsa.iot.dslink.node.value.Value;
import org.dsa.iot.dslink.util.handler.Handler;
import org.dsa.iot.haystack.Haystack;
import org.dsa.iot.haystack.Utils;
import org.dsa.iot.haystack.helpers.NavHelper;
import org.projecthaystack.HGrid;
import org.projecthaystack.HUri;
Expand All @@ -22,8 +23,8 @@
public class ListHandler implements Handler<Node> {

private static final Logger LOGGER = LoggerFactory.getLogger(ListHandler.class);
private static final long REFRESH_TIME = TimeUnit.SECONDS.toMillis(60);
private static final ListHandler HANDLER = new ListHandler();
public static final long REFRESH_TIME = TimeUnit.SECONDS.toMillis(60);

private ListHandler() {
}
Expand All @@ -37,6 +38,9 @@ public void handle(final Node event) {
if (haystack == null) {
return;
}
if (!Utils.shouldUpdateList(event)) {
return;
}
final NavHelper helper = haystack.getNavHelper();
final Value vNav = event.getRoConfig("navId");
final HVal navId;
Expand All @@ -53,16 +57,7 @@ public void handle(final Node event) {
navId = null;
}

Value val = event.getRoConfig("lu");
long curr = System.currentTimeMillis();
if (val != null) {
long lastUpdate = val.getNumber().longValue();
long diff = curr - lastUpdate;
if (diff < REFRESH_TIME) {
return;
}
}
val = new Value(curr);
Value val = new Value(System.currentTimeMillis());
val.setSerializable(false);
event.setRoConfig("lu", val);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void editConnection(String url, String user, String pass, int connTimeout
}

public void close() {
LOGGER.info("Connection closed: " + haystack.getNode().getName());
synchronized (lock) {
if (connectFuture != null) {
connectFuture.cancel(false);
Expand Down Expand Up @@ -130,6 +131,7 @@ public void handle(HClient event) {
}
} catch (Exception e) {
if (e instanceof CallNetworkException) {
LOGGER.warn("Connection closed", e);
close();
} else {
throw new RuntimeException(e);
Expand Down Expand Up @@ -270,7 +272,7 @@ public void run() {
statusNode.setValue(new Value(err));
}
close();
LOGGER.warn(err);
LOGGER.warn(err, e);
}
}
}
Expand Down
Loading

0 comments on commit c03ddd3

Please sign in to comment.