Twitter のタイムラインで流れる話題を AWS の AI サービスで分析してみる ~後編

2020-09-01
日常生活で楽しむクラウドテクノロジー

Author : 松尾 将幸

こんにちは。Amazon Web Services Japan でアプリケーションエンジニアをしている松尾です。

引き続き今の自分のタイムラインではどういった話題が多いのか、ポジティブな Tweet とネガティブな Tweet だとどちらが多いのかを調べるためのシステムを AWS のサービスを使って構築していきます。

前編では、データストアの構築まで進めましたが、いよいよ Tweet の解析結果の可視化を行っていきます。

ご注意

本記事で紹介する AWS サービスを起動する際には、料金がかかります。builders.flash メールメンバー特典の、クラウドレシピ向けクレジットコードプレゼントの入手をお勧めします。

builders.flash メールメンバーへの登録・特典の入手はこちら »

*ハンズオン記事およびソースコードにおける免責事項 »

このクラウドレシピ (ハンズオン記事) を無料でお試しいただけます »

毎月提供されるクラウドレシピのアップデート情報とともに、クレジットコードを受け取ることができます。 


4. Step 2 : AI サービスを使って Tweet を解析

Tweet データを溜めていくことができるようになりましたので、次はそのデータを AI サービスを使って分析します。と言っても、方法は非常に簡単で、AWS サービスの API を呼び出すだけです。

今回は Tweet に含まれる話題、感情を分析したいので、Amazon Comprehend を使用します。これにより、自分で機械学習モデルを作らずとも、簡単にデータ分析をすることができます。

アーキテクチャとしては、S3 バケットの raw フォルダに Tweet が入ったファイルが作成された時、その内容を Comprehend の API に渡して解析してもらいます。S3 と Comprehend の橋渡しは Lambda 関数で行います。

では早速、開発に取り掛かりましょう。

4-1. 必要な IAM ロールの作成

これまで同様、まずは Lambda 関数に割り当てる IAM ロールを作成します。
以下にロール作成に必要な設定値を示します。手順は前編の 3-2. と同じですので、そちらを参照しながら進めてみてください。異なる点だけ以下に示します。

  • ユースケースの選択 : Lambda
  • ロール名 : TweetAnalysisLambdaFuntionRole (任意)
  • ポリシー名 : TweetAnalysisLambdaFuntionPolicy (任意)
  • JSON ポリシーの内容
{
    
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:ListBucket",
                "s3:PutBucketNotification",
                "s3:DeleteBucket"
            ],
            "Resource": [
                "<バケットのARN>",
                "<バケットのARN>/*",
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:<リージョン>:<AWSアカウントID>:log-group:/aws/lambda/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "firehose:ListDeliveryStreams",
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:<リージョン>:<AWSアカウントID>:deliverystream/Twitter-Dashboard-Entities",
                "arn:aws:firehose:<リージョン>:<AWSアカウントID>:deliverystream/Twitter-Dashboard-Sentiment"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "comprehend:DetectEntities",
                "comprehend:DetectSentiment"
            ],
            "Resource": "*"
        }
    ]
}

4-2. S3 バケットのデータを Comprehend に渡す Lambda 関数を作成

まず、Lambda のコンソールを開き 関数の作成 ボタンをクリックします。

関数の作成画面では以下を入力し、関数の作成 ボタンを押します。

  • 関数名 : tweet_analysis
  • ランタイム : Python 3.8
  • 実行ロール : 既存のロールを使用する > TweetAnalysisLambdaFuntionRole

関数が無事に作成されたら、トリガーを追加 をクリックします。

トリガーの設定 で S3 を選択し、以下のように設定します。

  • バケット : 前編の Step 3-1. で作成したバケット名
  • イベントタイプ : すべてのオブジェクト作成イベント
  • プレフィックス : raw/

入力が終わったら、再帰呼び出し のチェックを入れて 追加 を押します。
図のようになっていれば成功です。

ここで デザイナー tweet_analysis をクリックしてから、関数コード のエディタで以下のコードを入力します。サンプルコードの source/lambda_function/index.py にも同じ内容が書かれています。

入力が終わったら、画面右上の 保存 ボタンをクリックしておきます。

import json
import boto3
import os
import re

s3 = boto3.resource('s3')
comprehend = boto3.client('comprehend')
firehose = boto3.client('firehose')

entity_should_be_filtered = re.compile('^[\d#@]$')

def lambda_handler(event, context):
    print(event)
    
    for record in event['Records']:
        s3_bucket = record['s3']['bucket']['name']
        s3_key = record['s3']['object']['key']
        
        obj = s3.Object(s3_bucket, s3_key)
        tweets_as_string = obj.get()['Body'].read().decode('utf-8') 
        
        # S3に置かれているファイルは、改行コードでレコードが区切られる形式の構造となっています。
        # そのため、改行コードでsplitしてからJSONとしてパースします。
        tweets = tweets_as_string.split('\n')
        for tweet_string in tweets:
            if len(tweet_string) < 1:
                continue
            
            tweet = json.loads(tweet_string)

            # Tweetの感情分析をComprehendに依頼します。
            sentiment_response = comprehend.detect_sentiment(
                    Text=tweet['text'],
                    LanguageCode=tweet['lang']
                )
            print(sentiment_response)

            sentiment_record = {
                'tweetid': tweet['id'],
                'text': tweet['text'],
                'sentiment': sentiment_response['Sentiment'],
                'sentimentposscore': sentiment_response['SentimentScore']['Positive'],
                'sentimentnegscore': sentiment_response['SentimentScore']['Negative'],
                'sentimentneuscore': sentiment_response['SentimentScore']['Neutral'],
                'sentimentmixedscore': sentiment_response['SentimentScore']['Mixed']
            }
            
            firehose.put_record(
                DeliveryStreamName=os.environ['SENTIMENT_STREAM'],
                Record={
                    'Data': json.dumps(sentiment_record) + '\n'
                }
            )
            
            # Tweetの名前付きエンティティ分析(人名、場所など)をComprehendに依頼します。
            entities_response = comprehend.detect_entities(
                    Text=tweet['text'],
                    LanguageCode=tweet['lang']
                )
            print(entities_response)
            
            seen_entities = []
            for entity in entities_response['Entities']:
                # 数字の0-9, 記号の#, @の1文字だけだった場合は意味を見出しにくいので除外します。
                if (entity_should_be_filtered.match(entity['Text'])):
                    continue

                id = entity['Text'] + '-' + entity['Type']
                if (id in seen_entities) == False:
                    entity_record = {
                        'tweetid': tweet['id'],
                        'entity': entity['Text'],
                        'type': entity['Type'],
                        'score': entity['Score']
                    }
                    
                    firehose.put_record(
                        DeliveryStreamName=os.environ['ENTITY_STREAM'],
                        Record={
                            'Data': json.dumps(entity_record) + '\n'
                        }
                    )
                    seen_entities.append(id)

    return 'true'

次に関数の中で参照している Firehose の配信ストリームを示す環境変数を設定します。
環境変数を管理 ボタンをクリックします。

環境変数の追加 ボタンをクリックして項目を 2 つに増やし、環境変数を入力します。
入力が終わったら 保存 ボタンをクリックします。

キー
SENTIMENT_STREAM Twitter-Dashboard-Sentiment
ENTITY_STREAM Twitter-Dashboard-Entities

最後にタイムアウト値を 5 分 (任意) にしておきます。

これで S3 バケットの sentiment フォルダ、entities フォルダの中に、Comprehend が分析した結果のファイルがどんどん蓄積されていくようになります。

数分待って、ファイルが作成されることを確認してください。

ここまでの作業で、以下の部分まで完成しました。

データを AI サービスで分析する部分はこれだけで完了です。AWS サービスを使うことで簡単にデータ分析できますね。


5. Step 3 : 解析結果を可視化

それでは最後のステップとして、分析した Tweet を SQL クエリで取得したり、グラフとして可視化する部分を作っていきます。

5-1. AWS Glue でテーブルスキーマを作成

Athena で利用するテーブルスキーマを Glue を使って作成します。
今回はサンプルコードとして用意した CloudFormation テンプレートを使い、事前に準備したスキーマを作ります。

前編の 3-5. と同じ要領で CloudFormation のコンソールを開き、サンプルコードの deployment/glue_data_catalog.yaml を使って新規スタックを作成してください。

スタック名は Twitter-Dashboard-Table-Schema (任意)、パラメータの TweetsBucket には 前編の 3-1. で作成したバケット名を入力します。

作成を実施したら、スタックのステータスが CREATE_IN_PROGRESS から CREATE_COMPLETE になるまで待ってから次の手順に進みます。

5-2. Amazon Athena でクエリを実行してみる

CloudFormation で構築したテーブルスキーマ通りに分析結果を参照することができるのか確認するため、まず Athena を使って SQL クエリを実行してみます。

Athena のコンソールに移動します。

Getting Started ボタンを押して次に進むと Query editor の画面が開きます。

ここで左メニューの Database が twitter_timeline_dashboard、Tables に tweet_entitiestweet_sentimentstweets が表示されていることを確認します。

なお、初めて Athena を使う場合、画面上に Before you run your first query, you need to set up a query result location in Amazon S3 と表示されており、Run query ボタンが押せない状態になっています。そのため、このリンクをクリックして設定をおこないます。

リンクをクリックするとポップアップが出てきますので、前編 3-1. で作成した S3 バケットの名前を入力します。この時、最初に "s3://" を、最後に "/" を付ける必要がある点に注意してください。

入力後は Save ボタンを押します。

設定が終わったら、以下のクエリを実行してみます。

それぞれ図のような結果が表示されれば OK です。
されない場合、S3 バケットにファイルが格納されているか確認してみてください。

select * from tweets limit 10;
select * from tweet_sentiments limit 10;
select * from tweet_entities limit 10;

5-3. Amazon QuickSight でグラフにする

いよいよ最後のステップとなります。
Quicksight を使用して、グラフで分析結果を表現します。

まずは QuickSight のコンソールに移動します。

もし QuickSight を使ったことがなければ、図のように Sign up を求められます。
ここで QuickSight 用のアカウントを作成する必要がありますので、 Sign up for Quicksight をクリックして次に進みます。

QuickSight アカウントの作成画面では最初からエンタープライズが選択されていますので、そのまま 続行 ボタンを押して進みます。

次の画面では、リージョンQuickSight アカウント名通知の E メールアドレス を入力する必要があります。

今回はリージョンは US East のまま、アカウント名は自分の名前、E メールアドレスは自分が受信できる任意のアドレスを入力します。

画面下のチェックボックスはそのままとしておきます。ただし、S3 バケットへのアクセス権を求められていますので、Choose S3 buckets のリンクをクリックします。 

このアクセス権の付与を忘れると、この先の手順で以下のようなエラーに遭遇します。

An error has been thrown from the AWS Athena client. Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;

Amazon S3 バケットの選択のポップアップが表示されますので、これまで通り 前編の 3-1. で作成したバケットにチェックして バケットの選択 をクリックします。

元の画面に戻ってきますので、そのまま 完了 を押します。
アカウントの作成が無事に完了すれば、図のような表示になりますので、QuickSight に移動する を押してトップページに進みます。

途中でアカウント名が無効などのエラーが出た場合、ユニークになるような名前に直してみてください。

右上にある New analysis を押して次に進みましょう。

データセットの選択画面に遷移しますので、左上の New dataset を押します。

データセット新規作成画面に遷移します。
様々なデータソースから選ぶことができますが、今回は Athena を選びます。

New Athena data source のポップアップが表示されますので、Data source name tweet_analysis (任意) を入力します。

また、左下の Validate connection を押して接続確認しておきましょう。問題なければ表示が Validated に変わります。

Create data source を押して次に進みます。

Choose your table の画面では下の Use custom SQL を押します。

すると、SQL 入力画面になりますので、以下の SQL 文をコピー&ペーストしてください。

SELECT  s.*,
    e.entity,
    e.type,
    e.score,
    t.lang as language,
    coordinates.coordinates[1] AS lon,
    coordinates.coordinates[2] AS lat ,
    place.name,
    place.country,
    (t.timestamp_ms / 1000) + (9 * 60 * 60) AS timestamp_in_seconds,
    regexp_replace(source,
    '\<.+?\>', '') AS src
FROM twitter_timeline_dashboard.tweets t
JOIN twitter_timeline_dashboard.tweet_sentiments s
    ON (s.tweetid = t.id)
JOIN twitter_timeline_dashboard.tweet_entities e
    ON (e.tweetid = t.id) 

そのあと、Confirm query ボタンを押して先に進みます。

Finish data set creation の画面が表示されたら、左下の Edit/Preview data ボタンを押します。

スキーマとサンプルデータが表示される画面に遷移しますので、ここでカラムの右の方にある timestamp_in_seconds の型を Date に変更します。

変更できたら画面上の Save & Visualize ボタンを押してグラフ作成画面に進みます。

ここからは、自由にグラフを作って Tweet を分析してみてください!
試しにいくつかグラフを作ってみます。

まず Visual types から Pie Chart をクリックし、次に Field list から entity をクリックします。そのあと、Field list tweetid をドラッグ&ドロップで Field wells Value まで持っていきます。そうすると、こちらのように、自分のタイムライン上で多い話題を示す円グラフが表示されるはずです。

円グラフの Other の項目が非常に多く見辛いため、その部分で右クリックして Hide others を選んで非表示にします。こうすると見やすくなります。

新しいグラフを作りたい時は画面左上の Add > New visual を押しましょう。グラフを作成するための箱が 1 つ追加されます。

次は話題の移り変わりを示す時系列グラフを作ってみます。

Visual types Line chartX axis timestamp_in_secondsValue tweetidColor entity とします。
さらに、timestamp_in_seconds の右の下矢印をクリックして、時間の粒度を Day から Hour に変更します。

すると、どの時間帯に自分のタイムラインで何が話題になっていたかを表す時系列グラフが表示されます。

最初はやはり Other が多いため、Entity の最下部にある Others を非表示にして見やすくします。

最終的にすっきりとしたグラフになりました。このグラフを見ると、ある時間帯から急に何らかの話題が発生していた時、何かイベントなどがあったのかな、という Insight を得ることができます。


6. まとめ

これで Twitter のタイムラインを分析して可視化するシステムが完成しました。
手順を見るとボリュームがすごく多いように感じられますが、実際に試してみると何も入力せずに 次へ のボタンを押すだけの作業が多く、思ったより時間をかけずに構築できると思います。

ぜひ QuickSight で色んなグラフを作って遊んでみてください。

最後に注意事項として、構築したあとに使わないまま放置すると余計なコストがかかってしまうため、その場合はリソースを削除するかアプリケーションが動かないようにしてください。今回のシステムの場合、Comprehend の API を呼び出している部分で特にコストがかかるため、少なくとも Comprehend の API を実行する Lambda は停止、もしくは実行頻度を減らすようにした方が安全です。

それでは Happy Coding !


builders.flash メールメンバーへ登録することで
AWS のベストプラクティスを毎月無料でお試しいただけます

筆者プロフィール

松尾 将幸 (まつお まさゆき)
アマゾン ウェブ サービス ジャパン合同会社
アプリケーションエンジニア

SIer で働く SE として IT 業界入りし、人材サービス会社やメガベンチャーの内製エンジニアを経て 2019 年に AWS に入社。
入社してからはプロトタイピング ソリューションアーキテクトとしてお客様のクラウド利用促進に従事。その後、社内異動制度を使って再びソフトウェアエンジニア職に。
趣味はランニングとアニメ、漫画を見ること。野球も好き。(DeNA ベイスターズファン)

AWS を無料でお試しいただけます

AWS 無料利用枠の詳細はこちら ≫
5 ステップでアカウント作成できます
無料サインアップ ≫
ご不明な点がおありですか?
日本担当チームへ相談する