Le Blog Amazon Web Services

Transformation de données d’Amazon Kinesis Firehose avec AWS Lambda

8 Septembre 2021 : Amazon Elasticsearch Service a été renommé en Amazon OpenSearch Service. Voir les détails.


Amazon Kinesis Firehose est un service entièrement géré pour la livraison de données de streaming en temps réel vers des destinations telles que Amazon S3, Amazon Redshift, ou Amazon ElasticSearch Service (Amazon ES). Vous configurez vos producteurs de données pour envoyer des données à Amazon Kinesis Firehose, qui les transmet automatiquement à la destination spécifiée. Vous pouvez envoyer des données à votre flux de “streaming” à l’aide d’Amazon Kinesis Agent ou de l’API d’Amazon Kinesis Firehose, en utilisant les kits de développement logiciel (SDK).

Nos clients nous ont partagés qu’ils souhaitaient effectuer des pré-traitements légers ou des mutations du flux de données entrant avant de l’écrire sur la destination. D’autres cas d’utilisation peuvent inclure la normalisation des données produites par des différents producteurs, l’ajout de métadonnées à l’enregistrement ou la conversion des données entrantes dans un format adapté à la destination. Dans cet article, nous vous présentons les capacités de transformation des données sur vos flux de livraison, afin de transformer de manière transparente les données sources entrantes et de livrer les données transformées à vos destinations.

Les transformations de données dans Amazon Kinesis Firehose

Avec la fonction de transformation des données de Amazon Kinesis Firehose, vous pouvez spécifier une fonction AWS Lambda qui peut effectuer des transformations directement sur le flux, lorsque vous créez un flux/stream.

Lorsque vous activez la transformation de données Amazon Kinesis Firehose, Amazon Kinesis Firehose met en mémoire tampon les données de streaming entrantes et invoque la fonction AWS Lambda sur chaque lot mis en mémoire tampon de manière asynchrone. Les données transformées sont envoyées d’AWS Lambda à Amazon Kinesis Firehose pour être mises en mémoire, le temps d’être livrées à la destination. Vous pouvez également choisir d’activer la sauvegarde des enregistrements source, qui sauvegarde alors tous les enregistrements non-transformés vers un bucket S3 séparé tout en diffusant les enregistrements transformés à la destination.

Pour vous aider à démarrer, nous vous fournissons les modèles de fonction AWS Lambda suivants, que vous pouvez adapter à vos besoins :

  • Apache Log vers JSON,
  • Apache Log vers CSV,
  • Syslog vers JSON,
  • Syslog vers CSV,
  • Traitement Kinesis Data Firehose général,
  • Kinesis Data Firehose traite les flux d’enregistrement en tant que sources,
  • Processeur Kinesis Data Firehose Cloudwatch Logs.`

Mise en place de la transformation des données Amazon Kinesis Firehose

Nous allons maintenant vous guider dans la mise en place d’un flux Firehose avec la transformation des données.

Dans la console Amazon Kinesis Firehose, à l’étape 1, créez un nouveau flux de diffusion avec un nom et choisissez la source comme « Instruction PUT directe ou autres sources« .

Amazon Kinesis Firehose - Console Screen set-up 1

Amazon Kinesis Firehose - Aws Console setup screen 22

À l’étape 2, activez la transformation des données et cliquez sur le bouton « Créer » pour créer une nouvelle fonction AWS Lambda. Dans la fenêtre pop-up, sélectionnez « Traitement Kinesis Data Firehose général » qui vous redirigera vers la page AWS Lambda.

Amazon Kinesis Firehose - Modele Traitement Lambda

Une fois la fonction AWS Lambda créée, modifiez le code en ligne et collez le code suivant, que nous allons utiliser pour démontrer la fonction de transformation des données d’Amazon Kinesis Data Firehose. Choisissez un délai d’attente de 5 minutes. Cette fonction fait correspondre les enregistrements du flux entrant à une expression régulière. En cas de correspondance, elle analyse l’enregistrement JSON. La fonction fait ensuite ce qui suit :

  • Ne prend que le secteur RETAIL et filtre le reste (filtrage),
  • Ajoute un TIMESTAMP à l’enregistrement (mutation),
  • Convertit de JSON en CSV (transformation),
  • L’enregistrement traité est renvoyé dans le flux pour être livré.
'use strict';
console.log('Loading function');

/* Stock Ticker format parser */
const parser = /^\{\"TICKER_SYMBOL\"\:\"[A-Z]+\"\,\"SECTOR\"\:"[A-Z]+\"\,\"CHANGE\"\:[-.0-9]+\,\"PRICE\"\:[-.0-9]+\}/;

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found
    let dropped = 0; // Number of dropped entries 

    /* Process the list of records and transform them */
    const output = event.records.map((record) => {

        const entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let match = parser.exec(entry);
        if (match) {
            let parsed_match = JSON.parse(match); 
            var milliseconds = new Date().getTime();
            /* Add timestamp and convert to CSV */
            const result = `${milliseconds},${parsed_match.TICKER_SYMBOL},${parsed_match.SECTOR},${parsed_match.CHANGE},${parsed_match.PRICE}`+"\n";
            const payload = (new Buffer(result, 'utf8')).toString('base64');
            if (parsed_match.SECTOR != 'RETAIL') {
                /* Dropped event, notify and leave the record intact */
                dropped++;
                return {
                    recordId: record.recordId,
                    result: 'Dropped',
                    data: record.data,
                };
            }
            else {
                /* Transformed event */
                success++;  
                return {
                    recordId: record.recordId,
                    result: 'Ok',
                    data: payload,
                };
            }
        }
        else {
            /* Failed event, notify the error and leave the record intact */
            console.log("Failed event : "+ record.data);
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            };
        }
        /* This transformation is the "identity" transformation, the data is left intact 
        return {
            recordId: record.recordId,
            result: 'Ok',
            data: record.data,
        } */
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

Dans la console Amazon Kinesis Data Firehose, choisissez la fonction AWS Lambda que vous venez de créer.

Amazon Kinesis Firehose - Choix Function Lambda

À l’étape 3, choisissez un bucket/compartiment Amazon S3 existant comme la destination.

Activez la sauvegarde de l’enregistrement source, et choisissez le même bucket/compartiment Amazon S3 et un préfixe approprié. Amazon Kinesis Data Firehose fournit le flux de données brutes à ce bucket sous ce préfixe.

Amazon Kinesis Firehose - Choix Destination

Choisissez une taille de mémoire tampon de 1 Mo, et un fréquence de mise en mémoire tampon de 60 secondes, vous pouvez laisser les autres champs avec des valeurs par défaut. Créez un rôle AWS IAM de livraison de Amazon Kinesis Data Firehose.

Amazon Kinesis Firehose - Creation Flux

Vérifiez la configuration et créez le flux de livraison Amazon Kinesis Firehose.

Tester la transformation des données Amazon Kinesis Firehose

Vous pouvez utiliser la console AWS pour ingérer des données simulées, par exemple, de données boursières. La console exécute un script dans votre navigateur pour mettre des exemples d’enregistrements dans votre flux de livraison Amazon Kinesis Firehose. Cela vous permet de tester la configuration de votre flux de livraison sans avoir à générer vos propres données de test. Voici un exemple des données simulées :

{"TICKER_SYMBOL":"QXZ","SECTOR":"HEALTHCARE","CHANGE":-0.05,"PRICE":84.51} {"TICKER_SYMBOL":"TGT","SECTOR":"RETAIL","CHANGE":2.14,"PRICE":68.26}

Pour tester la transformation des données Amazon Kinesis Firehose, la fonction AWS Lambda créée dans la section précédente ajoute un horodatage aux enregistrements, et ne livre que les informations liées aux actions du secteur « RETAIL ». Ce test démontre la capacité d’ajouter des métadonnées aux enregistrements du flux entrant, et de filtrer le flux de livraison.

Choisissez le flux de livraison Amazon Kinesis Firehose nouvellement créé, et choisissez Tester avec des données de démonstration, commencez à envoyer les données de démonstration.

Amazon Kinesis Firehose - Test Flux

Amazon Kinesis Firehose fournit des mesures Amazon CloudWatch sur le flux de diffusion. Des mesures supplémentaires pour surveiller la fonction de traitement des données sont également disponibles.

Amazon Kinesis Firehose - Amazon CloudWatch metriques

Le bucket Amazon S3 de destination ne contient pas les préfixes avec la sauvegarde des données sources, et le flux traité. Téléchargez un fichier des données traitées, et vérifiez que les enregistrements contiennent l’horodatage et les données du secteur « RETAIL« , comme suit :

1483504691599,ABC,RETAIL,0.92,21.28
1483504691600,TGT,RETAIL,-1.2,61.89
1483504691600,BFH,RETAIL,-0.79,15.86
1483504691600,MJN,RETAIL,-0.27,129.37
1483504691600,WMT,RETAIL,-2.4,76.39

Conclusion

Grâce à la fonction de transformation des données d’Amazon Kinesis Firehose, vous disposez d’un moyen simple et évolutif pour effectuer des transformations de données sur des flux de données en continu (streaming). Vous pouvez créer un datalake avec les données brutes, et simultanément transformer les données pour qu’elles soient consommées dans un format approprié par une destination Amazon Kinesis Firehose.

Pour plus d’informations sur Firehose, consultez le Manuel du développeur de Firehose Amazon Kinesis.

Article original contribué par Shiva Narayanaswamy, Architecte de solutions et adapté en français par Weibo Gu, Solution Architects dans l’équipe France.