Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incomplete implementation for PubSub using Mutiny / Kotlin #20

Open
wfrank2509 opened this issue Mar 4, 2021 · 1 comment
Open

Incomplete implementation for PubSub using Mutiny / Kotlin #20

wfrank2509 opened this issue Mar 4, 2021 · 1 comment

Comments

@wfrank2509
Copy link

Hello there,

I was trying to implement a PgSubscriber mechanism in a quarkus project leveraging the Mutiny framework.

Assuming the following "reduced" code should work ...

package com.wftest

import javax.enterprise.context.ApplicationScoped
import io.quarkus.runtime.ShutdownEvent

import javax.enterprise.event.Observes

import io.quarkus.runtime.StartupEvent
import io.vertx.mutiny.core.Vertx
import io.vertx.mutiny.pgclient.pubsub.PgSubscriber
import io.vertx.pgclient.PgConnectOptions
import java.time.Duration


@ApplicationScoped
class PgChannelObserver {

    fun onStart(@Observes ev: StartupEvent?, vertx: Vertx) {

        val subscriber = PgSubscriber.subscriber(vertx, connectOptions())
        subscriber.connect().await().atMost(Duration.ofSeconds(60))
        
        val channel = subscriber.channel("testchannel")        // I trigger the pg_notify directly in an pgsql session for that channel
        
        channel.toMulti().subscribe().with { n -> print(n) }
    }

    private fun connectOptions() = PgConnectOptions()
            .setPort(5432)
            .setHost("localhost")
            .setDatabase("rainbow_database")
            .setUser("unicorn_user")
            .setPassword("magical_password")
}

I get an UnsupportedOperationException on startup and digging further into the stack trace stumbling over the reason:

PgSubscriberImpl:

    // Since Vert.x 3.6.0 : todo
    public ReadStream<String> fetch(long amount) {
      throw new UnsupportedOperationException();
    }

When I don't use Mutiny and instead use:

        channel.handler { notif -> print("Notification received: $notif") }

everything works as expected.

I was wondering why the implementation for fetch is left out and if it is a Mutiny issue (needs to be addresses in smallrye) or related to an incomplete implementation of the PgSubscriber interface?

@andreas-eberle
Copy link

Unfortunately, this issue is still up-to-date :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants