Skip to content

Commit

Permalink
MulticastService with multiple NICs (#40)
Browse files Browse the repository at this point in the history
* - Fixed AnswersContainsAdditionalRecords functionality

* - Fixed using MulticastService with multiple NICs
- Removed exploring networks by times, instead used NetworkChange.NetworkAddressChanged
- Added accepting filtering function in MulticastService constructor

* Added a bit different way of async handing of massages

* Added filtering multicast loopback messages to avoid receiving duplicate questions and answers from all Nics

* Fixed PR comments

* Fixed multicast loopback duplicates receive filter

* removed unwanted local variables
  • Loading branch information
eshvatskyi authored and richardschneider committed Dec 13, 2018
1 parent 68ef311 commit 30feb36
Show file tree
Hide file tree
Showing 6 changed files with 369 additions and 281 deletions.
14 changes: 10 additions & 4 deletions src/MessageEventArgs.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using Makaretu.Dns;
using System;
using System.Collections.Generic;
using System.Text;
using System;
using System.Net;

namespace Makaretu.Dns
{
Expand All @@ -18,6 +16,14 @@ public class MessageEventArgs : EventArgs
/// The received message.
/// </value>
public Message Message { get; set; }

/// <summary>
/// The DNS message sender endpoint.
/// </summary>
/// <value>
/// The endpoint from the message was received.
/// </value>
public IPEndPoint RemoteEndPoint { get; set; }
}
}

160 changes: 160 additions & 0 deletions src/MulticastClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Threading.Tasks;

namespace Makaretu.Dns
{
class MulticastClient : IDisposable
{
public static readonly bool IP6;

readonly IPEndPoint multicastEndpoint;
readonly IPAddress multicastLoopbackAddress;
readonly UdpClient receiver;
readonly ConcurrentDictionary<IPAddress, UdpClient> senders = new ConcurrentDictionary<IPAddress, UdpClient>();

static MulticastClient()
{
if (Socket.OSSupportsIPv4)
IP6 = false;
else if (Socket.OSSupportsIPv6)
IP6 = true;
else
throw new InvalidOperationException("No OS support for IPv4 nor IPv6");
}

public MulticastClient(IPEndPoint multicastEndpoint, IEnumerable<NetworkInterface> nics)
{
this.multicastEndpoint = multicastEndpoint;

receiver = new UdpClient(multicastEndpoint.AddressFamily);

receiver.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiver.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ExclusiveAddressUse, false);

receiver.Client.Bind(new IPEndPoint(IP6 ? IPAddress.IPv6Any : IPAddress.Any, multicastEndpoint.Port));

foreach (var address in nics.SelectMany(GetNetworkInterfaceLocalAddresses))
{
try
{
receiver.Client.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, new MulticastOption(multicastEndpoint.Address, address));

var sender = new UdpClient(multicastEndpoint.AddressFamily);

sender.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
sender.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ExclusiveAddressUse, false);

sender.Client.Bind(new IPEndPoint(address, multicastEndpoint.Port));

sender.Client.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, new MulticastOption(multicastEndpoint.Address));
sender.Client.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastLoopback, true);

// Assigning multicastLoopbackAddress to first avalable address that we use for sending messages
if (senders.TryAdd(address, sender) && multicastLoopbackAddress == null)
{
multicastLoopbackAddress = address;
}
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.AddressNotAvailable)
{
// VPN NetworkInterfaces
}
}
}

public async Task SendAsync(byte[] message)
{
await Task.WhenAny(senders.Select(x => x.Value.SendAsync(message, message.Length, multicastEndpoint))).ConfigureAwait(false);
}

public void Receive(Action<UdpReceiveResult> callback)
{
Task.Run(async () =>
{
try
{
var task = receiver.ReceiveAsync();

_ = task.ContinueWith(x => Receive(callback), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.RunContinuationsAsynchronously);

_ = task.ContinueWith(x => FilterMulticastLoopbackMessages(x.Result, callback), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.RunContinuationsAsynchronously);

await task.ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
return;
}
});
}

/// <summary>
/// For multi NICs we accepting MulticastLoopback message only from one of available addresses, used for sending messages
/// </summary>
/// <param name="result">Received message <see cref="UdpReceiveResult"/></param>
/// <param name="next">Action to execute</param>
void FilterMulticastLoopbackMessages(UdpReceiveResult result, Action<UdpReceiveResult> next)
{
var remoteIP = result.RemoteEndPoint.Address;

if (senders.ContainsKey(remoteIP) && !remoteIP.Equals(multicastLoopbackAddress))
{
return;
}

next?.Invoke(result);
}

IEnumerable<IPAddress> GetNetworkInterfaceLocalAddresses(NetworkInterface nic)
{
return nic.GetIPProperties().UnicastAddresses
.Select(x => x.Address)
.Where(x => x.AddressFamily == (IP6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork));
}

#region IDisposable Support

private bool disposedValue = false; // To detect redundant calls

protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
receiver?.Dispose();

foreach (var address in senders.Keys)
{
if (senders.TryRemove(address, out var sender))
{
sender.Dispose();
}
}
}

disposedValue = true;
}
}

~MulticastClient()
{
Dispose(false);
}

// This code added to correctly implement the disposable pattern.
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

#endregion
}
}
Loading

0 comments on commit 30feb36

Please sign in to comment.