HTTP および HTTPS エンドポイントに送信される Amazon SNS メッセージの信頼性を検証するにはどうすればよいですか?

最終更新日: 2021 年 10 月 14 日

Amazon Simple Notification Service (Amazon SNS) を使用して HTTPS または HTTP エンドポイントに通知を送信しようとしています。なりすまし攻撃を防ぎたいのですが、エンドポイントが受信する Amazon SNS メッセージの信頼性を検証するにはどうすればよいですか?

解決方法

Amazon SNS 通知の信頼性を検証する際には、証明書ベースの署名検証を利用するのがベストプラクティスです。手順については、Amazon SNS デベロッパーガイドの Verifying the signatures of Amazon SNS messages を参照してください。

なりすまし攻撃を防ぐのに役立てるため、Amazon SNS メッセージの署名を検証する際には、次の事項を必ず実行してください。

  • Amazon SNS から証明書を取得するには、必ず HTTPS を使用します。
  • 証明書の信頼性を検証します。
  • 証明書が Amazon SNS から送信されたことを確認します。
  • (可能な場合) Amazon SNS 向けにサポートされている AWS SDK のいずれかを使用して、メッセージを検証および確認します。

証明書ベースの署名検証用の Python スクリプトの例

重要: 必ず processMessage (messagePayload) 関数を呼び出してください。messagePayload パラメータには、この記事で提供されている messagePayload JSON 文字列の例と同じ形式の JSON 文字列が必要です。

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import base64
from M2Crypto import EVP, RSA, X509
import requests
cache = dict()
 
# Extract the name-value pairs from the JSON document in the body of the HTTP POST request that Amazon SNS sent to your endpoint.
def processMessage(messagePayload):
     print ("Start!")
     
     if (messagePayload["SignatureVersion"] != "1"):
         print("Unexpected signature version. Unable to verify signature.")
         return False
     messagePayload["TopicArn"] = messagePayload["TopicArn"].replace(" ", "")
     signatureFields = fieldsForSignature(messagePayload["Type"])
     print(signatureFields)
     strToSign = getSignatureFields(messagePayload, signatureFields)
     print(strToSign)
     certStr = getCert(messagePayload)
 
     print("Printing the cert")
     print(certStr.text)
     print("Using M2Crypto")
     
     # Get the X509 certificate that Amazon SNS used to sign the message.
     certificateSNS = X509.load_cert_string(certStr.text)
     #Extract the public key from the certificate.
     public_keySNS = certificateSNS.get_pubkey()
     public_keySNS.reset_context(md = "sha1")
     # Generate the derived hash value of the Amazon SNS message.
     # Generate the asserted hash value of the Amazon SNS message.
     public_keySNS.verify_init()
     public_keySNS.verify_update(strToSign.encode())
     # Decode the Signature value 
     decoded_signature = base64.b64decode(messagePayload["Signature"])
     # Verify the authenticity and integrity of the Amazon SNS message
     verification_result = public_keySNS.verify_final(decoded_signature)
     print("verification_result", verification_result)
     if verification_result != 1:
         print("Signature could not be verified")
         return False
     else:   
        return True
 
# Obtain the fields for signature based on message type.
def fieldsForSignature(type):
    if (type == "SubscriptionConfirmation" or type == "UnsubscribeConfirmation"):
        return ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
    elif (type == "Notification"):
        return ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
    else:
        return []
 
# Create the string to sign.  
def getSignatureFields(messagePayload, signatureFields):
    signatureStr = ""
    for key in signatureFields:
        if key in messagePayload:
            signatureStr += (key + "\n" + messagePayload[key] + "\n")
    return signatureStr
 
#**** Certificate Fetching ****
#Certificate caching
def get_cert_from_server(url):
    print("Fetching cert from server...")
    response = requests.get(url)
    return response
 
def get_cert(url):
    print("Getting cert...")
    if url not in cache:
        cache[url] = get_cert_from_server(url)
    return cache[url]
 
def getCert(messagePayload):
    certLoc = messagePayload["SigningCertURL"].replace(" ", "")
    print("Cert location", certLoc)
    responseCert = get_cert(certLoc)
    return responseCert

messagePayload JSON 文字列の例

{
"Type" : "Notification",
"MessageId" : "e1f2a232-e8ce-5f0a-b5d3-fbebXXXXXXXX",
"TopicArn" : "arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST",
"Subject" : "Test",
"Message" : "TestHTTPS",
"Timestamp" : "2021-10-07T18:55:19.793Z",
"SignatureVersion" : "1",
"Signature" : "VetoDxbYMh0Ii/87swLEGZt6FB0ZzGRjlW5BiVmKK1OLiV8B8NaVlADa6ThbWd1s89A4WX1WQwJMayucR8oYzEcWEH6//VxXCMQxWD80rG/NrxLeoyas4IHXhneiqBglLXh/R9nDZcMAmjPETOW61N8AnLh7nQ27O8Z+HCwY1wjxiShwElH5/+2cZvwCoD+oka3Gweu2tQyZAA9ergdJmXA9ukVnfieEEinhb8wuaemihvKLwGOTVoW/9IRMnixrDsOYOzFt+PXYuKQ6KGXpzV8U/fuJDsWiFa/lPHWw9pqfeA8lqUJwrgdbBS9vjOJIL+u2c49kzlei8zCelK3n7w==",
"SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2aXXXXXXXX.pem",
"UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST:b5ab2db8-7775-4852-bd1a-2520XXXXXXXX",
"MessageAttributes" : {
"surname" : {"Type":"String","Value":"SNSHTTPSTest"}
}
}

この記事はお役に立ちましたか?


請求に関するサポートまたは技術サポートが必要ですか?