Skip to content

Commit

Permalink
Refactoring for better Server liveness handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mikera committed May 2, 2024
1 parent 791c7e3 commit 95e8b2d
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 64 deletions.
4 changes: 4 additions & 0 deletions convex-core/src/test/java/convex/core/lang/CoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1918,6 +1918,10 @@ public void testMap() {

// Map with sets works like conj'ing in each result in turn
assertEquals(Sets.of(4,5,6),eval("(map (fn [a b] b) #{1 2 3} [4 5 6])"));

// Map over an index
assertEquals(Vectors.of(4,5,6),eval("(map second (into (index) {0x 4 0x01 5 0x0001 6}))"));


// CAST error if any following arguments are not a data structure
assertCastError(step("(map inc 1)"));
Expand Down
14 changes: 11 additions & 3 deletions convex-gui/src/main/java/convex/gui/components/ScrollyList.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ScrollyList<E> extends JScrollPane {
private final ListModel<E> model;
private final ScrollablePanel listPanel = new ScrollablePanel();

public void refreshList() {
private void refreshList() {
EventQueue.invokeLater(()->{;
listPanel.removeAll();
int n = model.getSize();
Expand Down Expand Up @@ -81,12 +81,20 @@ public ScrollyList(ListModel<E> model, Function<E, Component> builder) {
model.addListDataListener(new ListDataListener() {
@Override
public void intervalAdded(ListDataEvent e) {
refreshList();
int start=e.getIndex0();
int last=e.getIndex1();
for (int i=start; i<=last; i++) {
listPanel.add(builder.apply(model.getElementAt(i)),"span");
}
}

@Override
public void intervalRemoved(ListDataEvent e) {
refreshList();
int start=e.getIndex0();
int last=e.getIndex1();
for (int i=start; i<=last; i++) {
listPanel.remove(start);;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public void paintComponent(Graphics g) {

Server server=peerView.getLocalServer();
if (server==null) return; // not a local server
if (!server.isLive()) return;

AStore tempStore=Stores.current(); // just in case, since we are reading from a specific peer
try {
Expand Down
9 changes: 6 additions & 3 deletions convex-gui/src/main/java/convex/gui/peer/PeerComponent.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,19 @@ public String getPeerDescription() {
State state=server.getPeer().getConsensusState();
AccountKey paddr=server.getPeerKey();
// sb.append("0x"+paddr.toChecksumHex()+"\n");
sb.append("Local peer on: " + server.getHostAddress() + " with store "+server.getStore().shortName()+"\n");

if (server.isLive()) {
sb.append("Local peer on: " + server.getHostAddress() + " with store "+server.getStore().shortName()+"\n");
} else {
sb.append("Inactive Peer\n");
}
PeerStatus ps=state.getPeer(paddr);
if (ps!=null) {
sb.append("Peer Stake: "+Text.toFriendlyBalance(ps.getPeerStake()));
sb.append(" ");
sb.append("Delegated Stake: "+Text.toFriendlyBalance(ps.getDelegatedStake()));
sb.append(" ");
} else {
sb.append("Not currently a rgistered peer ");
sb.append("Not currently a registered peer ");
}
ConnectionManager cm=server.getConnectionManager();
sb.append("Connections: "+cm.getConnectionCount());
Expand Down
13 changes: 9 additions & 4 deletions convex-gui/src/main/java/convex/gui/peer/PeerGUI.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,12 @@ public StateModel<State> getStateModel() {
}

public ConvexLocal getDefaultConvex() {
return peerList.getElementAt(0);
int n=peerList.size();
for (int i=0; i<n; i++) {
ConvexLocal c= peerList.get(i);
if (c.getLocalServer().isLive()) return c;
}
throw new IllegalStateException("No live peers!");
}

public Convex getClientConvex(Address contract) {
Expand Down Expand Up @@ -347,7 +352,7 @@ public Server getRandomServer() {
for (int i=0; i<n; i++) {
ConvexLocal c=peerList.elementAt(i);
Server s=c.getLocalServer();
if (s!=null) {
if ((s!=null)&&(s.isLive())) {
found+=1;
if (Math.random()*found<=1.0) {
result=s;
Expand All @@ -362,7 +367,7 @@ public Server getPrimaryServer() {
for (int i=0; i<n; i++) {
ConvexLocal c=peerList.elementAt(i);
Server s=c.getLocalServer();
if (s!=null) {
if ((s!=null)&&s.isLive()) {
return s;
}
}
Expand Down Expand Up @@ -394,7 +399,7 @@ public void launchExtraPeer() {

try {
Server base=getPrimaryServer();
ConvexLocal convex=Convex.connect(base, base.getPeerController(), base.getKeyPair());
ConvexLocal convex=getDefaultConvex();
Address a= convex.createAccountSync(kp.getAccountKey());
long amt=convex.getBalance()/10;
convex.transferSync(a, amt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void run() {
Stores.setCurrent(server.getStore());

// Run main component loop
while (server.isLive()) {
while (server.isRunning()) {
try {
loop();
} catch (InterruptedException e) {
Expand Down
75 changes: 39 additions & 36 deletions convex-peer/src/main/java/convex/peer/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,47 +192,50 @@ protected void maintainConnections() {

// refresh peers list
currentPeerCount=connections.size();
peers = connections.keySet().toArray(new AccountKey[currentPeerCount]);
if (peers.length<targetPeerCount) {
// Connect to a random peer with host address by stake
// SECURITY: stake weighted connection is important to avoid bad peers
// influencing the connection pool

Set<AccountKey> potentialPeers=s.getPeers().keySet();
InetSocketAddress target=null;
double accStake=0.0;
for (ACell c:potentialPeers) {
AccountKey peerKey=RT.ensureAccountKey(c);
if (connections.containsKey(peerKey)) continue; // skip if already connected

if (server.getPeerKey().equals(peerKey)) continue; // don't connect to self!!

PeerStatus ps=s.getPeers().get(peerKey);
if (ps==null) continue; // skip
AString hostName=ps.getHostname();
if (hostName==null) continue;
InetSocketAddress maybeAddress=Utils.toInetSocketAddress(hostName.toString());
if (maybeAddress==null) continue;
long peerStake=ps.getTotalStake();
if (peerStake>0) {
double t=random.nextDouble()*(accStake+peerStake);
if (t>=accStake) {
target=maybeAddress;
}
accStake+=peerStake;
if (currentPeerCount<targetPeerCount) {
tryRandomConnect(s);
}

lastConnectionUpdate=Utils.getCurrentTimestamp();
}

private void tryRandomConnect(State s) {
// Connect to a random peer with host address by stake
// SECURITY: stake weighted connection is important to avoid bad peers
// influencing the connection pool

Set<AccountKey> potentialPeers=s.getPeers().keySet();
InetSocketAddress target=null;
double accStake=0.0;
for (ACell c:potentialPeers) {
AccountKey peerKey=RT.ensureAccountKey(c);
if (connections.containsKey(peerKey)) continue; // skip if already connected

if (server.getPeerKey().equals(peerKey)) continue; // don't connect to self!!

PeerStatus ps=s.getPeers().get(peerKey);
if (ps==null) continue; // skip
AString hostName=ps.getHostname();
if (hostName==null) continue;
InetSocketAddress maybeAddress=Utils.toInetSocketAddress(hostName.toString());
if (maybeAddress==null) continue;
long peerStake=ps.getPeerStake();
if (peerStake>Constants.MINIMUM_EFFECTIVE_STAKE) {
double t=random.nextDouble()*(accStake+peerStake);
if (t>=accStake) {
target=maybeAddress;
}
accStake+=peerStake;
}
}

if (target!=null) {
// Try to connect to Peer. If it fails, no worry, will retry another peer next time
boolean success=connectToPeer(target) != null;
if (!success) {
log.warn("Failed to connect to Peer at "+target);
}
if (target!=null) {
// Try to connect to Peer. If it fails, no worry, will retry another peer next time
boolean success=connectToPeer(target) != null;
if (!success) {
log.warn("Failed to connect to Peer at "+target);
}
}

lastConnectionUpdate=Utils.getCurrentTimestamp();
}

/**
Expand Down
36 changes: 19 additions & 17 deletions convex-peer/src/main/java/convex/peer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,16 @@ public class Server implements Closeable {
* Flag for a running server. Setting to false will terminate server threads.
*/
private volatile boolean isRunning = false;

/**
* NIO Server instance
* Flag for a live server. Live Server has synced with at least one other peer
*/
private NIOServer nio = NIOServer.create(this);
private volatile boolean isLive = false;

/**
* The Peer Controller Address
* NIO Server instance
*/
private Address controller;
private NIOServer nio = NIOServer.create(this);

private Server(HashMap<Keyword, Object> config) throws TimeoutException, IOException {
this.config = config;
Expand Down Expand Up @@ -190,7 +190,6 @@ private void establishController() {
// TODO: not a problem?
log.warn("Server keypair does not match keypair for control account: "+controlAddress);
}
this.setPeerController(controlAddress);
}

private Peer establishPeer() throws TimeoutException, IOException {
Expand Down Expand Up @@ -376,6 +375,7 @@ public void launch() {
}
}

goLive();
log.info( "Peer Server started at "+nio.getHostAddress()+" with Peer Address: {}",getPeerKey());
} catch (Exception e) {
close();
Expand All @@ -385,6 +385,10 @@ public void launch() {
}
}

private void goLive() {
isLive=true;
}

/**
* Process a message received from a peer or client. We know at this point that the
* message decoded successfully, not much else.....
Expand Down Expand Up @@ -515,15 +519,7 @@ public long getBeliefReceivedCount() {
* @return Peer controller Address
*/
public Address getPeerController() {
return controller;
}

/**
* Sets the Peer controller Address
* @param a Peer Controller Address to set
*/
public void setPeerController(Address a) {
controller=a;
return getPeer().getController();
}

/**
Expand Down Expand Up @@ -679,7 +675,8 @@ public Peer persistPeerData() throws IOException {

@Override
public void close() {
if (!isRunning) return;
if (!isRunning) return; // already shut down
isLive=false;
isRunning = false;

// Shut down propagator first, no point sending any more Beliefs
Expand Down Expand Up @@ -770,6 +767,10 @@ public void setHostname(String string) {
}

public boolean isLive() {
return isLive;
}

public boolean isRunning() {
return isRunning;
}

Expand Down Expand Up @@ -806,12 +807,13 @@ public void shutdown() throws IOException, TimeoutException {
try {
AKeyPair kp= getKeyPair();
AccountKey key=kp.getAccountKey();
Convex convex=Convex.connect(this, controller,kp);
Convex convex=Convex.connect(this, getPeerController(),kp);
Result r=convex.transactSync("(set-peer-stake "+key+" 0)");
if (r.isError()) {
log.warn("Unable to remove Peer stake: "+r);
}
} finally {
isLive=false;
close();
}
}
Expand Down

0 comments on commit 95e8b2d

Please sign in to comment.