- Preview
- Messaging in Azure (Storage queue, Event Hub, Event Grid, Service Bus)
- IBM MQ
- Gateway – Ocelot
Preview
- Applicazioni Cloud-native
- Sono pensate per approfittare dei benefici del ‘cloud computing’ come flessibilità, scalabilità e deploy rapido.
- Microservice =>
- Queste applicazioni sono costitutite di componenti piccoli e riusabili chiamati ‘Microservice’.
- Spesso ‘impacchettati’ in ‘container’, I ms. lavorano insieme per comporre un’intera applicazione
- Backbone di comunicazione condivisa
- I ms. devono avere un mezzo per comunicare (sincrono o asincrono) e tra loro e operare in concerto.
- Meccanismi per creare la backbone
- Message Brokers
- Azure Service Bus, IBM MQ, RabbitMQ
- Rest API
- Message Brokers
- Microservice vs Monolith =>
- Esempio di un’applicazione tradizionale
-
- Esempio della stessa applicazione sviluppata tramite ms.
-
-
- Ogni componente ha la sua propria applicazione e il suo proprio DB
- L’applicazione originale non implementa piu’ le componenti ma solo le chiamate ad esse.
- Vantaggi
- Permette il codaggio in parallelo.
- Deployabile indipendentemente => permette il rilascio in parallelo, non é necessario fare il deploy di tutta l’applicazione ma solo del ms. interessato.
- Scalabile indipendetemente => é sufficiente fare lo scaling a di quei ms. che lo necessitano e non a tutta l’applcazione.
- No bulk code.
- Recude downtime =>
- Se un ms. ha dei problemi hardware, gli altri potenzialmente possono continuare a funzione.
- Quando il ms. ritornerà attivo, potrà tornare a processare le richieste pendenti (inviate dagli altri ms. rimasti attivi (nel caso di comunicazione asincrona tra ms.)
- Multiplatform => permette di sviluppare una parte con uno stack tecnologico (es: ASP.NET) e altre parti con altri stack tecnologici (es: JAVA)
-
- Miscroservice Architecture Style =>
-
- Un progetto si costituisce in realtà di diversi sotto-entità fisicamente divise e con un accoppiamento debole tra loro.
- Ogni ‘componente’ sarà un ms. con una sua propria applicazione
- Ogni componente comunica (sincrono o asincrono) tramite gli altri tramite un ‘servizio bus’
- Tutti i ms. sono impacchettati in un gateway
- L’applicazione client comunica con il gateway
- Esempio di flusso =>
- Invece di creare un unico progetto monolitico, struttura la mia app in piccole unità autonome che possono essere sviluppate, testate e deployate autonomamente
- Nel caso di deploy tramite cotainer
- Nelle cartelle di ognuno di tali progetti aggiungo il dockerfile e .dockerignore .
- Eseguo il comando docker build per creare le corrispondenti immagini/template per ognuno dei sotto progetti indicando l’immagine opportuna.
- Eseguo il comando docker run per eseguire le immagini create con il build nei container.
- I progetti potranno comunicare tra loro tramite HTTP.
- Comunication Pattern
- Esistono diversi modi in cui 2 o piu’ applicazioni possono comunicare tra loro
- Request-Response (sincrono)
- E” la maniera tradizionale con cui diversi componenti comunicano tra loro
- Un sender invia una ‘request ‘ a una risorsa receiver e attende che risponda
- Publisher-Subscriber (asincrono)
- In questo caso un’applicazione ‘publisher’ fare il push di un messaggio su di un message broker
- Una o più applicazioni ‘subscriber’ sono all’ascolto e reagiscono alla presenza di un nuovo messaggio sul canale/topic che hanno sottoscritto
- Sender-Receriver (asincrono)
- In questo caso un’applicazione ‘sender’ invia un messaggio ad una ‘queue’
- Una e una sola applicazione ‘receiver’ legge tale coda e elabora i lmessaggi che arrivano secondo la propria capacità di eleaborazione indipendentemente dalla quantità di messaggi in arrivo
- Request-Response (sincrono)
- Esistono diversi modi in cui 2 o piu’ applicazioni possono comunicare tra loro
- Event vs Message
- Event => E’ un’informazione che viene pubblicata o emessa senza nessun interesse verso chi la leggerà (es: un cambiqamento di stato).
- Message => E’ un’informazione che viene pubblicata o emessa legata e l’applicazione ‘publisher’ si aspetta che qualcuno elabori tale informazione.
Messaging in Azure (Storage queue, Event Hub, Event Grid, Service Bus)
- Messaging é il cuore di ogni possibile architettura microservices
- Azure ha 4 servizi di messaging completamente gestiti
- Storage Queue
- Event Grid
- Service Bus
- Event Hub
- 1 – Storage Queue =>
-
- Fa parte di ‘Azure Storage Account’ e non comporta costi aggiuntivi
- Performance per 1 messaggio da 1KByte
- 20000 messaggi/s per l’intero account
- 2000 messaggi/s per singola coda
- Max message size => 64KByte
- (Nel portale Azure)
- Creare una ‘Queue’
- Storage account -> Queues -> Create new queue
- Il contenuto del messaggio puo’ essere ad esempio Json
- Inviare un ‘Message’
- Selezionare la queue -> Add message
- E’ possibile indicare una data di espirazione
- E’ possibile indicare se il messaggio é criptato si/no
- Creare una ‘Queue’
- (Tramite API)
- Azure offre delle library per molti linguaggi
- E’ l’implementazione più semplice offerta da Azure per il messaging
- Connection string
- Selezionare lo ‘Storage account’ desiderato -> aprire il menù ‘Access Key’ -> copiare la ‘connection string’
-
//Questa routine fa il polling continuo della coda, ne legge i messaggi e poi li elimina private const string storageConnectionString = "<STORAGE ACCOUNT CONNECTION STRING>"; private const string queueName = "store-messages"; static void Main(string[] args) { // Instantiate a QueueClient which will be used to create and manipulate the queue QueueClient queueClient = new QueueClient(storageConnectionString, queueName); if (queueClient.Exists()) { while (true) { // Get the next message QueueMessage[] retrievedMessages = queueClient.ReceiveMessages(1); if (retrievedMessages.Length > 0) { // Process (i.e. print) the message in less than 30 seconds Console.WriteLine($"Dequeued message: '{retrievedMessages[0].MessageText}'"); // Delete the message queueClient.DeleteMessage(retrievedMessages[0].MessageId, retrievedMessages[0].PopReceipt); } } } }
- 2 – Azure Event Grid =>
-
- Cosa é
- Event-based architecture
- Permette di costruire un’architettura event-based
- Fa il publish di eventi verso destinatari interessati
- No queue
- No order garantito
- L’ordine di publicazione degli eventi nell”Event grid’ non é necessariamente quello con cui i ‘subscriber’ li riceveranno.
- E’ un modello push
- Event-based architecture
- Non é cara
- Fortemente integrato con motli altri servizi offerti da Azure.
- Max event size => 1 MByte
- Performance
- 10.000.000 eventi / s
- 5000 eventi/s per ogni topic definito nel’Event Grid’
- Latenza quasi ridotta al minimo.
- Terminologia
- Event
- Definisce cosa accade (es: blob file aggiunto allo ‘Storage account’, nuova telemetria IOT )
- Publisher
- Definisce chi crea l’evento (es: Microsoft, la mia organizzazione o le altre)
- Event source
- Definisce dove l’evento accade (Storage account o IOT hub)
- Inviano eventi al ‘Event Grid’ (più precisamente ad un ‘topic’)
- Topic
- Dove l’event é inviato.
- Usato per raggruppare eventi correlati
- E’ dove il ‘subscriber’ é all’ascolto
- System topic o custom topic
- Subscription
- Definisce a quale evento il ‘subscriber’ é interessato tramite uno specifico ‘Event handler’
- Event Handler
- Definisce dove il ‘subscriber’ riceve l’event (es: Azure Function o Event Hubs)
- Event
- Use case
- Ho uno storage account che riceve gli ordini da un portale
- Creo un ‘Event Grid’ e pubblico un topic che in ascolto su tale ‘storage account’
- Creo una ‘Function App’ che sottoscrive il topic e tratterà l’ordine che arriva dal portale
- La ‘Function App’ potrebbe ascoltare direttamente il portale tramite un ‘http trigger’ ma si creerebbe un legame sincrono
- Invece con l’utilizzo di un ‘Event Grid’ si crea una comunicazione asincrona tra gl ordini validati nel portale e la funzione che li deve trattare
- Creare un nuovo ‘Event Grid’ topic =>
- Event Grid System Topics
- Nella search bar editare ‘Event Grid System Topics’ (si crea un event grid basato su un topic built-in in Azure)
- Topic type
- Scegliere il tipo di topic (es: storage account)
- Resource
- Indicare il topic (es: indicare quale storage account tra quelli creati nella stessa subscrition e resource group)
- Event Grid System Topics
- Aggiunge una ‘subscription’ al ‘topic’ =>
- Event Type
- Scegli quali tipi di eventi vengono inviati alla tua destinazione.
- Tale lista dipende dal ‘Topic type’ del ‘Topic’ di cui stiamo aggiungendo una ‘subscription’
- Nel caso ad esempio di ‘Topic Type’=Storage Account, l’Event Type’ potrebbe essere ‘Blob created’ o ‘Blob deleted’ ecc.
- Endpoint Type
- Selezionare il tipo di Event Handler che sarà all’ascolto del nostro ‘Topic’
- Endpoint
- Selezionare la risorsa che effettivamente gestirà l’evento publicato del ‘Event Grid’ topic.
- Nel caso ‘EndPoint Type’ = ‘Azure Function’, come ”EndPoint’ sarà necessario indicare il nome di tale funzione.
- Event Type
- (Tramite API)
-
//Installare Microsoft.Azure.WebJobs.Extensions.EventGrid dotnet add package Microsoft.Azure.WebJobs.Extensions.EventGrid dotnet add package Azure.Messaging.EventGrid [FunctionName("ProcessOrderCosmos")] public static void Run( [EventGridTrigger] EventGridEvent eventGridEvent, [Blob("neworders", FileAccess.Write, Connection = "StorageConnectionString")] CloudBlobContainer container, [CosmosDB(databaseName: "readit-orders", collectionName: "order", ConnectionStringSetting = "CosmosDBConnection")] out Order order, ILogger log) { order = null; try { log.LogInformation($"Event details: Topic: {eventGridEvent.Topic}"); string eventBody = eventGridEvent.Data.ToString(); //Deserializing to StorageBlobCreatedEventData var storageData = JsonSerializer.Deserialize(eventBody, new JsonSerializerOptions { PropertyNameCaseInsensitive = true }); //Get the name of the new blob var blobName = Path.GetFileName(storageData.Url); //Get blob from storage var blockBlob = container.GetBlockBlobReference(blobName); var orderText = blockBlob.DownloadText(); //Deserializing to order order = JsonSerializer.Deserialize(orderText, new JsonSerializerOptions { PropertyNameCaseInsensitive = true }); } catch (Exception ex) { log.LogError(ex, "Error in function"); } }
-
- Cosa é
- 3 – Azure Service Bus =>
- Cosa é
- Platform-as-a-Service Message broker
- Full managed (non bisogna preoccuparsi di dischi, cpu, availability..)
- Fornisce un modello per la programmazione asincrona.
- I messaggi sono consegnati in modalità pull cioé i messaggi sono consegnati su richiesta (del receiver)
- Supporta un doppio scenario
- Pub/Sub tramite i ‘Topic’ (come Event Grid)
- Point-to-point tramite le ‘Queue’ (come Storage Queue)
- Supporta diversi ambienti (.NET, Java, Javascript, Phyton)
- Scaricare l’SDK adatto
- Protocolli di rete supportati (compliant) per lo scambio dei messaggi
- AMQP – Advanced Messaging Queuing Protocol
- JMS – Java Message Service (per esemio é possibile migrare un’applicazione JMS java da IBM MQ ad Azure Service Bus senza problemi)
- SBMP – Service Bus Messaging Protocol
- HTTP – HyperText Transfer Protocol
- Competing consumers pattern
- Per una ‘queue’ o un ‘topic’ e la sua ‘subscription’ potrebbe esiste
- più di un potenziale ‘publisher/sender’ che invia messaggi nella ‘queue’ o nel ‘topic’.
- più di potenziale ‘subscriber/receiver’ che riceve messaggi dalla ‘queue’ e da una ‘subscription’ individuale.
- Ogni volta che un potenziale ‘subscriber/receiver’ é pronto per leggere un messaggio, riceve il prossimo messaggio disponibile dalla testa della ‘queue’ o della ‘subcription’.
- Per una ‘queue’ o un ‘topic’ e la sua ‘subscription’ potrebbe esiste
- Ricezione dei messaggi
- Service Bus Receiver
- It is used to simply receive the messages and perform operations on them.
- It is a vanilla implementation that does core operations.
- Service Bus Processor
- It is a wrapper around service bus receiver and works on an event based model to receive messages and perform operations on them.
- It contains a callback method to perform operations on received messages.
- Service Bus Receiver
- Caratteristiche avanzate
- Partitions (aumenta availability) =>
- Cosa é
-
-
-
- Cosa é
- Sono legate a come i messaggi sono memorizzati nel ‘service bus’.
- Permetto di aumentare l’availability del ‘service bus’ stesso.
- Partitioning enable =>
- Normalmente quando viene creata una ‘queue’ o ‘subscription’ questa viene allocata ad una ‘Message unit‘ e un ‘Messaging Store‘.
- Quando la ‘partition’ é attivata in effetti i messaggi di una ‘queue’ o ‘subscription’ vengono trasmessi fino a 16 ‘message unit’.
- Availability over consistency
- Attiivare il ‘partitiong’ significa che in caso di problemi di una ‘message unit’ si vuole continuare a eleborare i messaggi
- Nonostante si sappia che il ‘receiver’ otterrà il maggior numero possibile di messaggi ma non necessariamente tutti o non nell’ordine di invio
- I messaggi inviati nella ‘message unit’ che ha avuto problemi saranno processati dopo nonostante potenzialmente inviati prima
- PartitionKey
- Quando il ‘partitioning’ é attivato, il ‘sender’ dovrà specificare in modo opportuno la ‘PartitionKey’ da legare al messaggio.
- Se non indicata, Azure Service Bus sceglierà la partitionKey autonomamente.
- Permetta di legare tra loro i messaggi che si vuole siano ricevuti insieme (es: CustomerId o OrderId’) dando cosi la priorità alla ‘consistency‘ dei dati.
- Quando il ‘partitioning’ é attivato, il ‘sender’ dovrà specificare in modo opportuno la ‘PartitionKey’ da legare al messaggio.
- Cosa é
-
-
-
-
- Sessions (garantisce FIFO) =>
-
-
-
-
- Cosa é
- Fanno riferimento a come i messaggi sono ricevuti dal ‘service bus’.
- Permettono di creare delle ‘sub-queue‘ =>
- Una ‘sub-queue’ si comporta una ‘queue’ indipendente
- No “head of line blocking”
- Ordine garantito
- Permettono di raggruppare dei messaggi in modo di consegnarli seguendo l’ordine di push nella ‘queue’ o ‘subscription’ (nonostrante la ‘competing consumers’).
- Multiple consuming application
- Sono utili nel caso di molte ‘consuming application’ in ascolto nel nostro bus.
- Se la ‘queue’ é come una strada, le sessioni sono le corsie.
- Ogni ‘consuming application’ avrà la sua ‘corsia’ nella ‘queue’ o ‘topic’ dove recuperare il messaggio successivo.
- Single consuming application
- Con una sola ‘consuming application’ la sessione non é necessaria
- L’app. sarà semplicemente all’ascolto di tutti i messaggi, ricevendo sempre il prossimo disponibile.
- SessionKey
- Per poter raggruppare i messaggi in una sessione bisogna specificare un ‘sessionKey’.
- Alla creazione e alla ricezione di un nuovo messaggio é necessario indicare la ‘sessionKey’
- Il ‘Consumer’ acquisisce il lock esclusivo di tutte i messsaggi di tale sessione
- Da quel punto in poi, ogni interazione con la ‘queue’ o il ‘topic’ dovrà essere nel contesto della ‘sessionKey’ utilizzata.
- La ricezione di un messaggio in una sessione non sarà più un’operazone di ReceiveMessage ma diventa un’operazione di Accept Message Session
- Cosa é
-
-
-
-
-
- Session State
- All’interno di una sessione é possibile memorizzare dei dati che sono mantenuti tra una ricezione e l’altra creando une processo stateful
- Un nuovo ‘Consumer’ che acquisisca la sessione avrà quindi accesso a tutte le informazioni lasciate dal ‘Consumer’ precedente relative all’elaborazione della sessione.
- Session State
- Dead-letter queue’ per ospitare i messaggi che non possono essere recapitati
- Scheduled delivery
- Possiamo programmare un messaggio da accodare in un momento differito.
- Il messaggio non potrà essere ricevuto fino a quel momento (puo essere fatto il peek)
- Otteniamo il numero di sequenza del messaggio che può essere utilizzato per annullare il messaggio pianificato.
-
-
-
-
-
- L’annullamento del messaggio pianificato lo eliminerà dal servizio.
-
-
-
-
- Gestione delle transazioni =>
- Nel basic-tier non la gestione delle transazioni non é supportata.
- Una transazione permette di ragruppare 2 o più operazioni in un solo ‘ambito di esecuzione‘ che sarà validato o annullato in modo atomico.
- Se ‘Service bus’ accetta un messaggio in transazione significa che ogni successiva operazione su tale messaggio é gestita in modo coordinato
- In modo tale da non perdere dati (il ‘sender’ completa l’operazione ma il ‘receiver’ no)
- In modo tale da non duplicarli (il ‘receiver’ completa la sua operazione ma il ‘sender’ no)
- Una sola entità
- Service Bus supporta il ragruppamento di operazoni in una transazione su una sola entità alla volta (es: queue, subscription).
- ReceiveMessageAsync
- L’operazione di ricezione non puo’ far parte di una transizione.
- Il ‘receiver’ utilizza già il concetto di ‘peeklock’
- Solo dopo apre una transaction scope per trattare il messaggio.
- Operazioni che fanno qualcosa sul messaggio nel broker possono far parte di una transizione:
- Send (SendAsync)
- Complete (CompliteMessageAsync)
- Abandon
- Deadletter
- Defer
- Renew lock
- Gestione delle transazioni =>
-
-
-
- Duplicate detection
- Vengono memorizzati i messaggi inviati nella ‘queue’ o nel ‘topic’ per un certo periodo in modo che possano essere rilevati e non consegnati eventuali duplicati
- Duplicate detection
- Prezzo
- Disponibili 3 tiers
- Dipende dal tier scelto e dal numero di operazioni
- Standard
- 10 dollari al mese => 13 milioni / mese
- Superati 13 milioni si paga per tranche di 1M di operazioni
- 1000 namespace
- Usa risorse condivise => Non é garantito l’uso esclusivo dei CPU Core e della memoria.
-
-
- Common workflow =>
-
-
- ServiceBus tiene traccia del ciclo di vita di ogni messaggio (cambio di stato)
- 1- Dal ‘Producer’ -> al ‘Server‘ =>
- Il ‘producer’ scrive in una ‘queue’
- Invia cioé dati tramite un protocollo di rete al server
- 2 – Nel ‘Server’ =>
- Quando il messaggio arriva al ‘server’, il ServiceBus aggiunge i seguenti dati al messaggio:
- Sequence number (numero che progressa indistintamente ad ogni messaggio ricevuto)
- Server timestamp
- Scrive ‘queue log’ (l messaggio si trova replicato 3 volte)
- Segnala il mesaggio come ‘Accepted’
- Il messaggio diventa => ‘Available‘
- Il messaggio diventa temporaneamente ‘Scheduled‘ nel caso di scheduled delivery
- Il messaggio viene quindi memorizzato temporanealente nel ‘queue log‘ e non é eligibile alla spedizione.
- Tornerà ‘Available‘ passato il tempo indicato per l’invio differito
- Il processo riparte dal punto 2 cioé il messaggio viene risequenzato e ri-timestampted
- Puo’ essere cancellato tramite il numero di seq ritornato dall’API di scheduling
- Quando il messaggio arriva al ‘server’, il ServiceBus aggiunge i seguenti dati al messaggio:
- 3 – Dal ‘Server’ al ‘Consumer’ =>
- Il ‘consumer’ fa il push di un nuovo messaggio
- Il più vecchio messaggio in stato ‘Available’ viene selezionato e assegnato esclusivamente al ‘consumer’
- Il messaggio diventa ‘Acquired‘ e il ‘Consumer’ ne comincia il trasferimento.
- 4 – Nel ‘Consumer‘ =>
- Il ‘Consumer’ finisce il trattamento del messaggio con successo
- Il messaggio diventa ‘Accepted‘
- Il ‘Consumer’ puo’ decidere di tenere da parte un messaggio (se si vuole rispettare una sequenza)
- Il messaggio diventa ‘Deferred‘
- Il messaggio rimane temporaneamente nel ‘queue log‘ e non é eligibile alla spedizione.
- Tornerà ‘Available‘ quando ripristinato (restored).
- Il ‘Consumer’ ha problemi con il trattamento del messaggio o il é scattato il timeout per il lock del messaggio
- Il messaggio puo’ diventare ‘Rejected‘. Il server non deve reinviarlo
- 4A – Nel ‘Server’ =>
- il messaggio é trasferito nella ‘Dead letter queue‘
- 4A – Nel ‘Server’ =>
- oppure il messaggio diventa ‘Released‘. Il server deve reinviarlo
- 4B – Nel ‘Server’ =>
-
- Il messaggio viene rispedito
- Redelivery count aumentato
- Se viene superato il ‘Max Deliveri Count‘ il messaggio é trasferito nella ‘Dead letter queue‘
-
- 4B – Nel ‘Server’ =>
- Il messaggio puo’ diventare ‘Rejected‘. Il server non deve reinviarlo
- Il ‘Consumer’ finisce il trattamento del messaggio con successo
- Creare un nuovo ‘Service Bus’
- Namespace =>
-
-
-
-
- E’ il nome dato al ‘bus service’ e corrisponde all’ URL per raggiungerlo
- E’ come un virtual broker
- Grazie alla politica dei nomi é possibile creare un namespace gerarchico
- DNS => <namespace>.servicebus.windows.net
- Stringa di connessione
- Apro il menu ‘Shared access policies’
- Clicco sulla ‘RootManageSharedAccessKey’
- Recupero la ‘Primary Connection String’
- Endpoint=sb://mangoservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=h3kTEDsVrW6q8zLGpztb3sfH4cdGvXUpKESk4xFp3l0=
-
-
-
-
- Package in Visual Studio
- Installare ‘Azure.Messaging.ServiceBus’ (il vecchio ora deprecato erq ‘Microsoft.Azure.ServiceBus’)
- Package in Visual Studio
- Azure.Messaging.ServiceBus
- API per interagire con il ‘service bus’ (Azure Service SDK)
-
//Azure.Messaging.ServiceBus dotnet add package Azure.Messaging.ServiceBus --version 7.12.0
-
-
- Topics =>
-
-
- Cosa é
- Usato nel caso lo stesso messaggio deve essere letto e processato da più di un receiver
- Un publisher puo’ aprire un topic che puo’ essere sottoscritto da più subscriber
- Un subscriber sottoscrive ad un topic e riceve una copia di tutti i messaggi inviati dal publisher al topic
- Topic => é dove il subriber invia i messaggi
- Subscriber => é il receiver che sottoscrive il topic
- Cosa é
-
-
-
- Creare un topic all’interno del nuovo ‘Service bus‘:
- Indicare nome
- Esempio ‘checkoutmessagetopic’
- Creare una ‘subscription’ a tale ‘topic’:
- Indicare nome
- Numero massimo di messaggi inviati
- esempio ‘mangoOrderSubcription’
- Leggere i messaggi inviati al ‘topic’ nel portalze Azure:
- Service Bus Explorer =>
- In Azure aprire il ‘topic’ di cui si vuole visualizzare i messaggi e aprire il ‘Service Bus Explorer’
- Selezionare la ‘subcription’ desiderata
- Fare il peek dei messaggi
- Service Bus Explorer =>
- Push del messaggio verso il ‘topic’ :
-
IMessageBus _messageBus; CheckoutHeaderDto checkoutHeader; await _messageBus.PublishMessage(checkoutHeader, checkoutTopic); //modello del messsaggio inviato al bus public class CheckoutHeaderDto : BaseMessage { public int Id { get; set; } public DateTime MessageCreated { get; set; } //aggiungo qui i campi aggiuntivi che voglio passare al 'topic' o alla 'queue' } public interface IMessageBus { Task PublishMessage(BaseMessage message, string topicName); } public class AzureServiceBusSending : IMessageBus { public AzureServiceBusSending { myServiceBusConnectionString = "<La connessione primaria al nostro service bus>" myMessageTopic = "<il nome del topic creato su Azure>" } //servizio che invia il topic al bus public async Task PublishMessage(BaseMessage message, string myMessageTopic) { //Instanzio il Service Bus Client e indico al connectionstring al mio Azure Bus Service await using var client = new ServiceBusClient(myServiceBusConnectionString); //Creo un sender verso il topic a cui voglio inviare il messaggio ServiceBusSender sender = client.CreateSender(myMessageTopic); //Serializzo il messaggio var jsonMessage = JsonConvert.SerializeObject(message); ServiceBusMessage finalMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(jsonMessage)) { CorrelationId = Guid.NewGuid().ToString() }; //invio il messaggio await sender.SendMessageAsync(finalMessage); await client.DisposeAsync(); } }
-
- Creare un topic all’interno del nuovo ‘Service bus‘:
-
-
-
- Fetch del messaggio dal ‘topic’ (tramite il processor e la sua funzione di callback):
-
-
-
-
-
public interface IAzureServiceBusConsumer { Task Start(); Task Stop(); } public class AzureServiceBusConsumer : IAzureServiceBusConsumer { private ServiceBusProcessor checkOutProcessor; public AzureServiceBusConsumer(IMessageBus messageBus) { myServiceBusConnectionString = "<La connessione primaria al nostro service bus>" messageCheckoutTopic = "<il nome del topic creato su Azure>" checkoutSubscription = "<il nome della subscription in Azure>" paymentTopic = "<il nome del topic creato su Azure>" paymentSubscription = "<il nome della subscription in Azure>" var client = new ServiceBusClient(myServiceBusConnectionString); //creo il processo responsabile di ascoltare quando arriva un nuovo messaggio nel topic //indicato per la subscription indicata //il processo farà lo start e lancera la routine ad esso associato checkOutProcessor = client.CreateProcessor(messageCheckoutTopic, checkoutSubscription); paymentProcessor = client.CreateProcessor(paymentTopic, paymentSubscription) } public async Task Start() { //imposto la routine da eseguire quando c'è un nuovo messaggio nel bus //definisco un handler (es: OnCheckOutMessageReceived) per il metodo processMessageAsync //tale handler scatta quando c'é un nuovo messaggio nel topic legato alla mia subscription //tale messaggio é inviato come argomento all'handler checkOutProcessor.ProcessMessageAsync += OnCheckOutMessageReceived; checkOutProcessor.ProcessErrorAsync += ErrorHandler; await checkOutProcessor.StartProcessingAsync(); } Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; } private async Task OnCheckOutMessageReceived(ProcessMessageEventArgs args) { var message = args.Message; var body = Encoding.UTF8.GetString(message.Body); //deserializzo il contenuto del corpo del messaggio //uso la stessq classe che il 'publisher' aveva usato per inviare il messaggio CheckoutHeaderDto checkoutHeaderDto = JsonConvert.DeserializeObject<CheckoutHeaderDto>(body); //utilizzo l'oggetto recuperato dal 'topic' .... await args.CompleteMessageAsync(args.Message); } public async Task Stop() { await checkOutProcessor.StopProcessingAsync(); await checkOutProcessor.DisposeAsync(); } }
-
- Filters and actions =>
- Un ‘subscriber’ puo’ definire quali messaggi vuole ricevere da un ‘topic’.
- Subscrition rules
- Definiscono un filtro che seleziona particolari messsaggi dal ‘topic’.
- Action =>
- Opzionalmente puo’ contenere un ‘action’ che annota i messaggi selezionati.
- L’azione può aggiungere / aggiornare /elimnare proprietà dal messaggio originale per produrre il messaggio che arriverà al ‘subscriber’
- Rules senza action
- Tutte le regole sono raggruppate in una (tramite OR) e produrranno un solo messaggio
- Rules con action
- Ogni regola creerà una copia del messaggio
- Esempio =>
- La ‘subscription’ ha 5 regole
- 2 con action
- 3 senza
- Se viene inviato un messaggio al topic; il ‘subscriber’ riceverà 3 messaggi.
- La ‘subscription’ ha 5 regole
-
- Queues =>
-
-
-
- Cosa é
- Usato per comunicazioni end-to-end
- Il sender invia messaggi al ‘message queue’ che li memorizza.
- Il receiver (quando é pronto) estrae i messaggi dalla queue. Solo a quel punto i messaggi sono eliminati.
- I messaggi sono ordinati per data di arrivo (FIFO)
- Non é possibile inviare lo stesso messaggio a più destinatari
- Forward messages
- E’ invece possibile inoltrare i messaggi ricevuti in una ‘queue’ ad un topic dello stesso ‘namespace’ (service bus)
- Cosa é
-
-
-
- Push del messaggio verso la ‘queue’:
- L’implementazione via le API ‘Azure.Messaging.ServiceBus’ é la stesso che nel caso ‘topic’
- Fetch del messaggio dalla ‘queue’ (tramite il receiver):
-
-
-
- Fetch del messaggio dalla ‘queue’ (tramire il processor e la sua funzione di callback):
-
- 4 – Azure Event Hub =>
-
- Cosa é
- E’ un broker per lo streaming di dati in continuo.
- Il broker ed é chiamato ‘namespace’
- E’ pensato per gestire milioni di eventi al secondo senza perdere nessun dato.
- E’ un’implementazione gestita di Kafka
- Un Event Hub puo’ avere molti ‘Event Hub instance’
- Event Hub Instance
- E’ divisa in ‘partition’ (da 1 a 32)
- Partition
- Rappresenta le stream di un singolo evento (puo’ essere vista come una singola coda)
- Garantisce l’ordine degli eventi
- Se invece il messaggio é splittato su più partitions l’ordine non puo’ essere più garantito.
- Corrisponde ad un ‘topic’
- Event Producer
- Sono i servizi che producono ‘Event’
- Cosa é
-
- Event Consumer
- Sono applicazioni che processano gli eventi pubblicati negli ‘Event Hub Istance’
- Segue un modello pull (il consumer fa il pull degli eventi pubblicati nell’hub)
- La ricezione degli eventi é tramite il protocollo AMQP
- Consumer Group
- E’ una vista unica su i dati presenti in una ‘Event Hub Instance’
- Ogni ‘consumer group’ ha i propri ‘offeset’ e ‘checkpoint’
- In termini semplici é un’applicazione che elabora i dati di una ‘Event Hub Instance’
- Se ho più di un ‘consumer’ che fa il pull degli eventi publicati nell’hub posso gestirli tramite i ‘consumer group’. Ogni applicazione si collegerà al proprio ‘consumer group’
- In modo che possa leggere gli eventi in autonomia, con il proprio passo.
- Offset
- E’ la posizione di un ‘event’ all’interno di una ‘partition’
- Checkpoint
- E’ un offset che il consumer memorizza in uno ‘storage account’
- Serve a gestire il restart dell’applicazione ‘consumer’ in modo che non riparta processando l’intera istanza ma solo le ‘partition’ non ancora elaborate.
- Shared Access policy
- E” una ‘connection string’ usata per avere il diritto di pubblicare un evento e leggere un evento in un ‘Event Hub Istance’
- Manage, Send, Listen
- Event Hub SDK
- Event Consumer
IBM MQ
- Nodi verso l’esterno => Gli unici nodi della rete a comunicare con l’sterno sono i nodi HUB e TEST.
- Message Broker
- E’ un message broker ‘queue ordiented’
- E’ un componente che si posiziona a metà tra 2 sistemi che vogliono comunicare (message oriented middleware)
- Message oriented middleware
- Fornisce agli sviluppatori un mezzo standardizzato per gestire il flusso di dati tra i componenti di un’applicazione.
- In modo che possano concentrarsi sulla sua logica di base.
- Le comunicazioni verso i nodi interni alla rete vengono instradate sui Queue Manager (es: HUB e TEST)
- Vantaggi
- Consente comunicazioni asincrone tra i servizi in modo che il servizio di invio non debba attendere la risposta del servizio di ricezione.
- Migliorando la tolleranza ai guasti e la resilienza nei sistemi in cui sono impiegati.
- Implementazione Sigo
- Queue Manager (es: HUB e TEST)
- Instradamento delle informazioni, su due tabelle (ISPOOL, OSPOOL) che mappano il formato dei messaggi richiesto.
- Message queue
- Comonente usato dal broker per archiviare e ordinare i messaggi fino a quando le applicazioni che li consumano non possono elaborarli.
- I messaggi vengono archiviati nell’ordine esatto in cui sono stati trasmessi e rimangono nella coda fino alla conferma della ricezione.
- Queue manager (canali MQSeries)
- Un nodo della rete di cmunicazione gestita dal broker.
- Compnente usato per gestire le interazioni tra più ‘message queue’.
- Fornisce funzionalità di instradamento dei dati, traduzione dei messaggi, persistenza e gestione dello stato del client.
- Formato messaggi
- L’header di ogni singolo record ha 2 parti
- Header1 (19 byte)
- Dimensione record
- Codice mittente
- Codice 1 destinatario
- Codice 2 destinatario
- Header2 (29byte)
- Data di spedizione
- Nome del file
- Numero File
- Modalità aggiornamento
- Filler
- Header1 (19 byte)
- L’header di ogni singolo record ha 2 parti
- Tabella di abbinamento tra i codici degli header MQ (Queue name) e i destinatari
- Sistemi (lista dei sistemi connessi tramite MqSeries)
- Gruppi
- Tabella singolo flusso
- Nomeflusso
- RecordType
- Sistema di monitoraggio delle code (in SQL)
Gateway – Ocelot
- Permette di creare un unico punto di accesso per tutte le API (microservices) utilizzati dalla nostra applicazione client
- File di configurazione json =>
- Url del gateway
- E’ necessario settare un indirizzo al quale il nostro gateway sarà raggiungibile (es: https://localhost:5050)
- Mapping degli end point
- Upstream => L’indirizzo che il gataway riceve dal client per accedere al dato end point (es: https://localhost:5050/api/products/{id} )
- Downstream => L’indirizzo al quale il gateway inoltra la richiesta arrivata dal client (es: https://localhost:7181/api/products/{id} )
- Eventualmente é possibile indicare una chiave per il provider di autenticazione scelto e lo scope
-
"Routes": [ { "DownstreamPathTemplate": "/api/products/{id}", "DownstreamScheme": "https", "DownstreamHostAndPorts": [ { "Host": "localhost", "Port": 7181 } ], "UpstreamPathTemplate": "/api/products/{id}", "UpstreamHttpMethod": [ "GET" ] }, { "DownstreamPathTemplate": "/api/products", "DownstreamScheme": "https", "DownstreamHostAndPorts": [ { "Host": "localhost", "Port": 7181 } ], "UpstreamPathTemplate": "/api/products", "UpstreamHttpMethod": [ "POST", "PUT" ], "AuthenticationOptions": { "AuthenticationProviderKey": "Bearer", "AllowedScopes": [ "mango" ] } } ], "GlobalConfiguration" : { "BaseUrl": "https://localhost:5050" }
- Url del gateway
- File program.cs
-
//é necessario settare un indirizzo per il gateway //click destro sul progettp -> proprietà -> 'Debug' -> 'App Url' using Microsoft.IdentityModel.Tokens; using Ocelot.DependencyInjection; using Ocelot.Middleware; using Ocelot.Values; var builder = WebApplication.CreateBuilder(args); builder.Services.AddAuthentication("Bearer") .AddJwtBearer("Bearer", options => { options.Authority = builder.Configuration["IdentityAPI"]; options.TokenValidationParameters = new TokenValidationParameters { ValidateAudience = false }; }); builder.Services.AddOcelot(); var app = builder.Build(); app.UseOcelot().Wait(); app.Run();
-