对于 DynamoDB 和 NoSQL 新手用户而言,面临的其中一个最大调整是如何进行数据建模以在整个数据集中对数据进行筛选。例如,在我们的游戏中,我们需要找到具有人员空当的游戏会话,以便向用户显示他们可以加入哪些游戏会话。
在关系数据库中,您将编写一些 SQL 来查询数据。
SELECT * FROM games WHERE status = “OPEN”
DynamoDB 可以筛选 Query 或 Scan 操作的结果,但 DynamoDB 不能像关系数据库那样运行。在检索到与 Query 或 Scan 操作匹配的初始项目之后,将应用 DynamoDB 筛选条件。筛选条件减小了从 DynamoDB 服务发送的有效负载的大小,但是最初检索的项目数受 DynamoDB 大小的限制。
幸运的是,有许多方法可以允许对 DynamoDB 中的数据集进行筛选查询。为了在 DynamoDB 表上提供有效的筛选条件,您需要从一开始就将筛选条件规划到表的数据模型中。记住我们在本实验的第二个模块中吸取的教训:考虑您的访问模式,然后再设计表。
在以下步骤中,我们使用全局二级索引来查找开放游戏。具体来说,我们将使用稀疏索引技术来处理此访问模式。
完成模块所需时间:40 分钟
-
第 1 步:为稀疏二级索引建模
二级索引是 DynamoDB 中至关重要的数据建模工具。它们支持您对数据进行改造以允许其他查询模式。要创建二级索引,请指定索引的主键,就像之前创建表时一样。请注意,全局二级索引的主键不必对每个项目具有唯一性。然后,DynamoDB 会根据指定的属性将项目复制到索引中,您可以像对表进行查询一样对其进行查询。
使用稀疏二级索引是 DynamoDB 中的高级策略。对于二级索引,DynamoDB 仅在项目具有二级索引中的主键元素的情况下才从原始表复制项目。没有主键元素的项目不会被复制,这就是为什么这些二级索引称为“稀疏”的原因。
让我们来看看这对我们带来的影响。您可能还记得我们有两种查找开放游戏的访问模式:
- 查找开放游戏(读取)
- 通过地图查找开放游戏(读取)
我们可以使用复合主键创建二级索引,其中 HASH 键是游戏的 map 属性,而 RANGE 键是游戏的 open_timestamp 属性,指示游戏的开放时间。
对我们来说重要的是,当游戏满员时,open_timestamp 属性将被删除。删除属性后,满员游戏将从二级索引中删除,因为它没有 RANGE 键属性的值。这就是使索引稀疏的原因:它仅包含具有 open_timestamp 属性的开放游戏。
在下一步中,我们创建二级索引。
-
第 2 步:创建稀疏二级索引
在此步骤中,我们为开放游戏(尚未满员的游戏)创建稀疏二级索引。
创建二级索引与创建表类似。在下载的代码中,您将在 scripts/ 目录中找到一个名为 add_secondary_index.py 的脚本文件。该文件的内容如下。
import boto3 dynamodb = boto3.client('dynamodb') try: dynamodb.update_table( TableName='battle-royale', AttributeDefinitions=[ { "AttributeName": "map", "AttributeType": "S" }, { "AttributeName": "open_timestamp", "AttributeType": "S" } ], GlobalSecondaryIndexUpdates=[ { "Create": { "IndexName": "OpenGamesIndex", "KeySchema": [ { "AttributeName": "map", "KeyType": "HASH" }, { "AttributeName": "open_timestamp", "KeyType": "RANGE" } ], "Projection": { "ProjectionType": "ALL" }, "ProvisionedThroughput": { "ReadCapacityUnits": 1, "WriteCapacityUnits": 1 } } } ], ) print("Table updated successfully.") except Exception as e: print("Could not update table. Error:") print(e)
每当在表或二级索引的主键中使用属性时,都必须在 AttributeDefinitions 中定义它们。然后,我们在 GlobalSecondaryIndexUpdates 属性中创建一个新的二级索引。对于此二级索引,我们指定索引名称、主键架构、预配置的吞吐量以及我们要投影的属性。
请注意,我们不必指定二级索引用作稀疏索引。这纯粹是您输入的数据的函数。如果将项目写入不具有二级索引属性的表中,则这些项目将不会包含在二级索引中。
通过运行以下命令来创建二级索引。
python scripts/add_secondary_index.py
您应该在控制台中看到以下消息:“表已成功更新。”
在下一步中,我们将使用稀疏索引按地图查找开放游戏。
-
第 3 步:查询稀疏二级索引
现在我们已经配置了二级索引,让我们使用它来满足某些访问模式的要求。
要使用二级索引,您有两个可用的 API 调用:Query 和 Scan。使用 Query 时,您必须指定 HASH 键,它会返回目标结果。使用 Scan 时,您无需指定 HASH 键,该操作将遍历整个表。除特殊情况外,不建议在 DynamoDB 中运行 Scan,因为它们会访问数据库中的每个项目。如果表中有大量数据,则扫描可能需要花费很长时间。在下一步中,我们向您介绍为什么将 Scan 与稀疏索引一起使用时,Scan 会成为一款强大的工具。
我们可以针对上一步中创建的二级索引使用 Query API,以按地图名称查找所有开放游戏。二级索引按地图名称划分,这使我们可以进行有针对性的查询以查找开放游戏。
在您下载的代码中,在 application/ 目录中有一个 find_open_games_by_map.py 文件。此脚本的内容如下。
import boto3 from entities import Game dynamodb = boto3.client('dynamodb') def find_open_games_by_map(map_name): resp = dynamodb.query( TableName='battle-royale', IndexName="OpenGamesIndex", KeyConditionExpression="#map = :map", ExpressionAttributeNames={ "#map": "map" }, ExpressionAttributeValues={ ":map": { "S": map_name }, }, ScanIndexForward=True ) games = [Game(item) for item in resp['Items']] return games games = find_open_games_by_map("Green Grasslands") for game in games: print(game)
在前面的脚本中,find_open_games_by_map 函数类似于您在应用程序中拥有的函数。该函数接受地图名称,并查询 OpenGamesIndex 以查找该地图的所有开放游戏。然后,它将返回的实体集合在一起,组成可在您的应用程序中使用的 Game 对象。
通过在终端中运行以下命令来执行此脚本。
python application/find_open_games_by_map.py
终端将显示以下输出以及 Green Grasslands 地图的四款开放游戏。
Open games for Green Grasslands: Game<14c7f97e-8354-4ddf-985f-074970818215 -- Green Grasslands> Game<3d4285f0-e52b-401a-a59b-112b38c4a26b -- Green Grasslands> Game<683680f0-02b0-4e5e-a36a-be4e00fc93f3 -- Green Grasslands> Game<0ab37cf1-fc60-4d93-b72b-89335f759581 -- Green Grasslands> sudo cp -r wordpress/* /var/www/html/
在下一步中,我们使用 Scan API 扫描稀疏二级索引。
-
第 4 步:扫描稀疏二级索引
在上一步中,我们了解了如何查找特定地图的游戏。一些玩家可能更喜欢玩特定的地图,因此这项操作很有用。其他玩家可能愿意玩任何地图的游戏。在本节中,我们将介绍如何在应用程序中查找任何开放游戏,而不管地图类型如何。为此,我们使用 Scan API。
通常,您不希望将表设计为使用 DynamoDB Scan 操作,因为 DynamoDB 是为获取所需确切实体的外科查询而构建的。Scan 操作会在表中获取随机的实体集合,因此要找到所需的实体可能需要多次往返数据库。
但是,有时 Scan 会很有用。像我们这种情况,我们拥有稀疏二级索引,这意味着索引中不应包含太多实体。此外,索引仅包括那些开放游戏,而这正是我们所需要的。
对于此用例,Scan 效果很好。让我们看看它是如何运行的。在您下载的代码中,在 application/ 目录中有一个 find_open_games.py 文件。该文件的内容如下。
import boto3 from entities import Game dynamodb = boto3.client('dynamodb') def find_open_games(): resp = dynamodb.scan( TableName='battle-royale', IndexName="OpenGamesIndex", ) games = [Game(item) for item in resp['Items']] return games games = find_open_games() print("Open games:") for game in games: print(game)
该代码与上一步中的代码相似。但是,我们不使用 DynamoDB 客户端上的 query() 方法,而使用 scan() 方法。由于我们使用的是 scan(),因此不需要像对 query() 那样指定任何关键条件。我们只是让 DynamoDB 不按特定顺序返回一堆项目。
在终端中使用以下命令运行脚本。
python application/find_open_games.py
您的终端应打印对多种地图开放的九款游戏的列表。
Open games: Game<c6f38a6a-d1c5-4bdf-8468-24692ccc4646 -- Urban Underground> Game<d06af94a-2363-441d-a69b-49e3f85e748a -- Dirty Desert> Game<873aaf13-0847-4661-ba26-21e0c66ebe64 -- Dirty Desert> Game<fe89e561-8a93-4e08-84d8-efa88bef383d -- Dirty Desert> Game<248dd9ef-6b17-42f0-9567-2cbd3dd63174 -- Juicy Jungle> Game<14c7f97e-8354-4ddf-985f-074970818215 -- Green Grasslands> Game<3d4285f0-e52b-401a-a59b-112b38c4a26b -- Green Grasslands> Game<683680f0-02b0-4e5e-a36a-be4e00fc93f3 -- Green Grasslands> Game<0ab37cf1-fc60-4d93-b72b-89335f759581 -- Green Grasslands>
在这一步中,我们了解了在特定情况下如何使用 Scan 操作是正确的选择。我们使用 Scan 从稀疏二级索引中获取各种实体,以向玩家展示开放游戏。
在接下来的步骤中,我们满足两种访问模式的要求:
- 将用户加入游戏(写入)
- 开始游戏(写入)
为了满足以下步骤中的“将用户加入游戏”访问模式,我们将使用 DynamoDB 事务。事务在操作一次影响多个数据元素的关系系统中很流行。例如,假设您正在经营一家银行。客户 Alejandra 将 100 美元转账给另一位客户 Ana。在记录此事务时,您将使用事务来确保将更改应用于两个客户的余额,而不仅仅是一个。
通过 DynamoDB 事务,您可以更轻松地构建仅一次操作即可进行多个项目更改的应用程序。借助事务,您可以通过一次交易请求处理多达 10 个项目。
在 TransactWriteItem API 调用中,可以使用以下操作:
- 输入:用于插入或覆盖项目。
- 更新:用于更新现有项目。
- 删除:用于移除项目。
- ConditionCheck:用于在不更改项目的情况下声明现有项目的条件。
在下一步中,我们在向游戏添加新用户同时防止游戏变得过满时使用 DynamoDB 事务。
-
第 5 步:将用户加入游戏
我们在本模块中解决的第一个访问模式是向游戏添加新用户。
在向游戏添加新用户时,我们需要:
- 确认游戏中没有 50 位玩家(每个游戏最多可以有 50 位玩家)。
- 确认用户尚未加入游戏。
- 创建一个新的 UserGameMapping 实体以将该用户添加到游戏。
- 增加 Game 实体上的 people 属性以跟踪游戏中有多少玩家。
请注意,完成所有这些操作需要跨现有 Game 实体和新的 UserGameMapping 实体以及每个实体的条件逻辑进行写操作。这种操作非常适合 DynamoDB 事务,因为您需要在同一请求中处理多个实体,并且希望整个请求一起成功或失败。
在您下载的代码中,application/ 目录中有一个 join_game.py 脚本。该脚本中的函数使用 DynamoDB 事务将用户添加到游戏。
该脚本的内容如下。
import boto3 from entities import Game, UserGameMapping dynamodb = boto3.client('dynamodb') GAME_ID = "c6f38a6a-d1c5-4bdf-8468-24692ccc4646" USERNAME = 'vlopez' def join_game_for_user(game_id, username): try: resp = dynamodb.transact_write_items( TransactItems=[ { "Put": { "TableName": "battle-royale", "Item": { "PK": {"S": "GAME#{}".format(game_id) }, "SK": {"S": "USER#{}".format(username) }, "game_id": {"S": game_id }, "username": {"S": username } }, "ConditionExpression": "attribute_not_exists(SK)", "ReturnValuesOnConditionCheckFailure": "ALL_OLD" }, }, { "Update": { "TableName": "battle-royale", "Key": { "PK": { "S": "GAME#{}".format(game_id) }, "SK": { "S": "#METADATA#{}".format(game_id) }, }, "UpdateExpression": "SET people = people + :p", "ConditionExpression": "people <= :limit", "ExpressionAttributeValues": { ":p": { "N": "1" }, ":limit": { "N": "50" } }, "ReturnValuesOnConditionCheckFailure": "ALL_OLD" } } ] ) print("Added {} to game {}".format(username, game_id)) return True except Exception as e: print("Could not add user to game") join_game_for_user(GAME_ID, USERNAME)
在此脚本的 join_game_for_user 函数中,transact_write_items() 方法执行写事务。此事务有两个操作。
在事务的第一个操作中,我们使用输入操作插入一个新的 UserGameMapping 实体。作为该操作的一部分,我们指定一个条件,即该实体不应存在 SK 属性。这样可确保不存在具有此 PK 和 SK 的实体。如果确实存在这样的实体,则意味着该用户已经加入了游戏。
第二个操作是对 Game 实体的更新操作,用于将 people 属性增加一个。作为此操作的一部分,我们添加了条件检查,以确保 people 的当前值不大于 50。一旦有 50 个人加入游戏,该游戏已满员,便可以开始游戏了。
在终端中使用以下命令运行此脚本。
python application/join_game.py
您终端的输出应表明该用户已添加到游戏中。
Added vlopez to game c6f38a6a-d1c5-4bdf-8468-24692ccc4646
请注意,如果您尝试再次运行该脚本,该函数将失败。用户 vlopez 已被添加到游戏中,因此尝试再次添加用户不符合我们指定的条件。
DynamoDB 事务的添加大大简化了围绕此类复杂操作的工作流程。如果没有事务,这将需要具有复杂条件的多个 API 调用,以及在发生冲突时执行手动回滚。现在,我们可以用少于 50 行的代码来实现这样的复杂操作。
在下一步中,我们将处理“开始游戏(写入)”访问模式。
-
第 6 步:开始游戏
只要游戏拥有 50 个用户,游戏的创建者就可以开始游戏以启动游戏。在这一步中,我们介绍如何处理这种访问模式。
当我们的应用程序后端收到开始游戏的请求时,我们会检查三个方面:
- 游戏有 50 人注册。
- 发出请求的用户是游戏的创建者。
- 游戏尚未开始。
我们可以在请求更新游戏的条件表达式中处理所有这些检查。如果所有这些检查均通过,则我们需要通过以下方式更新实体:
- 移除 open_timestamp 属性,以使其在上一个模块的稀疏二级索引中不显示为开放游戏。
- 添加一个 start_time 属性以指示游戏何时开始。
在您下载的代码中,application/ 目录中有一个 start_game.py 脚本。该文件的内容如下。
import datetime import boto3 from entities import Game dynamodb = boto3.client('dynamodb') GAME_ID = "c6f38a6a-d1c5-4bdf-8468-24692ccc4646" CREATOR = "gstanley" def start_game(game_id, requesting_user, start_time): try: resp = dynamodb.update_item( TableName='battle-royale', Key={ "PK": { "S": "GAME#{}".format(game_id) }, "SK": { "S": "#METADATA#{}".format(game_id) } }, UpdateExpression="REMOVE open_timestamp SET start_time = :time", ConditionExpression="people = :limit AND creator = :requesting_user AND attribute_not_exists(start_time)", ExpressionAttributeValues={ ":time": { "S": start_time.isoformat() }, ":limit": { "N": "50" }, ":requesting_user": { "S": requesting_user } }, ReturnValues="ALL_NEW" ) return Game(resp['Attributes']) except Exception as e: print('Could not start game') return False game = start_game(GAME_ID, CREATOR, datetime.datetime(2019, 4, 16, 10, 15, 35)) if game: print("Started game: {}".format(game))
在此脚本中,start_game 函数类似于您在应用程序中将拥有的函数。它采用 game_id、requesting_user 和 start_time,并运行一个请求来更新 Game 实体以开始游戏。
update_item() 调用中的 ConditionExpression 参数指定了我们在此步骤前面列出的三项检查中的每一项 – 游戏必须有 50 人,请求游戏开始的用户必须是游戏的创建者,游戏不能具有表明游戏已经开始的 start_time 属性。
在 UpdateExpression 参数中,您可以看到我们要对实体进行的更改。首先,我们从实体中移除 open_timestamp 属性,然后将 start_time 属性设置为游戏的开始时间。
在终端中使用以下命令运行此脚本。
python application/start_game.py
您应该在终端上看到表明游戏已成功开始的输出。
Started game: Game<c6f38a6a-d1c5-4bdf-8468-24692ccc4646 -- Urban Underground>
尝试再次在终端中运行脚本。这次,您应该看到一条错误消息,指示您无法开始游戏。这是因为您已经开始游戏,所以存在 start_time 属性。造成的结果是,该请求未能对实体进行条件检查。
您可能还记得,游戏实体和关联的用户实体之间存在多对多关系,并且该关系由 UserGameMapping 实体表示。
通常,您想查询关系的双方。通过主键设置,我们可以找到游戏中的所有 User 实体。我们可以通过使用倒排索引来查询 User 的所有 Game 实体。
在 DynamoDB 中,倒排索引是二级索引,它与主键是反向的。RANGE 键成为您的 HASH 键,反之亦然。这种模式可以翻转您的表,并允许您在多对多关系的另一方进行查询。
在以下步骤中,我们将倒排索引添加到表中,并介绍如何使用它来检索特定 User 的所有 Game 实体。
-
第 7 步:添加倒排索引
在这一步中,我们向表中添加一个倒排索引。像任何其他二级索引一样,将创建一个倒排索引。
在您下载的代码中,scripts/ 目录中有一个 add_inverted_index.py 脚本。该 Python 脚本会向您的表添加一个倒排索引。
该文件的内容如下。
import boto3 dynamodb = boto3.client('dynamodb') try: dynamodb.update_table( TableName='battle-royale', AttributeDefinitions=[ { "AttributeName": "PK", "AttributeType": "S" }, { "AttributeName": "SK", "AttributeType": "S" } ], GlobalSecondaryIndexUpdates=[ { "Create": { "IndexName": "InvertedIndex", "KeySchema": [ { "AttributeName": "SK", "KeyType": "HASH" }, { "AttributeName": "PK", "KeyType": "RANGE" } ], "Projection": { "ProjectionType": "ALL" }, "ProvisionedThroughput": { "ReadCapacityUnits": 1, "WriteCapacityUnits": 1 } } } ], ) print("Table updated successfully.") except Exception as e: print("Could not update table. Error:") print(e)
在此脚本中,我们在 DynamoDB 客户端上调用 update_table() 方法。在该方法中,我们传递有关要创建的二级索引的详细信息,包括索引的关键架构、预配置的吞吐量以及要投影到索引中的属性。
在终端中键入以下命令运行脚本。
python scripts/add_inverted_index.py
您的终端将显示索引已成功创建的输出。
Table updated successfully.
在下一步中,我们将使用倒排索引来检索特定 User 的所有 Game 实体。
-
第 8 步:为用户检索游戏
现在,我们已经创建了倒排索引,可以使用它来检索 User 玩过的 Game 实体。要处理此问题,需要针对我们要查看其 Game 实体的User 查询倒排索引。
在您下载的代码中,application/ 目录中有一个 find_games_for_user.py 脚本。该文件的内容如下。
import boto3 from entities import UserGameMapping dynamodb = boto3.client('dynamodb') USERNAME = "carrpatrick" def find_games_for_user(username): try: resp = dynamodb.query( TableName='battle-royale', IndexName='InvertedIndex', KeyConditionExpression="SK = :sk", ExpressionAttributeValues={ ":sk": { "S": "USER#{}".format(username) } }, ScanIndexForward=True ) except Exception as e: print('Index is still backfilling. Please try again in a moment.') return None return [UserGameMapping(item) for item in resp['Items']] games = find_games_for_user(USERNAME) if games: print("Games played by {}:".format(USERNAME)) for game in games: print(game)
在此脚本中,我们有一个名为 find_games_for_user() 的函数,该函数与您在游戏中将拥有的函数相似。此函数使用用户名并返回给定用户玩过的所有游戏。
使用以下命令在终端中运行脚本。
python application/find_games_for_user.py
该脚本应打印用户 carrpatrick 玩过的所有游戏。
Games played by carrpatrick: UserGameMapping<25cec5bf-e498-483e-9a00-a5f93b9ea7c7 -- carrpatrick -- SILVER> UserGameMapping<c6f38a6a-d1c5-4bdf-8468-24692ccc4646 -- carrpatrick> UserGameMapping<c9c3917e-30f3-4ba4-82c4-2e9a0e4d1cfd -- carrpatrick>
在本模块中,我们向表添加了二级索引。这满足了两个附加的访问模式的要求:
- 通过地图查找开放游戏(读取)
- 查找开放游戏(读取)
为此,我们使用了稀疏索引,该索引仅包括仍对其他玩家开放的游戏。我们针对索引使用 Query 和 Scan API 来查找开放游戏。
另外,我们看到了如何满足应用程序中的两项高级写操作。首先,当用户加入游戏时,我们使用了 DynamoDB 事务。借助事务,我们在一次请求中处理了跨多个实体的复杂条件写入。
其次,我们实施了游戏创建者在游戏准备就绪时开始游戏的函数。在此访问模式中,我们进行了更新操作,该操作需要检查三个属性的值并更新两个属性。您可以通过条件表达式和更新表达式的功能在一次请求中表达这种复杂的逻辑。
第三,我们通过检索 User 玩过的所有 Game 实体来满足最终访问模式。为了处理这种访问模式,我们使用倒排索引模式创建了二级索引,以允许对 User 实体与 Game 实体之间的多对多关系的另一方进行查询。
在下一个模块中,我们将清理我们创建的资源。