-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.cs
128 lines (113 loc) · 5.6 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
namespace azureiotedge_mdns
{
using System;
using System.IO;
using System.Runtime.InteropServices;
using System.Runtime.Loader;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
class Program
{
static int counter;
static void Main(string[] args)
{
// The Edge runtime gives us the connection string we need -- it is injected as an environment variable
string connectionString = Environment.GetEnvironmentVariable("EdgeHubConnectionString");
// Cert verification is not yet fully functional when using Windows OS for the container
bool bypassCertVerification = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
if (!bypassCertVerification) InstallCert();
Init(connectionString, bypassCertVerification).Wait();
// Wait until the app unloads or is cancelled
var cts = new CancellationTokenSource();
AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel();
Console.CancelKeyPress += (sender, cpe) => cts.Cancel();
WhenCancelled(cts.Token).Wait();
}
/// <summary>
/// Handles cleanup operations when app is cancelled or unloads
/// </summary>
public static Task WhenCancelled(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tcs);
return tcs.Task;
}
/// <summary>
/// Add certificate in local cert store for use by client for secure connection to IoT Edge runtime
/// </summary>
static void InstallCert()
{
string certPath = Environment.GetEnvironmentVariable("EdgeModuleCACertificateFile");
if (string.IsNullOrWhiteSpace(certPath))
{
// We cannot proceed further without a proper cert file
Console.WriteLine($"Missing path to certificate collection file: {certPath}");
throw new InvalidOperationException("Missing path to certificate file.");
}
else if (!File.Exists(certPath))
{
// We cannot proceed further without a proper cert file
Console.WriteLine($"Missing path to certificate collection file: {certPath}");
throw new InvalidOperationException("Missing certificate file.");
}
X509Store store = new X509Store(StoreName.Root, StoreLocation.CurrentUser);
store.Open(OpenFlags.ReadWrite);
store.Add(new X509Certificate2(X509Certificate2.CreateFromCertFile(certPath)));
Console.WriteLine("Added Cert: " + certPath);
store.Close();
}
/// <summary>
/// Initializes the DeviceClient and sets up the callback to receive
/// messages containing temperature information
/// </summary>
static async Task Init(string connectionString, bool bypassCertVerification = false)
{
Console.WriteLine("Connection String {0}", connectionString);
MqttTransportSettings mqttSetting = new MqttTransportSettings(TransportType.Mqtt_Tcp_Only);
// During dev you might want to bypass the cert verification. It is highly recommended to verify certs systematically in production
if (bypassCertVerification)
{
mqttSetting.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => true;
}
ITransportSettings[] settings = { mqttSetting };
// Open a connection to the Edge runtime
DeviceClient ioTHubModuleClient = DeviceClient.CreateFromConnectionString(connectionString, settings);
await ioTHubModuleClient.OpenAsync();
Console.WriteLine("IoT Hub module client initialized.");
// Register callback to be called when a message is received by the module
await ioTHubModuleClient.SetInputMessageHandlerAsync("input1", PipeMessage, ioTHubModuleClient);
}
/// <summary>
/// This method is called whenever the module is sent a message from the EdgeHub.
/// It just pipe the messages without any change.
/// It prints all the incoming messages.
/// </summary>
static async Task<MessageResponse> PipeMessage(Message message, object userContext)
{
int counterValue = Interlocked.Increment(ref counter);
var deviceClient = userContext as DeviceClient;
if (deviceClient == null)
{
throw new InvalidOperationException("UserContext doesn't contain " + "expected values");
}
byte[] messageBytes = message.GetBytes();
string messageString = Encoding.UTF8.GetString(messageBytes);
Console.WriteLine($"Received message: {counterValue}, Body: [{messageString}]");
if (!string.IsNullOrEmpty(messageString))
{
var pipeMessage = new Message(messageBytes);
foreach (var prop in message.Properties)
{
pipeMessage.Properties.Add(prop.Key, prop.Value);
}
await deviceClient.SendEventAsync("output1", pipeMessage);
Console.WriteLine("Received message sent");
}
return MessageResponse.Completed;
}
}
}