亚马逊AWS官方博客

事不过三——如何实现恶意登陆的自动防护

背景介绍

近些年,随着网络技术的发展,恶意的网络入侵事件时有发生,如何保障云上网络安全引起了越来越多客户的关注。同时,国内外相关的法律细则也在愈发的完善,其中都对恶意登陆控制台的相关防护提出了指导性的要求。具体来看,大致分为两点:

  • 是否配置并启用了登录失败处理功能。
  • 是否配置并启用了限制非法登录功能,非法登录达到一定次数后采取特定动作(如:登录失败处理次数限制,账户锁定等)。

本文便是针对上述的诉求,探讨如何在AWS侧实现相关的功能。

分析

对于绝大多数的客户来说,进行AWS控制台的登陆无外乎两条路,1)直接使用AWS的IAM用户进行登陆2) 自建SSO平台,通过登陆第三方认证系统,然后登陆进AWS平台。那么相对应我们的方案思路也有两种:

  • 方案1: 使用第三方SSO服务,例如ADFS,Authing等,在其侧进行相关用户锁定控制
  • 方案2: 利用Cloudtrail中出现的用户登陆失败事件触发用户锁定功能

本文由于篇幅的限制,接下来会主要探讨基于方案2的实现和演进。

实践分析

项目架构参考:

实验说明:

  1. 使用IAM 用户A进行登陆,故意输入3次错误密码
  2. 对应的用户登陆错误事件会发送到CloudTrail中,配置trail监听发送日志到CloudWatch
  3. CloudWatch对于2中收到用户登陆的日志
  4. CLoudWatch中配置登陆失败事件订阅,目标为lambda函数
  5. Lambda触发后会在DDB中插入登陆失败事件
  6. Lambda会检查是否N分钟登陆失败超过三次
  7. 如果发现N分钟内超过三次失败,则禁用对应IAM 控制台登陆的权限

具体流程

(1)CloudTrail配置
登陆CloudTrail界面,点击Trail进行创建,因为本实验并不会进行基于S3的查询,所以S3可以选择任意一个bucket或者新建一个bucket使用。配置CloudWatch是否监听,选择启动,并创建新的日志组如名称:ct-cw-log;点击下一步进入事件监听,events项目只选择Management events,类别选择Write,然后点击下一步最终生成Trail监听。效果如下图所示:

(2)DynamoDB配置

配置如下的DynamoDB table:

其中:

  • Partition key: username,string类型
  • Sort key: randomUUID,string类型

(3)Lambda函数配置

进入Lambda界面,点击新建函数,选择运行环境为python3.7,选择一个具有DynamoDB读写,IAM login profile 删除权限的role(需要自建);创建完毕后把下面的代码copy到code处,点击部署。

import gzip
import json
import base64¬
import boto3
import uuid
import time

DDBclient = boto3.client('DynamoDB')
IAMclient = boto3.client('iam')

## insert a iterm with username(pk) and random sort-k, and set ttl to target like 3 min
def diable_iam_function(userName):  
    ticks = time.time()
    print('current time is ', time.asctime( time.localtime(ticks) ))
    expiryDateTime = ticks + 180
    print('expire time is ', time.asctime( time.localtime(expiryDateTime) ))
    try:
        data = DDBclient.put_item(
            TableName='login_counter',
            Item={
                'username': {'S': userName},
                'randomUUID': { 'S': str(uuid.uuid4()) },
                'ttl': { 'N': str(expiryDateTime) } 
            }
        )
    except Exception as e:
        print('Exception: ', e) 
        
``` query username and get count ,if >=3 then disable the iam user
    record will expire after 3mins, and will modify this parm to be custmized
```

    data = DDBclient.query(
      TableName='login_counter',
      KeyConditionExpression='#name = :value',
      FilterExpression = '#t > :ttl', 
      ExpressionAttributeValues={
        ':value': {
          'S': userName
        },
        ':ttl': { 
          'N': str(time.time()), 
        }
        
      },
      ExpressionAttributeNames={
         '#name': 'username',
         '#t': 'ttl', 
      }
    )

## disable iam user if failed count >= 3   
    if data['Count'] >= 3:
        print('user', userName, 'is being deleted due to more than 3 times retry')
        try:
            response = IAMclient.delete_login_profile(
                UserName=userName,
            )
            
            result = "delete user", response
            return result
        except Exception as e:
            print('Exception: ', e)
    else:
        
        result = 'user', userName, 'current failed count is ', data['Count']
        print(result)
        return result
        
def lambda_handler(event, context):
    cw_data = event['awslogs']['data']
    compressed_payload = base64.b64decode(cw_data)
    uncompressed_payload = gzip.decompress(compressed_payload)
    payload = json.loads(uncompressed_payload)
    print('how many events:', len(payload['logEvents']))
    log_events = payload['logEvents']


#   iterate all log events    
    for event in log_events:
        output = event['message']
        data = json.loads(output)
        userName = data['userIdentity']['userName']
        diable_iam_function(userName)

代码逻辑解释:

  • 每一条登陆失败的信息都会被记录在DynamoDB中,其主键为用户名,sort key为随机值,以当前时间+N(N取决于客户对于登陆失败限定的时间)设定其TTL。
  • 当有新的失败触发时都会进行DynamoDB事件的插入,同时基于TTL进行查询是否已经有三次登陆失败发生,如果有那么通过delete IAM login profile进行console登陆的禁止。
  • 查询时还需要基于TTL进行查询,主要因为在DynamoDB中的TTL时软删除,其实效性并没有保障,参考:https://docs.aws.amazon.com/amazonDynamoDB/latest/developerguide/TTL.html

(4) CloudWatch配置

进入到CloudWatch界面,搜寻在步骤1中创建的log group, 点击其下方的subscription filter,选择创建Lambda subscription。选择destination为步骤三中创建的lambda函数,在配置Configure log format and filters处按如下填写:

  • Log format:从下拉框选择Amazon CloudTrail
  • Subscription filter pattern:
    • { ($.eventName = ConsoleLogin) && ($.errorMessage = “Failed authentication”) }
  • Subscription filter name:failed auth filter

然后点击Start Streaming 按钮。

(5) 测试过程:

  • 使用测试用户A,登陆失败三次后,连续一分钟内继续登陆登出操作,发现仍旧能正常操作;
  • 15分钟过后发现不可登陆,用其他IAM用户登陆,发现测试用户A的console权限已经被禁止;
  • 恢复用户A的console权限,连续错误登陆两次,三分钟后再错误登陆一次,15分钟后再进行登陆,登陆顺利。

(6)待需要解决问题:

(7)其他注意

最初测试时发现短时间内错误登陆多次,但是Lambda只触发了一次。查看日志后发现,从CloudWatch传过来的日志可能一条日志中包含了多个登陆错误事件,所以改成如上的循环读取logevent模式。

改进方案

项目架构参考:

与原始方案相比,主要的区别为用EventBridge监听CloudTrail新事件替换掉了CloudWatch监听CloudTrail日志的触发流程。主要解决的问题:

  • 错误登陆事件可以立即在CloudTrail中观察到,Eventbridge也能立即得到监听,从而解决了不能立即禁用多次错误登陆IAM用户的问题
  • 不需要新创建trail,节省了对应s3的费用

具体流程

(1)DynamoDB配置

配置如下的DynamoDB table,与上述实验相同

(2)Lambda函数配置

进入Lambda界面,点击新建函数,选择运行环境为python3.7,选择一个具有DDB读写,IAM login profile删除权限的role;创建完毕后把下面的代码copy到code处,点击部署

import gzip
import json
import base64
import boto3
import uuid
import time


DDBclient = boto3.client('DynamoDB')
IAMclient = boto3.client('iam')


## insert a iterm with username(pk) and random sort-k, and set ttl to target like 3 min
def diable_iam_function(userName):  
    ticks = time.time()
    print('current time is ', time.asctime( time.localtime(ticks) ))
    expiryDateTime = ticks + 180
    print('expire time is ', time.asctime( time.localtime(expiryDateTime) ))
    try:
        data = DDBclient.put_item(
            TableName='login_counter',
            Item={
                'username': {'S': userName},
                'randomUUID': { 'S': str(uuid.uuid4()) },
                'ttl': { 'N': str(expiryDateTime) } 
            }
        )
    except Exception as e:
        print('Exception: ', e) 
        
## query user from pk and get count ,if >=3 then disable the iam user
    data = DDBclient.query(
      TableName='login_counter',
      KeyConditionExpression='#name = :value',
      FilterExpression = '#t > :ttl', 
      ExpressionAttributeValues={
        ':value': {
          'S': userName
        },
        ':ttl': { 
          'N': str(time.time()), 
        }
        
      },
      ExpressionAttributeNames={
         '#name': 'username',
         '#t': 'ttl', 
      }
    )

# disable iam user if failed count >= 3   
    if data['Count'] >= 3:
        print('user', userName, 'is being deleted due to more than 3 times retry')
        try:
            response = IAMclient.delete_login_profile(
                UserName=userName,
            )¬¬
            
            result = "delete user", response
            return result
        except Exception as e:
            print('Exception: ', e)
    else:
        
        result = 'user', userName, 'current failed count is ', data['Count']
        print(result)
        return result
        
def lambda_handler(event, context):
    userName = event
    diable_iam_function(userName)

与原始方案相比,代码逻辑基本没有变化,唯一的变化是通过EvenBridge传过来的event是单条的,并且直接是错误登陆的用户名,从而代码中做了相应的调整。这个变化主要是由于EventBridge和CloudWatch传递事件的机制不同。

(3)EventBridge配置

进入EventBridge界面,点击新创建Rule。在Define rule detail界面,填写Name为: Falied auth trigger,然后点击下一步;拉到页面最下方Event Pattern处,选择Custom Pattern,复制如下内容

{
  "source": ["aws.signin"],
  "detail-type": ["AWS Console Sign In via CloudTrail"],
  "detail":{
    "responseElements": {
        "ConsoleLogin": [ { "anything-but": [ "Success" ] } ]
      }
   } 
}

点击下一步,select a target处选择Lambda,然后选择我们在步骤2中创建的Lambda函数,然后展开additional setting,配置如下:

  • Configure target input: 从下拉框选择Part of matched event
  • Specify the part of the matched event: 填写如下
    • $.detail.userIdentity.userName

一路点击下一步,直到创建完成。

(4)测试过程:

  • 使用测试用户A,登陆失败三次后,间隔5秒进行正确密码登陆,发现无法登陆
  • 使用其他账号进行登陆,发现用户A已经被禁止console login的权限
  • 恢复用户A的console权限,连续错误登陆两次,三分钟后再错误登陆一次,间隔5,10,180秒后登陆发现可以继续登陆

总结

通过改进后的方案,目前可以实现实时和事后两种方式对于IAM用户的锁定,其中实时方案更为贴近等保和客户的要求,在整体流程也更为简单,成本也较低。此外,关于为什么要使用DynamoDB作为登陆数据的存储,作者也进行了一系列调研,其及简要的优劣势分析如下所示:

  • 基于事件流直接进行分析
    • KDS + KDA,基于滑动窗口进行分析
      • 优点:基于SQL进行查询相对简单
      • 缺点:需要长时间运行,成本较高, 滑动窗口范围可能导致漏判
  • 基于No-SQL进行存储
    • Redis
      • 优点:其TTL为实时生效,减小了范围查询导致的延迟
      • 缺点:需要维护额外的半托管Elasticache集群,其成本也较高
    • Dynamodb
      • 优点:按需使用,价格较低
      • 缺点:“软”TTL,查询效率相对低
  • 基于时序数据库进行存储
    • TimeStream
      • 优点:相对更为贴合本场景
      • 缺点:中国区暂时没有

本篇作者

尹振宇

AWS解决方案架构师,负责基于AWS云平台的解决方案咨询和设计,尤其在无服务器领域和微服务领域有着丰富的实践经验。