Wie überprüfe ich die Authentizität von Amazon-SNS-Nachrichten, die an HTTP- und HTTPS-Endpunkte gesendet werden?
Letzte Aktualisierung: 14.10.2021
Ich sende Benachrichtigungen über den Amazon Simple Notification Service (Amazon SNS) an einen HTTPS- oder HTTP-Endpunkt. Ich möchte Spoofing-Angriffe verhindern. Wie überprüfe ich also die Authentizität der Amazon-SNS-Nachrichten, die mein Endpunkt erhält?
Auflösung
Es ist eine bewährte Methode, bei der Überprüfung der Authentizität einer Amazon-SNS-Benachrichtigung eine zertifikatbasierte Signaturvalidierung zu verwenden. Anweisungen finden Sie unter Verifizierung der Signaturen von Amazon-SNS-Nachrichten im Amazon-SNS-Entwicklerhandbuch.
Um Spoofing-Angriffe zu verhindern, sollten Sie bei der Überprüfung von Amazon-SNS-Nachrichtensignaturen Folgendes tun:
- Verwenden Sie immer HTTPS, um das Zertifikat von Amazon SNS zu erhalten.
- Validieren Sie die Authentizität des Zertifikats.
- Stellen Sie sicher, dass das Zertifikat von Amazon SNS gesendet wurde.
- (Wenn möglich) Verwenden Sie eines der unterstützten AWS-SDKs für Amazon SNS, um Nachrichten zu validieren und zu überprüfen.
Beispiel-Python-Skript für zertifikatbasierte Signaturvalidierung
Wichtig: Stellen Sie sicher, dass Sie die Funktion processMessage (messagePayload) aufrufen. Der messagePayload-Parameter erfordert eine JSON-Zeichenfolge im gleichen Format wie die Beispiel-JSON-Zeichenfolge messagePayload in diesem Artikel.
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import base64
from M2Crypto import EVP, RSA, X509
import requests
cache = dict()
# Extract the name-value pairs from the JSON document in the body of the HTTP POST request that Amazon SNS sent to your endpoint.
def processMessage(messagePayload):
print ("Start!")
if (messagePayload["SignatureVersion"] != "1"):
print("Unexpected signature version. Unable to verify signature.")
return False
messagePayload["TopicArn"] = messagePayload["TopicArn"].replace(" ", "")
signatureFields = fieldsForSignature(messagePayload["Type"])
print(signatureFields)
strToSign = getSignatureFields(messagePayload, signatureFields)
print(strToSign)
certStr = getCert(messagePayload)
print("Printing the cert")
print(certStr.text)
print("Using M2Crypto")
# Get the X509 certificate that Amazon SNS used to sign the message.
certificateSNS = X509.load_cert_string(certStr.text)
#Extract the public key from the certificate.
public_keySNS = certificateSNS.get_pubkey()
public_keySNS.reset_context(md = "sha1")
# Generate the derived hash value of the Amazon SNS message.
# Generate the asserted hash value of the Amazon SNS message.
public_keySNS.verify_init()
public_keySNS.verify_update(strToSign.encode())
# Decode the Signature value
decoded_signature = base64.b64decode(messagePayload["Signature"])
# Verify the authenticity and integrity of the Amazon SNS message
verification_result = public_keySNS.verify_final(decoded_signature)
print("verification_result", verification_result)
if verification_result != 1:
print("Signature could not be verified")
return False
else:
return True
# Obtain the fields for signature based on message type.
def fieldsForSignature(type):
if (type == "SubscriptionConfirmation" or type == "UnsubscribeConfirmation"):
return ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
elif (type == "Notification"):
return ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
else:
return []
# Create the string to sign.
def getSignatureFields(messagePayload, signatureFields):
signatureStr = ""
for key in signatureFields:
if key in messagePayload:
signatureStr += (key + "\n" + messagePayload[key] + "\n")
return signatureStr
#**** Certificate Fetching ****
#Certificate caching
def get_cert_from_server(url):
print("Fetching cert from server...")
response = requests.get(url)
return response
def get_cert(url):
print("Getting cert...")
if url not in cache:
cache[url] = get_cert_from_server(url)
return cache[url]
def getCert(messagePayload):
certLoc = messagePayload["SigningCertURL"].replace(" ", "")
print("Cert location", certLoc)
responseCert = get_cert(certLoc)
return responseCert
Beispiel-JSON-Zeichenfolge messagePayload
{
"Type" : "Notification",
"MessageId" : "e1f2a232-e8ce-5f0a-b5d3-fbebXXXXXXXX",
"TopicArn" : "arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST",
"Subject" : "Test",
"Message" : "TestHTTPS",
"Timestamp" : "2021-10-07T18:55:19.793Z",
"SignatureVersion" : "1",
"Signature" : "VetoDxbYMh0Ii/87swLEGZt6FB0ZzGRjlW5BiVmKK1OLiV8B8NaVlADa6ThbWd1s89A4WX1WQwJMayucR8oYzEcWEH6//VxXCMQxWD80rG/NrxLeoyas4IHXhneiqBglLXh/R9nDZcMAmjPETOW61N8AnLh7nQ27O8Z+HCwY1wjxiShwElH5/+2cZvwCoD+oka3Gweu2tQyZAA9ergdJmXA9ukVnfieEEinhb8wuaemihvKLwGOTVoW/9IRMnixrDsOYOzFt+PXYuKQ6KGXpzV8U/fuJDsWiFa/lPHWw9pqfeA8lqUJwrgdbBS9vjOJIL+u2c49kzlei8zCelK3n7w==",
"SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2aXXXXXXXX.pem",
"UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:XXXXXXXX:SNSHTTPSTEST:b5ab2db8-7775-4852-bd1a-2520XXXXXXXX",
"MessageAttributes" : {
"surname" : {"Type":"String","Value":"SNSHTTPSTest"}
}
}
Ähnliche Informationen
War dieser Artikel hilfreich?
Benötigen Sie Hilfe zur Fakturierung oder technischen Support?