aws dynamodb create-table ` --table-name SlackMessageTracking ` --attribute-definitions AttributeName=source_ts,AttributeType=S ` --key-schema AttributeName=source_ts,KeyType=HASH ` --billing-mode PAY_PER_REQUEST |
–table-name SlackMessageTracking :テーブル名
–attribute-definitions :キーのデータ型を定義(source_ts は文字列 S)
–key-schema :パーティションキーの指定(source_ts をHASHキーとして使用)
–billing-mode PAY_PER_REQUEST :オンデマンド料金で運用(推奨)
aws iam attach-role-policy ` --role-name ラムダのポリシー名 ` --policy-arn arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess |
import json import boto3 from slack_sdk import WebClient from slack_sdk.errors import SlackApiError # Slack Bot Token SLACK_BOT_TOKEN = "xoxb-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" client = WebClient(token=SLACK_BOT_TOKEN) # 監視対象のチャンネル(複数) SOURCE_CHANNELS = ["CYYYYYYYYY", "CZZZZZZZZ", "CAAAAAAAAA", "CBBBBBBBBB"] # 投稿先チャンネル(単独) TARGET_CHANNEL = "CXXXXXXXXXX" # 監視するリアクション(絵文字) TARGET_EMOJI = "thumbs-up" # DynamoDB クライアント dynamodb = boto3.client("dynamodb") TABLE_NAME = "SlackMessageTracking" def get_reaction_count(channel, ts): """ 指定のメッセージのリアクション数を取得 """ try: response = client.reactions_get(channel=channel, timestamp=ts) reactions = response["message"].get("reactions", []) for r in reactions: if r["name"] == TARGET_EMOJI: # 指定の絵文字 return r["count"] except SlackApiError as e: print(f"Error fetching reactions: {e.response['error']}") return 0 # リアクションがない場合は0を返す def get_thread_message_text(channel, ts): """ スレッド内のメッセージを取得(子スレッドのメッセージを検索) """ try: response = client.conversations_replies(channel=channel, ts=ts) messages = response.get("messages", []) for message in messages: if message["ts"] == ts: return message.get("text", "") except SlackApiError as e: print(f"Error fetching thread messages: {e.response['error']}") return "" def get_previous_post_ts(source_ts): """ DynamoDB から過去の投稿TSを取得 """ try: response = dynamodb.get_item( TableName=TABLE_NAME, Key={"source_ts": {"S": source_ts}} ) if "Item" in response: return response["Item"]["target_ts"]["S"] except Exception as e: print(f"Error fetching from DynamoDB: {str(e)}") return None def save_message_tracking(source_ts, target_ts): """ DynamoDB に転送済みメッセージ情報を保存 """ try: dynamodb.put_item( TableName=TABLE_NAME, Item={ "source_ts": {"S": source_ts}, "target_ts": {"S": target_ts} } ) print(f"Saved tracking info: {source_ts} -> {target_ts}") except Exception as e: print(f"Error saving to DynamoDB: {str(e)}") def lambda_handler(event, context): try: # 受信データをログ出力 print("Received event:", json.dumps(event, indent=2)) # JSON デコード body = json.loads(event['body']) slack_event = body.get("event", {}) # イベントタイプを確認 (reaction_added のみ処理) event_type = slack_event.get("type", "") if event_type != "reaction_added": print(f"Ignoring event type: {event_type}") return {"statusCode": 200, "body": "Ignoring event type"} # 監視対象のチャンネルか確認 item_channel = slack_event.get("item", {}).get("channel") if item_channel not in SOURCE_CHANNELS: print(f"Ignoring event from different channel: {item_channel}") return {"statusCode": 200, "body": "Ignoring event from different channel"} # 指定の絵文字か確認 reaction = slack_event.get("reaction", "") if reaction != TARGET_EMOJI: print(f"Ignoring reaction: {reaction}") return {"statusCode": 200, "body": "Ignoring reaction"} # メッセージのtimestamp取得 message_ts = slack_event["item"]["ts"] # リアクションの数を取得 reaction_count = get_reaction_count(item_channel, message_ts) if reaction_count == 0: return {"statusCode": 200, "body": "No reactions found"} # スレッド内の押されたメッセージの内容を取得 message_text = get_thread_message_text(item_channel, message_ts) if not message_text: return {"statusCode": 200, "body": "Message has no text"} # 転送するメッセージ内容 forwarded_message = f"{message_text}\n\n {reaction_count}回" # DynamoDB から過去の投稿を取得 previous_target_ts = get_previous_post_ts(message_ts) # 以前のメッセージを削除 if previous_target_ts: try: client.chat_delete(channel=TARGET_CHANNEL, ts=previous_target_ts) except SlackApiError as e: print(f"Failed to delete old message: {e.response['error']}") # 新しいメッセージを投稿 post_response = client.chat_postMessage(channel=TARGET_CHANNEL, text=forwarded_message) new_target_ts = post_response["ts"] # DynamoDB に保存 save_message_tracking(message_ts, new_target_ts) print(f"Message posted to {TARGET_CHANNEL}: {new_target_ts}") return {"statusCode": 200, "body": "Message forwarded"} except Exception as e: print(f"Unexpected error: {e}") return {"statusCode": 500, "body": f"Unexpected error: {str(e)}"} |
1 2 3 4 5 6 7 8 9 10 11 12 |
# markdown-mermaidプラグインをインストール code --install-extension bierner.markdown-mermaid code --install-extension bpruitt-goddard.mermaid-markdown-syntax-highlighting # Mermaid CLI をインストール(画像生成用) npm install -g @mermaid-js/mermaid-cli # SVG画像に変換 mmdc -i diagram.mmd -o diagram.svg # PNG画像は小さいので3倍拡大 mmdc -i diagram.mmd -o diagram.png --scale 3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
sequenceDiagram participant Slack participant Lambda participant DynamoDB Note right of DynamoDB: source_ts = 親投稿のタイムスタンプ Note right of DynamoDB: target_ts = コピー投稿のタイムスタンプ Slack->>Lambda: 親投稿に絵文字を追加 (reaction_added) Lambda->>DynamoDB: 過去の親投稿データを検索 (source_ts) DynamoDB-->>Lambda: あれば、過去のコピー投稿TSを取得(target_ts) Lambda->>Slack: あれば、過去のコピー投稿を削除 (chat_delete) Lambda->>Slack: 新しいコピー投稿 (chat_postMessage) Slack-->>Lambda: 新しいメッセージのTSを返す(target_ts) Lambda->>DynamoDB: 新しいTSを保存 (source_ts, target_ts) |
1, 絵文字が押される度に、全公開チャンネルを取得して、投稿先チャンネルリストとする。
2, 投稿先チャンネルとアーカイブ済チャネルは除外
3, slackアプリ追加されていないチャネルなら、追加する
1, 新規チャンネルを作成
2, 他のチャンネルで絵文字を押す。lambda実行されて、新規チャンネルにslackアプリ追加される。
3, この状態になったら、新規チャンネルでも絵文字を押されれれば、lambda実行がキックされる。
import json import boto3 from slack_sdk import WebClient from slack_sdk.errors import SlackApiError import time # Slack Bot Token SLACK_BOT_TOKEN = "" client = WebClient(token=SLACK_BOT_TOKEN) # 監視対象のチャンネル(複数)はAPIで、実行される度に取得する(join_all_public_channels) # SOURCE_CHANNELS # 投稿先チャンネル(単独) TARGET_CHANNEL = "" # 監視するリアクション(絵文字) TARGET_EMOJI = "" # DynamoDB クライアント dynamodb = boto3.client("dynamodb") TABLE_NAME = "SlackMessageTracking" # 数字を絵文字に変換するマッピング NUMERIC_EMOJI = { 1: "one", 2: "two", 3: "three", 4: "four", 5: "five", 6: "six", 7: "seven", 8: "eight", 9: "nine", 10: "keycap_ten" } def get_numeric_emoji(count): """ リアクション数に対応する絵文字を取得(11回以上は "keycap_ten" に固定) """ return NUMERIC_EMOJI.get(count, "keycap_ten") # 11以上は "keycap_ten" def get_all_public_channels(): """ 全ての公開チャンネルを取得(ページネーション対応) """ all_channels = [] cursor = None while True: try: response = client.conversations_list(types="public_channel", limit=1000, cursor=cursor) channels = response.get("channels", []) all_channels.extend(channels) cursor = response.get("response_metadata", {}).get("next_cursor") if not cursor: # 次のページがない場合、ループを抜ける break except SlackApiError as e: print(f"Error fetching channels: {e.response['error']}") break return all_channels def join_all_public_channels(): """ アプリを全公開チャンネルに参加させる(ページネーション対応 & レート制限回避) """ channels = get_all_public_channels() # すべての公開チャンネルを取得 for index, channel in enumerate(channels): channel_id = channel["id"] if channel["is_archived"]: # アーカイブ済みのチャンネルはスキップ continue if channel["is_member"]: # すでに参加済みならスキップ continue if channel_id == TARGET_CHANNEL: continue # 投稿先チャンネルは除外(無限ループ防止) try: client.conversations_join(channel=channel_id) print(f"Joined channel: {channel_id}") # **1リクエストごとに1.2秒スリープ(レート制限回避)** time.sleep(1.2) # **10回ごとに追加の10秒スリープ(1分50回制限を超えないため)** if (index + 1) % 10 == 0: print("Pausing for 10 seconds to prevent rate limiting...") time.sleep(10) except SlackApiError as e: if e.response["error"] == "ratelimited": print("Rate limit reached. Waiting before retrying...") time.sleep(30) # 30秒待機 continue print(f"Error joining channel {channel_id}: {e.response['error']}") def get_reaction_count(channel, ts): """ 指定のメッセージのリアクション数を取得 """ try: response = client.reactions_get(channel=channel, timestamp=ts) reactions = response["message"].get("reactions", []) for r in reactions: if r["name"] == TARGET_EMOJI: # 指定の絵文字 return r["count"] except SlackApiError as e: print(f"Error fetching reactions: {e.response['error']}") return 0 # リアクションがない場合は0を返す def get_thread_message_text(channel, ts): """ スレッド内のメッセージを取得(子スレッドのメッセージを検索) """ try: response = client.conversations_replies(channel=channel, ts=ts) messages = response.get("messages", []) for message in messages: if message["ts"] == ts: return message.get("text", "") except SlackApiError as e: print(f"Error fetching thread messages: {e.response['error']}") return "" def get_previous_post_ts(source_ts): """ DynamoDB から過去の投稿TSを取得 """ try: response = dynamodb.get_item( TableName=TABLE_NAME, Key={"source_ts": {"S": source_ts}} ) if "Item" in response: return response["Item"]["target_ts"]["S"] except Exception as e: print(f"Error fetching from DynamoDB: {str(e)}") return None def save_message_tracking(source_ts, target_ts): """ DynamoDB に転送済みメッセージ情報を保存 """ try: dynamodb.put_item( TableName=TABLE_NAME, Item={ "source_ts": {"S": source_ts}, "target_ts": {"S": target_ts} } ) print(f"Saved tracking info: {source_ts} -> {target_ts}") except Exception as e: print(f"Error saving to DynamoDB: {str(e)}") def lambda_handler(event, context): try: # 全公開チャンネルにアプリを追加 join_all_public_channels() # 監視対象のチャンネルを更新(全公開チャンネルを取得、投稿先チャネルは除外) all_channels = get_all_public_channels() SOURCE_CHANNELS = [channel["id"] for channel in all_channels if channel["id"] != TARGET_CHANNEL] # **監視対象のチャンネルをカンマ区切りでログ出力** # print(f"Monitoring {len(SOURCE_CHANNELS)} channels: " + ", ".join(SOURCE_CHANNELS)) # 受信データをログ出力 print("Received event:", json.dumps(event, indent=2)) # JSON デコード body = json.loads(event['body']) slack_event = body.get("event", {}) # イベントタイプを確認 (reaction_added のみ処理) event_type = slack_event.get("type", "") if event_type != "reaction_added": print(f"Ignoring event type: {event_type}") return {"statusCode": 200, "body": "Ignoring event type"} # 監視対象のチャンネルか確認 item_channel = slack_event.get("item", {}).get("channel") if item_channel not in SOURCE_CHANNELS: print(f"Ignoring event from different channel: {item_channel}") return {"statusCode": 200, "body": "Ignoring event from different channel"} # 投稿先チャンネルは除外 if item_channel == TARGET_CHANNEL: return {"statusCode": 200, "body": "Ignoring target channel to prevent loop"} # 指定の絵文字か確認 reaction = slack_event.get("reaction", "") if reaction != TARGET_EMOJI: print(f"Ignoring reaction: {reaction}") return {"statusCode": 200, "body": "Ignoring reaction"} # メッセージのtimestamp取得 message_ts = slack_event["item"]["ts"] # リアクションの数を取得 reaction_count = get_reaction_count(item_channel, message_ts) if reaction_count == 0: return {"statusCode": 200, "body": "No reactions found"} # スレッド内の押されたメッセージの内容を取得 message_text = get_thread_message_text(item_channel, message_ts) if not message_text: return {"statusCode": 200, "body": "Message has no text"} # DynamoDB から過去の投稿を取得 previous_target_ts = get_previous_post_ts(message_ts) if reaction_count == 1 and not previous_target_ts: # 初回の投稿 post_response = client.chat_postMessage(channel=TARGET_CHANNEL, text=message_text) new_target_ts = post_response["ts"] # 絵文字を追加 try: client.reactions_add( channel=TARGET_CHANNEL, name=NUMERIC_EMOJI[1], timestamp=new_target_ts ) except SlackApiError as e: print(f"Failed to add reaction: {e.response['error']}") # DynamoDB に保存 save_message_tracking(message_ts, new_target_ts) print(f"Message initially posted to {TARGET_CHANNEL}: {new_target_ts}") return {"statusCode": 200, "body": "Message posted"} elif reaction_count >= 3 and reaction_count % 3 == 0: # 3の倍数回目でのみ新規投稿(それ以外は更新のみ) if previous_target_ts: try: client.chat_delete(channel=TARGET_CHANNEL, ts=previous_target_ts) except SlackApiError as e: print(f"Failed to delete old message: {e.response['error']}") post_response = client.chat_postMessage(channel=TARGET_CHANNEL, text=message_text) new_target_ts = post_response["ts"] emoji_name = NUMERIC_EMOJI.get(reaction_count, "keycap_ten") try: client.reactions_add( channel=TARGET_CHANNEL, name=emoji_name, timestamp=new_target_ts ) except SlackApiError as e: print(f"Failed to add reaction: {e.response['error']}") save_message_tracking(message_ts, new_target_ts) print(f"Message posted to {TARGET_CHANNEL}: {new_target_ts}") return {"statusCode": 200, "body": "Message forwarded"} elif previous_target_ts: # 既存のメッセージの絵文字を更新(3の倍数回目でない場合) try: prev_emoji = get_numeric_emoji(reaction_count - 1) new_emoji = get_numeric_emoji(reaction_count) # **前回の絵文字を削除(存在する場合)** if prev_emoji: client.reactions_remove( channel=TARGET_CHANNEL, name=prev_emoji, timestamp=previous_target_ts ) # **新しい絵文字を追加** client.reactions_add( channel=TARGET_CHANNEL, name=new_emoji, timestamp=previous_target_ts ) except SlackApiError as e: print(f"Failed to update reaction: {e.response['error']}") return {"statusCode": 200, "body": "Reaction updated"} return {"statusCode": 200, "body": "Reaction processed"} except Exception as e: print(f"Unexpected error: {e}") return {"statusCode": 500, "body": f"Unexpected error: {str(e)}"} |