Comment puis-je vérifier l'authenticité des messages Amazon SNS envoyés aux points de terminaison HTTP et HTTPS ?

Date de la dernière mise à jour : 14/10/2021

J'envoie des notifications à un point de terminaison HTTPS ou HTTP à l'aide d'Amazon Simple Notification Service (Amazon SNS). Je souhaite empêcher les attaques par usurpation d'identité : comment puis-je vérifier l'authenticité des messages Amazon SNS reçus par mon point de terminaison ?

Solution

Il est recommandé d'utiliser la validation de signature basée sur un certificat lors de la vérification de l'authenticité d'une notification Amazon SNS. Pour obtenir des instructions, consultez Vérifier les signatures des messages Amazon SNS dans le Guide du développeur Amazon SNS.

Pour éviter les attaques par usurpation d'identité, veillez à effectuer les opérations suivantes lorsque vous vérifiez les signatures des messages Amazon SNS :

  • Utilisez toujours le protocole HTTPS pour obtenir le certificat auprès d'Amazon SNS.
  • Validez l'authenticité du certificat.
  • Vérifiez que le certificat a été envoyé depuis Amazon SNS.
  • (Lorsque cela est possible) Utilisez l'un des kits SDK AWS pris en charge pour Amazon SNS pour valider et vérifier les messages.

Exemple de script Python pour la validation de signature basée sur un certificat

Important : Veillez à appeler la fonction processMessage (messagePayload). Le paramètre messagePayload nécessite une chaîne JSON au même format que l'exemple de chaîne JSON messagePayload fournie dans cet article.

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import base64
from M2Crypto import EVP, RSA, X509
import requests
cache = dict()
 
# Extract the name-value pairs from the JSON document in the body of the HTTP POST request that Amazon SNS sent to your endpoint.
def processMessage(messagePayload):
     print ("Start!")
     
     if (messagePayload["SignatureVersion"] != "1"):
         print("Unexpected signature version. Unable to verify signature.")
         return False
     messagePayload["TopicArn"] = messagePayload["TopicArn"].replace(" ", "")
     signatureFields = fieldsForSignature(messagePayload["Type"])
     print(signatureFields)
     strToSign = getSignatureFields(messagePayload, signatureFields)
     print(strToSign)
     certStr = getCert(messagePayload)
 
     print("Printing the cert")
     print(certStr.text)
     print("Using M2Crypto")
     
     # Get the X509 certificate that Amazon SNS used to sign the message.
     certificateSNS = X509.load_cert_string(certStr.text)
     #Extract the public key from the certificate.
     public_keySNS = certificateSNS.get_pubkey()
     public_keySNS.reset_context(md = "sha1")
     # Generate the derived hash value of the Amazon SNS message.
     # Generate the asserted hash value of the Amazon SNS message.
     public_keySNS.verify_init()
     public_keySNS.verify_update(strToSign.encode())
     # Decode the Signature value 
     decoded_signature = base64.b64decode(messagePayload["Signature"])
     # Verify the authenticity and integrity of the Amazon SNS message
     verification_result = public_keySNS.verify_final(decoded_signature)
     print("verification_result", verification_result)
     if verification_result != 1:
         print("Signature could not be verified")
         return False
     else:   
        return True
 
# Obtain the fields for signature based on message type.
def fieldsForSignature(type):
    if (type == "SubscriptionConfirmation" or type == "UnsubscribeConfirmation"):
        return ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
    elif (type == "Notification"):
        return ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
    else:
        return []
 
# Create the string to sign.  
def getSignatureFields(messagePayload, signatureFields):
    signatureStr = ""
    for key in signatureFields:
        if key in messagePayload:
            signatureStr += (key + "\n" + messagePayload[key] + "\n")
    return signatureStr
 
#**** Certificate Fetching ****
#Certificate caching
def get_cert_from_server(url):
    print("Fetching cert from server...")
    response = requests.get(url)
    return response
 
def get_cert(url):
    print("Getting cert...")
    if url not in cache:
        cache[url] = get_cert_from_server(url)
    return cache[url]
 
def getCert(messagePayload):
    certLoc = messagePayload["SigningCertURL"].replace(" ", "")
    print("Cert location", certLoc)
    responseCert = get_cert(certLoc)
    return responseCert

Exemple de chaîne JSON messagePayload

{
"Type" : "Notification",
"MessageId" : "e1f2a232-e8ce-5f0a-b5d3-fbebXXXXXXXX",
"TopicArn" : "arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST",
"Subject" : "Test",
"Message" : "TestHTTPS",
"Timestamp" : "2021-10-07T18:55:19.793Z",
"SignatureVersion" : "1",
"Signature" : "VetoDxbYMh0Ii/87swLEGZt6FB0ZzGRjlW5BiVmKK1OLiV8B8NaVlADa6ThbWd1s89A4WX1WQwJMayucR8oYzEcWEH6//VxXCMQxWD80rG/NrxLeoyas4IHXhneiqBglLXh/R9nDZcMAmjPETOW61N8AnLh7nQ27O8Z+HCwY1wjxiShwElH5/+2cZvwCoD+oka3Gweu2tQyZAA9ergdJmXA9ukVnfieEEinhb8wuaemihvKLwGOTVoW/9IRMnixrDsOYOzFt+PXYuKQ6KGXpzV8U/fuJDsWiFa/lPHWw9pqfeA8lqUJwrgdbBS9vjOJIL+u2c49kzlei8zCelK3n7w==",
"SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2aXXXXXXXX.pem",
"UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST:b5ab2db8-7775-4852-bd1a-2520XXXXXXXX",
"MessageAttributes" : {
"surname" : {"Type":"String","Value":"SNSHTTPSTest"}
}
}

Cet article vous a-t-il été utile ?


Besoin d'aide pour une question technique ou de facturation ?