Como verifico a autenticidade de mensagens do Amazon SNS que são enviadas para endpoints HTTP e HTTPS?

Última atualização: 2021-10-14

Estou enviando notificações para um endpoint HTTPS (ou HTTP) usando o Amazon Simple Notification Service (Amazon SNS). Quero prevenir ataques de falsificação. Como verifico a autenticidade das mensagens do Amazon SNS que meu endpoint recebe?

Resolução

Uma das práticas recomendadas é o uso da validação de assinatura baseada em certificado ao verificar a autenticidade de uma notificação do Amazon SNS. Para obter instruções, consulte Verificação das assinaturas de mensagens do Amazon SNS no Guia do desenvolvedor do Amazon SNS.

Para ajudar a prevenir ataques de falsificação, faça o seguinte ao verificar as assinaturas de mensagens do Amazon SNS:

  • Sempre use HTTPS para obter o certificado do Amazon SNS.
  • Valide a autenticidade do certificado.
  • Verifique se o certificado foi enviado do Amazon SNS.
  • (Quando possível) Use um dos AWS SDKs compatíveis com o Amazon SNS para validar e verificar mensagens.

Exemplo de script Python para validação de assinatura baseada em certificado

Importante: não esqueça de chamar a função processMessage (messagePayload). O parâmetro messagePayload requer uma string JSON no mesmo formato que o exemplo de string JSON messagePayload fornecido neste artigo.

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import base64
from M2Crypto import EVP, RSA, X509
import requests
cache = dict()
 
# Extract the name-value pairs from the JSON document in the body of the HTTP POST request that Amazon SNS sent to your endpoint.
def processMessage(messagePayload):
     print ("Start!")
     
     if (messagePayload["SignatureVersion"] != "1"):
         print("Unexpected signature version. Unable to verify signature.")
         return False
     messagePayload["TopicArn"] = messagePayload["TopicArn"].replace(" ", "")
     signatureFields = fieldsForSignature(messagePayload["Type"])
     print(signatureFields)
     strToSign = getSignatureFields(messagePayload, signatureFields)
     print(strToSign)
     certStr = getCert(messagePayload)
 
     print("Printing the cert")
     print(certStr.text)
     print("Using M2Crypto")
     
     # Get the X509 certificate that Amazon SNS used to sign the message.
     certificateSNS = X509.load_cert_string(certStr.text)
     #Extract the public key from the certificate.
     public_keySNS = certificateSNS.get_pubkey()
     public_keySNS.reset_context(md = "sha1")
     # Generate the derived hash value of the Amazon SNS message.
     # Generate the asserted hash value of the Amazon SNS message.
     public_keySNS.verify_init()
     public_keySNS.verify_update(strToSign.encode())
     # Decode the Signature value 
     decoded_signature = base64.b64decode(messagePayload["Signature"])
     # Verify the authenticity and integrity of the Amazon SNS message
     verification_result = public_keySNS.verify_final(decoded_signature)
     print("verification_result", verification_result)
     if verification_result != 1:
         print("Signature could not be verified")
         return False
     else:   
        return True
 
# Obtain the fields for signature based on message type.
def fieldsForSignature(type):
    if (type == "SubscriptionConfirmation" or type == "UnsubscribeConfirmation"):
        return ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
    elif (type == "Notification"):
        return ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
    else:
        return []
 
# Create the string to sign.  
def getSignatureFields(messagePayload, signatureFields):
    signatureStr = ""
    for key in signatureFields:
        if key in messagePayload:
            signatureStr += (key + "\n" + messagePayload[key] + "\n")
    return signatureStr
 
#**** Certificate Fetching ****
#Certificate caching
def get_cert_from_server(url):
    print("Fetching cert from server...")
    response = requests.get(url)
    return response
 
def get_cert(url):
    print("Getting cert...")
    if url not in cache:
        cache[url] = get_cert_from_server(url)
    return cache[url]
 
def getCert(messagePayload):
    certLoc = messagePayload["SigningCertURL"].replace(" ", "")
    print("Cert location", certLoc)
    responseCert = get_cert(certLoc)
    return responseCert

Exemplo de string JSON messagePayload

{
"Type" : "Notification",
"MessageId" : "e1f2a232-e8ce-5f0a-b5d3-fbebXXXXXXXX",
"TopicArn" : "arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST",
"Subject" : "Test",
"Message" : "TestHTTPS",
"Timestamp" : "2021-10-07T18:55:19.793Z",
"SignatureVersion" : "1",
"Signature" : "VetoDxbYMh0Ii/87swLEGZt6FB0ZzGRjlW5BiVmKK1OLiV8B8NaVlADa6ThbWd1s89A4WX1WQwJMayucR8oYzEcWEH6//VxXCMQxWD80rG/NrxLeoyas4IHXhneiqBglLXh/R9nDZcMAmjPETOW61N8AnLh7nQ27O8Z+HCwY1wjxiShwElH5/+2cZvwCoD+oka3Gweu2tQyZAA9ergdJmXA9ukVnfieEEinhb8wuaemihvKLwGOTVoW/9IRMnixrDsOYOzFt+PXYuKQ6KGXpzV8U/fuJDsWiFa/lPHWw9pqfeA8lqUJwrgdbBS9vjOJIL+u2c49kzlei8zCelK3n7w==",
"SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2aXXXXXXXX.pem",
"UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST:b5ab2db8-7775-4852-bd1a-2520XXXXXXXX",
"MessageAttributes" : {
"surname" : {"Type":"String","Value":"SNSHTTPSTest"}
}
}

Este artigo ajudou?


Precisa de ajuda com faturamento ou suporte técnico?