I am looking to create an integration between Azure Eventhub and my DataMiner. I have a piece of code ready to run:
using Microsoft.Azure.EventHubs;
// Replace these values with your own Event Hub connection details
string connectionString = "Endpoint=sb://<YOUR_EVENT_HUB_NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<YOUR_SHARED_ACCESS_KEY_NAME>;SharedAccessKey=<YOUR_SHARED_ACCESS_KEY>";
string eventHubName = "<YOUR_EVENT_HUB_NAME>";// Create a connection to the Event Hub
EventHubClient client = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);// Create a receiver to listen for events from the Event Hub
EventHubReceiver receiver = client.CreateReceiver("<YOUR_CONSUMER_GROUP>", "<YOUR_PARTITION_ID>", EventPosition.FromEnqueuedTime(DateTime.Now));// Listen for events asynchronously
while (true)
{
// Receive the next batch of events
IEnumerable<EventData> events = await receiver.ReceiveAsync(100, TimeSpan.FromSeconds(1));// Process the events
foreach (EventData eventData in events)
{
// Get the event body as a string
string eventBody = Encoding.UTF8.GetString(eventData.Body.Array);// Do something with the event
Console.WriteLine(eventBody);
}
}// Close the receiver and client when you're done
receiver.CloseAsync();
client.CloseAsync();
As you can see there is a while true loop in this code to keep the channel open to receive events. What would be the best way to implement this? Should I create a driver or an automation script? Or are there other options?
Would create a connector for this use case. In a QAction you could spin up a new thread that keeps collecting new events. New events are stored in a (concurrent) queue, using your code in the question. Some managing code will need to be written to check if that thread is still active, and spin up a new thread when needed, in order to make sure it's robust. Also don't forget to stop the thread, when the element is being stopped. This can be done in the Dispose method.
A timer in the connector could then trigger the same QAction every x seconds. When that happens, the QAction processes all messages that are available in the queue, and updates the needed parameters in the element.