Prefetch speeds up the message flow by having a message readily available for local retrieval when and before the application asks for one. This throughput gain is the result of a trade-off that the application author must make explicitly:
With the ReceiveAndDelete receive mode, all messages that are acquired into the prefetch buffer are no longer available in the queue, and only reside in the in-memory prefetch buffer until they are received into the application through the Receive/ReceiveAsync or OnMessage/OnMessageAsync APIs. If the application terminates before the messages are received into the application, those messages are irrecoverably lost.
In the PeekLock receive mode, messages fetched into the Prefetch buffer are acquired into the buffer in a locked state, and have the timeout clock for the lock ticking. If the prefetch buffer is large, and processing takes so long that message locks expire while residing in the prefetch buffer or even while the application is processing the message, there might be some confusing events for the application to handle.
The application might acquire a message with an expired or imminently expiring lock. If so, the application might process the message, but then find that it cannot complete it due to a lock expiration. The application can check the LockedUntilUtc property (which is subject to clock skew between the broker and local machine clock). If the message lock has expired, the application must ignore the message; no API call on or with the message should be made. If the message is not expired but expiration is imminent, the lock can be renewed and extended by another default lock period by calling message.RenewLock()
If the lock silently expires in the prefetch buffer, the message is treated as abandoned and is again made available for retrieval from the queue. That might cause it to be fetched into the prefetch buffer; placed at the end. If the prefetch buffer cannot usually be worked through during the message expiration, this causes messages to be repeatedly prefetched but never effectively delivered in a usable (validly locked) state, and are eventually moved to the dead-letter queue once the maximum delivery count is exceeded.
If you need a high degree of reliability for message processing, and processing takes significant work and time, it is recommended that you use the prefetch feature conservatively, or not at all.
If you need high throughput and message processing is commonly cheap, prefetch yields significant throughput benefits.
The maximum prefetch count and the lock duration configured on the queue or subscription need to be balanced such that the lock timeout at least exceeds the cumulative expected message processing time for the maximum size of the prefetch buffer, plus one message. At the same time, the lock timeout ought not to be so long that messages can exceed their maximum TimeToLive when they are accidentally dropped, thus requiring their lock to expire before being redelivered.
using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Core; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks;
namespace ServiceBusPrefetchTest { class Program { static void Main(string[] args) { var sender = ServiceBusClientFactory.CreateTopicClient(); var receiver = ServiceBusClientFactory.CreateMessageReceiver(); var bufferSize = 2_000_0; var messageCount = 100;
// ************************************************ // Change the MaxMessageCount and PrefetchCount, and check the Total processing time. // Receive messages with/without prefetch. // receiver.PrefetchCount = 0; // Disable prefetch receiver.PrefetchCount = 20; var maxMsgCount = 15; Console.WriteLine("receiver.PrefetchCount = {0}", receiver.PrefetchCount); Console.WriteLine("maxMsgCount = {0}", maxMsgCount); // *************************************************
// Gerenarte and send random buffers. for (var i = 0; i < messageCount; i++) { var buffer = GenerateRandomBuffer(bufferSize); var message = new Message(buffer); message.Body = buffer; sender.SendAsync(message).GetAwaiter().GetResult(); }
var messageList = new List<Message>(); var stopwatch = new Stopwatch(); stopwatch.Start();
while (messageList.Count < messageCount) { var messageToAdd = receiver.ReceiveAsync(maxMsgCount, TimeSpan.FromMilliseconds(1000)).GetAwaiter() .GetResult()?.OrderBy(message => message.SystemProperties.SequenceNumber).ToList();
// Define other methods and classes here public static class ServiceBusClientFactory { public static string ServiceBusConnectionString { get; } = "Your ConnectionString"; public static string TopicName { get; } = "Your Topic Name"; public static string SubscriptionNameForDbSettlement { get; } = "Your Subscription Name";
public static ITopicClient CreateTopicClient() { return new TopicClient(ServiceBusConnectionString, TopicName); }
public static IMessageReceiver CreateMessageReceiver() { return new MessageReceiver(ServiceBusConnectionString, EntityNameHelper.FormatSubscriptionPath(TopicName, SubscriptionNameForDbSettlement), ReceiveMode.PeekLock); } }
public static byte[] GenerateRandomBuffer(long size) { Random random = new Random(); var bytes = new byte[size]; random.NextBytes(bytes); return bytes; } } }
PS C:\Jacky\ServiceBusPrefetchTest\bin\Debug> .\ServiceBusPrefetchTest.exe receiver.PrefetchCount = 0 var maxMsgCount = 5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Get message:5 Total processing time: 943 ms Total message count: Expect: 100, Get: 100
PS C:\Jacky\ServiceBusPrefetchTest\bin\Debug> .\ServiceBusPrefetchTest.exe receiver.PrefetchCount = 20 var maxMsgCount = 15 Get message:15 Get message:5 Get message:13 Get message:13 Get message:13 Get message:13 Get message:13 Get message:13 Get message:13 Total processing time: 1134 ms Total message count: Expect: 100, Get: 111