diff --git a/cpp/src/DataStorm/DataElementI.cpp b/cpp/src/DataStorm/DataElementI.cpp index 126b632fd23..182453310ce 100644 --- a/cpp/src/DataStorm/DataElementI.cpp +++ b/cpp/src/DataStorm/DataElementI.cpp @@ -331,10 +331,11 @@ DataElementI::attachFilter( int priority) { // No locking necessary, called by the session with the mutex locked - auto p = _listeners.find({session, facet}); + ListenerKey listenerKey{.session = session, .facet = facet}; + auto p = _listeners.find(listenerKey); if (p == _listeners.end()) { - p = _listeners.emplace(ListenerKey{.session = session, .facet = facet}, Listener{std::move(prx), facet}).first; + p = _listeners.emplace(std::move(listenerKey), Listener{std::move(prx), facet}).first; } bool added = false; @@ -344,12 +345,14 @@ DataElementI::attachFilter( _executor->queue([self = shared_from_this(), name] { self->_onConnectedElements(DataStorm::CallbackReason::Connect, name); }); } + if (addConnectedKey(key, subscriber)) { if (key) { subscriber->keys.insert(key); } + if (_traceLevels->data > 1) { Trace out(_traceLevels->logger, _traceLevels->dataCat); @@ -749,7 +752,7 @@ DataReaderI::initSamples( if (_traceLevels->data > 1) { Trace out(_traceLevels->logger, _traceLevels->dataCat); - out << this << ": initialized " << samples.size() << " samples from `" << element << '@' << topic << "'"; + out << this << ": initialized " << samples.size() << " samples from '" << element << '@' << topic << "'"; } vector> valid; @@ -790,7 +793,7 @@ DataReaderI::initSamples( if (_traceLevels->data > 2 && valid.size() < samples.size()) { Trace out(_traceLevels->logger, _traceLevels->dataCat); - out << this << ": discarded " << samples.size() - valid.size() << " samples from `" << element << '@' << topic + out << this << ": discarded " << samples.size() - valid.size() << " samples from '" << element << '@' << topic << "'"; } diff --git a/cpp/src/DataStorm/NodeSessionI.cpp b/cpp/src/DataStorm/NodeSessionI.cpp index 38265dd08ff..d171af81b3e 100644 --- a/cpp/src/DataStorm/NodeSessionI.cpp +++ b/cpp/src/DataStorm/NodeSessionI.cpp @@ -15,12 +15,12 @@ using namespace Ice; namespace { - // The `NodeForwarder` class forwards calls to a `Node` that lacks a public endpoint. + // The NodeForwarder class forwards calls to a Node that lacks a public endpoint. // - // This class implements the Slice `DataContract::Node` interface by forwarding calls to the target `Node` object - // using the connection established during the creation of the `NodeSession` object. + // This class implements the Slice DataContract::Node interface by forwarding calls to the target Node object + // using the connection established during the creation of the NodeSession object. // - // The `NodeForwarder` wraps the node and session proxy parameters passed to the `DataContract::Node` operations + // The NodeForwarder wraps the node and session proxy parameters passed to the DataContract::Node operations // in forwarder proxies, which handle forwarding to the corresponding target objects. class NodeForwarder : public Node, public enable_shared_from_this { @@ -159,7 +159,7 @@ NodeSessionI::init() if (_instance->getTraceLevels()->session > 0) { Trace out(_instance->getTraceLevels()->logger, _instance->getTraceLevels()->sessionCat); - out << "created node session (peer = `" << _publicNode << "'):\n" << _connection->toString(); + out << "created node session (peer = '" << _publicNode << "'):\n" << _connection->toString(); } } @@ -191,7 +191,7 @@ NodeSessionI::destroy() if (_instance->getTraceLevels()->session > 0) { Trace out(_instance->getTraceLevels()->logger, _instance->getTraceLevels()->sessionCat); - out << "destroyed node session (peer = `" << _publicNode << "')"; + out << "destroyed node session (peer = '" << _publicNode << "')"; } } diff --git a/cpp/src/DataStorm/NodeSessionManager.cpp b/cpp/src/DataStorm/NodeSessionManager.cpp index 7acb2b166cf..f4c4973bf26 100644 --- a/cpp/src/DataStorm/NodeSessionManager.cpp +++ b/cpp/src/DataStorm/NodeSessionManager.cpp @@ -140,11 +140,11 @@ NodeSessionManager::announceTopicReader(const string& topic, NodePrx node, const Trace out(_traceLevels->logger, _traceLevels->sessionCat); if (connection) { - out << "topic reader `" << topic << "' announced (peer = `" << node << "')"; + out << "topic reader '" << topic << "' announced (peer = '" << node << "')"; } else { - out << "announcing topic reader `" << topic << "' (peer = `" << node << "')"; + out << "announcing topic reader '" << topic << "' (peer = '" << node << "')"; } } @@ -187,11 +187,11 @@ NodeSessionManager::announceTopicWriter(const string& topic, NodePrx node, const Trace out(_traceLevels->logger, _traceLevels->sessionCat); if (connection) { - out << "topic writer `" << topic << "' announced (peer = `" << node << "')"; + out << "topic writer '" << topic << "' announced (peer = '" << node << "')"; } else { - out << "announcing topic writer `" << topic << "' (peer = `" << node << "')"; + out << "announcing topic writer '" << topic << "' (peer = '" << node << "')"; } } @@ -240,22 +240,22 @@ NodeSessionManager::announceTopics( { if (!readers.empty()) { - out << "topic reader(s) `" << readers << "' announced (peer = `" << node << "')"; + out << "topic reader(s) '" << readers << "' announced (peer = '" << node << "')"; } if (!writers.empty()) { - out << "topic writer(s) `" << writers << "' announced (peer = `" << node << "')"; + out << "topic writer(s) '" << writers << "' announced (peer = '" << node << "')"; } } else { if (!readers.empty()) { - out << "announcing topic reader(s) `" << readers << "' (peer = `" << node << "')"; + out << "announcing topic reader(s) '" << readers << "' (peer = '" << node << "')"; } if (!writers.empty()) { - out << "announcing topic writer(s) `" << writers << "' (peer = `" << node << "')"; + out << "announcing topic writer(s) '" << writers << "' (peer = '" << node << "')"; } } } @@ -296,7 +296,7 @@ NodeSessionManager::getSession(const Identity& node) const void NodeSessionManager::forward(const ByteSeq& inParams, const Current& current) const { - // Called while holding the mutex lock to ensure `_exclude` is not updated concurrently. + // Called while holding the mutex lock to ensure _exclude is not updated concurrently. // Forward the call to all nodes that have an active session, don't need to wait for the result. for (const auto& [_, session] : _sessions) @@ -385,7 +385,7 @@ NodeSessionManager::connected(const NodePrx& node, const LookupPrx& lookup) if (_traceLevels->session > 0) { Trace out(_traceLevels->logger, _traceLevels->sessionCat); - out << "established node session (peer = `" << node << "'):\n" << connection->toString(); + out << "established node session (peer = '" << node << "'):\n" << connection->toString(); } instance->getConnectionManager()->add( @@ -422,7 +422,7 @@ NodeSessionManager::disconnected(const NodePrx& node, const LookupPrx& lookup) if (_traceLevels->session > 0) { Trace out(_traceLevels->logger, _traceLevels->sessionCat); - out << "disconnected node session (peer = `" << node << "')"; + out << "disconnected node session (peer = '" << node << "')"; } _connectedTo = nullopt; lock.unlock(); diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 166e555d49f..80b333369fb 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -89,7 +89,7 @@ SessionI::announceTopics(TopicInfoSeq topics, bool, const Current&) { // For each local topic matching the given name: // - Attach the topic to all instances of the remote topic, with each instance represented by an entry in - // the ids sequence. + // the TopicInfo::ids sequence. // - Send an attachTopic request to the peer session to attach to the matching topic. runWithTopics( info.name, @@ -176,7 +176,7 @@ SessionI::attachTopic(TopicSpec spec, const Current&) Trace out(_traceLevels->logger, _traceLevels->sessionCat); out << _id << ": matched elements '" << spec << "' on '" << topic << "'"; } - // Don't wait for the response here, the remote session will send an ack. + // Don't wait for the response here, the peer session will send an ack. _session->attachElementsAsync(topic->getId(), specs, true, nullptr); } }); diff --git a/cpp/src/DataStorm/TopicI.cpp b/cpp/src/DataStorm/TopicI.cpp index ee6bc77d44c..345a481cf7d 100644 --- a/cpp/src/DataStorm/TopicI.cpp +++ b/cpp/src/DataStorm/TopicI.cpp @@ -163,14 +163,23 @@ TopicI::getTopicSpec() const spec.id = _id; spec.name = _name; spec.elements.reserve(_keyElements.size() + _filteredElements.size()); - for (const auto& k : _keyElements) + + // Add a key element to the spec for each topic key. Key elements have positive IDs. + for (const auto& [key, _] : _keyElements) { - spec.elements.push_back({k.first->getId(), "", k.first->encode(_instance->getCommunicator())}); + spec.elements.push_back( + ElementInfo{.id = key->getId(), .name = "", .value = key->encode(_instance->getCommunicator())}); } - for (const auto& f : _filteredElements) + + // Add a filtered element to the spec for each topic filter. Filtered elements have negative IDs. + for (const auto& [filter, _] : _filteredElements) { - spec.elements.push_back({-f.first->getId(), f.first->getName(), f.first->encode(_instance->getCommunicator())}); + spec.elements.push_back(ElementInfo{ + .id = -filter->getId(), + .name = filter->getName(), + .value = filter->encode(_instance->getCommunicator())}); } + spec.tags = getTags(); return spec; } @@ -182,7 +191,7 @@ TopicI::getTags() const tags.reserve(_updaters.size()); for (const auto& [tag, _] : _updaters) { - tags.push_back({tag->getId(), "", tag->encode(_instance->getCommunicator())}); + tags.push_back(ElementInfo{.id = tag->getId(), .name = "", .value = tag->encode(_instance->getCommunicator())}); } return tags; } @@ -191,7 +200,7 @@ ElementSpecSeq TopicI::getElementSpecs(int64_t topicId, const ElementInfoSeq& infos, const shared_ptr& session) { ElementSpecSeq specs; - // Iterate over the element infos representing the remote keys, and filter and compute the element spec for local + // Iterate over the element infos representing the remote keys, and filters and compute the element spec for local // keys and filters that match. Positive IDs represent keys and negative IDs represent filters. for (const auto& info : infos) { @@ -375,8 +384,8 @@ TopicI::attachElements( ElementSpecAckSeq specs; for (const auto& spec : elements) { - // The peer ID is computed by the remote caller and represents our local ID. Positive IDs represent keys and - // negative IDs represent filters. + // The peer ID is computed by the remote caller and represents our local ID. + // Positive IDs represent keys and negative IDs represent filters. if (spec.peerId > 0) { auto key = _keyFactory->get(spec.peerId); @@ -397,30 +406,26 @@ TopicI::attachElements( } // Iterate over the data elements for the matching key, attaching them to the data elements of the spec. - for (const auto& dataElement : p->second) + if (spec.id > 0 || filter->match(key)) { - ElementDataAckSeq acks; - for (const auto& data : spec.elements) + for (const auto& dataElement : p->second) { - if (spec.id > 0) // Key - { - dataElement->attach(topicId, spec.id, key, nullptr, session, prx, data, now, acks); - } - else if (filter->match(key)) // TODO: can we move the match outside the nested loop? + ElementDataAckSeq acks; + for (const auto& data : spec.elements) { dataElement->attach(topicId, spec.id, key, filter, session, prx, data, now, acks); } - } - if (!acks.empty()) - { - specs.push_back(ElementSpecAck{ - .elements = std::move(acks), - .id = key->getId(), - .name = "", - .value = spec.id < 0 ? key->encode(_instance->getCommunicator()) : ByteSeq{}, - .peerId = spec.id, - .peerName = spec.name}); + if (!acks.empty()) + { + specs.push_back(ElementSpecAck{ + .elements = std::move(acks), + .id = key->getId(), + .name = "", + .value = spec.id < 0 ? key->encode(_instance->getCommunicator()) : ByteSeq{}, + .peerId = spec.id, + .peerName = spec.name}); + } } } } @@ -446,30 +451,26 @@ TopicI::attachElements( key = _keyFactory->decode(_instance->getCommunicator(), spec.value); } - for (const auto& dataElement : p->second) + if (spec.id < 0 || filter->match(key)) { - ElementDataAckSeq acks; - for (const auto& data : spec.elements) + for (const auto& dataElement : p->second) { - if (spec.id < 0) // Filter - { - dataElement->attach(topicId, spec.id, nullptr, filter, session, prx, data, now, acks); - } - else if (filter->match(key)) + ElementDataAckSeq acks; + for (const auto& data : spec.elements) { dataElement->attach(topicId, spec.id, key, filter, session, prx, data, now, acks); } - } - if (!acks.empty()) - { - specs.push_back(ElementSpecAck{ - .elements = std::move(acks), - .id = -filter->getId(), - .name = filter->getName(), - .value = spec.id > 0 ? filter->encode(_instance->getCommunicator()) : ByteSeq{}, - .peerId = spec.id, - .peerName = spec.name}); + if (!acks.empty()) + { + specs.push_back(ElementSpecAck{ + .elements = std::move(acks), + .id = -filter->getId(), + .name = filter->getName(), + .value = spec.id > 0 ? filter->encode(_instance->getCommunicator()) : ByteSeq{}, + .peerId = spec.id, + .peerName = spec.name}); + } } } } diff --git a/java/test/android/controller/build.gradle b/java/test/android/controller/build.gradle index 79542d275e4..1f29d0bfeb8 100644 --- a/java/test/android/controller/build.gradle +++ b/java/test/android/controller/build.gradle @@ -55,12 +55,13 @@ android { defaultConfig { applicationId "com.zeroc.testcontroller" - minSdkVersion 24 + minSdkVersion 34 // SDK 34 required for Java 17 compatibility targetSdkVersion 34 multiDexEnabled true // Necessary otherwise we'd exceed the 64K DEX limit. compileOptions { - sourceCompatibility "1.17" - targetCompatibility "1.17" + // Sets Java compatibility to Java 17 + sourceCompatibility JavaVersion.VERSION_17 + targetCompatibility JavaVersion.VERSION_17 } } diff --git a/java/test/android/controller/src/main/java/com/zeroc/testcontroller/ControllerActivity.java b/java/test/android/controller/src/main/java/com/zeroc/testcontroller/ControllerActivity.java index 71c787f7a84..825b8dae4ec 100644 --- a/java/test/android/controller/src/main/java/com/zeroc/testcontroller/ControllerActivity.java +++ b/java/test/android/controller/src/main/java/com/zeroc/testcontroller/ControllerActivity.java @@ -17,6 +17,8 @@ import android.widget.Toast; import java.util.LinkedList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class ControllerActivity extends ListActivity { @@ -126,7 +128,18 @@ public void onNothingSelected(AdapterView arg0) } }); s.setSelection(0); - app.startController(this, bluetooth); + + // Start the controller in a background thread. Starting the controller creates the ObjectAdapter which makes + // IO calls. Android doesn't allow making IO calls from the main thread. + Executor executor = Executors.newSingleThreadExecutor(); + executor.submit(() -> { + try { + app.startController(this, bluetooth); + } catch (Exception e) { + e.printStackTrace(); + } + }); + executor.shutdown(); } public synchronized void println(String data)