Comment exécuter des commandes SQL sur une table Amazon Redshift avant ou après l'écriture de données dans une même tâche AWS Glue ?

Date de la dernière mise à jour : 17/04/2019

Je dispose d’une tâche AWS Glue qui charge les données dans une table Amazon Redshift. Je veux exécuter des commandes SQL sur Amazon Redshift avant ou après l’exécution de la tâche AWS Glue.

Résolution

Transmettez l'un des paramètres suivants dans la classe DynamicFrameWriter AWS Glue :

  • aws_iam_role : donne l'autorisation d’accéder aux données dans une autre ressource AWS. Utilisez ce paramètre avec le nom ARN complet du rôle AWS Identity and Access Management (IAM) qui est associé au cluster Amazon Redshift (par exemple, arn:aws:iam::123456789012:role/redshift_iam_role). Pour plus d'informations, consultez la section Paramètres d'autorisation.
  • preactions : liste de commandes SQL séparées par un point-virgule, qui sont exécutées avant la commande COPY. Si les commandes échouent, Amazon Redshift envoie une exception.
  • postactions : liste de commandes SQL séparées par un point-virgule, qui sont exécutées lorsque la commande COPY aboutit. Si les commandes échouent, Amazon Redshift envoie une exception.
  • extracopyoptions : liste d’options supplémentaires à ajouter à la commande COPY Amazon Redshift lors du chargement des données (par exemple, TRUNCATECOLUMNS ou MAXERROR).

Exemples de scénarios

Tronquer une table Amazon Redshift avant d'insérer des enregistrements dans AWS Glue

Utilisez le paramètre preactions, comme indiqué dans l'exemple Python suivant. Remplacez les valeurs suivantes :

  • test_red : connexion de catalogue à utiliser
  • target_table : table Amazon Redshift
  • s3://s3path : chemin du répertoire temporaire de la table Amazon Redshift
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Fusionner une table Amazon Redshift dans AWS Glue (upsert)

Créez une requête de fusion après le chargement des données dans une table intermédiaire, comme indiqué dans les exemples Python suivants. Remplacez les valeurs suivantes :

  • target_table : table Amazon Redshift
  • test_red : connexion de catalogue à utiliser
  • stage_table : table intermédiaire Amazon Redshift
  • s3://s3path : chemin du répertoire temporaire de la table Amazon Redshift
post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
        
        datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Pour plus d'informations, consultez Utilisation d'une table intermédiaire pour exécuter une fusion (Upsert).

Ignorer les lignes non valides

Utilisez le paramètre extracopyoptions pour spécifier une valeur MAXERROR plus élevée, comme indiqué dans l'exemple Python suivant. Remplacez les valeurs suivantes :

  • test: connexion de catalogue à utiliser
  • testalblog2 : table Amazon Redshift dans laquelle les données sont chargées
  • reddb : base de données Amazon Redshift
  • emp1 : table Amazon Redshift dans laquelle les données doivent être supprimées après le chargement des données dans testalblog2
  • test_red : connexion de catalogue à utiliser
  • stage_table : table intermédiaire Amazon Redshift
  • s3://s3path : chemin du répertoire temporaire de la table Amazon Redshift
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Cette page vous a-t-elle été utile ?

Cette page peut-elle être améliorée ?


Vous avez besoin d'aide ?