Le Blog Amazon Web Services
Transformation de données d’Amazon Kinesis Firehose avec AWS Lambda
8 Septembre 2021 : Amazon Elasticsearch Service a été renommé en Amazon OpenSearch Service. Voir les détails.
Amazon Kinesis Firehose est un service entièrement géré pour la livraison de données de streaming en temps réel vers des destinations telles que Amazon S3, Amazon Redshift, ou Amazon ElasticSearch Service (Amazon ES). Vous configurez vos producteurs de données pour envoyer des données à Amazon Kinesis Firehose, qui les transmet automatiquement à la destination spécifiée. Vous pouvez envoyer des données à votre flux de “streaming” à l’aide d’Amazon Kinesis Agent ou de l’API d’Amazon Kinesis Firehose, en utilisant les kits de développement logiciel (SDK).
Nos clients nous ont partagés qu’ils souhaitaient effectuer des pré-traitements légers ou des mutations du flux de données entrant avant de l’écrire sur la destination. D’autres cas d’utilisation peuvent inclure la normalisation des données produites par des différents producteurs, l’ajout de métadonnées à l’enregistrement ou la conversion des données entrantes dans un format adapté à la destination. Dans cet article, nous vous présentons les capacités de transformation des données sur vos flux de livraison, afin de transformer de manière transparente les données sources entrantes et de livrer les données transformées à vos destinations.
Les transformations de données dans Amazon Kinesis Firehose
Avec la fonction de transformation des données de Amazon Kinesis Firehose, vous pouvez spécifier une fonction AWS Lambda qui peut effectuer des transformations directement sur le flux, lorsque vous créez un flux/stream.
Lorsque vous activez la transformation de données Amazon Kinesis Firehose, Amazon Kinesis Firehose met en mémoire tampon les données de streaming entrantes et invoque la fonction AWS Lambda sur chaque lot mis en mémoire tampon de manière asynchrone. Les données transformées sont envoyées d’AWS Lambda à Amazon Kinesis Firehose pour être mises en mémoire, le temps d’être livrées à la destination. Vous pouvez également choisir d’activer la sauvegarde des enregistrements source, qui sauvegarde alors tous les enregistrements non-transformés vers un bucket S3 séparé tout en diffusant les enregistrements transformés à la destination.
Pour vous aider à démarrer, nous vous fournissons les modèles de fonction AWS Lambda suivants, que vous pouvez adapter à vos besoins :
- Apache Log vers JSON,
- Apache Log vers CSV,
- Syslog vers JSON,
- Syslog vers CSV,
- Traitement Kinesis Data Firehose général,
- Kinesis Data Firehose traite les flux d’enregistrement en tant que sources,
- Processeur Kinesis Data Firehose Cloudwatch Logs.`
Mise en place de la transformation des données Amazon Kinesis Firehose
Nous allons maintenant vous guider dans la mise en place d’un flux Firehose avec la transformation des données.
Dans la console Amazon Kinesis Firehose, à l’étape 1, créez un nouveau flux de diffusion avec un nom et choisissez la source comme « Instruction PUT directe ou autres sources« .
À l’étape 2, activez la transformation des données et cliquez sur le bouton « Créer » pour créer une nouvelle fonction AWS Lambda. Dans la fenêtre pop-up, sélectionnez « Traitement Kinesis Data Firehose général » qui vous redirigera vers la page AWS Lambda.
Une fois la fonction AWS Lambda créée, modifiez le code en ligne et collez le code suivant, que nous allons utiliser pour démontrer la fonction de transformation des données d’Amazon Kinesis Data Firehose. Choisissez un délai d’attente de 5 minutes. Cette fonction fait correspondre les enregistrements du flux entrant à une expression régulière. En cas de correspondance, elle analyse l’enregistrement JSON. La fonction fait ensuite ce qui suit :
- Ne prend que le secteur RETAIL et filtre le reste (filtrage),
- Ajoute un TIMESTAMP à l’enregistrement (mutation),
- Convertit de JSON en CSV (transformation),
- L’enregistrement traité est renvoyé dans le flux pour être livré.
Dans la console Amazon Kinesis Data Firehose, choisissez la fonction AWS Lambda que vous venez de créer.
À l’étape 3, choisissez un bucket/compartiment Amazon S3 existant comme la destination.
Activez la sauvegarde de l’enregistrement source, et choisissez le même bucket/compartiment Amazon S3 et un préfixe approprié. Amazon Kinesis Data Firehose fournit le flux de données brutes à ce bucket sous ce préfixe.
Choisissez une taille de mémoire tampon de 1 Mo, et un fréquence de mise en mémoire tampon de 60 secondes, vous pouvez laisser les autres champs avec des valeurs par défaut. Créez un rôle AWS IAM de livraison de Amazon Kinesis Data Firehose.
Vérifiez la configuration et créez le flux de livraison Amazon Kinesis Firehose.
Tester la transformation des données Amazon Kinesis Firehose
Vous pouvez utiliser la console AWS pour ingérer des données simulées, par exemple, de données boursières. La console exécute un script dans votre navigateur pour mettre des exemples d’enregistrements dans votre flux de livraison Amazon Kinesis Firehose. Cela vous permet de tester la configuration de votre flux de livraison sans avoir à générer vos propres données de test. Voici un exemple des données simulées :
Pour tester la transformation des données Amazon Kinesis Firehose, la fonction AWS Lambda créée dans la section précédente ajoute un horodatage aux enregistrements, et ne livre que les informations liées aux actions du secteur « RETAIL ». Ce test démontre la capacité d’ajouter des métadonnées aux enregistrements du flux entrant, et de filtrer le flux de livraison.
Choisissez le flux de livraison Amazon Kinesis Firehose nouvellement créé, et choisissez Tester avec des données de démonstration, commencez à envoyer les données de démonstration.
Amazon Kinesis Firehose fournit des mesures Amazon CloudWatch sur le flux de diffusion. Des mesures supplémentaires pour surveiller la fonction de traitement des données sont également disponibles.
Le bucket Amazon S3 de destination ne contient pas les préfixes avec la sauvegarde des données sources, et le flux traité. Téléchargez un fichier des données traitées, et vérifiez que les enregistrements contiennent l’horodatage et les données du secteur « RETAIL
« , comme suit :