如何驗證傳送至 HTTP 和 HTTPS 端點的 Amazon SNS 訊息的真偽?

上次更新日期:2021-10-14

我正在使用 Amazon Simple Notification Service (Amazon SNS) 向 HTTPS 或 HTTP 端點發送通知。我想要預防詐騙攻擊,我該如何確認端點收到的 Amazon SNS 訊息的真實性?

解決方案

若要確認 Amazon SNS 通知的真實性,最好使用憑證型簽章驗證。如需說明,請參閱《Amazon SNS 開發人員指南》中的驗證 Amazon SNS 訊息的簽章

為了防止詐騙攻擊,請務必在確認 Amazon SNS 訊息簽章時執行下列操作:

  • 一律使用 HTTPS 向 Amazon SNS 索取憑證。
  • 驗證憑證的真實性。
  • 確認憑證是從 Amazon SNS 發出。
  • (盡可能) 使用 Amazon SNS 支援的 AWS SDK 之一來驗證和確認訊息。

用於憑證型簽章驗證的範例 Python 指令碼

重要事項:務必呼叫 processMessage (messagePayload) 函數。messagePayload 參數需要與本文所載之範例 messagePayload JSON 字串格式相同的 JSON 字串。

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import base64
from M2Crypto import EVP, RSA, X509
import requests
cache = dict()
 
# Extract the name-value pairs from the JSON document in the body of the HTTP POST request that Amazon SNS sent to your endpoint.
def processMessage(messagePayload):
     print ("Start!")
     
     if (messagePayload["SignatureVersion"] != "1"):
         print("Unexpected signature version. Unable to verify signature.")
         return False
     messagePayload["TopicArn"] = messagePayload["TopicArn"].replace(" ", "")
     signatureFields = fieldsForSignature(messagePayload["Type"])
     print(signatureFields)
     strToSign = getSignatureFields(messagePayload, signatureFields)
     print(strToSign)
     certStr = getCert(messagePayload)
 
     print("Printing the cert")
     print(certStr.text)
     print("Using M2Crypto")
     
     # Get the X509 certificate that Amazon SNS used to sign the message.
     certificateSNS = X509.load_cert_string(certStr.text)
     #Extract the public key from the certificate.
     public_keySNS = certificateSNS.get_pubkey()
     public_keySNS.reset_context(md = "sha1")
     # Generate the derived hash value of the Amazon SNS message.
     # Generate the asserted hash value of the Amazon SNS message.
     public_keySNS.verify_init()
     public_keySNS.verify_update(strToSign.encode())
     # Decode the Signature value 
     decoded_signature = base64.b64decode(messagePayload["Signature"])
     # Verify the authenticity and integrity of the Amazon SNS message
     verification_result = public_keySNS.verify_final(decoded_signature)
     print("verification_result", verification_result)
     if verification_result != 1:
         print("Signature could not be verified")
         return False
     else:   
        return True
 
# Obtain the fields for signature based on message type.
def fieldsForSignature(type):
    if (type == "SubscriptionConfirmation" or type == "UnsubscribeConfirmation"):
        return ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
    elif (type == "Notification"):
        return ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
    else:
        return []
 
# Create the string to sign.  
def getSignatureFields(messagePayload, signatureFields):
    signatureStr = ""
    for key in signatureFields:
        if key in messagePayload:
            signatureStr += (key + "\n" + messagePayload[key] + "\n")
    return signatureStr
 
#**** Certificate Fetching ****
#Certificate caching
def get_cert_from_server(url):
    print("Fetching cert from server...")
    response = requests.get(url)
    return response
 
def get_cert(url):
    print("Getting cert...")
    if url not in cache:
        cache[url] = get_cert_from_server(url)
    return cache[url]
 
def getCert(messagePayload):
    certLoc = messagePayload["SigningCertURL"].replace(" ", "")
    print("Cert location", certLoc)
    responseCert = get_cert(certLoc)
    return responseCert

範例 messagePayload JSON 字串

{
"Type" : "Notification",
"MessageId" : "e1f2a232-e8ce-5f0a-b5d3-fbebXXXXXXXX",
"TopicArn" : "arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST",
"Subject" : "Test",
"Message" : "TestHTTPS",
"Timestamp" : "2021-10-07T18:55:19.793Z",
"SignatureVersion" : "1",
"Signature" : "VetoDxbYMh0Ii/87swLEGZt6FB0ZzGRjlW5BiVmKK1OLiV8B8NaVlADa6ThbWd1s89A4WX1WQwJMayucR8oYzEcWEH6//VxXCMQxWD80rG/NrxLeoyas4IHXhneiqBglLXh/R9nDZcMAmjPETOW61N8AnLh7nQ27O8Z+HCwY1wjxiShwElH5/+2cZvwCoD+oka3Gweu2tQyZAA9ergdJmXA9ukVnfieEEinhb8wuaemihvKLwGOTVoW/9IRMnixrDsOYOzFt+PXYuKQ6KGXpzV8U/fuJDsWiFa/lPHWw9pqfeA8lqUJwrgdbBS9vjOJIL+u2c49kzlei8zCelK3n7w==",
"SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2aXXXXXXXX.pem",
"UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST:b5ab2db8-7775-4852-bd1a-2520XXXXXXXX",
"MessageAttributes" : {
"surname" : {"Type":"String","Value":"SNSHTTPSTest"}
}
}

此文章是否有幫助?


您是否需要帳單或技術支援?