Skip to content

Commit

Permalink
Merge branch 'main' into android-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
externl authored Dec 23, 2024
2 parents 7f0464c + 075234d commit 6fce190
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 71 deletions.
11 changes: 7 additions & 4 deletions cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,11 @@ DataElementI::attachFilter(
int priority)
{
// No locking necessary, called by the session with the mutex locked
auto p = _listeners.find({session, facet});
ListenerKey listenerKey{.session = session, .facet = facet};
auto p = _listeners.find(listenerKey);
if (p == _listeners.end())
{
p = _listeners.emplace(ListenerKey{.session = session, .facet = facet}, Listener{std::move(prx), facet}).first;
p = _listeners.emplace(std::move(listenerKey), Listener{std::move(prx), facet}).first;
}

bool added = false;
Expand All @@ -344,12 +345,14 @@ DataElementI::attachFilter(
_executor->queue([self = shared_from_this(), name]
{ self->_onConnectedElements(DataStorm::CallbackReason::Connect, name); });
}

if (addConnectedKey(key, subscriber))
{
if (key)
{
subscriber->keys.insert(key);
}

if (_traceLevels->data > 1)
{
Trace out(_traceLevels->logger, _traceLevels->dataCat);
Expand Down Expand Up @@ -749,7 +752,7 @@ DataReaderI::initSamples(
if (_traceLevels->data > 1)
{
Trace out(_traceLevels->logger, _traceLevels->dataCat);
out << this << ": initialized " << samples.size() << " samples from `" << element << '@' << topic << "'";
out << this << ": initialized " << samples.size() << " samples from '" << element << '@' << topic << "'";
}

vector<shared_ptr<Sample>> valid;
Expand Down Expand Up @@ -790,7 +793,7 @@ DataReaderI::initSamples(
if (_traceLevels->data > 2 && valid.size() < samples.size())
{
Trace out(_traceLevels->logger, _traceLevels->dataCat);
out << this << ": discarded " << samples.size() - valid.size() << " samples from `" << element << '@' << topic
out << this << ": discarded " << samples.size() - valid.size() << " samples from '" << element << '@' << topic
<< "'";
}

Expand Down
12 changes: 6 additions & 6 deletions cpp/src/DataStorm/NodeSessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ using namespace Ice;

namespace
{
// The `NodeForwarder` class forwards calls to a `Node` that lacks a public endpoint.
// The NodeForwarder class forwards calls to a Node that lacks a public endpoint.
//
// This class implements the Slice `DataContract::Node` interface by forwarding calls to the target `Node` object
// using the connection established during the creation of the `NodeSession` object.
// This class implements the Slice DataContract::Node interface by forwarding calls to the target Node object
// using the connection established during the creation of the NodeSession object.
//
// The `NodeForwarder` wraps the node and session proxy parameters passed to the `DataContract::Node` operations
// The NodeForwarder wraps the node and session proxy parameters passed to the DataContract::Node operations
// in forwarder proxies, which handle forwarding to the corresponding target objects.
class NodeForwarder : public Node, public enable_shared_from_this<NodeForwarder>
{
Expand Down Expand Up @@ -159,7 +159,7 @@ NodeSessionI::init()
if (_instance->getTraceLevels()->session > 0)
{
Trace out(_instance->getTraceLevels()->logger, _instance->getTraceLevels()->sessionCat);
out << "created node session (peer = `" << _publicNode << "'):\n" << _connection->toString();
out << "created node session (peer = '" << _publicNode << "'):\n" << _connection->toString();
}
}

Expand Down Expand Up @@ -191,7 +191,7 @@ NodeSessionI::destroy()
if (_instance->getTraceLevels()->session > 0)
{
Trace out(_instance->getTraceLevels()->logger, _instance->getTraceLevels()->sessionCat);
out << "destroyed node session (peer = `" << _publicNode << "')";
out << "destroyed node session (peer = '" << _publicNode << "')";
}
}

Expand Down
22 changes: 11 additions & 11 deletions cpp/src/DataStorm/NodeSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ NodeSessionManager::announceTopicReader(const string& topic, NodePrx node, const
Trace out(_traceLevels->logger, _traceLevels->sessionCat);
if (connection)
{
out << "topic reader `" << topic << "' announced (peer = `" << node << "')";
out << "topic reader '" << topic << "' announced (peer = '" << node << "')";
}
else
{
out << "announcing topic reader `" << topic << "' (peer = `" << node << "')";
out << "announcing topic reader '" << topic << "' (peer = '" << node << "')";
}
}

Expand Down Expand Up @@ -187,11 +187,11 @@ NodeSessionManager::announceTopicWriter(const string& topic, NodePrx node, const
Trace out(_traceLevels->logger, _traceLevels->sessionCat);
if (connection)
{
out << "topic writer `" << topic << "' announced (peer = `" << node << "')";
out << "topic writer '" << topic << "' announced (peer = '" << node << "')";
}
else
{
out << "announcing topic writer `" << topic << "' (peer = `" << node << "')";
out << "announcing topic writer '" << topic << "' (peer = '" << node << "')";
}
}

Expand Down Expand Up @@ -240,22 +240,22 @@ NodeSessionManager::announceTopics(
{
if (!readers.empty())
{
out << "topic reader(s) `" << readers << "' announced (peer = `" << node << "')";
out << "topic reader(s) '" << readers << "' announced (peer = '" << node << "')";
}
if (!writers.empty())
{
out << "topic writer(s) `" << writers << "' announced (peer = `" << node << "')";
out << "topic writer(s) '" << writers << "' announced (peer = '" << node << "')";
}
}
else
{
if (!readers.empty())
{
out << "announcing topic reader(s) `" << readers << "' (peer = `" << node << "')";
out << "announcing topic reader(s) '" << readers << "' (peer = '" << node << "')";
}
if (!writers.empty())
{
out << "announcing topic writer(s) `" << writers << "' (peer = `" << node << "')";
out << "announcing topic writer(s) '" << writers << "' (peer = '" << node << "')";
}
}
}
Expand Down Expand Up @@ -296,7 +296,7 @@ NodeSessionManager::getSession(const Identity& node) const
void
NodeSessionManager::forward(const ByteSeq& inParams, const Current& current) const
{
// Called while holding the mutex lock to ensure `_exclude` is not updated concurrently.
// Called while holding the mutex lock to ensure _exclude is not updated concurrently.

// Forward the call to all nodes that have an active session, don't need to wait for the result.
for (const auto& [_, session] : _sessions)
Expand Down Expand Up @@ -385,7 +385,7 @@ NodeSessionManager::connected(const NodePrx& node, const LookupPrx& lookup)
if (_traceLevels->session > 0)
{
Trace out(_traceLevels->logger, _traceLevels->sessionCat);
out << "established node session (peer = `" << node << "'):\n" << connection->toString();
out << "established node session (peer = '" << node << "'):\n" << connection->toString();
}

instance->getConnectionManager()->add(
Expand Down Expand Up @@ -422,7 +422,7 @@ NodeSessionManager::disconnected(const NodePrx& node, const LookupPrx& lookup)
if (_traceLevels->session > 0)
{
Trace out(_traceLevels->logger, _traceLevels->sessionCat);
out << "disconnected node session (peer = `" << node << "')";
out << "disconnected node session (peer = '" << node << "')";
}
_connectedTo = nullopt;
lock.unlock();
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ SessionI::announceTopics(TopicInfoSeq topics, bool, const Current&)
{
// For each local topic matching the given name:
// - Attach the topic to all instances of the remote topic, with each instance represented by an entry in
// the ids sequence.
// the TopicInfo::ids sequence.
// - Send an attachTopic request to the peer session to attach to the matching topic.
runWithTopics(
info.name,
Expand Down Expand Up @@ -176,7 +176,7 @@ SessionI::attachTopic(TopicSpec spec, const Current&)
Trace out(_traceLevels->logger, _traceLevels->sessionCat);
out << _id << ": matched elements '" << spec << "' on '" << topic << "'";
}
// Don't wait for the response here, the remote session will send an ack.
// Don't wait for the response here, the peer session will send an ack.
_session->attachElementsAsync(topic->getId(), specs, true, nullptr);
}
});
Expand Down
89 changes: 45 additions & 44 deletions cpp/src/DataStorm/TopicI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,23 @@ TopicI::getTopicSpec() const
spec.id = _id;
spec.name = _name;
spec.elements.reserve(_keyElements.size() + _filteredElements.size());
for (const auto& k : _keyElements)

// Add a key element to the spec for each topic key. Key elements have positive IDs.
for (const auto& [key, _] : _keyElements)
{
spec.elements.push_back({k.first->getId(), "", k.first->encode(_instance->getCommunicator())});
spec.elements.push_back(
ElementInfo{.id = key->getId(), .name = "", .value = key->encode(_instance->getCommunicator())});
}
for (const auto& f : _filteredElements)

// Add a filtered element to the spec for each topic filter. Filtered elements have negative IDs.
for (const auto& [filter, _] : _filteredElements)
{
spec.elements.push_back({-f.first->getId(), f.first->getName(), f.first->encode(_instance->getCommunicator())});
spec.elements.push_back(ElementInfo{
.id = -filter->getId(),
.name = filter->getName(),
.value = filter->encode(_instance->getCommunicator())});
}

spec.tags = getTags();
return spec;
}
Expand All @@ -182,7 +191,7 @@ TopicI::getTags() const
tags.reserve(_updaters.size());
for (const auto& [tag, _] : _updaters)
{
tags.push_back({tag->getId(), "", tag->encode(_instance->getCommunicator())});
tags.push_back(ElementInfo{.id = tag->getId(), .name = "", .value = tag->encode(_instance->getCommunicator())});
}
return tags;
}
Expand All @@ -191,7 +200,7 @@ ElementSpecSeq
TopicI::getElementSpecs(int64_t topicId, const ElementInfoSeq& infos, const shared_ptr<SessionI>& session)
{
ElementSpecSeq specs;
// Iterate over the element infos representing the remote keys, and filter and compute the element spec for local
// Iterate over the element infos representing the remote keys, and filters and compute the element spec for local
// keys and filters that match. Positive IDs represent keys and negative IDs represent filters.
for (const auto& info : infos)
{
Expand Down Expand Up @@ -375,8 +384,8 @@ TopicI::attachElements(
ElementSpecAckSeq specs;
for (const auto& spec : elements)
{
// The peer ID is computed by the remote caller and represents our local ID. Positive IDs represent keys and
// negative IDs represent filters.
// The peer ID is computed by the remote caller and represents our local ID.
// Positive IDs represent keys and negative IDs represent filters.
if (spec.peerId > 0)
{
auto key = _keyFactory->get(spec.peerId);
Expand All @@ -397,30 +406,26 @@ TopicI::attachElements(
}

// Iterate over the data elements for the matching key, attaching them to the data elements of the spec.
for (const auto& dataElement : p->second)
if (spec.id > 0 || filter->match(key))
{
ElementDataAckSeq acks;
for (const auto& data : spec.elements)
for (const auto& dataElement : p->second)
{
if (spec.id > 0) // Key
{
dataElement->attach(topicId, spec.id, key, nullptr, session, prx, data, now, acks);
}
else if (filter->match(key)) // TODO: can we move the match outside the nested loop?
ElementDataAckSeq acks;
for (const auto& data : spec.elements)
{
dataElement->attach(topicId, spec.id, key, filter, session, prx, data, now, acks);
}
}

if (!acks.empty())
{
specs.push_back(ElementSpecAck{
.elements = std::move(acks),
.id = key->getId(),
.name = "",
.value = spec.id < 0 ? key->encode(_instance->getCommunicator()) : ByteSeq{},
.peerId = spec.id,
.peerName = spec.name});
if (!acks.empty())
{
specs.push_back(ElementSpecAck{
.elements = std::move(acks),
.id = key->getId(),
.name = "",
.value = spec.id < 0 ? key->encode(_instance->getCommunicator()) : ByteSeq{},
.peerId = spec.id,
.peerName = spec.name});
}
}
}
}
Expand All @@ -446,30 +451,26 @@ TopicI::attachElements(
key = _keyFactory->decode(_instance->getCommunicator(), spec.value);
}

for (const auto& dataElement : p->second)
if (spec.id < 0 || filter->match(key))
{
ElementDataAckSeq acks;
for (const auto& data : spec.elements)
for (const auto& dataElement : p->second)
{
if (spec.id < 0) // Filter
{
dataElement->attach(topicId, spec.id, nullptr, filter, session, prx, data, now, acks);
}
else if (filter->match(key))
ElementDataAckSeq acks;
for (const auto& data : spec.elements)
{
dataElement->attach(topicId, spec.id, key, filter, session, prx, data, now, acks);
}
}

if (!acks.empty())
{
specs.push_back(ElementSpecAck{
.elements = std::move(acks),
.id = -filter->getId(),
.name = filter->getName(),
.value = spec.id > 0 ? filter->encode(_instance->getCommunicator()) : ByteSeq{},
.peerId = spec.id,
.peerName = spec.name});
if (!acks.empty())
{
specs.push_back(ElementSpecAck{
.elements = std::move(acks),
.id = -filter->getId(),
.name = filter->getName(),
.value = spec.id > 0 ? filter->encode(_instance->getCommunicator()) : ByteSeq{},
.peerId = spec.id,
.peerName = spec.name});
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions java/test/android/controller/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ android {

defaultConfig {
applicationId "com.zeroc.testcontroller"
minSdkVersion 24
minSdkVersion 34 // SDK 34 required for Java 17 compatibility
targetSdkVersion 34
multiDexEnabled true // Necessary otherwise we'd exceed the 64K DEX limit.
compileOptions {
sourceCompatibility "1.17"
targetCompatibility "1.17"
// Sets Java compatibility to Java 17
sourceCompatibility JavaVersion.VERSION_17
targetCompatibility JavaVersion.VERSION_17
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import android.widget.Toast;

import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ControllerActivity extends ListActivity
{
Expand Down Expand Up @@ -126,7 +128,18 @@ public void onNothingSelected(AdapterView<?> arg0)
}
});
s.setSelection(0);
app.startController(this, bluetooth);

// Start the controller in a background thread. Starting the controller creates the ObjectAdapter which makes
// IO calls. Android doesn't allow making IO calls from the main thread.
Executor executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
app.startController(this, bluetooth);
} catch (Exception e) {
e.printStackTrace();
}
});
executor.shutdown();
}

public synchronized void println(String data)
Expand Down

0 comments on commit 6fce190

Please sign in to comment.