Sprytne rozwiązania - RabbitMQ - Blog - Silky Coders

Sprytne rozwiązania – RabbitMQ

W każdym z systemów, obsługujących duży ruch sieciowy, w pewnym momencie spotykamy się z ograniczeniami komunikacji synchronicznej. W ciągu dnia zdarzają się duże, przejściowe wzrosty ruchu, które możemy rozłożyć na większą liczbę instancji systemu, co może okazać się zbędne, są bowiem sprytniejsze rozwiązania.

Nie wszystkie żądania muszą być przetworzone dokładnie w tym samym momencie. Większość z nich może poczekać na swoją kolej i będzie to jak najbardziej akceptowalne z biznesowego punktu widzenia – dzięki temu zmniejszą się chociażby problemy z wydajnością oraz infrastrukturą obsługiwanego przez nas systemu. Jak to osiągnąć? Warto skorzystać z systemu kolejkowego, a o jego funkcjonowaniu na konkretnym przykładzie opowiemy w tym artykule.

Gdy komunikacja synchroniczna nie wystarcza – kolejki

Kiedy już nadejdzie ten moment, w którym przetwarzanie synchroniczne przestanie być wydajne, trzeba znaleźć rozwiązanie rodzącego się problemu. Jedną z najpopularniejszych opcji jest wprowadzenie komunikacji asynchronicznej  i zastosowanie kolejek. A czym właściwie są kolejki?

http://www.quickmeme.com/meme/3qdzk7

Kolejka to abstrakcyjny typ danych, który przechowuje uporządkowaną, liniową sekwencję elementów. Można to opisać jako strukturę FIFO (first in, first out) – pierwszy element, który zostanie dodany do kolejki, będzie pierwszym, który ją opuści. Taką kolejkę można przyrównać do zwykłej kolejki w sklepie lub na koncercie, gdzie pierwsza osoba, która w niej staje, będzie jako pierwsza obsłużona po czym z niej wychodzi. Proste i… sprawdza się.

Obsługa kolejek, czyli jakie wybrać narzędzia?

Uzbrojeni w kluczową wiedzę o kolejkach, w następnym kroku poszukujemy narzędzia do ich obsługi. Możemy je oczywiście napisać sami, ale o wiele lepiej jest skorzystać z istniejących i sprawdzonych rozwiązań. Dwa najpopularniejsze, dedykowane pod obsługę kolejek to Apache Kafka oraz RabbitMQ. W jednym z projektów zdecydowaliśmy się na wybór RabbitMQ, między innymi ze względu na jeden z driverów architektonicznych – w tym przypadku są to ograniczenia projektowe. Ponadto Deweloperzy z Zespołu znali najlepiej właśnie to rozwiązanie i biorąc pod uwagę optymalizację czasu pracy, miało dla nas najniższy próg wejścia. Czym zatem jest wspomniane RabbitMQ?

RabbitMQ – broker wiadomości

RabbitMQ to broker wiadomości – pośrednik w komunikacji. Opiera się na protokole AMQP. Posiada oficjalne biblioteki klienckie napisane w wielu językach, takich jak: Java, Python czy też PHP. RabbitMQ to lekkie i niezwykle wydajne narzędzie, które wspomaga komunikację w systemach rozproszonych.

Poszczególne podsystemy w naszej aplikacji mogą komunikować się ze sobą za pomocą RabbitMQ z użyciem mechanizmu wiadomości (np. Symfony Messenger),  ale nie tylko. System ten  pozwala na ich publikowanie, późniejszą subskrypcję i przetwarzanie dzięki konsumerom. Ilość konsumerów możemy zwiększać elastycznie w zależności od potrzeby, co ułatwia nam wydajne skalowanie aplikacji. Dzięki temu jest ona w stanie obsłużyć ruch sieciowy, na który nie byliśmy wcześniej przygotowani.

Wiele mocnych stron RabbitMQ, w tym jego elastyczność, pochodzi ze specyfikacji AMQP. W przeciwieństwie do protokołów, takich jak HTTP i SMTP, specyfikacja AMQP definiuje nie tylko protokół sieciowy, ale także usługi oraz zachowania po stronie serwera.

Dodam, że rozszerzenie skrótu AMQP to Advanced Message Queuing Protocol, a pod linkiem https://www.amqp.org/about/what znajdziecie więcej przydatnych informacji na ten temat.

RabbitMQ  ma pięć podstawowych składników, które należy zrozumieć, aby w świadomy sposób posługiwać się tym narzędziem. Niżej opisane zostaną one w skrócie, a jeżeli jesteście zainteresowani szczegółami, możecie znaleźć je w dokumentacji: https://www.rabbitmq.com/tutorials/amqp-concepts.html.  

https://www.gokhan-gokalp.com/en/rabbitmq-nedir-ve-windowsa-kurulumu/hello-world-example-routing/

Zatem podstawowe elementy RabbitMQ  to:

  1. Publisher – tworzy i wysyła wiadomości do exchange. Zazwyczaj będzie to fragment naszej aplikacji.
  2. Exchange – podejmuje decyzję, w których queues umieścić wiadomość.
  3. Routes lub Binding – różne reguły, które łączą exchange z queue.
  4. Queue – przechowuje wiadomości.
  5. Consumer – może być to oddzielna aplikacja lub fragment naszej aplikacji, która pobiera wiadomości z queue i następnie je przetwarza.

Chcąc bardziej zobrazować temat queue, można go przyrównać do sklepu, w którym chcesz zrobić zakupy, exchange będzie magazynem a binding to trasa prowadząca z magazynu do sklepu. Co ważne, zawsze istnieje wiele dróg łączących opisane miejsca.

Wybierając narzędzie do obsługi kolejek trzeba zastanowić się nad biblioteką kliencką, która pozwoli na komunikację z RabbitMQ. W naszym przypadku wybór padł na Symfony Messenger. Pisanie message i message handlerów z użyciem tego narzędzia jest bardzo intuicyjne. Oprócz tego konfiguracja exchange i queue odbywa się w jednym miejscu. Warto podkreślić, że Symfony Messenger jest agnostyczny co oznacza, że system kolejkowania można oprzeć o RabbitMQ, Kafkę, Redis, relacyjną bazę danych, jak i inne narzędzia. Teraz określmy dokładnie, czym jest Symfony Messenger. 

Kilka słów o Symfony Messenger

Messenger jest komponentem frameworku Symfony. Zapewnia możliwość wysyłania wiadomości, a następnie ich natychmiastowej obsługi w aplikacji (przetwarzanie synchroniczne) lub wysyłania ich przez transporty do późniejszej obsługi (przetwarzanie asynchroniczne).  Symfony Messenger pozwala w prosty sposób zaimplementować CQRS pattern (https://martinfowler.com/bliki/CQRS.html) w naszej aplikacji.

Instrukcję instalacji Symfony Messenger i dodatku Symfony Messenger do AMQP można znaleźć  w dokumentacji: https://symfony.com/doc/current/messenger.html. Warto zwrócić uwagę na to, że w sytuacji, w której wyspecyfikujemy, że Symfony Messenger ma stosować RabbitMQ, komponent ten korzysta „pod spodem” z następującej biblioteki https://github.com/pdezwart/php-amqp.

Zabierzmy się teraz za implementację. Postaramy się jasno przedstawić, jakie możliwości konfiguracji zachowań RabbitMQ daje nam Symfony Messenger oraz w jaki sposób wysyłać i odbierać z jego pomocą wiadomości.  

Konfiguracja kolejek, exchange i bindingów w Symfony Messenger

Konfiguracja exchange, kolejek i bindingów w RabbitMQ z użyciem Symfony Messenger jest intuicyjna, wystarczy zmodyfikować plik messenger.yaml. Poniżej można znaleźć przykładową konfigurację wraz z opisem.

Podczas konfiguracji korzystamy z tego, że Symfony Messenger jest agnostyczny. Failure transport (failure_async_priority_high) został skonfigurowany tak, aby czerpał z relacyjnej bazy danych, nie RabbitMQ. Skorzystanie z takiego transportu pod błędnie przetworzone wiadomości zapewnia nam szereg korzyści, m.in:

  • łatwiejsze przeniesienie wiadomości z środowiska produkcyjnego na lokalne,
  • łatwiejsze podpięcie raportów,

Przykładowa implementacja producenta w Symfony Messenger

Tak, jak zostało to opisane wcześniej, producentem może być aplikacja lub fragment naszej aplikacji. W poniższym przykładzie producentem jest Kontroler, który przyjmuje żądania od klientów.

Tworzymy message, nie będzie on zawierał logiki biznesowej, jest on jedynie nośnikiem danych.


namespace App\Messenger;

use JetBrains\PhpStorm\Immutable;

#[Immutable]
class ExampleCommand
{    
    public function __construct(public readonly int $id)
        {
    }
}

Aby skorzystać z Symfony Messenger, „wstrzykujemy” zależność do MessageBusInterface. Wywołujemy na tym obiekcie metodę dispatch.


class ExampleController extends AbstractController
{
    public function __construct(
        private readonly MessageBusInterface $messageBus,
        private readonly SerializerInterface $serializer,
        ){
    }

    #[Route('/index', name: 'app_index', methods: ['POST'])]
    public function index(Request $request): Response
        {
        $message = $this->serializer->deserialize($request->getContent(), ExampleCommand::class, 'json');
        $this->messageBus->dispatch(
            $message,
            [
                new AmqpStamp(null, AMQP_MANDATORY, ['delivery_mode' => 2])
            ]
        );

        return $this->json([
            'message' => 'Index created successfully',
        ]);
    }
}


Pierwszym parametrem wywołania dispatch jest message, który ma zostać obsłużony. Kolejnym parametrem są różnego rodzaju stamp’y . W tym przypadku jest to AmqpStamp, który będzie sterował zachowaniem RabbitMQ w stosunku do wysłanej wiadomości

Przykładowa implementacja konsumera  w Symfony

Konsumer deserializuje command i wykonuje odpowiednią logikę, wykorzystując dane zawarte w command’zie. Logika znajduje się w specjalnych klasach zwanych handler’ami, które implementują interfejs MessageHandlerInterface.


namespace App\Messenger;

use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class ExampleCommandHandler implements MessageHandlerInterface
{
    public function __invoke(ExampleCommand $message)
        {
        return $message->id;
    }
}


Dzięki autokonfiguracji Symfony , żeby dany message handler mógł być powiązany z określonym message’m, typ argumentu w __invoke musi wskazywać właśnie na klasę message’a. To wystarczy, by po dispatch ExampleCommand był obsłużony przez ExampleCommandHandler. Aby zobaczyć pełną listę powiązanych ze sobą messages  i message handlerów należy wpisać w konsoli:  


php bin/console debug:messenger


Gdy chcesz uruchomić konsumera, wpisz w command line:


php bin/console messenger:consume async_priority_high


Jak wygląda wiadomość wysyłana do RabbitMQ

Podczas publikowania wiadomości, do RabbitMQ klient wysyła tak naprawdę trzy tzw. frame’y. Można je przyrównać do wagonów w pociągu, każdy ma podobną zewnętrzną strukturę, ale przewożą inny ładunek.

Spośród 3 typów wiadomości to dzięki content header, a będąc bardziej konkretnym, dzięki strukturze danych nazwanej Basic.Properties, możemy kontrolować zachowanie RabbitMQ  w stosunku do przesyłanej wiadomości. Content header zawiera metadane o wysyłanej do RabbitMQ  wiadomości, czyli metadane o body. Wykorzystywany jest przez producentów do sterowania zachowaniem RabbitMQ .

Konfigurowanie wysyłania wiadomości do RabbitMQ – Symfony Messenger

Na początek wymieńmy przykładowe basic.properties, które modyfikują zachowanie RabbitMQ w stosunku do wysłanej przed publishera wiadomości. Znajdziecie je w poniższej tabeli.

W dokumentacji Symfony nie jest w jasny sposób przedstawione, jak dodać basic.properties do wiadomości z poziomu Symfony Messenger.  Aby to zrobić, dodajemy do wysyłanej wiadomości odpowiedni AmqpStamp. Implementacja wygląda w następujący sposób:

Podsumowanie 

Z tego tekstu dowiesz się:

  • czym są systemy kolejkowe i kiedy może zajść potrzeba ich wykorzystania,
  • jakie są podstawowe składniki RabbitMQ i Symfony Messenger,
  • jak wysyłać i dobierać wiadomości z użyciem Symfony Messenger,
  • jak tworzyć podstawową konfigurację w pliku messenger.yaml,
  • czym są Basic Properties i jak wysyłać je do RabbitMQ w wiadomościach z użyciem Symfony Messenger.

Połączenie RabbitMQ z Symfony Messenger jest szerokim tematem, a dokumentacja Symfony Messenger często nastręcza problemów. Opisywane narzędzia jednak z powodzeniem podnoszą wydajność aplikacji i rozładowują przejściowe wzrosty ruchu. Żeby móc z nich sprawnie i skutecznie korzystać, warto oprócz samej konfiguracji poznać wszystkie ograniczenia oraz sposoby na ich obchodzenie.

Jest to pierwszy z artykułów, w którym opisujemy funkcjonowanie RabbitMQ, a będzie ich więcej! Zapraszamy do lektury i śledzenia aktualności na naszym blogu.

Źródła: