Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullReferenceException when trying to sub with streaming subscriptions #1412

Open
Aimless321 opened this issue Nov 22, 2024 · 6 comments · May be fixed by #1415
Open

NullReferenceException when trying to sub with streaming subscriptions #1412

Aimless321 opened this issue Nov 22, 2024 · 6 comments · May be fixed by #1415
Assignees
Labels
Milestone

Comments

@Aimless321
Copy link

Expected Behavior

Receive messages in the registered handler when using streaming subscriptions.

Actual Behavior

Running version 1.14 of the Dapr runtime and using the master branch of the .NET SDK, i receive a NullRefrenceException when trying to subscribe to a topic using the new Messaging Client.

The call to messagingClient.SubscribeAsync never returns, and gives me an error which i traced back to a NullReferenceException on response.EventMessage in PublishSubscribeReceiver.cs line 252.

== APP ==       System.NullReferenceException: Object reference not set to an instance of an object.
== APP ==          at Dapr.Messaging.PublishSubscribe.PublishSubscribeReceiver.FetchDataFromSidecarAsync(AsyncDuplexStreamingCall`2 stream, ChannelWriter`1 channelWriter, CancellationToken cancellationToken)
== APP ==          at Dapr.Messaging.PublishSubscribe.PublishSubscribeReceiver.FetchDataFromSidecarAsync(AsyncDuplexStreamingCall`2 stream, ChannelWriter`1 channelWriter, CancellationToken cancellationToken)
== APP ==          at Dapr.Messaging.PublishSubscribe.PublishSubscribeReceiver.SubscribeAsync(CancellationToken cancellationToken)
== APP ==          at Dapr.Messaging.PublishSubscribe.DaprPublishSubscribeGrpcClient.SubscribeAsync(String pubSubName, String topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken)
== APP ==          at Worker.Services.WorkerService.ExecuteAsync(CancellationToken stoppingToken) in C:\Users\WesleyB\Projects\StrykerOrchestrator\Worker\Services\WorkerService.cs:line 37
== APP ==          at Microsoft.Extensions.Hosting.Internal.Host.TryExecuteBackgroundServiceAsync(BackgroundService backgroundService)

In WireShark i can see the GRPC call to the sidecar, and i also receive logging from the sidecar:

time="2024-11-22T13:54:48.3234217+01:00" level=info msg="Subscribing to pubsub 'task-queue' topic 'myTopic'" app_id=worker scope=dapr.runtime.pubsub.streamer type=log ver=1.14.4
time="2024-11-22T13:54:55.3082658+01:00" level=info msg="Unsubscribed from pubsub 'task-queue' topic 'myTopic'" app_id=worker scope=dapr.runtime.pubsub.streamer type=log ver=1.14.4

Even in Redis i can see a subscriber group, which says the messages have been delivered.

Steps to Reproduce the Problem

pubsub.yaml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: task-queue
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: consumerID
    value: "{appId}"
  - name: concurrency
    value: "1"
  - name: maxInFlightMessages
    value: "1"
  - name: logReceiveMessageErrors
    value: "true"
  1. Run the streaming subscription example.

Release Note

RELEASE NOTE:

@Aimless321 Aimless321 added the kind/bug Something isn't working label Nov 22, 2024
@WhitWaldo WhitWaldo self-assigned this Nov 22, 2024
@Aimless321
Copy link
Author

Diving some more into it.
It seems like this code is receiving some kind of initial response where the response.EventMessage is null.

Adding a simple:

if (response.EventMessage is null)
{
    continue;
}

Lets me receive events correctly in the delegate.

Also the DaprPublishSubscribeClient.SubscribeAsync method is blocking, not sure if that is intended. It only returns after the subscription has been closed.

@tommorvolloriddle
Copy link

tommorvolloriddle commented Nov 26, 2024

@Aimless321 @WhitWaldo I too have similar observation , I am surprised if SubscribeAsync will be a blocking call, I am looking at SubscribeAsync as an observer who will get triggered as and when events occur on the broker, is this what it is designed for?

However, I have wrapped it in a separate thread to achieve intended behavior something like this:

 Task.Run(async () =>
 {
    await myDaprPublishSubscribeClient.SubscribeAsync("rabbits", topic,
          new DaprSubscriptionOptions(new MessageHandlingPolicy(Timeout.InfiniteTimeSpan, TopicResponseAction.Drop)),
          messageHandler: Handler);
 });

The next problem is on the subscription, where on my handler the Data property has a empty byte array, am I missing something here?

I am using rabbitmq as the broker on the bus I can see that data I publish

 {
    "data": {
        "message": "Test",
        "messageNo": 1,
        "timeValue": 1732613845921
    },
    "datacontenttype": "application/json",
    "id": "fa93e217-3d07-4593-87db-cd6f6c8393da",
    "pubsubname": "rabbits",
    "source": "boashade-tiger",
    "specversion": "1.0",
    "time": "2024-11-26T15:07:25+05:30",
    "topic": "topic2",
    "traceid": "00-00000000000000000000000000000000-0000000000000000-00",
    "traceparent": "00-00000000000000000000000000000000-0000000000000000-00",
    "tracestate": "",
    "type": "com.dapr.event.sent"
}

However I do not get the data on the handler

@WhitWaldo
Copy link
Contributor

It's not intended to be blocking. I'm hoping to get a PR completed later this afternoon that remedies the issues listed above.

@tommorvolloriddle
Copy link

It's not intended to be blocking. I'm hoping to get a PR completed later this afternoon that remedies the issues listed above.

Can you also check the Data property initialization as this is needed by us to cascade the message to next sub-system after transforming it, if you can tell if this can be done, it will help us making some decisions

Thanks in advance Whit

@WhitWaldo WhitWaldo added the P1 label Nov 26, 2024
@WhitWaldo WhitWaldo added this to the v1.15 milestone Nov 26, 2024
@WhitWaldo WhitWaldo added P0 and removed P1 labels Nov 26, 2024
@WhitWaldo WhitWaldo linked a pull request Nov 28, 2024 that will close this issue
2 tasks
@WhitWaldo
Copy link
Contributor

It's not intended to be blocking. I'm hoping to get a PR completed later this afternoon that remedies the issues listed above.

Can you also check the Data property initialization as this is needed by us to cascade the message to next sub-system after transforming it, if you can tell if this can be done, it will help us making some decisions

Thanks in advance Whit

I've got a PR pending review that'll fix the reported issues above.

@tommorvolloriddle Could you either get into more detail about what you're referring to here or consider opening a separate issue for it?

@Aimless321
Copy link
Author

Thanks for looking into @WhitWaldo! ❤️

@tommorvolloriddle Could you either get into more detail about what you're referring to here or consider opening a separate issue for it?

I think what he meant was that the Data propery on the TopicMessage doesn't get initialized.
Meaning that the delegate receives a TopicMessage where the data is empty.

https://github.com/dapr/dotnet-sdk/pull/1415/files#diff-af797fa530e2e099d0d82534043e1bcbce120190a792181bf69c708cdd72f581R283-R290

I tested locally with:

var message =
                new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type,
                    response.EventMessage.SpecVersion, response.EventMessage.DataContentType,
                    response.EventMessage.Topic, response.EventMessage.PubsubName)
                {
                    // Added this line
                    Data = new ReadOnlyMemory<byte>(response.EventMessage.Data.ToByteArray()),
                    Path = response.EventMessage.Path,
                    Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value)
                };

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants