Comment exécuter des commandes SQL sur une table Amazon Redshift avant ou après l'écriture de données dans une tâche AWS Glue ?

Date de la dernière mise à jour : 15/06/2021

Je dispose d'une tâche AWS Glue qui charge les données dans une table Amazon Redshift. Je veux exécuter des commandes SQL sur Amazon Redshift avant ou après l'exécution de la tâche AWS Glue.

Résolution

Transmettez l'un des paramètres suivants dans la classe DynamicFrameWriter AWS Glue:

  • aws_iam_role: Fournit l'autorisation d'accéder aux données d'une autre ressource AWS. Utilisez ce paramètre avec l'ARN complet spécifié du rôle AWS Identity and Access Management (IAM) qui est associé au cluster Amazon Redshift (par exemple, arn:aws:iam::123456789012:role/redshift_iam_role). Pour plus d'informations, voir Paramètres d'autorisation.
  • preactions : Liste délimitée par des points-virgules des commandes SQL exécutées avant la commande COPY. Si les commandes échouent, Amazon Redshift génère une exception.
  • postactions : Liste délimitée par des points-virgules de commandes SQL exécutées après une commande COPY réussie. Si les commandes échouent, Amazon Redshift génère une exception.
  • extracopyoptions : Liste des options supplémentaires à ajouter à la commande COPY Amazon Redshift lors du chargement des données (par exemple, TRUNCATECOLUMNS ou MAXERROR).

Exemples de scénarios

Tronquer une table Amazon Redshift avant d'insérer des enregistrements dans AWS Glue

Utilisez le paramètre preactions, comme illustré dans l'exemple Python suivant. Remplacez les valeurs suivantes :

  • test_red: connexion au catalogue pour utiliser
  • target_table: la table Amazon Redshift
  • s3://s3path: le chemin d'accès du répertoire temporaire de la table Amazon Redshift annuaire
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Fusionner une table Amazon Redshift dans AWS Glue (upsert)

Créer une requête de fusion après avoir chargé les données dans une table de prédéploiement, comme indiqué dans les exemples Python suivants. Remplacez les valeurs suivantes :

  • target_table: la table Amazon Redshift
  • test_red: la connexion au catalogue à utiliser
  • stage_table: la table de prédéploiement Amazon Redshift
  • s3://s3path: le chemin d'accès au répertoire temporaire de la table Amazon Redshift
post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Pour plus d'informations, voir Utilisez une table de prédéploiement pour effectuer une fusion (Upsert).

Ignorez les lignes non valides

Utilisez le paramètre extracopyoptions pour spécifier une valeur MAXERROR plus élevée, comme illustré dans l'exemple Python suivant. Remplacez les valeurs suivantes :

  • test: la connexion au catalogue à utiliser
  • testalblog2: la table Amazon Redshift pour charger les données dans
  • reddb: la base de données Amazon Redshift
  • emp1: la table Amazon Redshift dans laquelle supprimer les données, après le chargement des données dans testalblog2testalblog2
  • test_red: une connexion au catalogue à utiliser
  • stage_table: la table de prédéploiement Amazon Redshift
  • s3://s3path: le chemin d'accès au répertoire temporaire de la table Amazon Redshift
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Cet article vous a-t-il été utile ?


Besoin de support pour une question technique ou de facturation ?