亚马逊AWS官方博客

利用AWS EKS CA结合AWS GameLift与Agones能力,实现游戏房间容器化调度

前言

在很多开房间游戏的实现中,如何快速高效地调度每个房间是每个开发运维人员绕不开的话题。同时,现代服务器架构下,容器化已经是不可避免的大趋势。许多架构会利用Google的Agones容器化能力来管理房间。它提供了一套简单有效且健壮的状态和K8S API来管理和追踪每个pod的生命周期,并且可以通过CA (Cluster Autoscaler)和AWS的EKS进行无缝集成。虽然CA也可以利用mixed instances policy 来使用AWS Spot实例达到降本增效的目的,但仅仅简单的集成后,spot的“被动响应式”中断模式总会给开发和运维人员带来“中断焦虑”困扰。

AWS Gamelift作为官方推出来的托管服务,它主要解决了游戏房间在虚拟机上的生命周期问题。其资源调度可以基于FleetIQ组件,实现了基于人工智能算法的Spot类型选择,通过实例再平衡等技术。会主动地去找到最“不紧张”的资源,提高了整体spot资源池子的可用绿。这种“主动发现式”的模型会大大降低了开发运维人员的“中断焦虑”当然,它也有一定的限制:需要做容器化改造提高资源利用率和管理效率。

概述

本文主要介绍了一种方法:将Agones和FleetIQ(GameLift)各自优势能力结合起来。让开发运维人员可以既享受到容器化的便捷,又不会被Spot实例的“中断焦虑”给困扰。其主要思路为自定义AWS EKS的CA参数node 来利用GameLift所使用ASG(Auto Scaling Group)来达到上述目的。

实现

术语定义

GameServer 一般服务器进程,提供客户端连接会话。可以容器化也可以单独部署 K8s Cluster AutoScaler (CA) K8s里面的自动扩容组件,可以根据设置自动调整node数目
FleetIQ
(GameLift)
GameServer 跑在EC2的服务器进程,通过FleetIQ管理 Agones GameServer 对应K8s里面的Pod
GameServerGroup 对应一组FleetIQ GamerServer资源,它和ASG(Auto Scaling Group)关联,定义了对应的配置以及FleetIQ如何管理它 Fleet K8s里面已经预热好的一组GameServer, 等待被分配
ClaimServerGroup 分配GameServerGroup里面可用的GameServer资源,用于后续的游戏会话 GameServerAllocation K8s里面能够按需伸缩Fleet里面GameServer的组件
Instance Viability 代表某类Spot类型被中断的可能性 Feet AutoScaler 分配Fleet里面可用的GameServer资源,用于后续的游戏会话

架构图

架构模块

  • Agones 模块(灰色):准备和扩容服务器,为游戏会话来分配服务器,并且管理其生命周期。其中的游戏是对于FleetIQ无任何感知的。
  • FleetIQ(GameLift )模块(绿色):提供了EC2 实例的viability 检查,并且更新到 EC2 Auto Scaling组的配置里面去,保证只选用viable 实例,同时清空和替换掉non-viable 实例。
  • 中间服务模块(黄色):主要是管理Agones 和FleetIQ两个层级之间的沟通。
    • 监控FleetIQ上报的实力viability状态,通过K8S的cordons和taint防止Agones使用non-viable的实例
    • 在EC2中注册FleetIQ GameServer服务,并且标记UTILIZED
    • 在non-viable实例中注销FleetIQ GameServer

基本步骤

本文所有代码和设计已经开源, 可以先去 clone 项目, 找到 script/install.sh 脚本。这个脚本会将下面流程中的大部分步骤自动化地执行掉。详细的代码可以参考项目

环境准备

  • 建立一个拥有AdministratorAccess 权限的账户,假设我们命名为 ”agonesfleetiq-admin
  • 将这个role添加到本地或者Cloud9中,这里我通过aws configure命令设置到了环境变量中

  • 安装环境以及工具,包括环境的验证

EKS配置

  • 安装eksctl,配置角色
  • 启动一个非托管节点的集群

FleetIQ配置

  • 根据前面的eks 集群生成ASG启动模板所需的 user data, 子网,安全组等等。
  • 利用ASG的启动模板,创建GameLift的 GameServerGroup。
  • 确认GameServerGroup状态正确后,打上方便CA识别的标签
aws autoscaling create-or-update-tags --tags ResourceId=gamelift-gameservergroup-${GSGNAME},ResourceType=auto-scaling-group,Key=kubernetes.io/cluster/agones,Value=owned,PropagateAtLaunch=true ResourceId=gamelift-gameservergroup-${GSGNAME},ResourceType=auto-scaling-group,Key=k8s.io/cluster-autoscaler/agones,Value=enabled,PropagateAtLaunch=true ResourceId=gamelift-gameservergroup-${GSGNAME},ResourceType=auto-scaling-group,Key=Name,Value=FleetIQ,PropagateAtLaunch=true ResourceId=gamelift-gameservergroup-${GSGNAME},ResourceType=auto-scaling-group,Key=k8s.io/cluster-autoscaler/enabled,Value=true,PropagateAtLaunch=true --region ${AWS_REGION}
  • 手动配置一个新的CA deployment, 让其使用对应的GameServerGroup
# 利用 --node 参数
command:
            - ./cluster-autoscaler
            - --v=4
            - --stderrthreshold=info
            - --cloud-provider=aws
            - --skip-nodes-with-local-storage=false
            - --expander=priority
            - --nodes=0:10:gamelift-gameservergroup-${GSGNAME}
            - --balance-similar-node-groups
            - --skip-nodes-with-system-pods=false
            
# 优先选择 gamelift-gameservergroup         
apiVersion: v1
kind: ConfigMap
metadata:
  name: cluster-autoscaler-priority-expander
  namespace: kube-system
data:
  priorities: |-
    10:
      - .*-non-existing-entry.*
    20:
      - gamelift-gameservergroup-${GSGNAME}

Agones配置

  • 安装helm 以及 agones chart。
  • 部署Agones 舰队,具体yaml可以参考config/environment/Minetest 下面的配置文件。

中间层配置

  • Lambda
    • pubsub-service:
      • 通过 DescribeGameServerInstance 方法来订阅FleetIQ模块中EC2实例的状态
      • 将状态推送到SQS队列中,保证唯一性和时序。
# 主动发现集群中的节点状态,推送到SQS中
...
"""
    Current status of the game server instance.
        ACTIVE -- The instance is viable for hosting game servers.
        DRAINING -- The instance is not viable for hosting game servers. Existing game servers are in the process of ending, and new game servers are not started on this instance unless no other resources are available. When the instance is put in DRAINING, a new instance is started up to replace it. Once the instance has no UTILIZED game servers, it will be terminated in favor of the new instance.
        SPOT_TERMINATING -- The instance is in the process of shutting down due to a Spot instance interruption. No new game servers are started on this instance.
"""          
def lambda_handler(event, context):
 
    queue = sqs.get_queue_by_name(QueueName=sqsName)
    
    gamelift = boto3.client('gamelift', region_name=os.getenv('AWS_REGION'))
    paginator = gamelift.get_paginator('describe_game_server_instances')
    #TODO get GAME_SERVER_GROUP_NAME from a ConfigMap and loop through the values
    groups = json.loads(os.getenv('CONFIG_TXT'))
    for group in groups['GameServerGroups']:
        pages = paginator.paginate(GameServerGroupName=group)
        for page in pages:
            game_servers = []
            for game_server in page['GameServerInstances']:
                
                game_server = decribe_instance(game_server)
                if 'PrivateDnsName' in game_server and len(game_server['PrivateDnsName']) > 0:
                    print(f'Publishing status on channel {game_server["InstanceId"]}', flush=True)
                    game_servers.append(game_server)

            if len(game_servers) == 0:
                continue
            randomsuffix = str(random.randint(1,1000))
            response = queue.send_message(
                MessageBody=json.dumps(game_servers), 
                MessageGroupId=group, 
                MessageDeduplicationId=group + randomsuffix)
                
            print(f'response = {response}')        
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }
...
  • monitor-service
    • 拉取SQS中的Message,同时通过K8s API分析节点状态
# 主流程,根据是否有redis记录决定是否要初始化GameServer
def main_loop(messageId, message):
    set_messageId(messageId, message['InstanceId'])
    print_messageId(message)
    global r
    r = redis_conn()
    rkey = '{}.{}'.format(message['GameServerGroupName'], message['InstanceId'])
    if r.setnx(rkey, '1') == True:
        print_messageId(f'access {rkey}')
        initialize_game_server(message)
    
    update_health_status(message)
    get_health_status(message)
    pass
  • 分析每个EC2状态:
    • 如果正常(ACTIVE),将机器注册到GameLift的ASG组里面
# 初始化GameServer: 将EC2注册到GameLift,更改状态到UTILIZED
def initialize_game_server(message):
    """This method registers the instance as a game server with Gamelift FleetIQ using the instance's Id as game server name.
    After registering the instance, it looks at result of DescribeAutoscalingInstances to see whether the instance is HEALTHY. 
    When HEALTHY, the instance is CLAIMED and its status is changed to UTILIZED. Finally, the taint gamelift.aws/status:ACTIVE,NoExecute
    is added to the node. Agones game servers need to have a toleration for this taint before they can run on this instance."""
    status = message['InstanceStatus']
    GameServerGroupName = message['GameServerGroupName']
    GameServerId = message['InstanceId']
    InstanceId = message['InstanceId']
    PrivateDnsName = message['PrivateDnsName']

    try:
        # Register game server instance
        print_messageId(f'Registering game server {GameServerId} PrivateDnsName: {PrivateDnsName}', flush=True)  
        gamelift.register_game_server(
            GameServerGroupName=GameServerGroupName,
            GameServerId=GameServerId,
            InstanceId=InstanceId
        )
    except gamelift.exceptions.ConflictException as error:
        print_messageId(f'The game server {GameServerId} is already registered', flush=True)
        #return None
    except Exception as e: 
        print_messageId(f'{e}', flush=True)
        deregister_game_server(GameServerGroupName, GameServerId)
        return None
    
    # Update the game server status to healthy
    # TODO Change this to use the new FleetIQ API DescribeGameServerInstances
    # TODO Consider using a decorator and backoff library to implement the backoff
    trytime = 0
    backoff = random.randint(1,5)
    while is_healthy(InstanceId) != 'HEALTHY':
        print_messageId(f'Instance is not healthy, re-trying in {backoff}', flush=True)
        sleep(backoff)
        trytime = trytime + 1
        # tried 3 times, let's return here
        if trytime == 3:
            return None
    
    update_health_status(message)
    
    # Claim the game server
    print_messageId(f'Claiming game server {GameServerId}', flush=True)
    try: 
        gamelift.claim_game_server(
            GameServerGroupName=GameServerGroupName,
            GameServerId=GameServerId
        )
    except gamelift.exceptions.ConflictException as error: 
        print_messageId('The instance has already been claimed', flush=True)
        
    # Update game server status 
    print_messageId(f'Changing server {GameServerId} status to utilized', flush=True)
    gamelift.update_game_server(
        GameServerGroupName=GameServerGroupName,
        GameServerId=GameServerId,
        UtilizationStatus='UTILIZED'
    )
    
# 通过 update_health_status 打上标签gamelift.aws/status:ACTIVE 
def update_health_status(message):
    status = message['InstanceStatus']
    GameServerGroupName = message['GameServerGroupName']
    GameServerId = message['InstanceId']
    InstanceId = message['InstanceId']
    PrivateDnsName = message['PrivateDnsName']

    try: 
        gamelift.update_game_server(
            GameServerGroupName=GameServerGroupName,
            GameServerId=GameServerId,
            HealthCheck='HEALTHY'
        )
        print_messageId(f'Updated Gamelift game server {GameServerId} health', flush=True)
    except gamelift.exceptions.NotFoundException as e:
        print_messageId(f'Skipping healthcheck, the node {GameServerId} is not registered', flush=True)
    
    # Adding taint ACTIVE to node
    taint = {
        "key": "gamelift.status/active",
        "value": "true",
        "effect": "NoExecute"
    }
    kubernetes_tools.taint_node(GameServerId, PrivateDnsName, json.dumps(taint))
    pass
  • 如果异常(DRAINING), 保护已经有Allocated的节点,等待玩家离开(优雅驱逐)。并且从GameLift的ASG组里面注销,然后进入终止流程(SPOT_TERMINATING)
# 异步检查节点的状态,提前处理DRAINING的节点

def get_health_status(message):
    """This is another asynchronous method that checks the health of the viability of the instance according to FleetIQ.
    It is subscribing to a Redis channel for updates because the DescribeGameServerInstances API has a low throttling rate.
    For this to work, a separate application has to be deployed onto the cluster.  This application gets the viability of 
    each instance and publishes it on a separate channel for each instance.  When the viability changes to DRAINING, the node
    is cordoned and tainted, preventing Agones from scheduling new game servers on the instance.  Game servers in the non-Allocated
    state will be rescheduled onto other instances."""

    status = message['InstanceStatus']
    GameServerGroupName = message['GameServerGroupName']
    GameServerId = message['InstanceId']
    InstanceId = message['InstanceId']
    PrivateDnsName = message['PrivateDnsName']

    if status == 'ACTIVE':
        return
    global r
    is_cordoned = (True, False)[r.hget(name="{InstanceId}", key="is_cordoned") == 1]
    is_ready_shutdown = (True, False)[r.hget(name="{InstanceId}", key="is_cordoned") == 1]
    is_waiting_for_termination = (True, False)[r.hget(name="{InstanceId}", key="is_cordoned") == 1]

    # is_cordoned = False
    # is_ready_shutdown = False
    # is_waiting_for_termination = False

    print_messageId('Starting message loop', flush=True)
    print_messageId(f'is_cordoned: {is_cordoned}, is_ready_shutdown: {is_ready_shutdown}, is_waiting_for_termination: {is_waiting_for_termination}', flush=True)
    print_messageId(f"Instance {GameServerId}/{PrivateDnsName} status is: {status}", flush=True)

    try:
        if status == 'DRAINING':
            print_messageId(f'Instance is no longer viable', flush=True)
            is_cordoned = cordon_and_protect(PrivateDnsName, GameServerId)
            is_cordoned = True
            r.hset(name="{InstanceId}", key="is_cordoned", value=(1, 0)[is_cordoned])

            print_messageId(f'Instance should have no allocted session', flush=True)
            is_ready_shutdown = is_ready_shutdown_check(PrivateDnsName)
            r.hset(name="{InstanceId}", key="is_ready_shutdown", value=(1, 0)[is_ready_shutdown])

            print_messageId(f'Instance should be deregistered', flush=True)
            deregister_game_server(GameServerGroupName, GameServerId)
            is_waiting_for_termination = True
            r.hset(name="{InstanceId}", key="is_waiting_for_termination", value=(1, 0)[is_waiting_for_termination])
            
            if is_waiting_for_termination == True:
                print_messageId(f'Waiting for termination signal', flush=True)
                
        elif status == 'SPOT_TERMINATING':
            if is_waiting_for_termination == True:
                # This is never invoked because the status never equals SPOT_TERMINATING  
                print_messageId(f'Received termination signal', flush=True)
                kubernetes_tools.drain_pods(InstanceId, PrivateDnsName)  
        else:
            pass     

    except Exception as e:
        print_messageId(f'get_health_status->error: {e}', flush=True)   
  • EventBridge:PubSub的定时任务的触发器
  • SQS:采用了FIFO队列,保证了消息的唯一性和顺序性。
game_server["PrivateDnsName"] = instance.get("PrivateDnsName")
game_server["PrivateIpAddress"] = instance.get("PrivateIpAddress")
game_server["PublicDnsName"] = instance.get("PublicDnsName")
game_server["PublicIpAddress"] = instance.get("PublicIpAddress")
  • ElasiCache:缓存临时结果
# 以instanceID为hash名字:
# is_cordoned: non-viable 节点已经被污染保护,等待被关闭,不接受新的pod
# is_ready_shutdown: 节点已经关闭
# is_waiting_for_termination:  节点已经关闭,等待注销

r.hset(name="{InstanceId}", key="is_cordoned", value=(1, 0)[is_cordoned])
r.hset(name="{InstanceId}", key="is_ready_shutdown", value=(1, 0)[is_ready_shutdown])
r.hset(name="{InstanceId}", key="is_waiting_for_termination", value=(1, 0)[is_waiting_for_termination])

功能测试

更详细的功能可以参考https://agones.dev/site/docs/

  • 检查Agones状态 :
kubectl describe --namespace agones-system pods
  • 检查Agones Pod状态:
kubectl get pods --namespace agones-system
  • 获取舰队:
kubectl get fleets
  • 获取GameServers:
kubectl get gameservers
  • 获取详细GameServers:
watch kubectl describe gameserver
  • Scale-up舰队:
kubectl scale fleet stk-fleet --replicas=100watch kubectl get fleets
kubectl get nodes
  • Scale-down舰队:
kubectl scale fleet stk-fleet --replicas=1
watch kubectl get fleets
kubectl get nodes
  • 还有一种通过GameServerFleetAutoScaler的方案来测试,利用 Agones的自动阔缩容方法来替代上面的scale,只需要将目标GameServer进行Allocation操作即可.
# gameserverfleetautoscaler.yaml

apiVersion: "autoscaling.agones.dev/v1"
kind: FleetAutoscaler
metadata:
  name: minetest-autoscaler
spec:
  fleetName: minetest
  policy:
    type: Buffer
    buffer:
      bufferSize: 5
      minReplicas: 0
      maxReplicas: 20
      
# gameserverallocation.yaml      
apiVersion: "allocation.agones.dev/v1"
kind: GameServerAllocation
spec:
  required:
    matchLabels:
      agones.dev/fleet: minetest

限制

  • 由于CA的默认设置,需要10min-15min左右时间scale-down操作才会生效,需要在手动配置ASG时候调整 --scale-down-delay-after-add参数。
  • 本文只是展示了单个ASG的配置,多个ASG配置,需要在EKS创建初期规划好对应的nodegroup的数目。
  • 为了减少代码的对于项目的入侵,并没有在每个节点中部署deamonset的方式来做monitor-server
  • 目前多个AWS服务的引入,并没有创建对应Cloudformation或者CDK做自动化部署。
  • sh 脚本可能在不同的linux环境中有不同表现,同时面临eks版本的问题(本文使用 1.8),需要安装时候注意。

结论

利用本文的方法可以将Cluster AutoscalerFleetIQ(GameLift结合起来使用,它的优势包括:

  • 从下往上看:利用FleetIQ智能预测+多节点平衡来判断节点的可用性,来判断上层Agones的Pod部署的节点位置,避免Spot被中断的问题。
  • 从上往下看:利用Agones针对游戏房间不同目的的标签,快速方便管理上层房间的状态(Ready, Allocated),无需触及底层节点的生命周期。

参考

本篇作者

万曦

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的云计算方案的咨询和架构设计。坚实的AWS Builder文化拥抱者。拥有超过12年的游戏研发经验,参与过数个游戏项目的管理和开发,对于游戏行业有深度理解和见解。