Occasionally Receive Messages in Unexpected Sequence

Below sample code can reproduce it:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ServiceBusTestClient
{
class Program
{
static void Main()
{
// Create sender and receiver
var sender = ServiceBusClientFactory.CreateTopicClient();
var receiver = ServiceBusClientFactory.CreateMessageReceiver();

// Please change this value according to your service bus settings.
long lastSeq = 82354;

var rand = new Random();
while (true)
{
// Sender Loop
var senderSize = rand.Next(1, 10);
for (var i = 0; i < senderSize; i++)
{
// Send one message
var message = new Message();
message.Body = Encoding.UTF8.GetBytes($"Test message body {DateTime.UtcNow}");
sender.SendAsync(message).GetAwaiter().GetResult();
}

//Receiver Loop
var result = receiver.ReceiveAsync(5, TimeSpan.FromMilliseconds(5000)).GetAwaiter().
GetResult()?.OrderBy(message => message.SystemProperties.SequenceNumber).ToList();

var seqNumbers = result.Select(item => item.SystemProperties.SequenceNumber).ToList();

if (CheckMissingSequenceNumberExists(lastSeq + 1, seqNumbers))
{
throw new Exception($"Missing Sequence Number. Expect: {lastSeq + 1}, Get: {string.Join(",", seqNumbers)}");
}

lastSeq = seqNumbers[seqNumbers.Count - 1];
var lockTokens = result.Select(item => item.SystemProperties.LockToken).ToList();
receiver.CompleteAsync(lockTokens).GetAwaiter().GetResult();
}
}

// Define other methods and classes here
public static bool CheckMissingSequenceNumberExists(long initialNum, List<long> dequeuedSequenceNumbers)
{
Console.WriteLine("initialNum:{0}, dequeuedSequenceNumbers:{1}", initialNum, dequeuedSequenceNumbers[0]);

if (dequeuedSequenceNumbers[0] != initialNum)
{
return true;
}

//Check sequence number is increased by 1, and if not, set the flag as true.
for (int i = 1; i < dequeuedSequenceNumbers.Count; i++)
{
if (dequeuedSequenceNumbers[i] - dequeuedSequenceNumbers[i - 1] != 1)
{
return true;
}
}

return false;
}

public static class ServiceBusClientFactory
{
public static string ServiceBusConnectionString { get; } = "Your connection String";
public static string TopicName { get; } = "Your Topic Name";
public static string SubscriptionNameForDbSettlement { get; } = "Your Subscrption Name";

public static ITopicClient CreateTopicClient()
{
return new TopicClient(ServiceBusConnectionString, TopicName);
}

public static IMessageReceiver CreateMessageReceiver()
{
return new MessageReceiver(ServiceBusConnectionString, EntityNameHelper.FormatSubscriptionPath(TopicName, SubscriptionNameForDbSettlement), ReceiveMode.PeekLock);
}
}
}
}

Below sample code can prevent it:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ServiceBusTestClient
{
class Program
{
static void Main()
{
// Create sender and receiver
var sender = ServiceBusClientFactory.CreateTopicClient();
var receiver = ServiceBusClientFactory.CreateMessageReceiver();

// Please change this value according to your service bus settings.
long lastSeq = 82354;

var rand = new Random();
while (true)
{
// Sender Loop
var senderSize = rand.Next(1, 10);
for (var i = 0; i < senderSize; i++)
{
// Send one message
var message = new Message();
message.Body = Encoding.UTF8.GetBytes($"Test message body {DateTime.UtcNow}");
sender.SendAsync(message).GetAwaiter().GetResult();
}

// *******************************************************************************
// With .NET, you enable the Prefetch feature by setting the PrefetchCount property of a MessageReceiver, QueueClient, or SubscriptionClient to a number greater than zero. Setting the value to zero turns off prefetch.

//You can easily add this setting to the receive-side of the QueuesGettingStarted or ReceiveLoop samples' settings to see the effect in those contexts.

//While messages are available in the prefetch buffer, any subsequent Receive / ReceiveAsync calls are immediately fulfilled from the buffer, and the buffer is replenished in the background as space becomes available. If there are no messages available for delivery, the receive operation empties the buffer and then waits or blocks, as expected.
receiver.PrefetchCount = 5;
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch
// ********************************************************************************
//Receiver Loop
var result = receiver.ReceiveAsync(5, TimeSpan.FromMilliseconds(5000)).GetAwaiter().
GetResult()?.OrderBy(message => message.SystemProperties.SequenceNumber).ToList();

var seqNumbers = result.Select(item => item.SystemProperties.SequenceNumber).ToList();

if (CheckMissingSequenceNumberExists(lastSeq + 1, seqNumbers))
{
throw new Exception($"Missing Sequence Number. Expect: {lastSeq + 1}, Get: {string.Join(",", seqNumbers)}");
}

lastSeq = seqNumbers[seqNumbers.Count - 1];
var lockTokens = result.Select(item => item.SystemProperties.LockToken).ToList();
receiver.CompleteAsync(lockTokens).GetAwaiter().GetResult();
}
}

// Define other methods and classes here
public static bool CheckMissingSequenceNumberExists(long initialNum, List<long> dequeuedSequenceNumbers)
{
Console.WriteLine("initialNum:{0}, dequeuedSequenceNumbers:{1}", initialNum, dequeuedSequenceNumbers[0]);

if (dequeuedSequenceNumbers[0] != initialNum)
{
return true;
}

//Check sequence number is increased by 1, and if not, set the flag as true.
for (int i = 1; i < dequeuedSequenceNumbers.Count; i++)
{
if (dequeuedSequenceNumbers[i] - dequeuedSequenceNumbers[i - 1] != 1)
{
return true;
}
}

return false;
}

public static class ServiceBusClientFactory
{
public static string ServiceBusConnectionString { get; } = "Your connection String";
public static string TopicName { get; } = "Your Topic Name";
public static string SubscriptionNameForDbSettlement { get; } = "Your Subscrption Name";

public static ITopicClient CreateTopicClient()
{
return new TopicClient(ServiceBusConnectionString, TopicName);
}

public static IMessageReceiver CreateMessageReceiver()
{
return new MessageReceiver(ServiceBusConnectionString, EntityNameHelper.FormatSubscriptionPath(TopicName, SubscriptionNameForDbSettlement), ReceiveMode.PeekLock);
}
}
}
}