Comment exécuter des commandes SQL sur une table Amazon Redshift avant ou après l'écriture de données dans une tâche AWS Glue ?

Lecture de 6 minute(s)
0

Je dispose d'une tâche AWS Glue qui charge des données dans une table Amazon Redshift. Je veux exécuter des commandes SQL sur Amazon Redshift avant ou après l'exécution de la tâche AWS Glue.

Résolution

Transmettez l'un des paramètres suivants dans la classe DynamicFrameWriter AWS Glue :

  • aws_iam_role : donne l'autorisation nécessaire pour accéder aux données dans une autre ressource AWS. Utilisez ce paramètre avec l'ARN complet du rôle AWS Identity and Access Management (IAM) qui est attaché au cluster Amazon Redshift. Par exemple, utilisez arn:aws:iam::123456789012:role/redshift_iam_role. Pour plus d'informations, consultez Paramètres d'autorisation.
  • preactions – liste de commandes SQL séparées par un point-virgule, qui sont exécutées avant la commande COPY. Si les commandes échouent, Amazon Redshift envoie alors une exception.
    Remarque : assurez-vous que le paramètre de préaction ne contient pas de caractère de saut de ligne.
  • postactions : liste de commandes SQL séparées par un point-virgule, qui sont exécutées lorsque la commande COPY aboutit. Si les commandes échouent, Amazon Redshift envoie alors une exception.
    Remarque : assurez-vous que le paramètre postaction ne contient pas de caractère de saut de ligne.
  • extracopyoptions : liste d'options supplémentaires à ajouter à la commande COPY Amazon Redshift lors du chargement de données. Par exemple, vous pouvez utiliser TRUNCATECOLUMNS ou MAXERROR.

Exemples de scénarios

Tronquer une table Amazon Redshift avant d'insérer des enregistrements dans AWS Glue

Utilisez le paramètre preactions.

Reportez-vous à l'exemple Python suivant :

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Reportez-vous à l'exemple Scala suivant :

val options = JsonOptions(Map(
   "dbtable" -> "schema.target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table schema.target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

Dans ces exemples, veillez à remplacer les valeurs suivantes :

  • test_red : la connexion de catalogue à utiliser
  • schema.target_table : le schéma de la base de données Amazon Redshift et la table Amazon Redshift
  • s3://s3path : le chemin du répertoire temporaire de la table Amazon Redshift

Utiliser un rôle IAM dans les options de connexion

Étant donné que les informations d'identification expirent au bout d'une heure, utilisez un rôle IAM dans les options de connexion pour empêcher l'échec de vos connexions de longue durée.

Reportez-vous à l'exemple Python suivant :

glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name",  table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})

Reportez-vous à l'exemple Scala suivant :

val connectionOptions = JsonOptions(Map(
      "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database",
      "dbtable" -> "schema.table",
      "user" -> "redshift_user",
      "password" -> "redshift_password",
      "tempdir" -> "s3://temp_bucket/temp",
      "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" ))

val dyf = glueContext.getSource("redshift", connectionOptions)
          .getDynamicFrame()

Fusionner une table Amazon Redshift dans AWS Glue (upsert) Créez une requête de fusion après avoir chargé les données dans une table intermédiaire.

Remarque : pour que votre requête de fusion fonctionne, target_table doit déjà exister dans votre base de données Amazon Redshift.

Reportez-vous à l'exemple Python suivant :

post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Reportez-vous à l'exemple Scala suivant :

val options = JsonOptions(Map(
   "dbtable" -> "schema.stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

Dans ces exemples, veillez à remplacer les valeurs suivantes :

  • schema.target_table : le schéma de la base de données Amazon Redshift et la table Amazon Redshift
  • test_red : la connexion de catalogue à utiliser
  • schema.stage_table : le schéma de la base de données Amazon Redshift et la table intermédiaire Amazon Redshift
  • s3://s3path : le chemin du répertoire temporaire de la table Amazon Redshift

Pour plus d'informations, veuillez consulter la section Utiliser une table intermédiaire pour exécuter une fusion (upsert).

Ignorer les lignes qui ne sont pas valides

Utilisez le paramètre extracopyoptions pour spécifier une valeur MAXERROR plus élevée.

Reportez-vous à l'exemple Python suivant :

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Reportez-vous à l'exemple Scala suivant :

val options = JsonOptions(Map(
   "dbtable" -> "testalblog2",
   "database" -> "reddb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "delete from emp1;",
   "extracopyoptions" -> "MAXERROR 2"
   ))
glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)

Dans ces exemples, veillez à remplacer les valeurs suivantes :

  • schema.target_table : le schéma de la base de données Amazon Redshift et la table Amazon Redshift
  • schema.stage_table : le schéma de la base de données Amazon Redshift et la table intermédiaire Amazon Redshift
  • test : la connexion de catalogue à utiliser
  • testalblog2 – la table Amazon Redshift dans laquelle les données sont chargées
  • reddb – la base de données Amazon Redshift
  • emp1 : la table Amazon Redshift dans laquelle les données doivent être supprimées après le chargement des données dans testalblog2
  • s3://s3path : le chemin du répertoire temporaire de la table Amazon Redshift

Informations supplémentaires

Vous pouvez utiliser le connecteur Spark Amazon Redshift (redshift-jdbc42-2.1.0.9) lorsque vous utilisez des tâches ETL AWS Glue 4.0. Ce connecteur possède les propriétés suivantes :

  • Prend en charge les URL JDBC basées sur IAM.
  • Inclut des options d'amélioration des performances telles que autopushdown, autopushdown.s3_result_cache et unload_s3_format.
  • Inclut l'option de chiffrement SSE_KMS qui peut être utilisée pour les données du dossier temporaire. AWS Glue utilise ces données lors de la lecture de tables Amazon Redshift.

Informations connexes

COPY

TRUNCATE

Opérations de chargement des données

AWS OFFICIEL
AWS OFFICIELA mis à jour il y a un an