Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactored logging for inclusion on gossip channels. #8733

Merged
merged 8 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,26 @@ public void onNewAttestation(final ValidatableAttestation validatableAttestation
attestationPublishSuccessCounter.inc();
},
error -> {
LOG.trace(
"Failed to publish attestation {}{} at slot {}",
attestation.hashTreeRoot(),
attestation
.getCommitteeIndices()
.map(z -> String.format(" for validator %s", z))
.orElse(""),
attestation.getData().getSlot());
gossipFailureLogger.logWithSuppression(error, attestation.getData().getSlot());
attestationPublishFailureCounter.inc();
});
}

public void subscribeToSubnetId(final int subnetId) {
LOG.trace("Subscribing to subnet ID {}", subnetId);
LOG.trace("Subscribing to attestation subnet {}", subnetId);
subnetSubscriptions.subscribeToSubnetId(subnetId);
}

public void unsubscribeFromSubnetId(final int subnetId) {
LOG.trace("Unsubscribing to subnet ID {}", subnetId);
LOG.trace("Unsubscribing to attestation subnet {}", subnetId);
subnetSubscriptions.unsubscribeFromSubnetId(subnetId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,28 @@
lastErroredSlot = slot;
lastRootCause = rootCause;

if (lastRootCause instanceof MessageAlreadySeenException) {
LOG.debug(
"Failed to publish {}(s) for slot {} because the message has already been seen",
messageType,
lastErroredSlot);
} else if (lastRootCause instanceof NoPeersForOutboundMessageException
|| lastRootCause instanceof SemiDuplexNoOutboundStreamException) {
LOG.log(
suppress ? Level.DEBUG : Level.WARN,
"Failed to publish {}(s) for slot {} because no peers were available on the required gossip topic",
messageType,
lastErroredSlot);
} else {
LOG.log(
suppress ? Level.DEBUG : Level.ERROR,
"Failed to publish {}(s) for slot {}",
messageType,
lastErroredSlot,
error);
}
switch (lastRootCause) {
case MessageAlreadySeenException messageAlreadySeenException -> LOG.debug(
Fixed Show fixed Hide fixed
"Failed to publish {}(s) for slot {} because the message has already been seen",
messageType,
lastErroredSlot);
case NoPeersForOutboundMessageException noPeersForOutboundMessageException -> LOG.log(
Fixed Show fixed Hide fixed
Level.DEBUG,
"Failed to publish {}(s) for slot {}; {}",
messageType,
lastErroredSlot,
rootCause.getMessage());
case SemiDuplexNoOutboundStreamException semiDuplexNoOutboundStreamException -> LOG.log(
Fixed Show fixed Hide fixed
suppress ? Level.DEBUG : Level.WARN,
"Failed to publish {}(s) for slot {} because no peers were available on the required gossip topic",
messageType,
lastErroredSlot);
default -> LOG.log(
suppress ? Level.DEBUG : Level.ERROR,
"Failed to publish {}(s) for slot {}",
messageType,
lastErroredSlot,
error);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ private void publish(final SyncCommitteeMessage message, final int subnetId) {
}

public void subscribeToSubnetId(final int subnetId) {
LOG.trace("Subscribing to subnet ID {}", subnetId);
LOG.trace("Subscribing to sync committee subnet ID {}", subnetId);
subnetSubscriptions.subscribeToSubnetId(subnetId);
}

public void unsubscribeFromSubnetId(final int subnetId) {
LOG.trace("Unsubscribing to subnet ID {}", subnetId);
LOG.trace("Unsubscribing to sync committee subnet {}", subnetId);
subnetSubscriptions.unsubscribeFromSubnetId(subnetId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
Expand All @@ -34,7 +36,7 @@
import tech.pegasys.teku.storage.client.RecentChainData;

public class AttestationSubnetSubscriptions extends CommitteeSubnetSubscriptions {

private static final Logger LOG = LogManager.getLogger();
private final Spec spec;
private final AsyncRunner asyncRunner;
private final RecentChainData recentChainData;
Expand Down Expand Up @@ -73,6 +75,11 @@ public SafeFuture<?> gossip(final Attestation attestation) {
+ attestation.getData().getSlot()
+ " because the state was not available");
}
LOG.trace(
"send attestation {} slot {} on subnet {}",
attestation.hashTreeRoot(),
attestation.getData().getSlot(),
subnetId.get());
final String topic =
GossipTopics.getAttestationSubnetTopic(
forkInfo.getForkDigest(spec), subnetId.get(), gossipEncoding);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package tech.pegasys.teku.networking.eth2.gossip.subnets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
Expand All @@ -31,7 +33,7 @@
import tech.pegasys.teku.storage.client.RecentChainData;

public class SyncCommitteeSubnetSubscriptions extends CommitteeSubnetSubscriptions {

private static final Logger LOG = LogManager.getLogger();
private final Spec spec;
private final RecentChainData recentChainData;
private final SchemaDefinitionsAltair schemaDefinitions;
Expand Down Expand Up @@ -61,6 +63,12 @@ public SyncCommitteeSubnetSubscriptions(
}

public SafeFuture<?> gossip(final SyncCommitteeMessage message, final int subnetId) {
LOG.trace(
"send sync committee message {} validator {}, subnet {} on slot slot {} ",
message.hashTreeRoot(),
message.getValidatorIndex(),
subnetId,
message.getSlot());
return gossipNetwork.gossip(
GossipTopics.getSyncCommitteeSubnetTopic(
forkInfo.getForkDigest(spec), subnetId, gossipEncoding),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package tech.pegasys.teku.networking.eth2.gossip.topics.topichandlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.OperationMilestoneValidator;
Expand All @@ -26,6 +28,7 @@
import tech.pegasys.teku.storage.client.RecentChainData;

public class SingleAttestationTopicHandler {
private static final Logger LOG = LogManager.getLogger();

public static Eth2TopicHandler<?> createHandler(
final RecentChainData recentChainData,
Expand All @@ -41,8 +44,16 @@ public static Eth2TopicHandler<?> createHandler(
final Spec spec = recentChainData.getSpec();
OperationProcessor<Attestation> convertingProcessor =
(attMessage, arrivalTimestamp) ->
operationProcessor.process(
ValidatableAttestation.fromNetwork(spec, attMessage, subnetId), arrivalTimestamp);
operationProcessor
.process(
ValidatableAttestation.fromNetwork(spec, attMessage, subnetId),
arrivalTimestamp)
.thenPeek(
result ->
LOG.trace(
"validate attestation {}; result ({})",
attMessage.hashTreeRoot(),
result.code()));

return new Eth2TopicHandler<>(
recentChainData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

package tech.pegasys.teku.networking.eth2.gossip;

import io.libp2p.core.SemiDuplexNoOutboundStreamException;
import io.libp2p.pubsub.MessageAlreadySeenException;
import io.libp2p.pubsub.NoPeersForOutboundMessageException;
import org.junit.jupiter.api.Test;
import tech.pegasys.infrastructure.logging.LogCaptor;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -43,7 +43,7 @@ void shouldLogAlreadySeenErrorsAtDebugLevel() {
void shouldLogFirstNoPeersErrorsAtWarningLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new NoPeersForOutboundMessageException("So Lonely")), SLOT);
new RuntimeException("Foo", new SemiDuplexNoOutboundStreamException("So Lonely")), SLOT);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this SemiDuplex exception used to share implementation with NoPeers error handling so this still tests the same thing and all the logic continues to work.

logCaptor.assertWarnLog(NO_PEERS_MESSAGE);
}
}
Expand All @@ -52,12 +52,12 @@ void shouldLogFirstNoPeersErrorsAtWarningLevel() {
void shouldLogRepeatedNoPeersErrorsAtDebugLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new NoPeersForOutboundMessageException("So Lonely")), SLOT);
new RuntimeException("Foo", new SemiDuplexNoOutboundStreamException("So Lonely")), SLOT);
logCaptor.clearLogs();

logger.logWithSuppression(
new IllegalStateException(
"Foo", new NoPeersForOutboundMessageException("Not a friend in the world")),
"Foo", new SemiDuplexNoOutboundStreamException("Not a friend in the world")),
SLOT);
logCaptor.assertDebugLog(NO_PEERS_MESSAGE);
}
Expand All @@ -67,12 +67,12 @@ void shouldLogRepeatedNoPeersErrorsAtDebugLevel() {
void shouldLogNoPeersErrorsWithDifferentSlotsAtWarnLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new NoPeersForOutboundMessageException("So Lonely")), SLOT);
new RuntimeException("Foo", new SemiDuplexNoOutboundStreamException("So Lonely")), SLOT);
logCaptor.assertWarnLog(NO_PEERS_MESSAGE);

logger.logWithSuppression(
new IllegalStateException(
"Foo", new NoPeersForOutboundMessageException("Not a friend in the world")),
"Foo", new SemiDuplexNoOutboundStreamException("Not a friend in the world")),
UInt64.valueOf(2));
logCaptor.assertWarnLog(noPeersMessage(2));
}
Expand All @@ -82,15 +82,15 @@ void shouldLogNoPeersErrorsWithDifferentSlotsAtWarnLevel() {
void shouldLogNoPeersErrorsAtWarnLevelWhenSeparatedByADifferentException() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new NoPeersForOutboundMessageException("So Lonely")), SLOT);
new RuntimeException("Foo", new SemiDuplexNoOutboundStreamException("So Lonely")), SLOT);
logCaptor.assertWarnLog(NO_PEERS_MESSAGE);
logCaptor.clearLogs();

logger.logWithSuppression(new MessageAlreadySeenException("Dupe"), SLOT);

logger.logWithSuppression(
new IllegalStateException(
"Foo", new NoPeersForOutboundMessageException("Not a friend in the world")),
"Foo", new SemiDuplexNoOutboundStreamException("Not a friend in the world")),
SLOT);
logCaptor.assertWarnLog(NO_PEERS_MESSAGE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ public void gossip(final Bytes bytes) {
SafeFuture.of(publisher.publish(Unpooled.wrappedBuffer(bytes.toArrayUnsafe()), topic))
.finish(
() -> LOG.trace("Successfully gossiped message on {}", topic),
err -> LOG.debug("Failed to gossip message on " + topic, err));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the full stack here got pretty unusable pretty quickly.

err -> LOG.debug("Failed to gossip message on {}, {}", topic, err.getMessage()));
}
}