using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Core; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;
namespace ServiceBusTestClient { class Program { static void Main() { // Create sender and receiver var sender = ServiceBusClientFactory.CreateTopicClient(); var receiver = ServiceBusClientFactory.CreateMessageReceiver();
// Please change this value according to your service bus settings. long lastSeq = 82354;
var rand = new Random(); while (true) { // Sender Loop var senderSize = rand.Next(1, 10); for (var i = 0; i < senderSize; i++) { // Send one message var message = new Message(); message.Body = Encoding.UTF8.GetBytes($"Test message body {DateTime.UtcNow}"); sender.SendAsync(message).GetAwaiter().GetResult(); }
//Receiver Loop var result = receiver.ReceiveAsync(5, TimeSpan.FromMilliseconds(5000)).GetAwaiter(). GetResult()?.OrderBy(message => message.SystemProperties.SequenceNumber).ToList();
var seqNumbers = result.Select(item => item.SystemProperties.SequenceNumber).ToList();
if (CheckMissingSequenceNumberExists(lastSeq + 1, seqNumbers)) { throw new Exception($"Missing Sequence Number. Expect: {lastSeq + 1}, Get: {string.Join(",", seqNumbers)}"); }
// Define other methods and classes here public static bool CheckMissingSequenceNumberExists(long initialNum, List<long> dequeuedSequenceNumbers) { Console.WriteLine("initialNum:{0}, dequeuedSequenceNumbers:{1}", initialNum, dequeuedSequenceNumbers[0]);
if (dequeuedSequenceNumbers[0] != initialNum) { return true; }
//Check sequence number is increased by 1, and if not, set the flag as true. for (int i = 1; i < dequeuedSequenceNumbers.Count; i++) { if (dequeuedSequenceNumbers[i] - dequeuedSequenceNumbers[i - 1] != 1) { return true; } }
return false; }
public static class ServiceBusClientFactory { public static string ServiceBusConnectionString { get; } = "Your connection String"; public static string TopicName { get; } = "Your Topic Name"; public static string SubscriptionNameForDbSettlement { get; } = "Your Subscrption Name";
public static ITopicClient CreateTopicClient() { return new TopicClient(ServiceBusConnectionString, TopicName); }
public static IMessageReceiver CreateMessageReceiver() { return new MessageReceiver(ServiceBusConnectionString, EntityNameHelper.FormatSubscriptionPath(TopicName, SubscriptionNameForDbSettlement), ReceiveMode.PeekLock); } } } }
using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Core; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;
namespace ServiceBusTestClient { class Program { static void Main() { // Create sender and receiver var sender = ServiceBusClientFactory.CreateTopicClient(); var receiver = ServiceBusClientFactory.CreateMessageReceiver();
// Please change this value according to your service bus settings. long lastSeq = 82354;
var rand = new Random(); while (true) { // Sender Loop var senderSize = rand.Next(1, 10); for (var i = 0; i < senderSize; i++) { // Send one message var message = new Message(); message.Body = Encoding.UTF8.GetBytes($"Test message body {DateTime.UtcNow}"); sender.SendAsync(message).GetAwaiter().GetResult(); }
// ******************************************************************************* // With .NET, you enable the Prefetch feature by setting the PrefetchCount property of a MessageReceiver, QueueClient, or SubscriptionClient to a number greater than zero. Setting the value to zero turns off prefetch.
//You can easily add this setting to the receive-side of the QueuesGettingStarted or ReceiveLoop samples' settings to see the effect in those contexts.
//While messages are available in the prefetch buffer, any subsequent Receive / ReceiveAsync calls are immediately fulfilled from the buffer, and the buffer is replenished in the background as space becomes available. If there are no messages available for delivery, the receive operation empties the buffer and then waits or blocks, as expected. receiver.PrefetchCount = 5; // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch // ******************************************************************************** //Receiver Loop var result = receiver.ReceiveAsync(5, TimeSpan.FromMilliseconds(5000)).GetAwaiter(). GetResult()?.OrderBy(message => message.SystemProperties.SequenceNumber).ToList();
var seqNumbers = result.Select(item => item.SystemProperties.SequenceNumber).ToList();
if (CheckMissingSequenceNumberExists(lastSeq + 1, seqNumbers)) { throw new Exception($"Missing Sequence Number. Expect: {lastSeq + 1}, Get: {string.Join(",", seqNumbers)}"); }
// Define other methods and classes here public static bool CheckMissingSequenceNumberExists(long initialNum, List<long> dequeuedSequenceNumbers) { Console.WriteLine("initialNum:{0}, dequeuedSequenceNumbers:{1}", initialNum, dequeuedSequenceNumbers[0]);
if (dequeuedSequenceNumbers[0] != initialNum) { return true; }
//Check sequence number is increased by 1, and if not, set the flag as true. for (int i = 1; i < dequeuedSequenceNumbers.Count; i++) { if (dequeuedSequenceNumbers[i] - dequeuedSequenceNumbers[i - 1] != 1) { return true; } }
return false; }
public static class ServiceBusClientFactory { public static string ServiceBusConnectionString { get; } = "Your connection String"; public static string TopicName { get; } = "Your Topic Name"; public static string SubscriptionNameForDbSettlement { get; } = "Your Subscrption Name";
public static ITopicClient CreateTopicClient() { return new TopicClient(ServiceBusConnectionString, TopicName); }
public static IMessageReceiver CreateMessageReceiver() { return new MessageReceiver(ServiceBusConnectionString, EntityNameHelper.FormatSubscriptionPath(TopicName, SubscriptionNameForDbSettlement), ReceiveMode.PeekLock); } } } }