Le Blog Amazon Web Services

Comment utiliser Apache Flink et Amazon Kinesis Data Analytics pour faire de l’ETL sur vos flux de données en temps réel

La plupart des entreprises génèrent des volumes de données toujours croissants, en continu et en temps réel. Par exemple, des données sont générées lorsque des utilisateurs jouent à des jeux vidéo, des load balancers journalisent les requêtes, des clients font des achats sur vos applications web ou que vos capteurs IoT détectent des changements de température. Analyser ces données vous permet de capitaliser sur les événements urgents, d’améliorer l’expérience utilisateur, l’efficacité et d’amener plus d’innovations. La vitesse à laquelle vous pouvez tirer parti de ces données est souvent dépendante de la vitesse à laquelle vous pouvez charger ces données dans vos lacs de données, data stores, et autres outils d’analyses. Avec l’augmentation des volumes et de la vitesse des données, il devient de plus en plus important de ne pas uniquement les charger, mais aussi de les transformer et de les analyser en temps réel.

Cet article explore comment utiliser Apache Flink comme fondation pour construire des pipelines sophistiqués Extract Transform Load (ETL). Apache Flink est un framework et un moteur de calcul distribué de traitement de flux. AWS fournit un service géré pour Apache Flink avec Amazon Kinesis Data Analytics, qui vous permet de construire et d’exécuter des applications de traitement de flux sophistiquées rapidement et facilement avec un coût ou charge opérationnelle faible.

Cet article traite les concepts qui sont requis lors de l’implémentation de pipelines ETL avec Apache Flink et Kinesis Data Analytics. Il fournit aussi des exemples de code pour différentes sources et destinations de données. Pour plus d’informations, consultez ce dépôt github. Ce dépôt contient un modèle AWS CloudFormation qui vous permet de commencer en quelques minutes et d’explorer un exemple de traitement de flux avec un pipeline ETL.

Architecture d’un ETL en streaming avec Apache Flink

Apache Flink est un framework et un moteur de calcul distribué de traitement avec état sur des flux fixes ou arrivant en temps réel. Il supporte une vaste palette de connecteurs personnalisables, comme des connecteurs pour Apache Kafka, Amazon Kinesis Data Stream, Elasticsearch et Amazon Simple Storage Service (Amazon S3). En complément, Apache Flink fournit une API de transformation, d’agrégation, d’enrichissement des événements et respecte le principe « exactly-once semantics » (le fait de ne délivrer un message qu’une seule fois). Apache Flink est donc une bonne fondation pour le cœur de vos applications de traitement de flux (streaming).

Pour déployer et exécuter vos pipelines ETL, l’architecture repose sur Kinesis Data Analytics. Kinesis Data Analytics vous permet d’exécuter vos applications Flink dans un environnement géré pour vous. Le service met en place et gère l’infrastructure, met à l’échelle votre application en fonction des variations de trafic et se rétablit d’une panne des infrastructures ou des applications. Vous pouvez combiner les avantages de l’API d’Apache Flink avec ceux d’un service géré en utilisant Kinesis Data Analytics pour déployer et exécuter vos applications. Cela vous permet de construire des pipelines de traitement de flux robustes et de réduire les frais opérationnels de mise en place et d’opération des infrastructures requises.

L’architecture illustrée dans cet article tire parti de plusieurs fonctionnalités que vous pouvez utiliser avec Kinesis Data Analytics pour Apache Flink. Voici les fonctionnalités utilisées :

  • Connectivité à vos réseaux privés : Connectez-vous aux ressources dans vos Amazon Virtual Private Cloud (Amazon VPC), dans vos datacenters avec une connexion VPN, ou bien dans une autre région avec une connexion via VPC peering ou AWS Transit Gateway.
  • Plusieurs sources et destinations :  Lisez et écrivez des données dans vos flux Kinesis Data Streams, cluster Apache Kafka et Amazon Managed Streaming for Apache Kafka.
  • Partitionnement de vos données : Déterminez le partitionnement de vos données qui sont poussées vers Amazon S3 en fonction des informations extraites du contenu de vos messages.
  • De multiples index Elasticsearch et identifiants personnalisés pour vos documents : Fan out d’un flux de données vers plusieurs index Elasticsearch et contrôler explicitement l’identifiant du document.
  • Exactly-one semantics : Évitez les doublons lorsque vous consommez ou bien dans les messages que vous produisez entre Apache Kafka, Amazon Managed Streaming for Apache Kafka (MSK), Amazon S3 et Amazon OpenSearch Service (Amazon ES).

Le diagramme suivant illustre l’architecture :

streaming ETL Flink

Dans cet article nous ne couvrons pas le déploiement et la conception de l’application Flink, pour en savoir plus, vous pouvez consulter cet article (en anglais) Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications ainsi que le guide pour les développeurs d’Amazon Kinesis Data Analytics.

Explorez un pipeline ETL de traitement de flux de données dans votre compte AWS

Avant de considérer les détails de l’implémentation et les aspects opérationnels, vous devez vous faire vos premières impressions du pipeline de traitement de flux de données en action. Pour créer les ressources nécessaires, déployez le modèle d’AWS CloudFormation suivant :

Launch CloudFormation stack

Ce modèle crée un flux Kinesis Data stream et une instance Amazon Elastic Compute Cloud (Amazon EC2) pour rejouer un historique de données dans le flux Kinesis Data Stream. Cet article utilise des données basées sur le dataset public de la New York City Taxi and Limousine Commission. Chaque événement décrit un voyage en taxi dans la ville de New York et inclut les horodatages et les quartiers de début et de fin ainsi que d’autres informations sur les prix. Une application Kinesis Data Analytics va lire les événements et les enregistrer dans Amazon S3 au format Parquet et les partitionner par date d’événement.

Connectez-vous à l’instance en allant dans la page de votre déploiement AWS CloudFormation, puis dans la section Sortie, suivez le lien à côté de ConnectToInstance. Vous pouvez ensuite commencer à rejouer une partie des voyages des taxis dans le flux de données Kinesis Data Stream avec le code suivant :

$ java -jar /tmp/amazon-kinesis-replay-*.jar -noWatermark -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 3600 -streamName <Kinesis stream name>

Vous pouvez obtenir ces commandes avec les bons paramètres depuis la section Sortie de votre déploiement CloudFormation. La section Sortie vous indique aussi le bucket S3 vers lequel les événements sont enregistrés et le tableau de bord Amazon CloudWatch qui vous permet d’observer votre pipeline.

Pour plus d’informations pour savoir comment activer les différentes combinaisons de sources et de destinations, par exemple Apache Kafka et Elasticsearch, consultez ce dépôt github.

Construire un pipeline ETL de flux de données avec Apache Flink

Maintenant, que vous avez vu le pipeline en action, nous pouvons étudier plus précisément comment implémenter votre application avec Apache Flink et Kinesis Data Analytics.

Lire et écrire vers vos ressources privées

Les applications Kinesis Data Analytics peuvent accéder à des ressources via Internet et les ressources dans un subnet privé qui est dans votre VPC. Par défaut, une application Kinesis Data Analytics permet uniquement d’accéder à des ressources que vous pouvez accéder à travers l’internet public. Cela fonctionne bien pour des ressources qui peuvent fournir un point de terminaison public, par exemple, Kinesis Data Stream or Amazon OpenSearch Service.

Si vos ressources sont privées dans votre VPC, pour des raisons techniques ou de sécurités, vous pouvez paramétrer la connectivité VPC pour votre application Kinesis Data Analytics. Par exemple, les clusters Amazon Managed Streaming for Apache Kafka (MSK) sont privés, vous ne pouvez pas y accéder depuis Internet. Vous pouvez faire tourner votre propre cluster Apache Kafka on-premises qui n’est pas exposé sur Internet et uniquement accessible depuis votre VPC via une connexion VPN. De même pour toutes les ressources qui sont privées dans votre VPC, comme des bases de données relationnelles ou des points de terminaisons supportés par AWS PrivateLink.

Pour activer la Connectivité VPC, paramétrer votre application Kinesis Data Analytics pour se connecter à vos subnets privés dans vos VPC. Kinesis Data Analytics crée des elastic network interfaces dans un ou plusieurs des subnets fournis dans le VPC renseigné lors de la configuration de votre application en fonction de la parallélisation de celle-ci. Pour plus d’informations, allez voir comment configurer Kinesis Data Analytics pour vos applications Java pour accéder à vos ressources dans un VPC.

La capture d’écran suivante montre un exemple de configuration d’une application Kinesis Data Analytics avec une connectivité VPC.

Kinesis Data Analytics VPC

L’application peut alors accéder aux ressources qui ont une connectivité réseau depuis les subnets configurés. Cela inclut les ressources qui ne sont pas directement présentes dans les subnets mais que vous pouvez accéder via une connexion VPN ou via VPC peering. Ce paramétrage supporte aussi les points de terminaison qui sont disponibles via Internet si vous avez une NAT gateway configurée pour les subnets respectifs. Pour plus d’informations, consultez comment accéder à différents services via Internet depuis une application Java connectée à votre VPC qui utilise Kinesis Data Analytics.

Paramétrez les sources Kinesis et Kafka

Apache Flink supporte différentes sources de données comme Kinesis Data Streams et Apache Kafka. Pour plus d’informations, vous pouvez consulter cette page pour voir les différents connecteurs sur le site d’Apache Flink.

Pour vous connecter à Kinesis Data Stream, dans un premier temps, configurez la région et le Credentials Provider. C’est une bonne pratique de choisir AUTO pour le credentials provider.  L’application va alors utiliser les identifiants temporaires du rôle IAM de l’application Kinesis Data Analytics pour lire les événements du flux de données spécifié. Cela permet d’éviter l’utilisation d’identifiants statiques dans votre application. Dans ce contexte, vous pouvez aussi raisonnablement augmenter la durée entre deux opérations de lectures du flux de données. Lorsque vous augmentez la valeur par défaut de 200 millisecondes à 1 seconde, la latence augmente légèrement, mais cela facilite la lecture de multiples consommateurs du même flux de données. Prenez par exemple le code suivant :

Properties properties = new Properties();
properties.setProperty(AWSConfigConstants.AWS_REGION, "<Region name>");
properties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
properties.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

Cette configuration est passée au FlinkKinesisConsumer avec le nom du flux et un DeserializationSchema. Cet article utilise le TripEventSchema pour la désérialisation, ce qui permet de spécifier comment désérialiser le tableau d’octets d’un événement Kinesis vers un TripEvent. Voici le code suivant :

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TripEvent> events = env.addSource(
  new FlinkKinesisConsumer<>("<Kinesis stream name>", new TripEventSchema(), properties)
);

Pour plus d’informations, vous pouvez consulter TripEventSchema.java sur Github. Apache Flink fournit d’autres mécanismes plus généraux de sérialisation qui peuvent désérialiser la donnée en chaîne de caractères ou des objets JSON.

Apache Flink n’est pas limité à lire depuis un flux Kinesis Data Stream. Si vous paramétrez la connectivité VPC de vos applications Kinesis Data Analytics correctement, Apache Flink peut aussi lire des événements depuis des clusters Apache Kafka ou MSK. Spécifiez une liste de brokers et de ports séparés par des virgules à utiliser lors de la première connexion à votre cluster. Cette configuration est passée au FlinkKafkaConsumer avec un nom de topic et un DeserializationSchema pour créer une source qui lit le topic de votre cluster Apache Kakfa. Voici le code suivant :

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<comma separated list of broker and port pairs>");
DataStream<TripEvent> events = env.addSource(
  new FlinkKafkaConsumer<>("<topic name>", new TripEventSchema(), properties)
);

Le DataStream résultant contient des objets TripEvent qui ont été désérialisés à partir des données de votre flux de données et de votre topic Kafka. Vous pouvez utiliser ce flux de données en combinaison d’une destination pour sauvegarder les événements dans leurs destinations respectives.

Sauvegardez les données dans Amazon S3 avec le partitionnement de vos données

Lorsque vous persistez les données vers Amazon S3, vous pouvez partitionner vos données. Vous améliorez la performance de vos requêtes de vos outils d’analyses en partitionnant vos données, les partitions qui ne sont pas utilisés pour les résultats ne seront pas lues. Par exemple, une bonne stratégie de partitionnement peut améliorer la performance et réduire le coût de vos requêtes Amazon Athena en réduisant la quantité de données lues par requête. Vous pouvez aussi choisir de partitionner vos données par les mêmes attributs qui sont utilisés par votre application. En outre, il est commun lorsque vous traitez les flux de données d’inclure la date des événements dans votre stratégie de partitionnement. Cela contraste avec l’utilisation de l’horodatage (timestamp) correspondant au moment d’arrivé côté serveur qui ne reflète pas correctement la date d’emission.

Pour plus d’informations sur comment prendre vos données partitionnées par date d’ingestion et les repartitionner par date événements avec Athena, consultez et article (en anglais) Analysez les journaux d’activités d’Amazon Cloudfront à l’échelle. Cependant, vous pouvez directement partitionner la donnée qui arrive en se basant sur les dates d’événements avec Apache Flink en utilisant le contenu des événements pour déterminer le partitionnement, ce qui évite une étape supplémentaire de post-traitement. Cette fonctionnalité est appelée data partitionning et n’est pas limitée au partitionnement par date.

Vous pouvez effectuer le partitionnement de la donnée avec StreamingFileSink et BucketAssigner d’Apache Flink. Pour plus d’informations, consultez Streaming File Sink sur le site d’Apache Flink.

Pour un événement donné, le BucketAssigner détermine le préfixe de partitionnement correspondant sous la forme d’une chaîne de caractères. Voici le code suivant :

public class TripEventBucketAssigner implements BucketAssigner<TripEvent, String> {
  public String getBucketId(TripEvent event, Context context) {
    return String.format("pickup_location=%03d/year=%04d/month=%02d",
        event.getPickupLocationId(),
        event.getPickupDatetime().getYear(),
        event.getPickupDatetime().getMonthOfYear()
    );
  }
  ...
} 

La destination prend un argument pour le bucket S3 indiquant le chemin de destination et une fonction qui convertit l’objet Java TripEvent en une chaîne de caractères. Voici le code correspondant :

SinkFunction<TripEvent> sink = StreamingFileSink
  .forRowFormat(
    new Path("s3://<Bucket name>"),
    (Encoder<TripEvent>) (element, outputStream) -> {
      PrintStream out = new PrintStream(outputStream);
      out.println(TripEventSchema.toJson(element));
    }
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .build();
events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

Pour plus d’informations, consultez Bulk-encoded Formats sur le site d’Apache Flink.

Sauvegarder des événements fonctionne un peu différemment lorsque vous utilisez une conversion au format Parquet. Lorsque vous activez la conversion Parquet, vous pouvez uniquement configurer le StreamingFileSink avec la politique OnCheckpointRollingPolicy, celui-ci va écrire les fichiers complets dans Amazon S3 uniquement lorsqu’un point de contrôle est détecté. Vous avez besoin d’activer les points de contrôle Apache Flink dans votre application Kinesis Data Analytics pour persister les données dans Amazon S3. Cela devient visible pour les consommateurs lorsqu’un point de contrôle est détecté, votre latence d’écriture dépend donc de la fréquence des points de contrôle de votre application.

De plus, précédemment, vous aviez uniquement besoin de générer une chaîne de caractères représentant les données pour sauvegarder les données dans Amazon S3. En comparaison, le ParquetAvroWriters attend un schéma Apache Avro pour les événements. Pour plus d’informations, consultez ce dépôt github. Vous pouvez utiliser et étendre le schéma du dépôt si vous voulez un exemple.

En général, il est fortement recommandé de convertir les données au format Parquet si vous voulez travailler dessus et requêter les données sauvegardées efficacement. Bien que cela requiert des efforts supplémentaires, les bénéfices de la conversion sont supérieurs à la complexité additionnelle en comparaison d’un stockage sans modification.

Fanning out vers plusieurs index Elasticsearch et des identifiants uniques pour vos documents

Amazon OpenSearch Service est un service complètement géré qui rend facile le déploiement, l’exécution de clusters Elasticsearch sécurisés. Un cas d’usage populaire est de livrer via un flux en temps réel les journaux d’activité de vos applications et de votre réseau vers un bucket Amazon S3. Ces journaux d’activités sont des documents dans Elasticsearch et vous pouvez en créer un pour chaque événement et le stocker dans un index Elasticsearch.

Le connecteur pour Elasticsearch qu’Apache Flink fournit est flexible et extensible. Vous pouvez spécifier un index basé sur le contenu de chaque événement. C’est utile lorsque le flux contient différents types d’événements et que vous voulez les stocker dans différents index Elasticsearch. Avec cette fonctionnalité, vous pouvez utiliser une seule destination et application, pour écrire dans plusieurs index. Avec les nouvelles versions d’Elasticsearch, un seul index ne peut pas contenir différents types. Voici le code correspondant :

SinkFunction<TripEvent> sink = AmazonElasticsearchSink.buildElasticsearchSink(
  "<Elasticsearch endpoint>",
  "<AWS region>",
  new ElasticsearchSinkFunction<TripEvent>() {
   public IndexRequest createIndexRequest(TripEvent element) {
    String type = element.getType().toString();
    String tripId = Long.toString(element.getTripId());
    return Requests.indexRequest()
      .index(type)
      .type(type)
      .id(tripId)
      .source(TripEventSchema.toJson(element), XContentType.JSON);
   }
);
events.addSink(sink);

Vous pouvez aussi spécifier explicitement l’identifiant du document lorsque vous envoyez les documents vers Elasticsearch. Si un événement avec le même identifiant est poussé plusieurs fois, il est écrasé au lieu de créer des doublons. Cela permet à vos écritures sur Elasticsearch d’être idempotentes. De cette façon, vous pouvez obtenir l’exactly once semantics pour toute votre architecture même si vos sources de données fournissent uniquement une at-least-once semantics.

Le connecteur AmazonElasticsearchSink ci-dessus est une extension du connecteur avec destination Elasticsearch qui vient avec Apache Flink. Ce connecteur ajoute le support des requêtes signées avec les identifiants IAM pour que vous puissiez utiliser une authentification et autorisation forte basée sur IAM qui est disponible pour ce service. À cette fin, ce connecteur prend les identifiants temporaires de l’environnement Kinesis Data Analytics dans lequel votre application Apache Flink tourne. Il utilise la méthode Signature Version 4 pour ajouter les informations de l’authentification à la requête qui est envoyée au point de terminaison Elasticsearch.

Tirez parti de l’exactly-once semantics

Vous pouvez obtenir l’exactly-once semantics en combinant un connecteur de destination idempotent avec de l’at-least-once semantics, mais ce n’est pas toujours faisable. Par exemple, si vous voulez répliquer vos données depuis un cluster Apache Kafka vers un autre ou persister des CDCs (change data captures) transactionnels depuis Apache Kafka vers Amazon S3, vous ne pouvez pas forcément tolérer des doublons dans la destination, mais les connecteurs sont idempotents.

Apache Flink supporte nativement l’exactly-once semantics. Kinesis Data Analytics active implicitement le mode exactly-once pour les points de contrôle. Pour obtenir l’exactly-once semantics de bout en bout, vous avez besoin d’activer les points de contrôles pour l’application Kinesis Data Analytics et de choisir un connecteur qui supporte l’exactly-once semantics, comme le StreamingFileSink. Pour plus d’informations, consultez les connecteurs de source et de destination garantie tolérants aux pannes sur le site d’Apache Flink.

Il y a quelques effets de bords lors de l’utilisation de l’exactly-once semantics. Par exemple, la latence de bout en bout augmente pour plusieurs raisons. Par exemple, vous pouvez uniquement déclencher l’écriture lorsqu’un point de contrôle est détecté. C’est la même chose que lorsque la latence augmente lorsque vous activez la conversion au format Parquet. L’intervalle des points de contrôle est par défaut à 1 minute, vous pouvez aussi la diminuer. Cependant, obtenir une latence de livraison inférieure à 1 seconde est difficile avec cette approche.

Les détails d’une exactly-once semantics de bout en bout sont subtiles. Bien que l’application Flink peut lire en exactly once depuis a un flux de données, les duplicatas peuvent déjà faire partie du flux, vous obtenez donc une at-least-once semantics pour l’application entière. Pour Apache Kafka à la fois comme source et destination, différentes réserves peuvent s’appliquer. Pour plus d’informations sur ces réserves consultez le site d’Apache Flink.

Assurez-vous de comprendre tous les détails de la stack applicative avant de vous engager dans la exactly-once semantics. En général, si votre application peut tolérer une at-least-once semantics, c’est une bonne idée de l’utiliser au lieu de se fier à une qualité de service dont vous n’avez pas besoin.

Utilisez de multiples sources et destinations

Une application Flink peut lire les données de plusieurs sources et les sauvegarder dans plusieurs destinations. Ce qui est intéressant pour plusieurs raisons. Premièrement, vous pouvez sauvegarder les données de sous-ensemble des données dans différentes destinations. Par exemple, vous pouvez utiliser la même application pour répliquer tous les événements d’un cluster Apache Kafka sur site vers un cluster MSK et en même temps, vous pouvez envoyer certains événements vers un cluster Elasticsearch.

Dans un second temps, vous pouvez utiliser plusieurs connecteurs de destinations pour augmenter la robustesse de votre application. Par exemple, votre application peut filtrer et enrichir les données des différents flux et peut aussi archiver les données brutes. Si quelque chose tourne mal avec la logique de votre application, Amazon S3 aura toujours les données brutes, que vous pouvez utiliser pour ré-injecter le connecteur de destination.

Cependant, il y a certains parti pris, lorsque vous combinez plusieurs fonctionnalités dans une seule application, vous augmentez la surface d’impact des erreurs. Si un seul composant de l’application échoue, l’application entière échoue et vous devez la rétablir depuis le dernier point de contrôle. Cela cause des temps d’arrêt et augmente la latence de livraison de toutes les destinations dans l’application. Aussi, une seule large application est souvent difficile à maintenir et à changer. Vous devez trouver un équilibre entre ajouter des fonctionnalités ou créer des applications Kinesis Data Analytics supplémentaires.

Aspects opérationnels

Lorsque vous opérez ce type d’architecture en production, vous faites tourner une seule application Flink en continu et indéfiniment. Il est crucial d’implémenter des solutions de surveillance et des alertes pour être sûr que le pipeline fonctionne comme convenu et que le traitement peut continuer sur les données qui arrivent. Idéalement, le pipeline doit s’adapter aux changements des conditions de débit et enclencher des notifications s’il échoue à écrire les données d’une source vers une destination.

Certains aspects nécessitent des attentions particulières d’un point de vue opérationnel. La section suivante fournit quelques idées et de plus amples références sur comment augmenter la robustesse de vos pipelines ETL sur des flux.

Surveillance et mise à l’échelle des sources

Les flux de données et les clusters MSK, sont des points d’entrée pour toute l’architecture. Ils découplent les producteurs de données du reste de l’architecture. Pour éviter des impacts sur les producteurs, ce qui souvent ne peut pas être contrôlé directement, vous avez besoin de passer à l’échelle le flux d’entrée de l’architecture de façon appropriée et être sûr qu’il peut prendre en compte le débit de messages à n’importe quel moment.

Kinesis Data Stream utilise un modèle de provisionnement basé sur les shards pour gérer le débit. Chaque shard fournit une certaine capacité de lecture et d’écriture. En fonction du nombre de shard provisionnés, vous pouvez dériver le débit maximum du flux en terme d’ingestion, d’émission d’événements et de volume de données par seconde. Pour plus d’informations, consultez les quotas de Kinesis Data Stream.

Kinesis Data Stream expose ces métriques à travers CloudWatch qui peuvent indiquer si le flux est sous ou sur provisionné. Vous pouvez utiliser les métriques IncomingBytes et IncomingRecords pour passer à l’échelle le flux de façon proactive, vous pouvez aussi utiliser la métrique WriteProvisionedThroughputExceeded pour passer à l’échelle le flux de façon réactive. Des métriques similaires existent pour les données rentrantes, que vous pouvez aussi surveiller. Pour plus d’information, consultez comment surveiller un flux Kinesis Data Stream avec Amazon CloudWatch.

Le graphique suivant montre certaines de ces métriques pour les flux de données de l’architecture. En moyenne, le flux Kinesis Data Streams reçoit 2.8 millions d’événements et 1.1 Go de données chaque minute.

graphe streaming ETL

Vous pouvez même automatiser le passage à l’échelle de vos Kinesis flux Data Streams. Pour plus d’informations, consultez : passez à l’échelle la capacité de vos flux Kinesis Data Stream avec UpdateShardCount.

Apache Kafka et Amazon MSK utilisent un modèle de provisionnement basé sur les nœuds. Amazon MSK expose aussi des métriques à travers CloudWatch, comme des métriques indiquant quelle quantité de données et combien d’événements sont ingérés dans votre cluster. Pour plus d’information, consultez les métriques d’Amazon MSK à surveiller.

En complément, vous pouvez aussi activer la surveillance Open Prometheus pour les clusters MSK. C’est un peu plus dur de connaître la capacité totale de votre cluster et vous devez souvent mesurer les performances pour savoir quand vous devez passer à l’échelle. Pour plus d’informations sur les métriques qui sont importantes à surveiller, consultez Surveillez Kafka sur le site de Confluent.

Surveillez et passez à l’échelle l’application Kinesis Data Analytics

L’application Flink est le cœur de votre architecture. Kinesis Data Analytics s’exécute dans un environnement géré et vous devez être sûr qu’il lit bien en continue les données des sources et les sauvegarder dans les différentes destinations sans échecs ou sans rencontrer des blocages.

Lorsque votre application prend du retard, c’est souvent un indicateur que vous ne passez pas à l’échelle correctement. Il est important de surveiller l’évolution de deux métriques de l’application qui sont millisBehindLastest (lorsque l’application est prête à lire le flux Kinesis Data Stream) et records-lag-max (lorsque vous lisez un cluster Apache Kafka ou MSK). Ces métriques ne sont pas uniquement un indicateur que les données sont bien lues depuis les sources, mais aussi elles indiquent si la donnée est lue assez rapidement. Si les valeurs de ces métriques sont en augmentation constante, l’application prend du retard en continu ce qui indique que vous avez besoin de passer à l’échelle votre application Kinesis Data Analytics. Pour plus d’information, consultez le connecteur Kinesis Data Stream et les métriques des applications.

Le graphique suivant montre les métriques pour l’exemple d’application de cet article. Pendant les points de contrôle, la métrique millisBehindLatest atteint occasionnellement un pic de 7 secondes. Cependant, parce que la moyenne reportée de la métrique est inférieure à 1 seconde et que l’application rattrape immédiatement son retard sur le flux, ça n’est donc pas un problème pour l’architecture.

latence Streaming ETL Flink

Bien que le retard de l’application est l’une des métriques les plus importantes à surveiller, il y a d’autres métriques qui sont importantes qu’Apache Flink et Kinesis Data Analytics expose. Pour plus d’informations, consultez surveillez les applications Apache Flink 101 sur le site d’Apache Flink.

Surveillez les destinations

Pour vérifier que les destinations reçoivent bien les données et qu’en fonction du type de destinations, elles ne remplissent pas tout leur espace de stockage, vous devez surveiller la destination de près.

Vous pouvez activer les métriques détaillées pour vos buckets S3 pour surveiller le nombre de requêtes et les données chargées dans le bucket avec une granularité d’une minute. Pour plus d’informations, consultez Surveillez les métriques dans Amazon Cloudwatch. Le graphique suivant montre ces métriques pour le bucket S3 de l’exemple d’application.

requêtes Streaming ETL Flink

Lorsque l’architecture sauvegarde les données dans un flux Kinesis Data Stream ou un topic Kafka, il agit en tant que producteur, alors les mêmes recommandations pour la surveillance et la mise à l’échelle des sources s’appliquent.

Gestion des erreurs

Les échecs sont inéluctables et des pannes finiront toujours par arriver“. Vous devez vous attendre à ce que votre application tombe en panne au bout d’un certain temps. Par exemple, un nœud de l’infrastructure sous-jacente  qu’Amazon Kinesis Data Analytics gère peut échouer ou des interruptions intermittentes sur le réseau peuvent empêcher l’application de lire les sources ou d’écrire sur les destinations. Lorsque cela arrive, Kinesis Data Analytics redémarre l’application et recommence le traitement en revenant au dernier point de contrôle. Parce que les événements bruts ont été sauvegardés dans le flux de données ou sur un topic Kafka, l’application peut relire les événements qui ont été sauvegardés dans le flux entre le dernier point de contrôle et le moment lorsqu’il se rétablit et continue le traitement des données.

Ces exemples de pannes sont rares et l’application peut gracieusement se rétablir sans sacrifier l’exactly-once semantics inclue. Cependant, d’autres types de pannes ont besoin d’attention supplémentaire et d’avoir une gestion en cas d’erreur.

Lorsqu’une exception est lancée, n’importe où dans votre code, par exemple, dans le composant qui contient la logique de parsing des événements, si vous ne gérez pas cette exception, alors votre application entière échoue. Comme avant, l’application finit par se rétablir, mais si l’exception est causée par un bug dans le code qu’un événement spécifique déclenche à chaque fois, cela résulte en une boucle infinie. Après le rétablissement d’une panne, l’application relit l’événement, et parce qu’il n’a pas été traité avec succès avant, l’application tombe encore. Le processus recommence et se répète de manière indéfinie, ce qui effectivement bloque l’application dans sa progression.

Il est donc important de gérer correctement les exceptions dans le code de l’application pour éviter qu’elle tombe en panne. S’il y a un problème qui persiste et que vous ne pouvez pas le résoudre de façon programmatique, vous pouvez utiliser d’autres destinations pour rediriger l’événement problématique vers un flux secondaire, avec lequel vous pouvez persister les données vers une dead letter queue ou un bucket S3 pour de futures inspections. Pour plus d’informations, consultez Side output sur le site d’Apache Flink.

Lorsque l’application est coincée et ne peut traiter de données, c’est visible dans la métrique indiquant le retard de l’application. Si votre pipeline ETL sur le flux filtre et enrichi des événements, les pannes peuvent être bien plus subtiles, et vous pouvez le remarquer uniquement après un long moment que les données ont été ingérées. Par exemple, dû à un bug dans l’application, vous pouvez accidentellement rejeter des événements importants ou corrompre le contenu des messages involontairement. Kinesis Data stream enregistre les événements jusqu’à 365 jours et bien que possible techniquement, Apache Kafka n’est souvent pas configuré pour sauvegarder les événements indéfiniment. Si vous n’identifiez pas une corruption assez rapidement, vous risquez de perdre de l’information lorsque la rétention des messages bruts expirent.

Pour vous protéger contre ce scénario, vous pouvez enregistrer les événements bruts dans Amazon S3 avant d’appliquer quelques transformations ou traitements additionnels. Vous pouvez garder les événements bruts et les retraiter ou les rejouer dans le flux si vous en avez besoin. Pour intégrer la fonctionnalité dans votre application, ajouter une deuxième destination qui écrit juste dans Amazon S3. Alternativement, vous pouvez utiliser une application séparée qui lit uniquement et sauvegarde les données brutes du flux; cela n’induit que le coût de faire tourner une application supplémentaire.

Quel service choisir et dans quel cas ?

AWS fournit plusieurs services qui fonctionnent avec des flux de données et peuvent effectuer des ETL sur les flux. Amazon Kinesis Data Firehose peut ingérer, traiter et sauvegarder les flux de données dans un ensemble de destinations. Il y a un chevauchement significatif entre les fonctionnalités d’Amazon Kinesis Data Firehose et la solution de cet article, mais il y a plusieurs raisons en faveur de chacune.

Pour vous aider dans votre choix, choisissez Kinesis Data Firehose dès que cela remplit vos besoins. Ce service est construit pour être facile d’utilisation. Pour utiliser Kinesis Data Firehose, vous avez juste besoin de configurer le service. Vous pouvez utiliser Kinesis Data Firehose pour les ETL sur les flux sans code, sans serveur et sans administration. En complément, Kinesis Data Firehose vient avec plusieurs de fonctionnalités natives et son modèle de facturation permet de payer uniquement pour la quantité de données traitées et livrées. Si vous ne recevez pas de données dans votre Kinesis Data Firehose, vous ne serez pas facturé.

De l’autre côté, la solution de cet article requiert que vous créiez, construisiez et déployez une application Flink. En supplément, vous avez besoin de penser à la surveillance et comment obtenir une architecture robuste qui n’est pas uniquement tolérante aux pannes de l’infrastructure, mais aussi résiliente aux bugs de l’application. Cependant, cette complexité apporte beaucoup de fonctionnalités avancées, dont votre cas d’usage peut avoir besoin. Pour plus d’informations, consultez cet article (en anglais) Contruisez et faite tourner vos applications streaming avec Apache Flink et Amazon Kinesis Data Analytics pour les applications Java et le guide pour les développeurs pour Amazon Kinesis Data Analytics.

Quelles sont les prochaines étapes ?

Cet article montre comment construire un pipeline ETL sur des flux de données avec Apache Flink et Kinesis Data Analytics. Il se concentre sur comment construire une solution modifiable qui adresse des cas d’usages avancés pour l’ingestion de données tout en réduisant la charge opérationnelle. La solution permet de rapidement enrichir, transformer, et charger vos flux de données dans votre data lake, data store, et autres outils d’analyse des données sans avoir besoin de rajouter des étapes d’ETL. Cet article explore aussi les différentes façons d’étendre l’application avec de la surveillance et de la gestion d’erreurs.

Vous avez maintenant une bonne compréhension de comment construire un pipeline ETL sur un flux de données sur AWS. Vous pouvez donc commencer à capitaliser sur les événements sensibles en temps réel en utilisant un pipeline ETL sur vos flux de données qui rend les informations importantes, rapidement accessibles pour les consommateurs. Vous pouvez paramétrer le format et la forme des informations pour votre cas d’usage sans ajouter de latence substantielle comme les traitements ETL basés sur les lots.

Article original écrit par Steffen Hausmann. Adapté en français par Kévin Polossat, Solution Architect dans les équipes AWS France.