Robust messaging with Azure Service Bus (2023)

What is ServiceBus?

  • Fully managed companynews broker

  • decoupleApplications and services from each other

  • Newstailsand Publish/Subscribesubjects

  • load balancing overCompeteworkforce

  • High level ofreliabilityAndthroughput

What will this document do?

  • distinguish betweenMessagesAndevents

  • Look at themOrchestratorenor service bus who canTo produceAndConsumeMessages

  • Dive into Service BusconceptsAndAdvanced features

  • A detailed look at some of the code samples forproducersAndconsumer

  • The code examples discussed are available atSource code

What is a message?

Robust messaging with Azure Service Bus (1)

Robust messaging with Azure Service Bus (2)

  • raw data Producedto be through a ministryConsumedorStoredelsewhere

  • Contains the data that triggered theNewsPipeline

  • editorof theNewshas an expectation about how thatconsumertreats theNews

  • Contractexists between the two sides

    • WHOsend the message?

    • Wasis the message?

    • Wois the message sent?

Service Bus - Deep Dive

  • Intended for traditional enterprise applications that requireTransactions, ordering, duplicate detection and instant consistency

  • Allows cloud-native applications to be deployed reliablyState Transition Managementfor business processes

  • Relievedhighly secureAndReliableCommunication across hybrid cloud solutions and can connect existing on-premises systems with cloud solutions

  • Brokered MessagingSystem. It stores messages in aestate agents(for example a queue) until the consuming party is ready to receive the messages

  • Available as eitherdata streamsorevent stack

  • Reliably asynchronousmessaging (Enterprise Messaging als Service) which requires a query

  • Advanced messaging features likeFIFO, sessions, transaction ranges, dead-lettering and filtering, and duplicate detection

  • At least onceDelivery

  • Optionalin orderDelivery

tails

Robust messaging with Azure Service Bus (3)

  • OfferFirst in, first out(FIFO) messaging to one or more competing consumers

  • recipient usuallyReceiveAndProceedingsMessages in the order in which they were added to the queue, and only one message consumer receives and processes each message

  • producers(Sender) Andconsumer(Recipient) do not have to send and receive messages at the same time, since messages are permanently stored in the queue

  • Manufacturerdon't have to wait for an answer from theconsumerto further process and send messages

  • The associated advantage isLast levelingthis allowsproducersAndconsumerto send and receive messages at different rates

  • mediated transmission

Robust messaging with Azure Service Bus (4)

  • Maximize uptime- Delays in performance do not directly and immediately affect the application

  • Manufacturercan continue to send messages to the queue even if the service is unavailable or not currently processing messages

  • consumercan process at its own pace without becoming overloaded

  • Maximize scalabilitybecause both the number of queues and the number of services can be varied as required

Competing Consumer

Robust messaging with Azure Service Bus (5)

  • SeveralRecipientcompete for messages on the samequeue

  • Offers Automaticallyload balancingof working on recipients who volunteerjobs

  • Improvedreliability

    • Messagesare not sent to a specific service instance

    • A failed service instance does not block aManufacturer

    • Messagescan be processed by any functioning service instance.

  • Scalable- The system can dynamically increase or decrease the number of instances of consumer service as message traffic fluctuates.

  • Improveelasticity

    • consumerservice instancereadsAndprocessesDieNewsas part of a transaction operation

    • If theconsumerservice instance fails, this pattern can ensure that the message is returned to the queue for another instance of the consumer service to fetch and process

Themes and Subscriptions

Robust messaging with Azure Service Bus (6)

  • Provides a one-to-many form of communication in which aPublish/SubscribeMuster

  • Useful for scaling to large numbers ofRecipient, everypublishedMessage will be provided to everyoneSubscriptionregistered with theTheme

  • Messagesare sent to aThemeand delivered to one or more AssociatesSubscriptions

Vorabruf

Robust messaging with Azure Service Bus (7)

  • read aheadThe feature can be enabled for a Service Bus client

  • TheRecipientacquiresAdditionallyNO. of messages than originally intended by the application

  • With prefetching enabled, Service Bus behaves differently in the following two scenarios

    • Receive and deleteModus

      • In the captured messagesPrefetch-Pufferare no longer available in the queue

      • Messages exist in in-memoryPrefetch-Pufferup to the application

      • news areIrrecoverableorLost, if the client application ends before the messages are outReceiveinto the application

    • Peek-LockModus

      • Messagesbrought into thePrefetch-Pufferare in oneBlockedCondition

      • Falls diePrefetch-Pufferis large and processing takes longer than the message lock time - the subscribing client must take appropriate action according to the requirements

      • If theLock outexpiressilentimPrefetch-Puffer, the message is treated asLeaveand is made available again for retrievalqueue

      • If the requirement is to be metHigh reliabilitySince message processing and processing requires significant labor and time, it is recommended to use theread aheadFunction conservatively or not at all

      • If the requirement is to be metHigh throughputand message processing is in generalfaster Faster, the recommendation is to goread aheadoption and benefit from significant throughput advantages

Service bus – reliability

throttling

  • Throttling from an external system that Service Bus depends on

  • Throttling is done through interactions withstorageAndcalculateresources

  • Activatepartitioningon onequeueorThemeto reduce the chance of throttling

Problem for an Azure dependency

  • Issue for a system that Service Bus depends on. For example, a specific portion of memory may have problems

  • To work around these types of problems, Service Bus regularly investigates and implements workarounds

  • Due to the nature of mitigation, it can take up to 15 minutes for a sent message to appear in the affected queue or subscription and be ready to be received

Service Bus errors on a single subsystem

  • Service Bus failure on a single subsystem. In this situation, a compute node may fall into an inconsistent state and must restart itself, causing all entities it serves to load balance to other nodes. This in turn can result in a short period of slow message processing

  • The client application generates aSystem.TimeoutExceptionorMessagingExceptionException. Service Bus includes a solution to this problem in the form of automated retry logic for clients. Once the retry period expires and the message is not delivered

Peek-Sperre –At least once

Robust messaging with Azure Service Bus (8)

  • Sendersends the messages.

  • RecipientBlocks the message from other recipients.

  • The nextRecipientlocks the next message.

  • Completeremoves the message from the queue.

  • Give upreleases the lock and makes it available for the next recipient

Receive and delete –At most once

Robust messaging with Azure Service Bus (9)

  • Sendersends the messages

  • Recipientreceives the message and deletes it from the queue

  • The next recipient receives andDeletesthe next message

  • Give upAndCompleteOperations are not required

news sessions

Robust messaging with Azure Service Bus (10)

  • sessionsallow sets of related sets of related messages to be pinned to a recipient even when using competing consumers

  • meetingstate can be used to store an intermediate state for a session

  • Asession receiveris created by a client accepting a session

  • If the session is accepted and held by a client, the client holds an exclusive lock on all messages using that sessionSession-IDqueued or subscribed. It also holds exclusive locks on all messages with theSession-IDthat comes later

  • TheLock outis released when you call close methods on the receiver or when the lockExpires. There are methods on the receiver tooRenewalso the locks

  • If severalSimultaneous Recipientspull out ofqueue, the messages that belong to a particular onemeetingare sent specificallyRecipientcurrently holding the lock for this session

  • Thesession lockheld by thesession receiveris an umbrella for the news blocks used by thePeek-Lockbilling mode

  • only oneRecipientcan lock a session

  • ARecipientmay have many in-flight messages, but messages are received in order

  • Give upa message causes the same message to be delivered again on the next reception

Robust messaging with Azure Service Bus (11)

duplicate detection

  • Helps to keep track of the application controlledMessage IDall messages sent to a queue or topic during a specific time window

  • When a new message is sent withMessage IDlogged during the time window, the message is reported as accepted (The send process is successful)

  • Resent message is instantignoredAndDropped. No other parts of the message than thatMessage IDare considered

  • application control ofidentifieris essential, because this is the only way to bind the applicationMessage IDinto a business process context from which to predictably reconstruct when an error occurs

  • duplicate detectionCourse time windows can be configured during whichMessage IDsare retained. By default, this value is 10 minutes for queues and topics, with a minimum of 20 seconds and a maximum of 7 days

  • Activateduplicate detectionand directly affect the size of the windowqueue(AndTheme) throughput since all recordedMessage IDsmust be compared with the newly transmitted message ID

  • Keeping the window small means lessMessage IDsneed to be maintained and adjusted and therefore have less impact onthroughput

  • For high throughput entities that requireduplicate detection, ideally keep thewindow as small as possible

schedule messages

  • You can send messages to a queue or topic for deferred processing; B. to schedule a job to become available for processing by a system at a specific time. This capability realizes a reliable distributed time-based scheduler.

  • Scheduled messages are not queued until the defined queue time. Prior to this time, scheduled messages may be cancelled. Cancel deletes the message

  • The SequenceNumber for a scheduled message is only valid while the message is in this state. When the message transitions to the active state, the message is appended to the queue as if it were queued at the current time, which involves assigning a new SequenceNumber

  • Because the feature is anchored in individual messages and messages can only be queued once, Service Bus doesn't support recurring schedules for messages

Dead letter messages

Robust messaging with Azure Service Bus (12)

  • ASecondary subqueue, calledDead-letter queue (DLQ)

  • Thedead letterQueue does not have to be created explicitly

  • Cannot be deleted or managed independently from the main entity

  • Keeps news that can't bedeliveredto eachRecipient, or news that couldn't beprocessed

  • Messages can then be removed from theDLQAndinspected. An application can fix problems and resubmit the message

  • Severalactivitiesin Service Bus can cause messages to be pushed to theDLQ

  • ReceptionThe application can also explicitly move messages to theDLQ

Postponed Messages

Robust messaging with Azure Service Bus (13)

  • Deferral is a feature built specifically for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a specific order

  • You may need to defer processing of some received messages until required prior work, informed by other messages, is completed

  • Ultimately, the delay helps rearrange messages from the order in which they were received into an order in which they can be processed, while keeping the messages safely in the message store for which processing must be deferred

  • If a message cannot be processed because a particular resource is temporarily unavailable to process that message, but message processing shouldn't be suspended abruptly, one way to put that message aside for a few minutes is to look at the sequence number to remember in ascheduled messageto be published in a few minutes and retrieve the deferred message again when the scheduled message arrives

  • If a message handler depends on a database for all operations, and that database is temporarily unavailable, it should not use a delay but should suspend receiving messages altogether until the database becomes available again

Retrieve deferred messages

  • Deferred messages remain in the main queue (as opposed to undeliverable messages that are in a subqueue), but they can no longer be received using the regular receive operations. Deferred messages can be discovered viaSearch messageswhen an application loses track of them

  • To retrieve a deferred message, its owner is responsible for remembering the sequence number when deferring it

  • Any recipient who knows the sequence number of a deferred message can later receive the message by using receive methods that take the sequence number as a parameter. For more information about sequence numbers, seeMessage Sequencing and Timestamps

Automatic forwarding

Robust messaging with Azure Service Bus (14)

  • When the target entity accumulates too many messages and exceeds the quota or the target entity is disabled, the source entity adds the messages to itDead-letter queueuntil there is space at the finish(or the entity will be activated again)

  • Messages still reside in the dead-letter queue, so you must explicitly receive and process them from the dead-letter queue

  • A first-level topic with 20 subscriptions, each chained to a second-level topic with 200 subscriptions, allows higher throughput than a first-level topic with 200 subscriptions, each chained to a second-level topic with 20 subscriptions are

  • To create a subscription that is chained to another queue or topic, the subscription creator must useAdministerPermissions for both the source and target entities. Sending messages to the source topic requires onlySendPermissions for the source topic

  • Messages exceeding 4 hops are undeliverable

Robust messaging with Azure Service Bus (15)Robust messaging with Azure Service Bus (16)

transaction processing

Robust messaging with Azure Service Bus (17)

  • The grouptwo or more operations together into oneexecution area

  • Make sure everyoneThe operationBelonging to a specific group of operations eitherSuccessfulorFailtogether

  • transactionsact as a unit, often referred to asatomicity

  • Sendmultiple messages to oneQueue/Topicfrom inside atransaction area

    • Messageswill only be committed if thetransactionsuccessfully completes.

  • The operationavailable within a transaction area -

    • Send

    • Complete

    • Give up

    • dead letter

    • Postpone

    • Renewlock out

  • Message received from sendertransmission queueorTheme

    • transmission queueorThemeImmediately moves the message to the desired locationdestination queueorTheme

    • Newsis neverCommittedfor thetransmission queueorThemeand is therefore not visible to themconsumer

  • order of transactionswithin thetransaction areais important; e.g.

    • Receive- Read from thattransmission queueorTheme

    • from you1- Send toQueue1orThema1

    • send2- Send totail2orThema2

code samples

How to send messages

SendQueueMessageAsync

private async Task<ResponseModel> SendQueueMessageAsync (string queueNameString, HeaderModel headerModel, List<MessageModel> messagesList) {kServiceBusClient = new ServiceBusClient(headerModel.ConnectionString); var serviceBusSender = kServiceBusClient.CreateSender(queueNameString); var serviceBusMessagesList = PrepareAllQueueMessages(messagesList); ResponseModel responseModel = null; try { await serviceBusSender.SendMessagesAsync(serviceBusMessagesList); responseModel = new ResponseModel() { Code = 200, Message = $"message batch sent:{serviceBusMessagesList.Count}" }; } catch(ServiceBusException ex) {responseModel = new ResponseModel() { Code = 400, Message = ex.Message }; } finally { waiting for serviceBusSender.DisposeAsync(); } return response model;}

ScheduleQueueMessageAsync

private async Task<List<ResponseModel>> ScheduleQueueMessageAsync (string queueNameString, HeaderModel headerModel, List<MessageModel> messagesList, Dictionary<string, int> queryStringMap){ kServiceBusClient = new ServiceBusClient(headerModel.ConnectionString); var serviceBusSender = kServiceBusClient.CreateSender(queueNameString); var serviceBusMessagesList = PrepareAllQueueMessages(messagesList); int delayMinutes = (int)(queryStringMap["delayBy"])/60; lange Zeitplansequenz = 0; var responseModelsList = new List<ResponseModel>(); try { var scheduledTasksList = serviceBusMessagesList.Select (async (ServiceBusMessage serviceBusMessage) => { scheduleSequence = await serviceBusSender.ScheduleMessageAsync (serviceBusMessage, DateTimeOffset.Now.AddMinutes(delayMinutes)); var responseModel = new ResponseModel() { Code = 200, Message = $"Nachricht geplant:{scheduleSequence}" };responseModelsList.Add(responseModel); }).ToList(); warte auf Task.WhenAll(scheduledTasksList); } catch (ServiceBusException ex) { var responseModel = new ResponseModel() { Code = 400, Message = ex.Message }; responseModelsList.Add (AntwortModell); } endlich { warte auf serviceBusSender.DisposeAsync(); } return responseModelsList;}

How to receive messages

ReadFromDeadLetterQueue

public async Task<IActionResult> ReadFromDeadLetterQueue (string queueNameString, [FromHeader] HeaderModel headerModel){ kServiceBusClient = new ServiceBusClient(headerModel.ConnectionString); var deadLetterReceiver = kServiceBusClient.CreateReceiver(queueNameString,new ServiceBusReceiverOptions(){SubQueue = SubQueue.DeadLetter, ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete }); MessageModel ReceivedModel = null; ErrorModel errorModel = null; try { var ReceivedMessage = await deadLetterReceiver.ReceiveMessageAsync(kWaitTimeSpan); if (receivedMessage == null) throw new ArgumentNullException(nameof(receivedMessage)); ReceivedModel = JsonConvert.DeserializeObject<MessageModel> (Encoding.UTF8.GetString(receivedMessage.Body)); if (receivedModel == null) throw new ArgumentNullException(nameof(receivedModel)); } catch (ArgumentNullException ex) { errorModel = new ErrorModel() { Code = 500, Message = ex.Message }; } catch (ServiceBusException ex) { errorModel = new ErrorModel() { Code = 500, Message = ex.Message }; } endlich { await deadLetterReceiver.DisposeAsync(); } return Ok((receivedModel != null) ? ReceivedModel : errorModel);}

Servicebus-Listener

// Service Bus SDK clientprivate ServiceBusClient _serviceBusClient;// Service Bus message processorprivate ServiceBusProcessorOptions _serviceBusProcessorOptions;private ServiceBusProcessor _serviceBusProcessor;// Service Bus session based message processorprivate ServiceBusSessionProcessorOptions _serviceBusSessionProcessorOptions;private ServiceBusSessionProcessor _serviceBusSessionProcessor;....publicProcessor){(string connection MessageString) string connectionString) _connectionString = ConnectionString; _serviceBusClient = new ServiceBusClient(connectionString);}.....
öffentliche asynchrone Aufgabe StartProcessingAsync(MessageProcessorCallback messageProcessorCallback){ _serviceBusProcessor.ProcessMessageAsync += (ProcessMessageEventArgs processMessageEventArgs) => {messageProcessorCallback.Invoke(processMessageEventArgs, null); return Task.CompletedTask; }; _serviceBusProcessor.ProcessErrorAsync += (ProcessErrorEventArgs processErrorEventArgs) => {messageProcessorCallback.Invoke (null, processErrorEventArgs); return Task.CompletedTask; }; warte auf _serviceBusProcessor.StartProcessingAsync();}

Start message processing for a session

öffentliche asynchrone Aufgabe StartSessionProcessingAsync (MessageSessionProcessorCallback messageSessionProcessorCallback){ _serviceBusSessionProcessor.ProcessMessageAsync += (ProcessSessionMessageEventArgs processSessionMessageEventArgs) => {messageSessionProcessorCallback.Invoke(processSessionMessageEventArgs, null); return Task.CompletedTask; }; _serviceBusSessionProcessor.ProcessErrorAsync += (ProcessErrorEventArgs processErrorEventArgs) => {messageSessionProcessorCallback.Invoke (null, processErrorEventArgs); return Task.CompletedTask; }; await _serviceBusSessionProcessor.StartProcessingAsync();}

How to perform transaction processing

ForwardTopicAsync

public async Task<IActionResult> ForwardToTopicAsync (string topicNameString, string subscriptionNameString, [FromHeader] ForwardHeaderModel forwardHeaderModel, [FromQuery] Dictionary<string, string> queryStringMap){ var serviceBusClientOptions = new ServiceBusClientOptions() { EnableCrossEntityTransactions = true, TransportType = ServiceBusTransportType.AmqpTcp } ; kServiceBusClient = new ServiceBusClient(forwardHeaderModel.ConnectionString, serviceBusClientOptions); var serviceBusReceiverOptions = new ServiceBusReceiverOptions() {PrefetchCount = 2, ReceiveMode = ServiceBusReceiveMode.PeekLock}; ServiceBusReceiver serviceBusReceiver = null; ServiceBusSender nextHopSender = null; OCRModel ReceivedModel = null; ErrorModel errorModel = null; try { var sessionNameString = queryStringMap["session"]; var nextHopTopicNameString = forwardHeaderModel.NextHopTopicName; var nextHopSessionNameString = forwardHeaderModel.NextHopSessionName; serviceBusReceiver = kServiceBusClient.CreateReceiver (topicNameString, subscriptionNameString, serviceBusReceiverOptions); nextHopSender = kServiceBusClient.CreateSender(nextHopTopicNameString); var ReceivedMessage = ServiceBusReceiver abwarten?.ReceiveMessageAsync(kWaitTimeSpan); if (receivedMessage == null) throw new ArgumentNullException(nameof(receivedMessage)); ReceivedModel = JsonConvert.DeserializeObject<OCRModel> (Encoding.UTF8.GetString(receivedMessage.Body)); if (receivedModel == null) throw new ArgumentNullException(nameof(receivedModel)); using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) { await serviceBusReceiver.CompleteMessageAsync(receivedMessage); var serviceBusMessage = neue ServiceBusMessage (receivedMessage); serviceBusMessage.TransactionPartitionKey = ReceivedMessage.PartitionKey; serviceBusMessage.SessionId = nextHopSessionNameString; warte auf nextHopSender.SendMessageAsync (serviceBusMessage); ts.Complete(); } } catch (ArgumentNullException ex) { errorModel = new ErrorModel () { Code = 400, Message = ex.Message }; } catch (ServiceBusException ex) { errorModel = new ErrorModel() { Code = 500, Message = ex.Message }; } finally { if (serviceBusReceiver != null) await serviceBusReceiver.DisposeAsync(); if (nextHopSender != null) warte auf nextHopSender.DisposeAsync(); } return Ok((receivedModel != null) ? ReceivedModel : errorModel);}

references

  • bus

  • Queues, Topics and Subscriptions

  • Advanced features

  • security base

Top Articles
Latest Posts
Article information

Author: Carmelo Roob

Last Updated: 03/28/2023

Views: 5974

Rating: 4.4 / 5 (45 voted)

Reviews: 92% of readers found this page helpful

Author information

Name: Carmelo Roob

Birthday: 1995-01-09

Address: Apt. 915 481 Sipes Cliff, New Gonzalobury, CO 80176

Phone: +6773780339780

Job: Sales Executive

Hobby: Gaming, Jogging, Rugby, Video gaming, Handball, Ice skating, Web surfing

Introduction: My name is Carmelo Roob, I am a modern, handsome, delightful, comfortable, attractive, vast, good person who loves writing and wants to share my knowledge and understanding with you.