¿Cómo se verifica la autenticidad de los mensajes de Amazon SNS que se envían a los puntos de enlace HTTP y HTTPS?

Actualización más reciente: 14-10-2021

Envío notificaciones a un punto de enlace HTTPS o HTTP mediante Amazon Simple Notification Service (Amazon SNS). Deseo evitar los ataques de suplantación de identidad, así que ¿cómo puedo verificar la autenticidad de los mensajes de Amazon SNS que recibe mi punto de enlace?

Resolución

Es una práctica recomendada utilizar la validación de la firma basada en certificados cuando se verifica la autenticidad de una notificación de Amazon SNS. Para obtener instrucciones, consulte Verificar las firmas de los mensajes de Amazon SNS en la Guía para desarrolladores de Amazon SNS.

Para ayudar a prevenir los ataques de suplantación, asegúrese de realizar las siguientes acciones al verificar las firmas de los mensajes de Amazon SNS:

  • Utilice siempre HTTPS para obtener el certificado de Amazon SNS.
  • Valide la autenticidad del certificado.
  • Verifique que el certificado se envió desde Amazon SNS.
  • Cuando sea posible, utilice uno de los SDK de AWS compatibles con Amazon SNS para validar y verificar los mensajes.

Ejemplo de script en Python para la validación de firmas basada en certificados

Importante: Asegúrese de llamar a la función processMessage (messagePayload). El parámetro messagePayload requiere una cadena JSON con el mismo formato que el ejemplo de cadena JSON messagePayload proporcionada en este artículo.

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import base64
from M2Crypto import EVP, RSA, X509
import requests
cache = dict()
 
# Extract the name-value pairs from the JSON document in the body of the HTTP POST request that Amazon SNS sent to your endpoint.
def processMessage(messagePayload):
     print ("Start!")
     
     if (messagePayload["SignatureVersion"] != "1"):
         print("Unexpected signature version. Unable to verify signature.")
         return False
     messagePayload["TopicArn"] = messagePayload["TopicArn"].replace(" ", "")
     signatureFields = fieldsForSignature(messagePayload["Type"])
     print(signatureFields)
     strToSign = getSignatureFields(messagePayload, signatureFields)
     print(strToSign)
     certStr = getCert(messagePayload)
 
     print("Printing the cert")
     print(certStr.text)
     print("Using M2Crypto")
     
     # Get the X509 certificate that Amazon SNS used to sign the message.
     certificateSNS = X509.load_cert_string(certStr.text)
     #Extract the public key from the certificate.
     public_keySNS = certificateSNS.get_pubkey()
     public_keySNS.reset_context(md = "sha1")
     # Generate the derived hash value of the Amazon SNS message.
     # Generate the asserted hash value of the Amazon SNS message.
     public_keySNS.verify_init()
     public_keySNS.verify_update(strToSign.encode())
     # Decode the Signature value 
     decoded_signature = base64.b64decode(messagePayload["Signature"])
     # Verify the authenticity and integrity of the Amazon SNS message
     verification_result = public_keySNS.verify_final(decoded_signature)
     print("verification_result", verification_result)
     if verification_result != 1:
         print("Signature could not be verified")
         return False
     else:   
        return True
 
# Obtain the fields for signature based on message type.
def fieldsForSignature(type):
    if (type == "SubscriptionConfirmation" or type == "UnsubscribeConfirmation"):
        return ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
    elif (type == "Notification"):
        return ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
    else:
        return []
 
# Create the string to sign.  
def getSignatureFields(messagePayload, signatureFields):
    signatureStr = ""
    for key in signatureFields:
        if key in messagePayload:
            signatureStr += (key + "\n" + messagePayload[key] + "\n")
    return signatureStr
 
#**** Certificate Fetching ****
#Certificate caching
def get_cert_from_server(url):
    print("Fetching cert from server...")
    response = requests.get(url)
    return response
 
def get_cert(url):
    print("Getting cert...")
    if url not in cache:
        cache[url] = get_cert_from_server(url)
    return cache[url]
 
def getCert(messagePayload):
    certLoc = messagePayload["SigningCertURL"].replace(" ", "")
    print("Cert location", certLoc)
    responseCert = get_cert(certLoc)
    return responseCert

Ejemplo de cadena JSON messagePayload

{
"Type" : "Notification",
"MessageId" : "e1f2a232-e8ce-5f0a-b5d3-fbebXXXXXXXX",
"TopicArn" : "arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST",
"Subject" : "Test",
"Message" : "TestHTTPS",
"Timestamp" : "2021-10-07T18:55:19.793Z",
"SignatureVersion" : "1",
"Signature" : "VetoDxbYMh0Ii/87swLEGZt6FB0ZzGRjlW5BiVmKK1OLiV8B8NaVlADa6ThbWd1s89A4WX1WQwJMayucR8oYzEcWEH6//VxXCMQxWD80rG/NrxLeoyas4IHXhneiqBglLXh/R9nDZcMAmjPETOW61N8AnLh7nQ27O8Z+HCwY1wjxiShwElH5/+2cZvwCoD+oka3Gweu2tQyZAA9ergdJmXA9ukVnfieEEinhb8wuaemihvKLwGOTVoW/9IRMnixrDsOYOzFt+PXYuKQ6KGXpzV8U/fuJDsWiFa/lPHWw9pqfeA8lqUJwrgdbBS9vjOJIL+u2c49kzlei8zCelK3n7w==",
"SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2aXXXXXXXX.pem",
"UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST:b5ab2db8-7775-4852-bd1a-2520XXXXXXXX",
"MessageAttributes" : {
"surname" : {"Type":"String","Value":"SNSHTTPSTest"}
}
}

¿Le resultó útil este artículo?


¿Necesita asistencia técnica o con la facturación?