What is ServiceBus?
Fully managed companynews broker
decoupleApplications and services from each other
Newstailsand Publish/Subscribesubjects
load balancing overCompeteworkforce
High level ofreliabilityAndthroughput
What will this document do?
distinguish betweenMessagesAndevents
Look at themOrchestratorenor service bus who canTo produceAndConsumeMessages
Dive into Service BusconceptsAndAdvanced features
A detailed look at some of the code samples forproducersAndconsumer
The code examples discussed are available atSource code
What is a message?
raw data Producedto be through a ministryConsumedorStoredelsewhere
Contains the data that triggered theNewsPipeline
editorof theNewshas an expectation about how thatconsumertreats theNews
Contractexists between the two sides
WHOsend the message?
Wasis the message?
Wois the message sent?
Service Bus - Deep Dive
Intended for traditional enterprise applications that requireTransactions, ordering, duplicate detection and instant consistency
Allows cloud-native applications to be deployed reliablyState Transition Managementfor business processes
Relievedhighly secureAndReliableCommunication across hybrid cloud solutions and can connect existing on-premises systems with cloud solutions
Brokered MessagingSystem. It stores messages in aestate agents(for example a queue) until the consuming party is ready to receive the messages
Available as eitherdata streamsorevent stack
Reliably asynchronousmessaging (Enterprise Messaging als Service) which requires a query
Advanced messaging features likeFIFO, sessions, transaction ranges, dead-lettering and filtering, and duplicate detection
At least onceDelivery
Optionalin orderDelivery
tails
OfferFirst in, first out(FIFO) messaging to one or more competing consumers
recipient usuallyReceiveAndProceedingsMessages in the order in which they were added to the queue, and only one message consumer receives and processes each message
producers(Sender) Andconsumer(Recipient) do not have to send and receive messages at the same time, since messages are permanently stored in the queue
Manufacturerdon't have to wait for an answer from theconsumerto further process and send messages
The associated advantage isLast levelingthis allowsproducersAndconsumerto send and receive messages at different rates
- mediated transmission
Maximize uptime- Delays in performance do not directly and immediately affect the application
Manufacturercan continue to send messages to the queue even if the service is unavailable or not currently processing messages
consumercan process at its own pace without becoming overloaded
Maximize scalabilitybecause both the number of queues and the number of services can be varied as required
Competing Consumer
SeveralRecipientcompete for messages on the samequeue
Offers Automaticallyload balancingof working on recipients who volunteerjobs
Improvedreliability
Messagesare not sent to a specific service instance
A failed service instance does not block aManufacturer
Messagescan be processed by any functioning service instance.
Scalable- The system can dynamically increase or decrease the number of instances of consumer service as message traffic fluctuates.
Improveelasticity
consumerservice instancereadsAndprocessesDieNewsas part of a transaction operation
If theconsumerservice instance fails, this pattern can ensure that the message is returned to the queue for another instance of the consumer service to fetch and process
Themes and Subscriptions
Provides a one-to-many form of communication in which aPublish/SubscribeMuster
Useful for scaling to large numbers ofRecipient, everypublishedMessage will be provided to everyoneSubscriptionregistered with theTheme
Messagesare sent to aThemeand delivered to one or more AssociatesSubscriptions
Vorabruf
read aheadThe feature can be enabled for a Service Bus client
TheRecipientacquiresAdditionallyNO. of messages than originally intended by the application
With prefetching enabled, Service Bus behaves differently in the following two scenarios
Receive and deleteModus
In the captured messagesPrefetch-Pufferare no longer available in the queue
Messages exist in in-memoryPrefetch-Pufferup to the application
news areIrrecoverableorLost, if the client application ends before the messages are outReceiveinto the application
Peek-LockModus
Messagesbrought into thePrefetch-Pufferare in oneBlockedCondition
Falls diePrefetch-Pufferis large and processing takes longer than the message lock time - the subscribing client must take appropriate action according to the requirements
If theLock outexpiressilentimPrefetch-Puffer, the message is treated asLeaveand is made available again for retrievalqueue
If the requirement is to be metHigh reliabilitySince message processing and processing requires significant labor and time, it is recommended to use theread aheadFunction conservatively or not at all
If the requirement is to be metHigh throughputand message processing is in generalfaster Faster, the recommendation is to goread aheadoption and benefit from significant throughput advantages
Service bus – reliability
throttling
Throttling from an external system that Service Bus depends on
Throttling is done through interactions withstorageAndcalculateresources
Activatepartitioningon onequeueorThemeto reduce the chance of throttling
Problem for an Azure dependency
Issue for a system that Service Bus depends on. For example, a specific portion of memory may have problems
To work around these types of problems, Service Bus regularly investigates and implements workarounds
Due to the nature of mitigation, it can take up to 15 minutes for a sent message to appear in the affected queue or subscription and be ready to be received
Service Bus errors on a single subsystem
Service Bus failure on a single subsystem. In this situation, a compute node may fall into an inconsistent state and must restart itself, causing all entities it serves to load balance to other nodes. This in turn can result in a short period of slow message processing
The client application generates aSystem.TimeoutExceptionorMessagingExceptionException. Service Bus includes a solution to this problem in the form of automated retry logic for clients. Once the retry period expires and the message is not delivered
Peek-Sperre –At least once
Sendersends the messages.
RecipientBlocks the message from other recipients.
The nextRecipientlocks the next message.
Completeremoves the message from the queue.
Give upreleases the lock and makes it available for the next recipient
Receive and delete –At most once
Sendersends the messages
Recipientreceives the message and deletes it from the queue
The next recipient receives andDeletesthe next message
Give upAndCompleteOperations are not required
news sessions
sessionsallow sets of related sets of related messages to be pinned to a recipient even when using competing consumers
meetingstate can be used to store an intermediate state for a session
Asession receiveris created by a client accepting a session
If the session is accepted and held by a client, the client holds an exclusive lock on all messages using that sessionSession-IDqueued or subscribed. It also holds exclusive locks on all messages with theSession-IDthat comes later
TheLock outis released when you call close methods on the receiver or when the lockExpires. There are methods on the receiver tooRenewalso the locks
If severalSimultaneous Recipientspull out ofqueue, the messages that belong to a particular onemeetingare sent specificallyRecipientcurrently holding the lock for this session
Thesession lockheld by thesession receiveris an umbrella for the news blocks used by thePeek-Lockbilling mode
only oneRecipientcan lock a session
ARecipientmay have many in-flight messages, but messages are received in order
Give upa message causes the same message to be delivered again on the next reception
duplicate detection
Helps to keep track of the application controlledMessage IDall messages sent to a queue or topic during a specific time window
When a new message is sent withMessage IDlogged during the time window, the message is reported as accepted (The send process is successful)
Resent message is instantignoredAndDropped. No other parts of the message than thatMessage IDare considered
application control ofidentifieris essential, because this is the only way to bind the applicationMessage IDinto a business process context from which to predictably reconstruct when an error occurs
duplicate detectionCourse time windows can be configured during whichMessage IDsare retained. By default, this value is 10 minutes for queues and topics, with a minimum of 20 seconds and a maximum of 7 days
Activateduplicate detectionand directly affect the size of the windowqueue(AndTheme) throughput since all recordedMessage IDsmust be compared with the newly transmitted message ID
Keeping the window small means lessMessage IDsneed to be maintained and adjusted and therefore have less impact onthroughput
For high throughput entities that requireduplicate detection, ideally keep thewindow as small as possible
schedule messages
You can send messages to a queue or topic for deferred processing; B. to schedule a job to become available for processing by a system at a specific time. This capability realizes a reliable distributed time-based scheduler.
Scheduled messages are not queued until the defined queue time. Prior to this time, scheduled messages may be cancelled. Cancel deletes the message
The SequenceNumber for a scheduled message is only valid while the message is in this state. When the message transitions to the active state, the message is appended to the queue as if it were queued at the current time, which involves assigning a new SequenceNumber
Because the feature is anchored in individual messages and messages can only be queued once, Service Bus doesn't support recurring schedules for messages
Dead letter messages
ASecondary subqueue, calledDead-letter queue (DLQ)
Thedead letterQueue does not have to be created explicitly
Cannot be deleted or managed independently from the main entity
Keeps news that can't bedeliveredto eachRecipient, or news that couldn't beprocessed
Messages can then be removed from theDLQAndinspected. An application can fix problems and resubmit the message
Severalactivitiesin Service Bus can cause messages to be pushed to theDLQ
ReceptionThe application can also explicitly move messages to theDLQ
Postponed Messages
Deferral is a feature built specifically for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a specific order
You may need to defer processing of some received messages until required prior work, informed by other messages, is completed
Ultimately, the delay helps rearrange messages from the order in which they were received into an order in which they can be processed, while keeping the messages safely in the message store for which processing must be deferred
If a message cannot be processed because a particular resource is temporarily unavailable to process that message, but message processing shouldn't be suspended abruptly, one way to put that message aside for a few minutes is to look at the sequence number to remember in ascheduled messageto be published in a few minutes and retrieve the deferred message again when the scheduled message arrives
If a message handler depends on a database for all operations, and that database is temporarily unavailable, it should not use a delay but should suspend receiving messages altogether until the database becomes available again
Retrieve deferred messages
Deferred messages remain in the main queue (as opposed to undeliverable messages that are in a subqueue), but they can no longer be received using the regular receive operations. Deferred messages can be discovered viaSearch messageswhen an application loses track of them
To retrieve a deferred message, its owner is responsible for remembering the sequence number when deferring it
Any recipient who knows the sequence number of a deferred message can later receive the message by using receive methods that take the sequence number as a parameter. For more information about sequence numbers, seeMessage Sequencing and Timestamps
Automatic forwarding
When the target entity accumulates too many messages and exceeds the quota or the target entity is disabled, the source entity adds the messages to itDead-letter queueuntil there is space at the finish(or the entity will be activated again)
Messages still reside in the dead-letter queue, so you must explicitly receive and process them from the dead-letter queue
A first-level topic with 20 subscriptions, each chained to a second-level topic with 200 subscriptions, allows higher throughput than a first-level topic with 200 subscriptions, each chained to a second-level topic with 20 subscriptions are
To create a subscription that is chained to another queue or topic, the subscription creator must useAdministerPermissions for both the source and target entities. Sending messages to the source topic requires onlySendPermissions for the source topic
Messages exceeding 4 hops are undeliverable
transaction processing
The grouptwo or more operations together into oneexecution area
Make sure everyoneThe operationBelonging to a specific group of operations eitherSuccessfulorFailtogether
transactionsact as a unit, often referred to asatomicity
Sendmultiple messages to oneQueue/Topicfrom inside atransaction area
Messageswill only be committed if thetransactionsuccessfully completes.
The operationavailable within a transaction area -
Send
Complete
Give up
dead letter
Postpone
Renewlock out
Message received from sendertransmission queueorTheme
transmission queueorThemeImmediately moves the message to the desired locationdestination queueorTheme
Newsis neverCommittedfor thetransmission queueorThemeand is therefore not visible to themconsumer
order of transactionswithin thetransaction areais important; e.g.
Receive- Read from thattransmission queueorTheme
from you1- Send toQueue1orThema1
send2- Send totail2orThema2
code samples
How to send messages
SendQueueMessageAsync
private async Task<ResponseModel> SendQueueMessageAsync (string queueNameString, HeaderModel headerModel, List<MessageModel> messagesList) {kServiceBusClient = new ServiceBusClient(headerModel.ConnectionString); var serviceBusSender = kServiceBusClient.CreateSender(queueNameString); var serviceBusMessagesList = PrepareAllQueueMessages(messagesList); ResponseModel responseModel = null; try { await serviceBusSender.SendMessagesAsync(serviceBusMessagesList); responseModel = new ResponseModel() { Code = 200, Message = $"message batch sent:{serviceBusMessagesList.Count}" }; } catch(ServiceBusException ex) {responseModel = new ResponseModel() { Code = 400, Message = ex.Message }; } finally { waiting for serviceBusSender.DisposeAsync(); } return response model;}
ScheduleQueueMessageAsync
private async Task<List<ResponseModel>> ScheduleQueueMessageAsync (string queueNameString, HeaderModel headerModel, List<MessageModel> messagesList, Dictionary<string, int> queryStringMap){ kServiceBusClient = new ServiceBusClient(headerModel.ConnectionString); var serviceBusSender = kServiceBusClient.CreateSender(queueNameString); var serviceBusMessagesList = PrepareAllQueueMessages(messagesList); int delayMinutes = (int)(queryStringMap["delayBy"])/60; lange Zeitplansequenz = 0; var responseModelsList = new List<ResponseModel>(); try { var scheduledTasksList = serviceBusMessagesList.Select (async (ServiceBusMessage serviceBusMessage) => { scheduleSequence = await serviceBusSender.ScheduleMessageAsync (serviceBusMessage, DateTimeOffset.Now.AddMinutes(delayMinutes)); var responseModel = new ResponseModel() { Code = 200, Message = $"Nachricht geplant:{scheduleSequence}" };responseModelsList.Add(responseModel); }).ToList(); warte auf Task.WhenAll(scheduledTasksList); } catch (ServiceBusException ex) { var responseModel = new ResponseModel() { Code = 400, Message = ex.Message }; responseModelsList.Add (AntwortModell); } endlich { warte auf serviceBusSender.DisposeAsync(); } return responseModelsList;}
How to receive messages
ReadFromDeadLetterQueue
public async Task<IActionResult> ReadFromDeadLetterQueue (string queueNameString, [FromHeader] HeaderModel headerModel){ kServiceBusClient = new ServiceBusClient(headerModel.ConnectionString); var deadLetterReceiver = kServiceBusClient.CreateReceiver(queueNameString,new ServiceBusReceiverOptions(){SubQueue = SubQueue.DeadLetter, ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete }); MessageModel ReceivedModel = null; ErrorModel errorModel = null; try { var ReceivedMessage = await deadLetterReceiver.ReceiveMessageAsync(kWaitTimeSpan); if (receivedMessage == null) throw new ArgumentNullException(nameof(receivedMessage)); ReceivedModel = JsonConvert.DeserializeObject<MessageModel> (Encoding.UTF8.GetString(receivedMessage.Body)); if (receivedModel == null) throw new ArgumentNullException(nameof(receivedModel)); } catch (ArgumentNullException ex) { errorModel = new ErrorModel() { Code = 500, Message = ex.Message }; } catch (ServiceBusException ex) { errorModel = new ErrorModel() { Code = 500, Message = ex.Message }; } endlich { await deadLetterReceiver.DisposeAsync(); } return Ok((receivedModel != null) ? ReceivedModel : errorModel);}
Servicebus-Listener
// Service Bus SDK clientprivate ServiceBusClient _serviceBusClient;// Service Bus message processorprivate ServiceBusProcessorOptions _serviceBusProcessorOptions;private ServiceBusProcessor _serviceBusProcessor;// Service Bus session based message processorprivate ServiceBusSessionProcessorOptions _serviceBusSessionProcessorOptions;private ServiceBusSessionProcessor _serviceBusSessionProcessor;....publicProcessor){(string connection MessageString) string connectionString) _connectionString = ConnectionString; _serviceBusClient = new ServiceBusClient(connectionString);}.....
öffentliche asynchrone Aufgabe StartProcessingAsync(MessageProcessorCallback messageProcessorCallback){ _serviceBusProcessor.ProcessMessageAsync += (ProcessMessageEventArgs processMessageEventArgs) => {messageProcessorCallback.Invoke(processMessageEventArgs, null); return Task.CompletedTask; }; _serviceBusProcessor.ProcessErrorAsync += (ProcessErrorEventArgs processErrorEventArgs) => {messageProcessorCallback.Invoke (null, processErrorEventArgs); return Task.CompletedTask; }; warte auf _serviceBusProcessor.StartProcessingAsync();}
Start message processing for a session
öffentliche asynchrone Aufgabe StartSessionProcessingAsync (MessageSessionProcessorCallback messageSessionProcessorCallback){ _serviceBusSessionProcessor.ProcessMessageAsync += (ProcessSessionMessageEventArgs processSessionMessageEventArgs) => {messageSessionProcessorCallback.Invoke(processSessionMessageEventArgs, null); return Task.CompletedTask; }; _serviceBusSessionProcessor.ProcessErrorAsync += (ProcessErrorEventArgs processErrorEventArgs) => {messageSessionProcessorCallback.Invoke (null, processErrorEventArgs); return Task.CompletedTask; }; await _serviceBusSessionProcessor.StartProcessingAsync();}
How to perform transaction processing
ForwardTopicAsync
public async Task<IActionResult> ForwardToTopicAsync (string topicNameString, string subscriptionNameString, [FromHeader] ForwardHeaderModel forwardHeaderModel, [FromQuery] Dictionary<string, string> queryStringMap){ var serviceBusClientOptions = new ServiceBusClientOptions() { EnableCrossEntityTransactions = true, TransportType = ServiceBusTransportType.AmqpTcp } ; kServiceBusClient = new ServiceBusClient(forwardHeaderModel.ConnectionString, serviceBusClientOptions); var serviceBusReceiverOptions = new ServiceBusReceiverOptions() {PrefetchCount = 2, ReceiveMode = ServiceBusReceiveMode.PeekLock}; ServiceBusReceiver serviceBusReceiver = null; ServiceBusSender nextHopSender = null; OCRModel ReceivedModel = null; ErrorModel errorModel = null; try { var sessionNameString = queryStringMap["session"]; var nextHopTopicNameString = forwardHeaderModel.NextHopTopicName; var nextHopSessionNameString = forwardHeaderModel.NextHopSessionName; serviceBusReceiver = kServiceBusClient.CreateReceiver (topicNameString, subscriptionNameString, serviceBusReceiverOptions); nextHopSender = kServiceBusClient.CreateSender(nextHopTopicNameString); var ReceivedMessage = ServiceBusReceiver abwarten?.ReceiveMessageAsync(kWaitTimeSpan); if (receivedMessage == null) throw new ArgumentNullException(nameof(receivedMessage)); ReceivedModel = JsonConvert.DeserializeObject<OCRModel> (Encoding.UTF8.GetString(receivedMessage.Body)); if (receivedModel == null) throw new ArgumentNullException(nameof(receivedModel)); using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) { await serviceBusReceiver.CompleteMessageAsync(receivedMessage); var serviceBusMessage = neue ServiceBusMessage (receivedMessage); serviceBusMessage.TransactionPartitionKey = ReceivedMessage.PartitionKey; serviceBusMessage.SessionId = nextHopSessionNameString; warte auf nextHopSender.SendMessageAsync (serviceBusMessage); ts.Complete(); } } catch (ArgumentNullException ex) { errorModel = new ErrorModel () { Code = 400, Message = ex.Message }; } catch (ServiceBusException ex) { errorModel = new ErrorModel() { Code = 500, Message = ex.Message }; } finally { if (serviceBusReceiver != null) await serviceBusReceiver.DisposeAsync(); if (nextHopSender != null) warte auf nextHopSender.DisposeAsync(); } return Ok((receivedModel != null) ? ReceivedModel : errorModel);}
references
bus
Queues, Topics and Subscriptions
Advanced features
security base