Comment exécuter des commandes SQL sur une table Amazon Redshift avant ou après l'écriture de données dans une tâche AWS Glue ?

Date de la dernière mise à jour : 16/08/2021

Je dispose d'une tâche AWS Glue qui charge les données dans une table Amazon Redshift. Je veux exécuter des commandes SQL sur Amazon Redshift avant ou après l'exécution de la tâche AWS Glue.

Résolution

Transmettez l'un des paramètres suivants dans la classe DynamicFrameWriter AWS Glue :

  • aws_iam_role – donne l'autorisation d'accéder aux données dans une autre ressource AWS. Utilisez ce paramètre avec l'ARN complet du rôle AWS Identity and Access Management (IAM) qui est attaché au cluster Amazon Redshift (par exemple arn:aws:iam::123456789012:role/redshift_iam_role). Pour plus d'informations, consultez Paramètres d'autorisation.
  • preactions – liste de commandes SQL séparées par un point-virgule, qui sont exécutées avant la commande COPY. Si les commandes échouent, alors Amazon Redshift envoie une exception.
    Remarque : assurez-vous que le paramètre de préaction ne contient aucun caractère de saut de ligne.
  • postactions – liste de commandes SQL séparées par un point-virgule, qui sont exécutées lorsque la commande COPY aboutit. Si les commandes échouent, alors Amazon Redshift envoie une exception.
    Remarque : assurez-vous que le paramètre postaction ne contient aucun caractère de saut de ligne.
  • extracopyoptions – liste d'options supplémentaires à ajouter à la commande COPY Amazon Redshift lors du chargement des données (par exemple TRUNCATECOLUMNS ou MAXERROR).

Exemples de scénarios

Tronquer une table Amazon Redshift avant d'insérer des enregistrements dans AWS Glue

Utilisez le paramètre preactions, comme indiqué dans l'exemple Python suivant. Remplacez les valeurs suivantes :

  • test_red – la connexion de catalogue à utiliser
  • target_table – la table Amazon Redshift
  • s3://s3path – le chemin du répertoire temporaire de la table Amazon Redshift
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Fusionner une table Amazon Redshift dans AWS Glue (upsert)

Créez une requête de fusion après le chargement des données dans une table intermédiaire, comme indiqué dans les exemples Python suivants. Remplacez les valeurs suivantes :

  • target_table – la table Amazon Redshift
  • test_red – la connexion de catalogue à utiliser
  • stage_table – table intermédiaire Amazon Redshift
  • s3://s3path – le chemin du répertoire temporaire de la table Amazon Redshift
post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Pour plus d'informations, consultez Utiliser une table intermédiaire pour exécuter une fusion (Upsert).

Ignorer les lignes qui ne sont pas valides

Utilisez le paramètre extracopyoptions pour spécifier une valeur MAXERROR plus élevée, comme indiqué dans l'exemple Python suivant. Remplacez les valeurs suivantes :

  • test – la connexion de catalogue à utiliser
  • testalblog2 – la table Amazon Redshift dans laquelle les données sont chargées
  • reddb – la base de données Amazon Redshift
  • emp1 – la table Amazon Redshift dans laquelle les données doivent être supprimées après le chargement des données dans testalblog2
  • test_red – une connexion de catalogue à utiliser
  • stage_table – table intermédiaire Amazon Redshift
  • s3://s3path – le chemin du répertoire temporaire de la table Amazon Redshift
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Cet article vous a-t-il été utile ?


Besoin d'aide pour une question technique ou de facturation ?