slack投稿をs3にjson形式で保存したので、athenaでデータ分析してみた
slack投稿毎に、ファイル名は日時にして、JSONファイルとしてS3に保存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import json import boto3 import urllib.parse from datetime import datetime # S3 クライアントを初期化 s3_client = boto3.client('s3') bucket_name = 'slack-post-data' # S3 バケット名を指定 def lambda_handler(event, context): try: # Slack からのイベントを処理 slack_event = json.loads(event['body']) # 現在のタイムスタンプを取得し、ファイル名として使用 timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') file_name = f"{timestamp}.json" # S3 にSlackイベントの全体をJSONフォーマットで保存 s3_client.put_object(Bucket=bucket_name, Key=file_name, Body=json.dumps(slack_event)) # Slack に処理成功を通知 return { 'statusCode': 200, 'body': json.dumps('Message saved to S3') } except Exception as e: print(f"Error saving message to S3: {str(e)}") # エラーが発生した場合は、Lambda関数の実行結果としてエラーメッセージを返す return { 'statusCode': 500, 'body': json.dumps(f"Error saving message to S3: {str(e)}") } |
athenaって、S3にあるファイルを、なんでもSQLで検索できるみたいなイメージがあったけど、最初にテーブル定義しないと駄目なのね~。
さらに、クエリ結果を保存するためのS3バケットも必要。
athenaのクエリエディタに行って
テーブルとビューの作成ボタンを押す。SQLから作成で、以下のSQLでテーブル生成
eventカラムの中身が入れ子になっているので、mysqlなら別テーブルにするけど、structで表現するらしい。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
CREATE EXTERNAL TABLE IF NOT EXISTS slack_messages ( token string, team_id string, context_team_id string, context_enterprise_id string, api_app_id string, type string, event_id string, event_time string, authorizations array<struct<enterprise_id:string, team_id:string, user_id:string, is_bot:boolean>>, is_ext_shared_channel boolean, event_context string, event struct< type:string, subtype:string, message:struct< type:string, text:string, user:string, ts:string >, previous_message:struct< type:string, text:string, user:string, ts:string >, channel:string, hidden:boolean, ts:string, event_ts:string, channel_type:string > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) LOCATION 's3://slack-post-data/'; |
後は、通常のSELCT文を実行できる。
結果はブラウザ上に表示され、S3バケットにも保存される。
1 2 |
-- レコード数をカウント SELECT count(*) FROM slack_messages; |
キュー内の時間: 104 ミリ秒
実行時間: 1.023 秒
スキャンしたデータ: 4.35 MB
athenaの料金は、1TBで5USD(750円)だから、1GBで0.75円。
だいたい、1円で1GB弱ファイルのスキャンが出来るって感じか?
struct内カラムへのアクセスは、ドットでつなげる
1 2 3 4 5 |
-- ユーザID毎の投稿数を、多い順に表示 SELECT event.message.user AS user_id, COUNT(*) AS message_count FROM slack_messages GROUP BY event.message.user ORDER BY message_count DESC; |
1 2 3 4 5 |
-- 特定のキーワードを含むメッセージの検索 SELECT event.message.text AS message_text, event.message.user AS user_id, event_time FROM slack_messages WHERE LOWER(event.message.text) LIKE '%プロジェクト%' ORDER BY event_time DESC; |
1 2 3 4 5 6 7 8 9 10 |
-- 24時間別のメッセージ投稿数 SELECT HOUR(FROM_UNIXTIME(CAST(event_time AS double))) AS hour_of_day, COUNT(*) AS message_count FROM slack_messages GROUP BY HOUR(FROM_UNIXTIME(CAST(event_time AS double))) ORDER BY hour_of_day; |
レコード数が少ない&単純なSELECT文なのに、1~2秒と結構時間がかかる…。