Take the red pill…
Nous allons commencer à parler de l'intégration dans votre code, dans l'application. Il faut choisir les messages, les définir et aussi définir comment ils seront traités. Ensuite, grâce à la configuration, il sera relativement facile de les envoyer de part et d'autre du système.
Pour illustrer les différents concepts je vais prendre un exemple extrait d'une de nos applications en cours de développement. Nous stockons une liste d'examen et l'utilisateur a la possibilité de flagguer un examen comme inapproprié. À partir de la plusieurs actions doivent s'exécuter.
Ok, c'est quoi un Message ?
class DeleteExamFromSearch
{
private $exam_uid;
public function __construct(Exam $exam)
{
$this->exam_uid = $exam->id();
}
public function getExamUid(): ExamUid
{
return $this->exam_uid;
}
}
Considérez un message comme un `DTO` ou `Data Transfer Object`. Il contient toutes les informations utiles à son traitement et permet de les lire de façon structuré. Cependant, il ne contient aucune logique métier. Le composant n'impose pas d'interface pour les messages, tout les objets peuvent être utilisés.
Il est très important de prendre en compte que cet objet sera sérialisé et surement envoyé dans un système externe comme une fil de message.
Cet objet doit contenir les informations précises nécessaires à son traitement. S'il doit référencer une valeur issue d'un autre objet, il n'est peut être pas pertinent de seulement référencer l'identifiant de cet objet. Je préfère utiliser la valeur elle-même.
Je considère les messages comme des entités autonomes qui font transiter de l'information. Ces entités sont coupées de toute source de données et embarque donc tout le nécessaire.
Comment le diffuser ?
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
use Symfony\Component\Messenger\MessageBusInterface;
class FlagExam extends Controller
{
public function __invoke(Exam $exam, MessageBusInterface $bus): Response
{
// Mise à jour de l'examen et sauvegarde du flag.
$bus->dispatch(new DeleteExamFromSearch($exam));
}
}
Grâce aux interfaces intégrées dans le composant et à l'auto-wiring de Symfony, il suffit d'injecter l'instance de MessageBusInterface et de l'utiliser.
Bien sur le comportement de ces différents objets peut être contrôlé par la configuration.
Message Bus ?
Tout composant, matériel ou logiciel qui permet d'envoyer/recevoir des messages entre des systèmes distribués.
Car oui l'objectif de tout ça est bien de distribuer le traitement des messages et de mieux gérer la scalabilité.
Bien sur les traitements ne sont pas forcément asynchrone ou traités sur d'autres machines mais c'est facilement possible d'implémenter ce genre d'architecture.
Configuration messenger.yaml
framework:
messenger:
transports:
default: '%env(MESSENGER_ADAPTER_DSN)%'
routing:
# Route your messages to the transports
Comme tout composant Symfony, la configuration est gérée dans un fichier YAML.
A travers ce fichier, vous pouvez définir les différents bus de messages.
On retrouve aussi les deux parties transports
et routing
. Ces deux composantes nous aident à contrôler comment les messages transitent dans l'application.
Un Message voyage…
Protocole AMQP, broker
Les transports vont être utilisés pour stocker et faire voyager notre message.
▶ Messenger intègre un transport par défaut, qui implémente le protocole AMQP pour Advanced Message Queuing Protocol. Ce protocole est intégré dans PHP sous la forme d'une extension PECL. Il "suffit" d'installer cette extension AMQP
pour interagir avec les systèmes compatibles.
▶ RabbitMQ ou OpenAMQ sont par exemple compatibles avec ce protocole.
…avec un Adapter…
queue-interop/queue-interop
enqueue/messenger-adapter
enqueue/enqueue-bundle
enqueue/*
En PHP il n'existe pas encore un moyen normalisé officiel de traiter les messages, comprenez qu'il n'existe pas de PSR pour le moment. Plusieurs groups d'utilisateurs se sont posés des questions et des initiatives comme queue-interop ont vu le jour.
▶ L'objectif est bien sur de faciliter l'intégration de système complexes dans une application. Ensuite, enqueue est un autre projet qui implémente les recommandations de Queue-Interop.
▶ Enqueue est Open Source qui propose des packages de qualité autour des files de messages.
Le plus intéressant pour commencer est la liste des différents transports implémentés. Il y a des connecteurs pours les systèmes les plus courants sur le marché : GearMan, Redis, SQS, Kafka, …
Mais aussi des choses un peu plus "simples" comme un connecteur dbal qui permet de stocker la file de message dans une table ou encore un connecteur filesystem qui n'a rien besoin de plus qu'un dossier sur le disque pour fonctionner. Quand je dis "simple" ici c'est plus une question d'architecture de l'application finale que du connecteur lui même.
Un adapter a été créé pour faciliter l'utilisation avec le composant Messenger, il suffit de l'installer avec Composer pour avoir accès à tous ces connecteurs.
L'avantage de cette approche est que tout est géré en configuration, cet adapter aide vraiment à ouvrir le champ des possibles avec Messenger.
L'installation de l'Adapter va ajouter le "enqueue-bundle" avec ses propres outils et configuration. C'est dans la configuration de ce bundle qu'il faut définir les différents transports pour ensuite pouvoir s'en servir dans Messenger.
… sur la bonne route !
framework:
messenger:
transports:
default: enqueue://redis
log: enqueue://file
routing:
'App\Domain\Exam\Message\DeleteExamFromSearch': [default, log]
'*': default
Chaque message peut ensuite être envoyé vers un ou plusieurs transports. Oui, j'ai bien dit plusieurs.
Il suffit dans le fichier de configuration de définir le mapping pour facilement gérer le routing.
Comme on peut le voir ici, je peux définir plusieurs transports. Ici ils dépendent tout deux d'enqueue. Ensuite je fais mon mapping entre les messages et les différents transports dans la partie "routing".
Traitement des messages
Nous allons commencer à parler de l'intégration dans votre code, dans l'application. Il faut choisir les messages, les définir et aussi définir comment ils seront traités. Ensuite, grâce à la configuration, il sera relativement facile de les envoyer de part et d'autre du système.
Définition d'un Handler
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class DeleteFlaggedExam implements MessageHandlerInterface
{
/** @var ElasticsearchConnection */
private $connection;
public function __invoke(DeleteExamFromSearch $message)
{
$this->connection->removeExam($message->getExamUid());
}
}
Un Handler est un objet spécifique qui implémente MessageHandlerInterface. Cette interface ne définit pas de méthode, elle structure "juste" le code.
La version la plus courte est donc de créer un objet avec la méthode `__invoke`.
On voit que le Handler prend en paramètre le message qui va bien, ensuite libre à l'utilisateur d'en faire ce qu'il a besoin.
Grâce à l'auto-wiring, il est possible d'injecter dans le constructeur du Handler les objets utiles pour le traitement. On peut même dispatcher un nouveau message depuis le Handler.
Il y a plusieurs points très importants à noter. Contrairement à un Event, un message doit forcément être traité donc envoyé dans une queue ou passer directement dans un Handler.
Donc je ne peux pas "dispatch" un message sans rien faire.
Configuration
services:
App\Domain\Exam\Message\Handler\DeleteFlaggedExam:
tags: [messenger.message_handler]
Le mapping entre le type de message et le handler est détecté automatiquement grâce au typage de paramètre dans la fonction `__invoke` que nous venons de voir.
Cependant pour que Symfony sache que cet objet est bien un Handler, il faut le déclarer dans les services avec un tag particulier.
Exécution
Synchrone, ou asynchrone
bin/console messenger:consume-messages [amqp]
Une fois que les messages sont envoyés, il ne reste plus qu'à les consommer
Consommer un message revient à le traiter donc lire les informations qu'il contient et agir en conséquence.
▶ Deux possibilités, le traitement est synchrone, donc le handler est invoqué et exécuté directement pendant la requête.
Ou le traitement est asynchrone donc le message est envoyé dans un transport. Il est sérializé et mis en attente.
▶ La commande `consume-messages` peut ensuite être lancée pour déclencher le traitement. A ce moment la, les messages sont extraits de la fil d'attente, désérialisés et le handler exécuté.
Handler, Sender, Receiver
Finalement, pour pouvoir gérer l'envoi du message, le composant utilise un objet Sender. À chaque type de broker redis, RabbitMQ, ou autre est associé un sender qui sérialise le message et gère le stockage.
De même, pour lire les messages venant d'un broker, il y a des objets Receiver. Ces objets vont être responsables de récupérer les messages et de les passer au Handler qui va traiter les messages.
Pour chacun de ces éléments, il est possible de définir votre propre implémentation. Par exemple un Sender qui enverrait le message par e-mail. Ou un receiver qui construirait des messages à partir d'une source externe comme un fichier CSV. On est pas obligé de respecter le chaîne complète, tout peut être adapté.
La combinaison Sender / Receiver représente un Transport.
Sender personnalisé
class SmtpSender implements SenderInterface
{
/** @var SmtpConnection */
private $connection;
public function send($message): void
{
$this->connection->send(
// Transformer le message en e-mail
);
}
}
Pour illustrer ce qu'il est possible d'intégrer, voici un exemple de Sender personnalisé. Cet objet sera responsable de deux choses :
Transformer le message en E-Mail
Déclencher l'envoi dans la connection SMTP active
Bien sur, comme tout est configuré comme service Symfony, il est possible d'injecter tout les élément nécessaire. Ici j'injecte une SmtpConnection.
Configuration
# config/services.yaml
services:
App\Transport\SmtpSender:
tags:
- { name: 'messenger.sender', alias: 'custom_email' }
# config/packages/messenger.yaml
framework:
messenger:
routing:
App\Domain\Exam\Message\Handler\DeleteFlaggedExam:
senders: ['custom_email']
send_and_handle: true
Pour finaliser cet exemple, je termine avec deux petits morceaux de configuration pour montrer ce qu'il est possible de faire.
On définit le Sender en tant que service en utilisant le tag approprié. Je choisit en plus de lui donner un alias pour pouvoir le référencer plus facilement.
Ensuite dans messenger.yaml, je définit que mon message passera par le sender 'custom_email'.
Petite subtilité ici, mon sender ne me permet pas de traiter le message en asynchrone, je choisis donc d'avoir un handler synchrone en plus du sender grâce à l'option `send_and_handle`.
Si on reprend les différents exemples présentés jusqu'à maintenant, on peut imaginer que le message sera traité par le handler vu tout à l'heure pour supprimer l'info d'Elasticsearch, mais qu'en plus un mail sera envoyé à l'utilisateur pour l'informer que l'examen a été supprimé de la recherche parce qu'il a été considéré inapproprié.