Source code for misfit.notification

import arrow
import json
import requests

from base64 import standard_b64decode
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15

from .misfit import MisfitObject


[docs]def string_to_sign(data): """ Build a signed SNS string from a dict """ strings = [] for key in ('Message', 'MessageId', 'Subject', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'): if key in data: strings += [key, data[key]] strings.append("") return '\n'.join(strings).encode('utf8')
[docs]class MisfitMessage(MisfitObject): """ DELETED, CREATED and UPDATED are the three known actions you will find in the ``action`` property """ DELETED = 'deleted' CREATED = 'created' UPDATED = 'updated'
[docs]class MisfitNotification(MisfitObject): def __init__(self, data): """ Load the JSON to a dict and verify the signature if applicable """ data_dict = json.loads(data.decode('utf8')) super(MisfitNotification, self).__init__(data_dict) if hasattr(self, 'Signature'): self.verify_signature() if self.Type == 'Notification': # Objectify the message list self.Message = [MisfitMessage(m) for m in json.loads(self.Message)] elif self.Type == 'SubscriptionConfirmation': # If the notification is a subscription confirmation, fetch the # subscribe URL requests.get(self.SubscribeURL)
[docs] def verify_signature(self): """ Verify the signature of the SNS message. raises cryptography.exceptions.InvalidSignature """ # Get the signing certificate and public key from the specified URL cert_str = requests.get(self.data['SigningCertURL']).content cert = default_backend().load_pem_x509_certificate(cert_str) pubkey = cert.public_key() # Verify the signature signature = standard_b64decode(self.data['Signature'].encode('utf8')) verifier = pubkey.verifier(signature, PKCS1v15(), hashes.SHA1()) verifier.update(string_to_sign(self.data)) # verify returns None on success, raises InvalidSignature on failure assert verifier.verify() is None