Zoom APIをDjangoで扱ってみる
Zoom APIを利用して部屋の作成とかする案件がありそうなので勉強のために調べてみたことメモ。
Django
なのはこれも勉強のため。。
前提
- Django: 2.2.x
ゴール
手順
1. Zoomにてアプリ登録を行う
- OAuth - Build an App - Documentationにアクセス
- 右上「Create App」をクリック
- Zoomアカウントでのログインを求められるのでログインする
- 「Choose your app type」にて「OAuth > Create」をクリック
- 「Create on OAuth app」というモーダルが表示されるので必要に応じて情報を入力する
- App Name: 任意(今回はSamplaApp)
- Choose app type: 任意(今回はAccount-level appを選択)
- Would you like to publish this app on Zoom App Marketplace?: Offを選択
- モーダル内「Create」をクリックするとアプリが作成され、以下情報が取得できる
- 取得できる情報
- Client ID
- Client Secret
- またOAuthでの認証後にリダイレクトされるページのURLを指定する(今回は
http://localhost:8000/zoom/auth/complete
とする) - Whitelist URLも登録しておく(今回は
http://localhost:8000
)
- 取得できる情報
- 画面左側メニュー「Scopes」をクリック
- 「Add scopes+」をクリック
- 必要に応じて認証のスコープを設定する(今回はMeetingを作成できればよいので
user:read:admin
,meeting:write:admin
,meeting:write
を選択)- APIのリファレンス(API Reference)に必要なスコープが書かれているので参考にする
- スコープの選択が完了したらモーダル内「Done」をクリック
- 必要に応じて認証のスコープを設定する(今回はMeetingを作成できればよいので
2. Djangoで実装を行う
2-1. デモアプリを作成する
アプリ名はわかりやすくzoom
とする
$ python manage.py startapp zoom
settings.py
にてzoom
を有効化する
# <project-root>/<project-app>/settings.py INSTALLED_APPS = [ ..., # My Applications 'zoom.apps.ZoomConfig', ]
2-1. 認証, アクセストークンの取得
認証に関するルーティングを設定する
# <project-root>/<zoom/urls.py from django.urls import path from . import views urlpatterns = [ # /auth: 認証ページにリダイレクト # /auth/complete: 認証完了 # /meeting: ミーティング一覧 # /meeting/add: ミーティング作成 path('', views.index, name='zoom_index'), path('auth/', views.auth, name='zoom_auth'), path('auth/complete', views.index, name='zoom_auth_complete'), ] # <project-root>/<project-app>/urls.py ... urlpattenrs = [ ..., path('zoom/', include('zoom.urls')), ..., ]
ルーティングに対応するTemplate
, View
を作成する
とりあえず必要な箇所以外は一旦処理を省略する
<!-- <project-root>/zoom/templates/auth/auth.html --> <!DOCTYPE html> <html lang="ja"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Zoom - Auth</title> </head> <body> <h1>Zoom Auth</h1> <a href="{{ auth_href }}">Zoomで認証する</a> </body> </html> <!-- <project-root>/zoom/templates/auth/complete.html --> <!DOCTYPE html> <html lang="ja"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Authentication complete</title> </head> <body> Zoom Authentication Complete </body> </html>
# <project-root>/zoom/views.py import requests, base64, json from django.shortcuts import render, redirect from django.urls import reverse # Create your views here. def auth(request): '''認証ページへリダイレクトさせる''' client_id = 'SnSyF4YFQOGpJlXXTbIE4w' client_secret = 'ofqwMjnKuuQba4tmmTQVOlfgmj447uHw' if 'code' not in request.GET: print('get code') auth_url = 'https://zoom.us/oauth/authorize' response_type = 'code' # ngrokでlocalhostをSSL形式でアクセスできるようにする redirect_uri = 'https://XXXXXX.ngrok.io/zoom/auth' auth_href = auth_url + '?response_type=' + response_type + '&client_id=' + client_id + '&redirect_uri=' + redirect_uri return render(request, 'auth/auth.html', { 'auth_href': auth_href }) else: print('get token') auth_url = 'https://zoom.us/oauth/token' code = request.GET.get('code') grant_type = 'authorization_code' redirect_uri = 'https://XXXXXX.ngrok.io/zoom/auth' # basic認証用のコードを作成(client_ID:Client_Secretをbase64エンコード) client_basic = base64.b64encode('{0}:{1}'.format(client_id, client_secret).encode()) # POST用のパラメータとカスタムヘッダを作成する post_payload = { 'code': code, 'grant_type': grant_type, 'redirect_uri': redirect_uri } post_header = { 'Authorization': 'Basic {0}'.format(client_basic.decode()) } # # Exec POST response = requests.post(auth_url, data=post_payload, headers=post_header) response_text = json.loads(response.text) if 'access_token' in response_text: # 認証結果をセッションに保存 request.session['zoom_access_token'] = response_text['access_token'] request.session['zoom_token_type'] = response_text['token_type'] request.session['zoom_refresh_token'] = response_text['refresh_token'] request.session['zoom_expires_in'] = response_text['expires_in'] request.session['zoom_scope'] = response_text['scope'] return redirect('zoom_auth_complete') else: return render(request, 'auth/auth.html', { 'auth_href': 'hoge' }) def auth_complete(request): '''認証完了ページ''' pass
ビルトインサーバを起動し、https://XXXXXX.ngrok.io/zoom/auth/
にアクセスする
リンクをクリックするとZoomのOAuth画面が表示される
※ Zoom APIを利用する場合はhttps
が必須らしくlocalhost
を利用する場合はngrok
などのサービスを使ってhttps
環境を作成する必要がある
OAuthを許可したら再度https://XXXXXX.ngrok.io/zoom/auth/
にリダイレクトされればOk
この時、URLパラメータにcode=XXXXXXXXXXX
というものがついている。これはアクセストークンの取得に必要となる
取得したコードをもとにアクセストークンを取得する
アクセストークンの取得にはZoomにアプリ登録した際のClientID
,Client_Secret
を使いBasic認証経由でPOST
する必要がある
Basic認証状はCientID:Client__Secret
をbase64エンコード
して送信する
またPOST
の際に設定するredirect_uri
パラメータだが、ここはZoomのアプリ登録時に設定したRedirect URL for OAuth
の文字列を設定していたがそれではエラー(redirect uri mismatch
)が出る
どうやらPOST
するページのURLでないとダメらしいので注意
# basic認証用のコードを作成(client_ID:Client_Secretをbase64エンコード) client_basic = base64.b64encode('{0}:{1}'.format(client_id, client_secret).encode()) # POST用のパラメータとカスタムヘッダを作成する post_payload = { 'code': code, 'grant_type': grant_type, 'redirect_uri': redirect_uri } post_header = { # base64エンコードした状態だとbyte形式となるので.decode()でString形式に変換する 'Authorization': 'Basic {0}'.format(client_basic.decode()) }
アクセストークンが取得できたらセッションに情報を追加して完了ページへリダイレクトする
簡略化のためにセッションにしているが本来はcookie
あたりがいいと思われる
2-2. 部屋の作成(時間の変更)
アクセストークンが取得できたら試しにミーティング部屋を作ってみる
Views
を以下の通り編集する
# <project-root>/zoom/views.py def auth_complete(request): '''認証完了ページ''' get_user_url = 'https://api.zoom.us/v2/users' access_token = request.session['zoom_access_token'] # ユーザー情報の取得 get_user_headers = { 'Authorization': 'Bearer {0}'.format(access_token) } get_user_response = requests.get(get_user_url, headers=get_user_headers) get_user_response_text = json.loads(get_user_response.text) user_info = get_user_response_text['users'][0] # 部屋の作成 create_meeting_url = 'https://api.zoom.us/v2/users/{0}/meetings'.format(user_info['id']) create_meeting_params = { 'topic': 'Sample Meeting', 'type': 2, # scheduled meeting 'start_time': '2020-11-02 T 12:00:00', 'duration': 180, 'timezone': user_info['timezone'], } create_meeting_params_json = json.dumps(create_meeting_params).encode('utf-8') create_meeting_headers = { 'Authorization': 'Bearer {0}'.format(access_token), 'Content-Type': 'application/json' } create_meeting_response = requests.post(create_meeting_url, data=create_meeting_params_json.decode(), headers=create_meeting_headers) create_meeting_response_text = json.loads(create_meeting_response.text) return render(request, 'auth/complete.html')
ミーティング部屋を作成するためにはユーザーID
が必要となるため、API経由で取得する
APIの詳細はList Users - Users - Zoom API - API Referenceを参照
アクセストークンはAuthrization: Bearer XXXX
というヘッダをつけて送信する
# ユーザー情報の取得 get_user_headers = { 'Authorization': 'Bearer {0}'.format(access_token) } get_user_response = requests.get(get_user_url, headers=get_user_headers) get_user_response_text = json.loads(get_user_response.text) user_info = get_user_response_text['users'][0]
ユーザーID
が取得できたらミーティング部屋を作成する
基本的に「ミーティング名称」「ミーティングタイプ」「開始日」「ミーティング時間」「タイムゾーン」あたりを設定していればよい
他にも様々なパラメータを渡すことができるので詳細は Create a Meeting - Meetings - Zoom API - API Reference を参照
この時、パラメータはdict
ではなくjson
を利用して渡すことになるため、ヘッダに「Content-Type: application/json
」が必要となる(ない場合は300 - Unsupported Content Type
というエラーが返ってくる)
# 部屋の作成 create_meeting_url = 'https://api.zoom.us/v2/users/{0}/meetings'.format(user_info['id']) create_meeting_params = { 'topic': 'Sample Meeting', 'type': 2, # scheduled meeting 'start_time': '2020-11-02 T 12:00:00', 'duration': 180, 'timezone': user_info['timezone'], } create_meeting_params_json = json.dumps(create_meeting_params).encode('utf-8') create_meeting_headers = { 'Authorization': 'Bearer {0}'.format(access_token), 'Content-Type': 'application/json' } create_meeting_response = requests.post(create_meeting_url, data=create_meeting_params_json.decode(), headers=create_meeting_headers) create_meeting_response_text = json.loads(create_meeting_response.text)
リクエストが正常に完了したらZoomを立ち上げ、スケジュールに追加されていれば処理完了
参考サイト
PyDriveを使ってGoogle DriveにDjangoからアクセスしてみる
PyDriveを使ってGoogle Driveを操作する必要があったのでメモ
調べてみると素のPythonのコードはたくさんあるんだけどDjango
を利用してのサンプルが見つからなかったので書いてみた。
とりあえずはOAuthの認証とファイル/フォルダの一覧取得まで。
前提
- Django 2.2.x
- PyDrive 1.3.1
手順
1. GCPコンソールで認証情報を作成する
- プロジェクト「PyDrive-Django」を作成(プロジェクト名は何でもよい)
- プロジェクト「PyDrive-Django」の管理画面を開く
- 画面中央「APIの概要に移動」をクリック(APIとサービスページに移動)
- 左側メニュー「認証情報」をクリック
- 画面上部「認証情報を作成」をクリック
- 「OAuthクライアントID」を選択
- OAuthクライアントIDの作成画面にて以下入力する
- アプリケーションの種類: ウェブアプリケーション
- 名前: 任意(SeminaRise-OAuth)
- 承認済みのJavaScript生成元
- 承認済みのリダイレクトURI
- 作成後、認証情報画面にリダイレクトされる
- OAuth2.0クライアントIDの一覧に先程追加した認証情報が表示される
- 先程作成した認証情報をダウンロードする(画面右端の矢印ボタン)
- 認証情報は
client_secrets.json
とリネームしておく
- 認証情報は
2. Djangoの実装をしていく
デモ用のアプリをDjangoにて作成する
$ python manage.py startapp gdrive
作成したアプリをsettings.py
で有効化する
# <project-app>/settings.py INSTALLED_APPS = [ ..., # My Applications 'gdrive.apps.Gdriveconfig', .... ]
gdrive
アプリ内に認証情報を入れるディレクトリを作成し、client_secrets.json
を入れておく
$ cd gdrive $ mkdir certs $ cd certs $ mv /path/to/client_secrets.json .
認証方式の設定ファイルsettings.yaml
を作成しclient_secrets.json
と同じ場所に入れておく
詳細な項目は OAuth made easy — PyDrive 1.3.0 documentation を参照
# <project-root>/gdrive/settings.yaml # 認証情報をどこから読み込むか(settings or file: default) client_config_backend: file # client_config_backend: fileの場合に指定するファイルのパス client_config_file: /path/to/<project-root>/gdrive/certs/client_secrets.json # 認証情報をファイルに保存するか save_credentials: True # 認証情報をどの形式で保存するか(file固定) save_credentials_backend: file # 認証情報の保存先ファイルパス(ファイルが存在しない場合は作成される / 2回目以降は更新) save_credentials_file: /path/to/<project-root>/gdrive/certs/saved_credentials.json # 認証情報を自動で更新するか? get_refresh_token: True
View
を作成する
とりあえずはQuickstart — PyDrive 1.3.0 documentationに記載されているコードを参考に作成する
import os from django.shortcuts import render, redirect from django.http import HttpResponse from pydrive.auth import GoogleAuth from pydrive.drive import GoogleDrive def index(request): current_dir_path = os.path.dirname(os.path.abspath(__file__)) settings_file_path = os.path.join(current_dir_path,'certs/settings.yaml') # settings.yamlを元にGoogleAuthを生成する gauth = GoogleAuth(settings_file=settings_file_path) auth_url = gauth.GetAuthUrl() # OAuthの認証がない場合はauth_url(認証用URL)に遷移する # 認証済みの場合はfile一覧に遷移する if 'code' in request.GET: code = request.GET.get('code') gauth.Auth(code) gauth.SaveCredentials('file') return redirect('files') else: return redirect(auth_url) def files(request): current_dir_path = os.path.dirname(os.path.abspath(__file__)) settings_file_path = os.path.join(current_dir_path,'certs/settings.yaml') gauth = GoogleAuth(settings_file=settings_file_path) # Google Driveのroot直下のファイル, フォルダ一覧を取得 drive = GoogleDrive(gauth) file_list = drive.ListFile({'q': "'root' in parents and trashed=false"}).GetList() files = [] for file_item in file_list: file = {} file['id'] = file_item['id'] file['title'] = file_item['title'] file['mimeType'] = file_item['mimeType'] files.append(file.copy()) return render(request, 'gdrive/files.html', { 'files': files })
View
に対応するルーティングを設定する
$ touch <project-root>/gdrive/urls.py
# <project-root>/gdrive/urls.py from django.urls import path from . import views urlpatterns = [ path('', views.index, name='index'), path('files/', views.files, name='files'), ] # <project-root>/<project-app>/urls.py from django.contrib import admin from django.urls import path, include urlpatterns = [ ... path('gdrive/', include('gdrive.urls')) ]
ビルトインサーバを起動しブラウザでhttp://localhost:8000/gdrive
にアクセスする
OAuthの認証ページが表示され、許可をするとファイル一覧が表示されればOK
認証さえできればあとはファイル操作はdocumentみればなんとかなりそう。
参考サイト
Django Channlesで投票アプリ(簡易版)を作る
先日やったDjango Channels
のチュートリアルを応用して投票アプリを作成してみる。
WebSocketの練習にもぴったりですね。
前提
- Django: 2.2.8
- Channels: 2.4.0
手順
投票用アプリの作成
次のコマンドを叩いて投票用アプリを作成する
$ python manage startapp vote
テンプレートを作成する
問題作成用HTML, 回答用HTMLを作る
$ mkdir <project-root>/vote/templates/question $ touch <project-root>/vote/templates/question/question.html $ mkdir <project-root>/vote/templates/answer $ touch <project-root>/vote/templates/answer/answer.html
<!-- question.html --> <!DOCTYPE html> <html lang="ja"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Create Question</title> <style> lavel { width: 100%; display: flex; } </style> </head> <body> <h1>Create Question [ Room Name: {{ room_name }} ]</h1> <label> <span>Question: </span> <input type="text" id="vote-question-input" size="100"> </label> <input type="button" id="vote-question-send" value="Send"> <div> 回答状況: <span id="vote-results"></span> </div> <script> const roomName = '{{ room_name }}' // WebSocketの定義 const questionSocket = new WebSocket('ws://' + window.location.host + '/ws/vote/' + roomName + '/') let answers = [] // サーバからのメッセージ受信時処理 questionSocket.onmessage = (e) => { const data = JSON.parse(e.data) if(data.type !== 'send_answer') return; // let resultsData = document.getElementById('vote-resutls').innerHTML || '' // resultsData += data.answer + ',' // document.getElementById('vote-results').innerHTML = resultsData answers.push(data.answer) document.getElementById('vote-results').innerHTML = answers.join(',') } // WebSocket終了時処理 questionSocket.onclose = (e) => { console.error('Question socket closed.') } // 問題送信処理 document.getElementById('vote-question-send').onclick = (e) => { const questionInputDom = document.getElementById('vote-question-input') const question = questionInputDom.value; if (question === '') return; let questionResult = {} questionResult.question = question questionSocket.send(JSON.stringify({ 'question': question })) questionInputDom.value = '' } </script> </body> </html> <!-- answer.html --> <!DOCTYPE html> <html lang="ja"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Vote Answer</title> <style> lavel { width: 100%; display: flex; } #vote-question-answers { display: none; } #vote-question-answers.show { display: block; } </style> </head> <body> <h1>Create Question [ Room Name: {{ room_name }} ]</h1> <label> <span>Question: </span> <div id="vote-question-text">投票前です</div> </label> <div id="vote-question-answers"> <!-- 今回はデモなので選択肢の文言を固定とする --> <label><input type="radio" name="vote-answer" value="大分県"> 大分県</label> <label><input type="radio" name="vote-answer" value="大分県以外"> 大分県以外</label> <input type="button" id="vote-question-send" value="Send"> </div> <script> const roomName = '{{ room_name }}' // WebSocketの定義 const answerSocket = new WebSocket('ws://' + window.location.host + '/ws/vote/' + roomName + '/') // サーバからのメッセージ受信時処理 answerSocket.onmessage = (e) => { const data = JSON.parse(e.data); if(data.type !== 'send_question') return; document.getElementById('vote-question-text').innerHTML = data.question document.getElementById('vote-question-answers').classList.toggle('show') } // WebSocket終了時処理 answerSocket.onclose = (e) => { console.error('Question socket closed.') } // 問題送信処理 document.getElementById('vote-question-send').onclick = (e) => { const answerInputDoms = document.querySelectorAll('[name=vote-answer]') const answers = {} for(let i = 0; i < answerInputDoms.length; i++) { if(!answerInputDoms[i].checked) continue answers.answer = answerInputDoms[i].value } if (Object.keys(answers).length == 0) return; answerSocket.send(JSON.stringify(answers)) document.getElementById('vote-question-text').innerHTML = '回答しました' document.getElementById('vote-question-answers').classList.toggle('show') } </script> </body> </html>
Views, Rouinの設定を行う
question.html, answer.htmlを利用できるよう views
の設定を行う
# <project-root>/vote/views.py from django.shortcuts import render def index(request): pass def question(request, room_name): return render(request, 'question/question.html', { 'room_name': room_name }) def answer(request, room_name): return render(request, 'answer/answer.html', { 'room_name': room_name })
viewに対する routing
の設定を行う
# <project-root>/<project-app>/urls.py urlpatterns = [ ... path('vote/', include('vote.urls')), ] # <project-root>/vote/urls.py from django.urls import path from . import views urlpatterns = [ path('question/<str:room_name>/', views.question, name='question'), path('answer/<str:room_name>/', views.answer, name='answer'), ]
Websocketの処理を作成する
WebSocketを利用するために consumer
を作成する
複数のグループに属する場合のうまい書き方がわからなくてself.channel_layer.group_add()
を複数回書いてるけどこんなんでいいのかな…
import json from channels.generic.websocket import AsyncWebsocketConsumer class VoteConsumer(AsyncWebsocketConsumer): # 接続時処理 async def connect(self): # チャンネル名とグループ名を定義 self.room_group_name_question = 'question_%s' % self.scope['url_route']['kwargs']['room_name'] self.room_group_name_answer = 'answer_%s' % self.scope['url_route']['kwargs']['room_name'] # グループに追加 await self.channel_layer.group_add( self.room_group_name_question, self.channel_name ) await self.channel_layer.group_add( self.room_group_name_answer, self.channel_name ) await self.accept() # 切断時処理 async def disconnect(self, close_code): # グループから削除 await self.channel_layer.group_discard( self.room_group_name_question, self.channel_name ) await self.channel_layer.group_discard( self.room_group_name_answer, self.channel_name ) await self.close() # メッセージ受信時処理 async def receive(self, text_data): text_data_json = json.loads(text_data) if 'question' in text_data_json.keys(): await self.channel_layer.group_send( self.room_group_name_answer, { 'type': 'send_question', 'question': text_data_json['question'] } ) if 'answer' in text_data_json.keys(): await self.channel_layer.group_send( self.room_group_name_question, { 'type': 'send_answer', 'answer': text_data_json['answer'] } ) # 質問の送信 async def send_question(self, event): question = event['question'] await self.send(text_data=json.dumps({ 'type': 'send_question', 'question': question })) # 回答の送信 async def send_answer(self, event): answer = event['answer'] await self.send(text_data=json.dumps({ 'type': 'send_answer', 'answer': answer }))
WebSocket用のルーティングを設定する
前述のConsumer
に対する routing
を設定する
これを設定することでWebSocketのルーティングが有効化される
# <project-root>/<project-app>/asgi.py ... import django.urls import re_path import vote.consumers application = ProtocolTypeRouter({ "http": AsgiHandler, "websocket": AuthMiddlewareStack( URLRouter([ ..., re_path(r'ws/vote/(?P<room_name>\w+)/$', vote.consumers.VoteConsumer), ]) ), })
動作確認
ここまでできたらサーバを起動する(Channels用のRedisも起動しておく)
$ redis-server & $ python manage.py runserver 0.0.0.0:8000
ブラウザでhttp://localhost:8000/vote/question/lobby
, http://localhost:8000/vote/answer/lobby
にアクセスしてみる。
/vote/question/lobby
で質問を入力し送信すると、/vote/answer/lobby
側で質問が反映される。質問に回答すると/vote/question/lobby
側に回答が追加されるのがわかる。
今回は回答を固定したり、回答者の情報は送信しないなど本当に必要最低限の機能しか実装していないが、あとは作り込みの問題なので割愛とする
Django ChannelsがあるとWebSocketの利用が楽になっていいですね。
Django と Channelsでチャット作成のチュートリアルをやってみる(Tutorial Part 4: Automated Testing)
Channelsのチュートリアルシリーズ最後!
今回は前章までで作成したもののテストコードを書く
前章までの記事は下記
https://satoh-d.hatenablog.com/entry/2020/10/26/180000satoh-d.hatenablog.com
https://satoh-d.hatenablog.com/entry/2020/10/27/180000satoh-d.hatenablog.com
前提
- Django: 2.2.8(本当はDjang 3.0+がいいけどローカルがこれだったので。。)
- Channels: 2.4.0
- Redis-server: 5.0.9
- Google Chrome: 86.0.4240.111(Headless Chromeで利用)
- ChromeDriver: 86.0.4240.22
- Selenium: 3.141.0
Chrome, ChromeDriver, Seleniumのインストール
それぞれのインストール方法は次の通り(今回はDockerコンテナ上で実施)
# Google Chroome $ wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add $ echo 'deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main' | tee /etc/apt/sources.list.d/google-chrome.list $ apt-get update $ apt-get install -y google-chrome-stable $ google-chrome --version Google Chrome 86.0.4240.111 # Chrome Driver $ curl -OL https://chromedriver.storage.googleapis.com/76.0.3809.126/chromedriver_linux64.zip $ unzip chromedriver_linux64.zip chromedriver $ mv chromedriver /usr/bin/chromedriver # Selenium $ pip install selenium
テストコードは下記の通り
Headless Chromeを定義する際にOptionを指定しないとテスト実施時にエラーとなるため注意すること
# chat/tests.py from channels.testing import ChannelsLiveServerTestCase from selenium import webdriver # Tutorialに載ってないけどOptionsを定義しないとテスト実施時にエラーとなる from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.wait import WebDriverWait class ChatTests(ChannelsLiveServerTestCase): serve_static = True # emulate StaticLiveServerTestCase @classmethod def setUpClass(cls): super().setUpClass() # optionsはTutorialに載ってないけど定義しないとテスト実施時にエラーとなる options = Options() options.add_argument('--no-sandbox') options.add_argument('--headless') try: # NOTE: Requires "chromedriver" binary to be installed in $PATH cls.driver = webdriver.Chrome(options=options) except: super().tearDownClass() raise @classmethod def tearDownClass(cls): cls.driver.quit() super().tearDownClass() def test_when_chat_message_posted_then_seen_by_everyone_in_same_room(self): try: self._enter_chat_room('room_1') self._open_new_window() self._enter_chat_room('room_1') self._switch_to_window(0) self._post_message('hello') WebDriverWait(self.driver, 2).until(lambda _: 'hello' in self._chat_log_value, 'Message was not received by window 1 from window 1') self._switch_to_window(1) WebDriverWait(self.driver, 2).until(lambda _: 'hello' in self._chat_log_value, 'Message was not received by window 2 from window 1') finally: self._close_all_new_windows() def test_when_chat_message_posted_then_not_seen_by_anyone_in_different_room(self): try: self._enter_chat_room('room_1') self._open_new_window() self._enter_chat_room('room_2') self._switch_to_window(0) self._post_message('hello') WebDriverWait(self.driver, 2).until(lambda _: 'hello' in self._chat_log_value, 'Message was not received by window 1 from window 1') self._switch_to_window(1) self._post_message('world') WebDriverWait(self.driver, 2).until(lambda _: 'world' in self._chat_log_value, 'Message was not received by window 2 from window 2') self.assertTrue('hello' not in self._chat_log_value, 'Message was improperly received by window 2 from window 1') finally: self._close_all_new_windows() # === Utility === def _enter_chat_room(self, room_name): self.driver.get(self.live_server_url + '/chat/') ActionChains(self.driver).send_keys(room_name + '\n').perform() WebDriverWait(self.driver, 2).until(lambda _: room_name in self.driver.current_url) def _open_new_window(self): self.driver.execute_script('window.open("about:blank", "_blank");') self.driver.switch_to_window(self.driver.window_handles[-1]) def _close_all_new_windows(self): while len(self.driver.window_handles) > 1: self.driver.switch_to_window(self.driver.window_handles[-1]) self.driver.execute_script('window.close();') if len(self.driver.window_handles) == 1: self.driver.switch_to_window(self.driver.window_handles[0]) def _switch_to_window(self, window_index): self.driver.switch_to_window(self.driver.window_handles[window_index]) def _post_message(self, message): ActionChains(self.driver).send_keys(message + '\n').perform() @property def _chat_log_value(self): return self.driver.find_element_by_css_selector('#chat-log').get_property('value')
ChannelsLiveServerTestCase
はDjangoのE2Eテストツール(StaticLiveServerTestCase
やLiveServerTestCase
)を拡張したもの
Channles
で定義した内部のルーティング(/ws/room/ROOM_NAME
のような)をいい感じにテストしてくれるとか
テストコード実行時にテスト用DBを自動で作成するため、権限を付けてあげる
今回は(MySQL, DB名: test_django(デフォルト))
mysql> grant all privileges on test_django.* to 'user'@'%';
実際にテストを動かしてみる
「OK」とでればテストが通ったことになる
$ python manage.py test chat.tests Creating test database for alias 'default'... System check identified no issues (0 silenced). .. ---------------------------------------------------------------------- Ran 2 tests in 2.818s OK Destroying test database for alias 'default'...
最後に
Tutorialと環境が違う(そもそもDjangoのバージョンがちがったり色々...)ことでハマってしまうところが多々あった。
とはいえなんとか終わらせることができてよかった。
もうすこしWebSocketでできることをしらべないといけないかなーと...。
参考サイト
Django と Channelsでチャット作成のチュートリアルをやってみる(Tutorial Part 3: Rewrite Chat Server as Asynchronous)
今回は前章までで作成したチャットサーバを非同期処理で書き直す
非同期処理にするとリクエストごとにスレッドが追加されないから同期処理に比べてパフォーマンスが高いとのこと
前章までで作成したChatConsumer
を非同期で書き直すと以下の通り
# chat/consumers.py import json from channels.generic.websocket import AsyncWebsocketConsumer class ChatConsumer(AsyncWebsocketConsumer): # 接続時処理 async def connect(self): # チャンネル名とグループ名を定義 self.room_name = self.scope['url_route']['kwargs']['room_name'] self.room_group_name = 'chat_%s' % self.room_name # Join room group await self.channel_layer.group_add( self.room_group_name, self.channel_name ) await self.accept() # 切断時処理 async def disconnect(self, close_code): # Leave room group await self.channel_layer.group_discard( self.room_group_name, self.channel_name ) # メッセージ受信時処理 async def receive(self, text_data): text_data_json = json.loads(text_data) message = text_data_json['message'] # Send message to room group await self.channel_layer.group_send( self.room_group_name, { 'type': 'chat_message', 'message': message } ) # メッセージの受信→送信 async def chat_message(self, event): message = event['message'] # Send message to WebSocket await self.send(text_data=json.dumps({ 'message': message }))
非同期処理に書き換えた場合に変わる点は以下の通り
WebsocketConsumer
ではなくAsyncWebsocketConsumer
を継承する- 全てのメソッドが
def
ではなくasync def
に変更となる - I/Oが発生する処理は全て
await
を付与する async_to_sync
を利用しなくなる
参考サイト
Django と Channelsでチャット作成のチュートリアルをやってみる(Tutorial Part 2: Implement a Chat Server)
今回はいよいよChannelsを利用してチャットのデモを作るところ
チュートリアルと環境が違うこと、Redis用に新たにdockerコンテナを作るところがあったけど敢えてコンテナを作らず直接インストールしようとしたらすんごいハマってしまった。
でもうまく行ったからよしとしよう
前提
- Django: 2.2.8(本当はDjang 3.0+がいいけどローカルがこれだったので。。)
- Channels: 2.4.0
- Redis-server: 5.0.9
3. tutorial
3-1. Channlesのセットアップ
channles用アプリを作る
$ python manage.py startapp chat
今回はchat/views.py
, chhat/__init__.py
しか使わないので他のファイルは削除する
<project-app>/settings.py
に先程追加したchat
を有効化する処理を追記する
INSTALLED_APPS = [ ... # My applications 'chat', ... ]
chat/templates/chat/index.html
を作成する
<!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Chat Rooms</title> </head> <body> What chat room would you like to enter?<br> <input id="room-name-input" type="text" size="100"><br> <input id="room-name-submit" type="button" value="Enter"> <script> document.querySelector('#room-name-input').focus(); document.querySelector('#room-name-input').onkeyup = function(e) { if (e.keyCode === 13) { // enter, return document.querySelector('#room-name-submit').click(); } }; document.querySelector('#room-name-submit').onclick = function(e) { var roomName = document.querySelector('#room-name-input').value; window.location.pathname = '/chat/' + roomName + '/'; }; </script> </body> </html>
chat/views.py
に以下を追記する
def index(request): # chat/templates/chat/index.html をビューとして返す return render(request, 'chat/index.html')
chat/urls.py
, <project-app>/urls.py
にルーティングを追加する
# chat/urls.py from django.urls import path from . import views urlpatterns = [ path('', views.index, name='index') ]
# <project-app>/urls.py from django.contrib import admin from django.urls import path, include urlpatterns = [ ... path('chat/', include('chat.urls')), ]
3-2. チャットサーバーの実装(Implement a Chat Server)
チャット部屋用のviewを追加する
$ touch <project-app>/chat/templates/chat/room.html
<!-- chat/templates/chat/room.html --> <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Chat Room</title> </head> <body> <textarea id="chat-log" cols="100" rows="20"></textarea><br> <input id="chat-message-input" type="text" size="100"><br> <input id="chat-message-submit" type="button" value="Send"> {{ room_name|json_script:"room-name" }} <script> const roomName = JSON.parse(document.getElementById('room-name').textContent); const chatSocket = new WebSocket( 'ws://' + window.location.host + '/ws/chat/' + roomName + '/' ); chatSocket.onmessage = function(e) { const data = JSON.parse(e.data); document.querySelector('#chat-log').value += (data.message + '\n'); }; chatSocket.onclose = function(e) { console.error('Chat socket closed unexpectedly'); }; document.querySelector('#chat-message-input').focus(); document.querySelector('#chat-message-input').onkeyup = function(e) { if (e.keyCode === 13) { // enter, return document.querySelector('#chat-message-submit').click(); } }; document.querySelector('#chat-message-submit').onclick = function(e) { const messageInputDom = document.querySelector('#chat-message-input'); const message = messageInputDom.value; chatSocket.send(JSON.stringify({ 'message': message })); messageInputDom.value = ''; }; </script> </body> </html>
追加したテンプレートに対応するview, ルーティングを<project-root>/chat/views.py, urls.py
に追記する
def room(request, room_name): return render(request, 'chat/room.html', { 'room_name': room_name })
urlpatterns = [ path('', views.index, name='index'), path('<str:room_name>/', views.room, name='room'), # 追記 ]
http://localhost:8000/chat/lobby
にアクセスしてみるとチャット用画面が表示される
試しにテキストボックスにHello
と入力しsendを押してみるとWebSocketのエラー
が発生する
これはWebSocket
のルーティングが定義されていないためである
Websocket
の処理を記述するために<project-root>/chat/consumers.py
を作成する
$ touch <project-root>/chat/consumers.py
# chat/consumers.py import json from channels.generic.websocket import WebsocketConsumer class ChatConsumer(WebsocketConsumer): # 接続時処理 def connect(self): self.accept() # 切断時処理 def disconnect(self, close_code): pass # メッセージ受信時処理 def receive(self, text_data): text_data_json = json.loads(text_data) message = text_data_json['message'] self.send(text_data=json.dumps({ 'message': message }))
consumers.py
用に新規でルーティングを<project-root>/chat/routing.py
に記載する
$ touch <project-root>/chat/routing.py
# chat/routing.py from django.urls import re_path from . import consumers websocket_urlpatterns = [ re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()), ] # Django 2.2以下では次のように記載する # `ChatConsumer` has no attribute `as_asgi()`とエラーが出るため websocket_urlpatterns = [ re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer), ]
上記のas_asgi()
はユーザの接続ごとにconsumerのインスタンスを生成するASGIアプリケーションである
Django
のas_view()
のようにユーザの接続ごとにviewを生成する動きにと同じようなもの
次に<project-root>/<project-app>/asgi.py
を以下のように編集する
# mysite/asgi.py import os from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter from django.core.asgi import get_asgi_application import chat.routing os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings") application = ProtocolTypeRouter({ "http": get_asgi_application(), "websocket": AuthMiddlewareStack( URLRouter( chat.routing.websocket_urlpatterns ) ), }) # Django 2.2以下は以下のようなコードになる(get_asgi_applicationあたりが変わっている) import os import django from channels.auth import AuthMiddlewareStack from channels.http import AsgiHandler from channels.routing import ProtocolTypeRouter, URLRouter import chat.routing os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') django.setup() application = ProtocolTypeRouter({ "http": AsgiHandler, "websocket": AuthMiddlewareStack( URLRouter( chat.routing.websocket_urlpatterns ) ), })
AuthMiddlewareStack
でWebSocket
の接続情報を定義することでWebSocket
の接続プロトコル(ws://
, wss://
)で接続できるようになる
改めてhttp://localhost:8000/chat/lobby
にアクセスする
テキストボックスにHello
と入力しsend
を押すとエラーなく上のテキストエリアにHelllo
と表示される
しかしタブを複数開きそれぞれでテキストボックスに文字を入力しても、別のタブには反映されない
これはChatConsumer
がそれぞれの接続ごとにインスタンス化されているが、お互いに会話はできていない状態となっている
Channels
にはchannel layer
というレイヤーが提供されており、これを利用することでChatConsumer
同士で会話ができるようになる
チャンネルレイヤーとは
各コンシューマインスタンス同士やDjangoの他のパーツとの会話ができるようになるシステム
チャンネルレイヤーは以下を提供している
channel
: メールボックス的なもの。各チャンネルには名前があり、その名前があれば(所属していれば?)誰でもメッセージを送ることができるgroup
: チャンネルの関連的なもの。各グループには名前があり、その名前があれば(所属していれば?)グループの追加/削除に、グループに所属している全てのチャンネルにメッセージを送ることができる
チャンネルレイヤーを利用するにはRedis
が動いている必要がある
Redis
をインストールする(Tutorialではdockerを起動しているが、PythonのDockerに手動でインストール → 起動してみる)
$ cd /code # redis-serverのインストール $ wget http://download.redis.io/releases/redis-5.0.9.tar.gz $ tar xzf redis-5.0.9.tar.gz $ cd redis-5.0.9 $ make install # redis-serverの起動(続けて操作するために&をつけることでBG起動) $ /usr/loca/bin/redis-server & # ダウンロードしたファイルの削除 $ cd .. $ rm redis-5.0.9.tar.gz $ rm -r redis-5.0.9
Redis
をPythonから利用するためのパッケージをインストールしておく
$ pip install channels_redis
チャンネルレイヤーを使うための設定を<project-root>/<project-app>/settings.py
に追記する
# mysite/settings.py # Channels ASGI_APPLICATION = 'mysite.routing.application' CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": [('127.0.0.1', 6379)], }, }, }
チャンネルレイヤーがRedis
の元で動いているか確認をするためにshell
で実行してみる
$ cd <project-root> $ python manage.py shell # チャンネルレイヤーをimport >>> import channels.layers >>> channel_layer = channels.layers.get_channel_layer() >>> from asgiref.sync import async_to_sync # test_channelに`hello`とメッセージを送信 >>> async_to_sync(channel_layer.send)('test_channel', {'type': 'hello'}) # test_channelに送信されたメッセージを受信 >>> async_to_sync(channel_layer.receive)('test_channel') # helloが取得できれば成功 {'type': 'hello'}
これでチャンネルレイヤーを使う準備ができたので、ChatCoonsumer
を以下の通り修正する
# chat/consumers.py import json from asgiref.sync import async_to_sync from channels.generic.websocket import WebsocketConsumer class ChatConsumer(WebsocketConsumer): # 接続時処理 def connect(self): # チャンネル名とグループ名を定義 self.room_name = self.scope['url_route']['kwargs']['room_name'] self.room_group_name = 'chat_%s' % self.room_name # Join room group async_to_sync(self.channel_layer.group_add)( self.room_group_name, self.channel_name ) self.accept() # 切断時処理 def disconnect(self, close_code): # Leave room group async_to_sync(self.channel_layer.group_discard)( self.room_group_name, self.channel_name ) # メッセージ受信時処理 def receive(self, text_data): text_data_json = json.loads(text_data) message = text_data_json['message'] # Send message to room group async_to_sync(self.channel_layer.group_send)( self.room_group_name, { 'type': 'chat_message', 'message': message } ) # メッセージの受信→送信 def chat_message(self, event): message = event['message'] # Send message to WebSocket self.send(text_data=json.dumps({ 'message': message }))
- ユーザがPOSTしたメッセージはJavaScriptにてWebSocketを通じて
ChatConsumer
に届く ChatConsumer
はメッセージに対応するグループに属するChatConsumer
に転送する- グループ名はURLの部屋名称が
ChatConsumer.connect
にて設定されている
- グループ名はURLの部屋名称が
- 同じグループに属する
ChatConsumer
は受け取ったメッセージをWebSocket
を通じてJavaScriptに転送する(chatsocket.onmessage
部分)
ソースコードの修正が完了したら http://localhost:8000/chat/lobby
を2窓開き、それぞれでメッセージを送信してみる
お互いの窓でメッセージが表示されれば成功
参考サイト
Django と Channelsでチャット作成のチュートリアルをやってみる(Tutorial Part 1: Basic Setup)
案件でチャットを作ることになりそうなので勉強中のDjango
でどこまでできるか確認メモ
調べてみるとChannels
を使えばできそうというということがわかったのでチュートリアルをやってみる
とりあえず今回は Tutorial Part 1: Basic Setup — Channels 2.4.0 documentation までやってみた
前提
- Django: 2.2.8(本当はDjang 3.0+がいいけどローカルがこれだったので。。)
- Channels: 2.4.0
1. Django Channelsとは
Channels wraps Django’s native asynchronous view support, allowing Django projects to handle not only HTTP, but protocols that require long-running connections too - WebSockets, MQTT, chatbots, amateur radio, and more.
DjangoがHTTPだけでなくWebsockeet、MQTT、チャットボット、アマチュア無線など長時間の接続を必要とするプロトコルを扱えるようにする
ASGI
がそれを可能にしているそうな
ASGI(the Asynchronous Server Gateway Interface)
Channels
で作られてたアプリケーションをアプリケーションサーバから切り離し、アプリケーションとミドルウェアとの間の標準インタフェース
WSGI
拡張らしい
2. インストール
pip
を利用してインストールを行う
$ pip install channels $ pip list | grep channels channels 2.4.0
<project-app>/settings.py
を編集しchannels
を有効化する
INSTALLED_APPS = [ ... # 3rd party apps 'channels', ... ]
<project-app>/asgi.py
を新規作成する
※ 今回は config/asgi.py
とする
import os from channels.routing import ProtocolTypeRouter from django.core.asgi import get_asgi_application os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') application = ProtocolTypeRouter({ "http": get_asgi_application(), # Just HTTP for now. (We can add other protocols later.) }) # Django 2.2はASGIをサポートしていないので代替手段となる以下コードを記載する import os import django from channels.http import AsgiHandler from channels.routing import ProtocolTypeRouter os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') django.setup() application = ProtocolTypeRouter({ # Tutorialは "http": AsgiHandler() となっているが AsgiHandler でないと動かない "http": AsgiHandler, # Just HTTP for now. (We can add other protocols later.) })
<projcect-app>/settings.py
にASGI_APPLICATION
の設定を追記する
... # Channles and ASGI Settings ASGI_APPLICATION = 'config.asgi.application' ...
3. tutorial
3-1. Channlesのセットアップ
channles用アプリを作る
$ python manage.py startapp chat
今回はchat/views.py
, chhat/__init__.py
しか使わないので他のファイルは削除する
<project-app>/settings.py
に先程追加したchat
を有効化する処理を追記する
INSTALLED_APPS = [ ... # My applications 'chat', ... ]
chat/templates/chat/index.html
を作成する
<!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Chat Rooms</title> </head> <body> What chat room would you like to enter?<br> <input id="room-name-input" type="text" size="100"><br> <input id="room-name-submit" type="button" value="Enter"> <script> document.querySelector('#room-name-input').focus(); document.querySelector('#room-name-input').onkeyup = function(e) { if (e.keyCode === 13) { // enter, return document.querySelector('#room-name-submit').click(); } }; document.querySelector('#room-name-submit').onclick = function(e) { var roomName = document.querySelector('#room-name-input').value; window.location.pathname = '/chat/' + roomName + '/'; }; </script> </body> </html>
chat/views.py
に以下を追記する
def index(request): # chat/templates/chat/index.html をビューとして返す return render(request, 'chat/index.html')
chat/urls.py
, <project-app>/urls.py
にルーティングを追加する
# chat/urls.py from django.urls import path from . import views urlpatterns = [ path('', views.index, name='index') ]
# <project-app>/urls.py from django.contrib import admin from django.urls import path, include urlpatterns = [ ... path('chat/', include('chat.urls')), ]