Connecter ActiveMQ à Talend Studio : Publier et Consommer des Messages en Toute Simplicité
Dans ce tutoriel, vous allez apprendre à configurer Talend Studio pour publier et consommer des messages avec ActiveMQ. Ce guide vous montrera comment utiliser les composants génériques « MQServeur » de Talend pour envoyer des messages à une file d’attente et les lire facilement dans un flux ETL.
Prérequis :
- Avoir installé le Studio Talend.
- Avoir installé Active et avoir accès à l’interface de gestion pour créer des files d’attente et des messages. (Utilisation de la console de commande, notion en SHELL).
- Connaissances de base sur le Studio et en intégration de données.
Composants utilisés dans le traitement :
- tMomConnection : pour établir la connexion entre Talend et le serveur ActiveMQ.
- tMomOutput : pour envoyer des messages vers une file d’attente ActiveMQ.
- tMomQInput : pour consommer et lire des messages depuis une file d’attente ActiveMQ.
- tWriteJSONField : pour générer un fichier JSON.
- tMap : pour mapper les colonnes du fichier source.
- tLogRow : pour afficher le contenu d’un flux dans la console d’exécution.
Scénario :
Supposons que vous devez intégrer ActiveMQ dans votre flux de données pour envoyer des notifications ou des données de traitement dans une file d’attente. Dans ce tutoriel, nous allons :
- Établir une connexion avec ActiveMQ depuis Talend.
- Publier un message dans une file d’attente ActiveMQ à l’aide de tMomOutput.
- Consommer le message depuis la file d’attente avec tMomInput et l’afficher dans la console avec tLogRow.
1. Authentification & connexion au serveur ActiveMQ :
Procédure :
- Dans votre designer ajoutez le composant tMomConnection et configurez-le en saisissant les paramètres suivants :
- MQ server : Sélectionnez « ActiveMQ »
- Hostname : Nom/Adresse IP du serveur ou ici en l’occurrence « localhost » ou « 127.0.0.1 » pour le test.
- Port : Pour les opérations de messagerie via AMQP, le port 61616 est utilisé.
- Username & Password : Votre nom d’utilisateur & mot de passe créés dans l’interface de gestion en utilisant le port 8161. Pour cet exemple nous utilisons ici « admin / admin » c’est le username et password par default utilisés en local souvent pour les tests. Il possède les droits admin, mais ne peux être utilisé pour des connections à distance.
2. Création d’un message et envoie dans la Queue.
⚠️ Il existe plusieurs façons d’écrire un message dans ActiveMQ : ici, nous allons utilisés les données d’un fichier CSV, mais la source pourrait aussi être une base de données, un service web, une API REST, un fichier XML ou même une entrée manuelle.
Le bloc de votre job devra ressemblé a celui dans la figure ci-dessous :
- Nous allons lire un fichier délimité (ici CSV) avec le tFileInputDelimited et envoyer la donnée sous forme de flux.
- Ensuite est utilisé le tMap pour mapper les données provenant de tFileInputDelimited. Nous ne faisons dans ce job aucune transformation ou filtre de données.
- Vient ensuite la conversion des données mapper en format JSON via un tWriteJSONField. Il prend les colonnes du flux d’entrée et les structure en un objet JSON.
- Pour finir nous utilisons le tMomOutput pour envoyer les données JSON produites par tWriteJSONField vers une file d’attente ActiveMQ . Utilisez les mêmes paramètres de connexion que sur le composant tMomConnection.
- Ensuite paramètre : Message Type sur « Queue« , il existe deux type de message , le plus simple à comprendre est « Queue », pour que le message soit délivré à la queue pas besoin de clé il doit seulement avoir le même nom que la Queue, ici « QueueTest1 » .
- Le Message Type : « Queue« , les messages sont distribués de manière unique aux consommateurs, le message est consommé une seule fois.
- Le Message Type : « Topic » chaque consommateur reçoit une copie de chaque message, ce qui est idéal pour la distribution multi-cible.
- Message Body Type : « Text Message« .
tWarn : Utilisez le pour afficher un message de log une fois que le sous-job de lecture et d’envoi des données est terminé.
3. Extraction et consommation du message.
Procédure :
- Configurez un composant tMomInput comme le montre la figure ci-dessous :
- Les paramètres de connexion sont identiques.
- Le schéma utilisé est le même que celui du tWriteJSONField en sortie.
- Paramètre le nom de la Queue dans Message From, ici » QueueTest1 » , c’est dans celle là que nous avons envoyé notre message précédemment.
- Message Type doit être le même que celui du message envoyé dans la queue, « Queue ».
- Ensuite reliez votre tMomInput à un tLogRow afin d’afficher le contenu de votre message.
- Lorsque vous exécutez un tMomInput vous allez lire en permanence la queue demandée, pour consommer les messages des qu’ils sont créés. « Keep Listening » permet de lire en permanence la queue. Pour SHUTDOWN le cycle de lecture dans « Sleeping time (in sec) » mettez un timer, ici « 20 » secondes.
Dans Advanced settings du tMomInput mettez le mode « Auto Acknowledge » cela permet de gérer les accusés de réception en automatique, et ne pas avoir à le faire manuellement.
Pas besoin d’un composant de clôture de connexion, les composants sont conçu pour se fermer automatiquement lorsque le job se termine, un tPostJob est conseillé pour effectuer les tâches finales.
Votre sous- job devra ressembler à celui dans la figure ci-dessous :
Vous savez désormais comment s’authentifier, créer et extraire des messages via ActiveMQ et le protocole AMQP sur Talend Studio.
Laisser un commentaire
Il n'y a pas de commentaires pour le moment. Soyez le premier à participer !