slackで、特定の絵文字を押されたら、指定のチャンネルに同じ投稿をして絵文字の押された回数も表示する。
DynamoDBで、絵文字を押された投稿のタイムスタンプを保持(ID代わり)
1 2 3 4 5 |
aws dynamodb create-table ` --table-name SlackMessageTracking ` --attribute-definitions AttributeName=source_ts,AttributeType=S ` --key-schema AttributeName=source_ts,KeyType=HASH ` --billing-mode PAY_PER_REQUEST |
–table-name SlackMessageTracking :テーブル名
–attribute-definitions :キーのデータ型を定義(source_ts は文字列 S)
–key-schema :パーティションキーの指定(source_ts をHASHキーとして使用)
–billing-mode PAY_PER_REQUEST :オンデマンド料金で運用(推奨)
lambdaに、DynamoDBへの権限を付与。
1 2 3 |
aws iam attach-role-policy ` --role-name ラムダのポリシー名 ` --policy-arn arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import json import boto3 from slack_sdk import WebClient from slack_sdk.errors import SlackApiError # Slack Bot Token SLACK_BOT_TOKEN = "xoxb-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" client = WebClient(token=SLACK_BOT_TOKEN) # 監視対象のチャンネル(複数) SOURCE_CHANNELS = ["CYYYYYYYYY", "CZZZZZZZZ", "CAAAAAAAAA", "CBBBBBBBBB"] # 投稿先チャンネル(単独) TARGET_CHANNEL = "CXXXXXXXXXX" # 監視するリアクション(絵文字) TARGET_EMOJI = "thumbs-up" # DynamoDB クライアント dynamodb = boto3.client("dynamodb") TABLE_NAME = "SlackMessageTracking" def get_reaction_count(channel, ts): """ 指定のメッセージのリアクション数を取得 """ try: response = client.reactions_get(channel=channel, timestamp=ts) reactions = response["message"].get("reactions", []) for r in reactions: if r["name"] == TARGET_EMOJI: # 指定の絵文字 return r["count"] except SlackApiError as e: print(f"Error fetching reactions: {e.response['error']}") return 0 # リアクションがない場合は0を返す def get_thread_message_text(channel, ts): """ スレッド内のメッセージを取得(子スレッドのメッセージを検索) """ try: response = client.conversations_replies(channel=channel, ts=ts) messages = response.get("messages", []) for message in messages: if message["ts"] == ts: return message.get("text", "") except SlackApiError as e: print(f"Error fetching thread messages: {e.response['error']}") return "" def get_previous_post_ts(source_ts): """ DynamoDB から過去の投稿TSを取得 """ try: response = dynamodb.get_item( TableName=TABLE_NAME, Key={"source_ts": {"S": source_ts}} ) if "Item" in response: return response["Item"]["target_ts"]["S"] except Exception as e: print(f"Error fetching from DynamoDB: {str(e)}") return None def save_message_tracking(source_ts, target_ts): """ DynamoDB に転送済みメッセージ情報を保存 """ try: dynamodb.put_item( TableName=TABLE_NAME, Item={ "source_ts": {"S": source_ts}, "target_ts": {"S": target_ts} } ) print(f"Saved tracking info: {source_ts} -> {target_ts}") except Exception as e: print(f"Error saving to DynamoDB: {str(e)}") def lambda_handler(event, context): try: # 受信データをログ出力 print("Received event:", json.dumps(event, indent=2)) # JSON デコード body = json.loads(event['body']) slack_event = body.get("event", {}) # イベントタイプを確認 (reaction_added のみ処理) event_type = slack_event.get("type", "") if event_type != "reaction_added": print(f"Ignoring event type: {event_type}") return {"statusCode": 200, "body": "Ignoring event type"} # 監視対象のチャンネルか確認 item_channel = slack_event.get("item", {}).get("channel") if item_channel not in SOURCE_CHANNELS: print(f"Ignoring event from different channel: {item_channel}") return {"statusCode": 200, "body": "Ignoring event from different channel"} # 指定の絵文字か確認 reaction = slack_event.get("reaction", "") if reaction != TARGET_EMOJI: print(f"Ignoring reaction: {reaction}") return {"statusCode": 200, "body": "Ignoring reaction"} # メッセージのtimestamp取得 message_ts = slack_event["item"]["ts"] # リアクションの数を取得 reaction_count = get_reaction_count(item_channel, message_ts) if reaction_count == 0: return {"statusCode": 200, "body": "No reactions found"} # スレッド内の押されたメッセージの内容を取得 message_text = get_thread_message_text(item_channel, message_ts) if not message_text: return {"statusCode": 200, "body": "Message has no text"} # 転送するメッセージ内容 forwarded_message = f"{message_text}\n\n {reaction_count}回" # DynamoDB から過去の投稿を取得 previous_target_ts = get_previous_post_ts(message_ts) # 以前のメッセージを削除 if previous_target_ts: try: client.chat_delete(channel=TARGET_CHANNEL, ts=previous_target_ts) except SlackApiError as e: print(f"Failed to delete old message: {e.response['error']}") # 新しいメッセージを投稿 post_response = client.chat_postMessage(channel=TARGET_CHANNEL, text=forwarded_message) new_target_ts = post_response["ts"] # DynamoDB に保存 save_message_tracking(message_ts, new_target_ts) print(f"Message posted to {TARGET_CHANNEL}: {new_target_ts}") return {"statusCode": 200, "body": "Message forwarded"} except Exception as e: print(f"Unexpected error: {e}") return {"statusCode": 500, "body": f"Unexpected error: {str(e)}"} |
vscodeでmermaid記法でシーケンス図を書いて、画像出力
1 2 3 4 5 6 7 8 9 10 11 12 |
# markdown-mermaidプラグインをインストール code --install-extension bierner.markdown-mermaid code --install-extension bpruitt-goddard.mermaid-markdown-syntax-highlighting # Mermaid CLI をインストール(画像生成用) npm install -g @mermaid-js/mermaid-cli # SVG画像に変換 mmdc -i diagram.mmd -o diagram.svg # PNG画像は小さいので3倍拡大 mmdc -i diagram.mmd -o diagram.png --scale 3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
sequenceDiagram participant Slack participant Lambda participant DynamoDB Note right of DynamoDB: source_ts = 親投稿のタイムスタンプ Note right of DynamoDB: target_ts = コピー投稿のタイムスタンプ Slack->>Lambda: 親投稿に絵文字を追加 (reaction_added) Lambda->>DynamoDB: 過去の親投稿データを検索 (source_ts) DynamoDB-->>Lambda: あれば、過去のコピー投稿TSを取得(target_ts) Lambda->>Slack: あれば、過去のコピー投稿を削除 (chat_delete) Lambda->>Slack: 新しいコピー投稿 (chat_postMessage) Slack-->>Lambda: 新しいメッセージのTSを返す(target_ts) Lambda->>DynamoDB: 新しいTSを保存 (source_ts, target_ts) |
監視対象チャンネルを、全ての公開チャンネルに変更
以下の処理を追加
1, 絵文字が押される度に、全公開チャンネルを取得して、投稿先チャンネルリストとする。
2, 投稿先チャンネルとアーカイブ済チャネルは除外
3, slackアプリ追加されていないチャネルなら、追加する
※時間がかかるのでlambdaタイムアウトを10分に変更。conversations_join(slackAPI)は1分間に50リクエストまでなのでsleepで調整。
なので、新規チャンネルの場合は
1, 新規チャンネルを作成
2, 他のチャンネルで絵文字を押す。lambda実行されて、新規チャンネルにslackアプリ追加される。
3, この状態になったら、新規チャンネルでも絵文字を押されれれば、lambda実行がキックされる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
import json import boto3 from slack_sdk import WebClient from slack_sdk.errors import SlackApiError import time # Slack Bot Token SLACK_BOT_TOKEN = "" client = WebClient(token=SLACK_BOT_TOKEN) # 監視対象のチャンネル(複数)はAPIで、実行される度に取得する(join_all_public_channels) # SOURCE_CHANNELS # 投稿先チャンネル(単独) TARGET_CHANNEL = "" # 監視するリアクション(絵文字) TARGET_EMOJI = "" # DynamoDB クライアント dynamodb = boto3.client("dynamodb") TABLE_NAME = "SlackMessageTracking" # 数字を絵文字に変換するマッピング NUMERIC_EMOJI = { 1: "one", 2: "two", 3: "three", 4: "four", 5: "five", 6: "six", 7: "seven", 8: "eight", 9: "nine", 10: "keycap_ten" } def get_numeric_emoji(count): """ リアクション数に対応する絵文字を取得(11回以上は "keycap_ten" に固定) """ return NUMERIC_EMOJI.get(count, "keycap_ten") # 11以上は "keycap_ten" def get_all_public_channels(): """ 全ての公開チャンネルを取得(ページネーション対応) """ all_channels = [] cursor = None while True: try: response = client.conversations_list(types="public_channel", limit=1000, cursor=cursor) channels = response.get("channels", []) all_channels.extend(channels) cursor = response.get("response_metadata", {}).get("next_cursor") if not cursor: # 次のページがない場合、ループを抜ける break except SlackApiError as e: print(f"Error fetching channels: {e.response['error']}") break return all_channels def join_all_public_channels(): """ アプリを全公開チャンネルに参加させる(ページネーション対応 & レート制限回避) """ channels = get_all_public_channels() # すべての公開チャンネルを取得 for index, channel in enumerate(channels): channel_id = channel["id"] if channel["is_archived"]: # アーカイブ済みのチャンネルはスキップ continue if channel["is_member"]: # すでに参加済みならスキップ continue if channel_id == TARGET_CHANNEL: continue # 投稿先チャンネルは除外(無限ループ防止) try: client.conversations_join(channel=channel_id) print(f"Joined channel: {channel_id}") # **1リクエストごとに1.2秒スリープ(レート制限回避)** time.sleep(1.2) # **10回ごとに追加の10秒スリープ(1分50回制限を超えないため)** if (index + 1) % 10 == 0: print("Pausing for 10 seconds to prevent rate limiting...") time.sleep(10) except SlackApiError as e: if e.response["error"] == "ratelimited": print("Rate limit reached. Waiting before retrying...") time.sleep(30) # 30秒待機 continue print(f"Error joining channel {channel_id}: {e.response['error']}") def get_reaction_count(channel, ts): """ 指定のメッセージのリアクション数を取得 """ try: response = client.reactions_get(channel=channel, timestamp=ts) reactions = response["message"].get("reactions", []) for r in reactions: if r["name"] == TARGET_EMOJI: # 指定の絵文字 return r["count"] except SlackApiError as e: print(f"Error fetching reactions: {e.response['error']}") return 0 # リアクションがない場合は0を返す def get_thread_message_text(channel, ts): """ スレッド内のメッセージを取得(子スレッドのメッセージを検索) """ try: response = client.conversations_replies(channel=channel, ts=ts) messages = response.get("messages", []) for message in messages: if message["ts"] == ts: return message.get("text", "") except SlackApiError as e: print(f"Error fetching thread messages: {e.response['error']}") return "" def get_previous_post_ts(source_ts): """ DynamoDB から過去の投稿TSを取得 """ try: response = dynamodb.get_item( TableName=TABLE_NAME, Key={"source_ts": {"S": source_ts}} ) if "Item" in response: return response["Item"]["target_ts"]["S"] except Exception as e: print(f"Error fetching from DynamoDB: {str(e)}") return None def save_message_tracking(source_ts, target_ts): """ DynamoDB に転送済みメッセージ情報を保存 """ try: dynamodb.put_item( TableName=TABLE_NAME, Item={ "source_ts": {"S": source_ts}, "target_ts": {"S": target_ts} } ) print(f"Saved tracking info: {source_ts} -> {target_ts}") except Exception as e: print(f"Error saving to DynamoDB: {str(e)}") def lambda_handler(event, context): try: # 全公開チャンネルにアプリを追加 join_all_public_channels() # 監視対象のチャンネルを更新(全公開チャンネルを取得、投稿先チャネルは除外) all_channels = get_all_public_channels() SOURCE_CHANNELS = [channel["id"] for channel in all_channels if channel["id"] != TARGET_CHANNEL] # **監視対象のチャンネルをカンマ区切りでログ出力** # print(f"Monitoring {len(SOURCE_CHANNELS)} channels: " + ", ".join(SOURCE_CHANNELS)) # 受信データをログ出力 print("Received event:", json.dumps(event, indent=2)) # JSON デコード body = json.loads(event['body']) slack_event = body.get("event", {}) # イベントタイプを確認 (reaction_added のみ処理) event_type = slack_event.get("type", "") if event_type != "reaction_added": print(f"Ignoring event type: {event_type}") return {"statusCode": 200, "body": "Ignoring event type"} # 監視対象のチャンネルか確認 item_channel = slack_event.get("item", {}).get("channel") if item_channel not in SOURCE_CHANNELS: print(f"Ignoring event from different channel: {item_channel}") return {"statusCode": 200, "body": "Ignoring event from different channel"} # 投稿先チャンネルは除外 if item_channel == TARGET_CHANNEL: return {"statusCode": 200, "body": "Ignoring target channel to prevent loop"} # 指定の絵文字か確認 reaction = slack_event.get("reaction", "") if reaction != TARGET_EMOJI: print(f"Ignoring reaction: {reaction}") return {"statusCode": 200, "body": "Ignoring reaction"} # メッセージのtimestamp取得 message_ts = slack_event["item"]["ts"] # リアクションの数を取得 reaction_count = get_reaction_count(item_channel, message_ts) if reaction_count == 0: return {"statusCode": 200, "body": "No reactions found"} # スレッド内の押されたメッセージの内容を取得 message_text = get_thread_message_text(item_channel, message_ts) if not message_text: return {"statusCode": 200, "body": "Message has no text"} # DynamoDB から過去の投稿を取得 previous_target_ts = get_previous_post_ts(message_ts) if reaction_count == 1 and not previous_target_ts: # 初回の投稿 post_response = client.chat_postMessage(channel=TARGET_CHANNEL, text=message_text) new_target_ts = post_response["ts"] # 絵文字を追加 try: client.reactions_add( channel=TARGET_CHANNEL, name=NUMERIC_EMOJI[1], timestamp=new_target_ts ) except SlackApiError as e: print(f"Failed to add reaction: {e.response['error']}") # DynamoDB に保存 save_message_tracking(message_ts, new_target_ts) print(f"Message initially posted to {TARGET_CHANNEL}: {new_target_ts}") return {"statusCode": 200, "body": "Message posted"} elif reaction_count >= 3 and reaction_count % 3 == 0: # 3の倍数回目でのみ新規投稿(それ以外は更新のみ) if previous_target_ts: try: client.chat_delete(channel=TARGET_CHANNEL, ts=previous_target_ts) except SlackApiError as e: print(f"Failed to delete old message: {e.response['error']}") post_response = client.chat_postMessage(channel=TARGET_CHANNEL, text=message_text) new_target_ts = post_response["ts"] emoji_name = NUMERIC_EMOJI.get(reaction_count, "keycap_ten") try: client.reactions_add( channel=TARGET_CHANNEL, name=emoji_name, timestamp=new_target_ts ) except SlackApiError as e: print(f"Failed to add reaction: {e.response['error']}") save_message_tracking(message_ts, new_target_ts) print(f"Message posted to {TARGET_CHANNEL}: {new_target_ts}") return {"statusCode": 200, "body": "Message forwarded"} elif previous_target_ts: # 既存のメッセージの絵文字を更新(3の倍数回目でない場合) try: prev_emoji = get_numeric_emoji(reaction_count - 1) new_emoji = get_numeric_emoji(reaction_count) # **前回の絵文字を削除(存在する場合)** if prev_emoji: client.reactions_remove( channel=TARGET_CHANNEL, name=prev_emoji, timestamp=previous_target_ts ) # **新しい絵文字を追加** client.reactions_add( channel=TARGET_CHANNEL, name=new_emoji, timestamp=previous_target_ts ) except SlackApiError as e: print(f"Failed to update reaction: {e.response['error']}") return {"statusCode": 200, "body": "Reaction updated"} return {"statusCode": 200, "body": "Reaction processed"} except Exception as e: print(f"Unexpected error: {e}") return {"statusCode": 500, "body": f"Unexpected error: {str(e)}"} |