Comment exécuter des commandes SQL sur une table Amazon Redshift avant ou après l'écriture de données dans une tâche AWS Glue ?

Dernière mise à jour : 27/10/2022

Je dispose d'une tâche AWS Glue qui charge des données dans une table Amazon Redshift. Je veux exécuter des commandes SQL sur Amazon Redshift avant ou après l'exécution de la tâche AWS Glue.

Résolution

Transmettez l'un des paramètres suivants dans la classe DynamicFrameWriter AWS Glue :

  • aws_iam_role – donne l'autorisation d'accéder aux données dans une autre ressource AWS. Utilisez ce paramètre avec l'ARN complet du rôle AWS Identity and Access Management (IAM) qui est attaché au cluster Amazon Redshift (par exemple arn:aws:iam::123456789012:role/redshift_iam_role). Pour plus d'informations, consultez Paramètres d'autorisation.
  • preactions – liste de commandes SQL séparées par un point-virgule, qui sont exécutées avant la commande COPY. Si les commandes échouent, alors Amazon Redshift envoie une exception.
    Remarque : assurez-vous que le paramètre de préaction ne contient aucun caractère de saut de ligne.
  • postactions – liste de commandes SQL séparées par un point-virgule, qui sont exécutées lorsque la commande COPY aboutit. Si les commandes échouent, alors Amazon Redshift envoie une exception.
    Remarque : assurez-vous que le paramètre postaction ne contient aucun caractère de saut de ligne.
  • extracopyoptions – liste d'options supplémentaires à ajouter à la commande COPY Amazon Redshift lors du chargement des données (par exemple TRUNCATECOLUMNS ou MAXERROR).

Exemples de scénarios

Tronquer une table Amazon Redshift avant d'insérer des enregistrements dans AWS Glue

Utilisez le paramètre preactions.

Exemple Python :

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Exemple Scala :

val options = JsonOptions(Map(
   "dbtable" -> "target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

Remplacez les valeurs suivantes :

  • test_red – la connexion de catalogue à utiliser
  • target_table – la table Amazon Redshift
  • s3://s3path – le chemin du répertoire temporaire de la table Amazon Redshift

Fusionner une table Amazon Redshift dans AWS Glue (upsert)

Créez une requête de fusion après avoir chargé les données dans une table intermédiaire.

Exemple Python :

post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Exemple Scala :

val options = JsonOptions(Map(
   "dbtable" -> "stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;",
   "postactions" -> "begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

Remplacez les valeurs suivantes :

  • target_table – la table Amazon Redshift
  • test_red – la connexion de catalogue à utiliser
  • stage_table – table intermédiaire Amazon Redshift
  • s3://s3path : le chemin du répertoire temporaire de la table Amazon Redshift

Pour plus d'informations, veuillez consulter la section Utiliser une table intermédiaire pour exécuter une fusion (upsert).

Ignorer les lignes qui ne sont pas valides

Utilisez le paramètre extracopyoptions pour spécifier une valeur MAXERROR plus élevée.

Exemple Python :

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Exemple Scala :

val options = JsonOptions(Map(
   "dbtable" -> "testalblog2",
   "database" -> "reddb",
   "preactions" -> "drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;",
   "postactions" -> "delete from emp1;",
   "extracopyoptions" -> "MAXERROR 2"
   ))
glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)

Remplacez les valeurs suivantes :

  • test – la connexion de catalogue à utiliser
  • testalblog2 – la table Amazon Redshift dans laquelle les données sont chargées
  • reddb – la base de données Amazon Redshift
  • emp1 : la table Amazon Redshift dans laquelle les données doivent être supprimées après le chargement des données dans testalblog2
  • s3://s3path : le chemin du répertoire temporaire de la table Amazon Redshift

Cet article vous a-t-il été utile ?


Besoin d'aide pour une question technique ou de facturation ?