@Satoh_D no blog

大分にUターンしたので記念に。調べたこととか作ったこととか食べたこととか

Zoom APIをDjangoで扱ってみる

Zoom APIを利用して部屋の作成とかする案件がありそうなので勉強のために調べてみたことメモ。
Djangoなのはこれも勉強のため。。

前提

ゴール

  • DjangoからZoomのOAuth認証ができること
  • DjangoからZoomの部屋を作成できること(予約情報の変更もできること)

手順

1. Zoomにてアプリ登録を行う

  • OAuth - Build an App - Documentationにアクセス
  • 右上「Create App」をクリック
    • Zoomアカウントでのログインを求められるのでログインする
  • 「Choose your app type」にて「OAuth > Create」をクリック
  • 「Create on OAuth app」というモーダルが表示されるので必要に応じて情報を入力する
    • App Name: 任意(今回はSamplaApp)
    • Choose app type: 任意(今回はAccount-level appを選択)
    • Would you like to publish this app on Zoom App Marketplace?: Offを選択
  • モーダル内「Create」をクリックするとアプリが作成され、以下情報が取得できる
    • 取得できる情報
      • Client ID
      • Client Secret
    • またOAuthでの認証後にリダイレクトされるページのURLを指定する(今回はhttp://localhost:8000/zoom/auth/completeとする)
    • Whitelist URLも登録しておく(今回はhttp://localhost:8000
  • 画面左側メニュー「Scopes」をクリック
  • 「Add scopes+」をクリック
    • 必要に応じて認証のスコープを設定する(今回はMeetingを作成できればよいのでuser:read:admin, meeting:write:admin, meeting:writeを選択)
      • APIのリファレンス(API Reference)に必要なスコープが書かれているので参考にする
    • スコープの選択が完了したらモーダル内「Done」をクリック

2. Djangoで実装を行う

2-1. デモアプリを作成する

アプリ名はわかりやすくzoomとする

$ python manage.py startapp zoom

settings.pyにてzoomを有効化する

# <project-root>/<project-app>/settings.py
INSTALLED_APPS = [
    ...,
    # My Applications
    'zoom.apps.ZoomConfig',
]

2-1. 認証, アクセストークンの取得

認証に関するルーティングを設定する

# <project-root>/<zoom/urls.py
from django.urls import path

from . import views

urlpatterns = [
    # /auth: 認証ページにリダイレクト
    # /auth/complete: 認証完了
    # /meeting: ミーティング一覧
    # /meeting/add: ミーティング作成
    path('', views.index, name='zoom_index'),
    path('auth/', views.auth, name='zoom_auth'),
    path('auth/complete', views.index, name='zoom_auth_complete'),
]

# <project-root>/<project-app>/urls.py
...
urlpattenrs = [
    ...,
    path('zoom/', include('zoom.urls')),
    ...,
]

ルーティングに対応するTemplate, Viewを作成する
とりあえず必要な箇所以外は一旦処理を省略する

<!-- <project-root>/zoom/templates/auth/auth.html -->
<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Zoom - Auth</title>
</head>
<body>
    
    <h1>Zoom Auth</h1>

    <a href="{{ auth_href }}">Zoomで認証する</a>
</body>
</html>

<!-- <project-root>/zoom/templates/auth/complete.html -->
<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Authentication complete</title>
</head>
<body>
    Zoom Authentication Complete
</body>
</html>
# <project-root>/zoom/views.py
import requests, base64, json

from django.shortcuts import render, redirect
from django.urls import reverse

# Create your views here.
def auth(request):
    '''認証ページへリダイレクトさせる'''

    client_id = 'SnSyF4YFQOGpJlXXTbIE4w'
    client_secret = 'ofqwMjnKuuQba4tmmTQVOlfgmj447uHw'

    if 'code' not in request.GET:
        print('get code')
        auth_url = 'https://zoom.us/oauth/authorize'
        response_type = 'code'
        # ngrokでlocalhostをSSL形式でアクセスできるようにする
        redirect_uri = 'https://XXXXXX.ngrok.io/zoom/auth'

        auth_href = auth_url + '?response_type=' + response_type + '&client_id=' + client_id + '&redirect_uri=' + redirect_uri
        
        return render(request, 'auth/auth.html', {
            'auth_href': auth_href
        })
    else:
        print('get token')
        auth_url = 'https://zoom.us/oauth/token'
        code = request.GET.get('code')
        grant_type = 'authorization_code'
        redirect_uri = 'https://XXXXXX.ngrok.io/zoom/auth'

        # basic認証用のコードを作成(client_ID:Client_Secretをbase64エンコード)
        client_basic = base64.b64encode('{0}:{1}'.format(client_id, client_secret).encode())

        # POST用のパラメータとカスタムヘッダを作成する
        post_payload = {
            'code': code,
            'grant_type': grant_type,
            'redirect_uri': redirect_uri
        }
        post_header = {
            'Authorization': 'Basic {0}'.format(client_basic.decode())
        }

        # # Exec POST
        response = requests.post(auth_url, data=post_payload, headers=post_header)
        response_text = json.loads(response.text)

        if 'access_token' in response_text:
            # 認証結果をセッションに保存
            request.session['zoom_access_token'] = response_text['access_token']
            request.session['zoom_token_type'] = response_text['token_type']
            request.session['zoom_refresh_token'] = response_text['refresh_token']
            request.session['zoom_expires_in'] = response_text['expires_in']
            request.session['zoom_scope'] = response_text['scope']

            return redirect('zoom_auth_complete')
        else:
            return render(request, 'auth/auth.html', {
                'auth_href': 'hoge'
            })

def auth_complete(request):
    '''認証完了ページ'''
    pass

ビルトインサーバを起動し、https://XXXXXX.ngrok.io/zoom/auth/にアクセスする リンクをクリックするとZoomのOAuth画面が表示される
※ Zoom APIを利用する場合はhttpsが必須らしくlocalhostを利用する場合はngrokなどのサービスを使ってhttps環境を作成する必要がある

OAuthを許可したら再度https://XXXXXX.ngrok.io/zoom/auth/にリダイレクトされればOk
この時、URLパラメータにcode=XXXXXXXXXXXというものがついている。これはアクセストークンの取得に必要となる

取得したコードをもとにアクセストークンを取得する
アクセストークンの取得にはZoomにアプリ登録した際のClientID,Client_Secretを使いBasic認証経由でPOSTする必要がある
Basic認証状はCientID:Client__Secretbase64エンコードして送信する

またPOSTの際に設定するredirect_uriパラメータだが、ここはZoomのアプリ登録時に設定したRedirect URL for OAuthの文字列を設定していたがそれではエラー(redirect uri mismatch)が出る
どうやらPOSTするページのURLでないとダメらしいので注意

# basic認証用のコードを作成(client_ID:Client_Secretをbase64エンコード)
client_basic = base64.b64encode('{0}:{1}'.format(client_id, client_secret).encode())

# POST用のパラメータとカスタムヘッダを作成する
post_payload = {
    'code': code,
    'grant_type': grant_type,
    'redirect_uri': redirect_uri
}
post_header = {
    # base64エンコードした状態だとbyte形式となるので.decode()でString形式に変換する
    'Authorization': 'Basic {0}'.format(client_basic.decode())
}

アクセストークンが取得できたらセッションに情報を追加して完了ページへリダイレクトする
簡略化のためにセッションにしているが本来はcookieあたりがいいと思われる

2-2. 部屋の作成(時間の変更)

アクセストークンが取得できたら試しにミーティング部屋を作ってみる
Viewsを以下の通り編集する

# <project-root>/zoom/views.py
def auth_complete(request):
    '''認証完了ページ'''

    get_user_url = 'https://api.zoom.us/v2/users'
    access_token = request.session['zoom_access_token']

    # ユーザー情報の取得
    get_user_headers = {
        'Authorization': 'Bearer {0}'.format(access_token)
    }
    get_user_response = requests.get(get_user_url, headers=get_user_headers)
    get_user_response_text = json.loads(get_user_response.text)
    user_info = get_user_response_text['users'][0]

    # 部屋の作成
    create_meeting_url = 'https://api.zoom.us/v2/users/{0}/meetings'.format(user_info['id'])
    create_meeting_params = {
        'topic': 'Sample Meeting',
        'type': 2, # scheduled meeting
        'start_time': '2020-11-02 T 12:00:00',
        'duration': 180,
        'timezone': user_info['timezone'],
    }
    create_meeting_params_json = json.dumps(create_meeting_params).encode('utf-8')
    create_meeting_headers = {
        'Authorization': 'Bearer {0}'.format(access_token),
        'Content-Type': 'application/json'
    }
    create_meeting_response = requests.post(create_meeting_url, data=create_meeting_params_json.decode(), headers=create_meeting_headers)
    create_meeting_response_text = json.loads(create_meeting_response.text)
    
    return render(request, 'auth/complete.html')

ミーティング部屋を作成するためにはユーザーIDが必要となるため、API経由で取得する
APIの詳細はList Users - Users - Zoom API - API Referenceを参照
アクセストークンはAuthrization: Bearer XXXXというヘッダをつけて送信する

# ユーザー情報の取得
get_user_headers = {
    'Authorization': 'Bearer {0}'.format(access_token)
}
get_user_response = requests.get(get_user_url, headers=get_user_headers)
get_user_response_text = json.loads(get_user_response.text)
user_info = get_user_response_text['users'][0]

ユーザーIDが取得できたらミーティング部屋を作成する
基本的に「ミーティング名称」「ミーティングタイプ」「開始日」「ミーティング時間」「タイムゾーン」あたりを設定していればよい
他にも様々なパラメータを渡すことができるので詳細は Create a Meeting - Meetings - Zoom API - API Reference を参照
この時、パラメータはdictではなくjsonを利用して渡すことになるため、ヘッダに「Content-Type: application/json」が必要となる(ない場合は300 - Unsupported Content Typeというエラーが返ってくる)

# 部屋の作成
create_meeting_url = 'https://api.zoom.us/v2/users/{0}/meetings'.format(user_info['id'])
create_meeting_params = {
    'topic': 'Sample Meeting',
    'type': 2, # scheduled meeting
    'start_time': '2020-11-02 T 12:00:00',
    'duration': 180,
    'timezone': user_info['timezone'],
}
create_meeting_params_json = json.dumps(create_meeting_params).encode('utf-8')
create_meeting_headers = {
    'Authorization': 'Bearer {0}'.format(access_token),
    'Content-Type': 'application/json'
}
create_meeting_response = requests.post(create_meeting_url, data=create_meeting_params_json.decode(), headers=create_meeting_headers)
create_meeting_response_text = json.loads(create_meeting_response.text)

リクエストが正常に完了したらZoomを立ち上げ、スケジュールに追加されていれば処理完了

参考サイト

PyDriveを使ってGoogle DriveにDjangoからアクセスしてみる

PyDriveを使ってGoogle Driveを操作する必要があったのでメモ
調べてみると素のPythonのコードはたくさんあるんだけどDjangoを利用してのサンプルが見つからなかったので書いてみた。
とりあえずはOAuthの認証とファイル/フォルダの一覧取得まで。

前提

手順

1. GCPコンソールで認証情報を作成する

  • プロジェクト「PyDrive-Django」を作成(プロジェクト名は何でもよい)
  • プロジェクト「PyDrive-Django」の管理画面を開く
  • 画面中央「APIの概要に移動」をクリック(APIとサービスページに移動)
  • 左側メニュー「認証情報」をクリック
  • 画面上部「認証情報を作成」をクリック
    • 「OAuthクライアントID」を選択
  • OAuthクライアントIDの作成画面にて以下入力する
  • 作成後、認証情報画面にリダイレクトされる
    • OAuth2.0クライアントIDの一覧に先程追加した認証情報が表示される
  • 先程作成した認証情報をダウンロードする(画面右端の矢印ボタン)
    • 認証情報はclient_secrets.jsonとリネームしておく

2. Djangoの実装をしていく

デモ用のアプリをDjangoにて作成する

$ python manage.py startapp gdrive

作成したアプリをsettings.pyで有効化する

# <project-app>/settings.py
INSTALLED_APPS = [
    ...,
    # My Applications
    'gdrive.apps.Gdriveconfig',
    ....
]

gdriveアプリ内に認証情報を入れるディレクトリを作成し、client_secrets.jsonを入れておく

$ cd gdrive
$ mkdir certs
$ cd certs
$ mv /path/to/client_secrets.json .

認証方式の設定ファイルsettings.yamlを作成しclient_secrets.jsonと同じ場所に入れておく
詳細な項目は OAuth made easy — PyDrive 1.3.0 documentation を参照

# <project-root>/gdrive/settings.yaml
# 認証情報をどこから読み込むか(settings or file: default)
client_config_backend: file
# client_config_backend: fileの場合に指定するファイルのパス
client_config_file: /path/to/<project-root>/gdrive/certs/client_secrets.json

# 認証情報をファイルに保存するか
save_credentials: True
# 認証情報をどの形式で保存するか(file固定)
save_credentials_backend: file
# 認証情報の保存先ファイルパス(ファイルが存在しない場合は作成される / 2回目以降は更新)
save_credentials_file: /path/to/<project-root>/gdrive/certs/saved_credentials.json

# 認証情報を自動で更新するか?
get_refresh_token: True

Viewを作成する
とりあえずはQuickstart — PyDrive 1.3.0 documentationに記載されているコードを参考に作成する

import os
from django.shortcuts import render, redirect
from django.http import HttpResponse

from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive

def index(request):
    current_dir_path = os.path.dirname(os.path.abspath(__file__))
    settings_file_path = os.path.join(current_dir_path,'certs/settings.yaml')
    # settings.yamlを元にGoogleAuthを生成する
    gauth = GoogleAuth(settings_file=settings_file_path)
    auth_url = gauth.GetAuthUrl()

    # OAuthの認証がない場合はauth_url(認証用URL)に遷移する
    # 認証済みの場合はfile一覧に遷移する
    if 'code' in request.GET:
        code = request.GET.get('code')
        gauth.Auth(code)
        gauth.SaveCredentials('file')
        return redirect('files')
    else:
        return redirect(auth_url)

def files(request):
    current_dir_path = os.path.dirname(os.path.abspath(__file__))
    settings_file_path = os.path.join(current_dir_path,'certs/settings.yaml')
    gauth = GoogleAuth(settings_file=settings_file_path)

    # Google Driveのroot直下のファイル, フォルダ一覧を取得
    drive = GoogleDrive(gauth)
    file_list = drive.ListFile({'q': "'root' in parents and trashed=false"}).GetList()
    files = []
    for file_item in file_list:
        file = {}
        file['id'] = file_item['id']
        file['title'] = file_item['title']
        file['mimeType'] = file_item['mimeType']
        files.append(file.copy())

    return render(request, 'gdrive/files.html', {
        'files': files
    })

Viewに対応するルーティングを設定する

$ touch <project-root>/gdrive/urls.py
# <project-root>/gdrive/urls.py
from django.urls import path

from . import views

urlpatterns = [
    path('', views.index, name='index'),
    path('files/', views.files, name='files'),
]

# <project-root>/<project-app>/urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    ...
    path('gdrive/', include('gdrive.urls'))
]

ビルトインサーバを起動しブラウザでhttp://localhost:8000/gdriveにアクセスする
OAuthの認証ページが表示され、許可をするとファイル一覧が表示されればOK 認証さえできればあとはファイル操作はdocumentみればなんとかなりそう。

参考サイト

Django Channlesで投票アプリ(簡易版)を作る

先日やったDjango Channelsチュートリアルを応用して投票アプリを作成してみる。
WebSocketの練習にもぴったりですね。

前提

  • Django: 2.2.8
  • Channels: 2.4.0

手順

投票用アプリの作成

次のコマンドを叩いて投票用アプリを作成する

$ python manage startapp vote

テンプレートを作成する

問題作成用HTML, 回答用HTMLを作る

$ mkdir <project-root>/vote/templates/question
$ touch <project-root>/vote/templates/question/question.html
$ mkdir <project-root>/vote/templates/answer
$ touch <project-root>/vote/templates/answer/answer.html
<!-- question.html -->
<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Create Question</title>
    <style>
        lavel {
            width: 100%;
            display: flex;
        }
    </style>
</head>
<body>

    <h1>Create Question [ Room Name: {{ room_name }} ]</h1>
    <label>
        <span>Question: </span>
        <input type="text" id="vote-question-input" size="100">
    </label>
    <input type="button" id="vote-question-send" value="Send">

    <div>
        回答状況: <span id="vote-results"></span>
    </div>

    <script>
        const roomName = '{{ room_name }}'
        // WebSocketの定義
        const questionSocket = new WebSocket('ws://' + window.location.host + '/ws/vote/' + roomName + '/')
        let answers = []

        // サーバからのメッセージ受信時処理
        questionSocket.onmessage = (e) => {
            const data = JSON.parse(e.data)

            if(data.type !== 'send_answer') return;

            // let resultsData = document.getElementById('vote-resutls').innerHTML || ''
            // resultsData += data.answer + ','
            // document.getElementById('vote-results').innerHTML = resultsData
            answers.push(data.answer)
            document.getElementById('vote-results').innerHTML = answers.join(',')
        }

        // WebSocket終了時処理
        questionSocket.onclose = (e) => {
            console.error('Question socket closed.')
        }

        // 問題送信処理
        document.getElementById('vote-question-send').onclick = (e) => {
            const questionInputDom = document.getElementById('vote-question-input')
            const question = questionInputDom.value;

            if (question === '') return;

            let questionResult = {}
            questionResult.question = question

            questionSocket.send(JSON.stringify({
                'question': question
            }))
            questionInputDom.value = ''
        }
    </script>
</body>
</html>

<!-- answer.html -->
<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Vote Answer</title>
    <style>
        lavel {
            width: 100%;
            display: flex;
        }
        #vote-question-answers {
            display: none;
        }
        #vote-question-answers.show {
            display: block;
        }
    </style>
</head>
<body>

    <h1>Create Question [ Room Name: {{ room_name }} ]</h1>

    <label>
        <span>Question: </span>
        <div id="vote-question-text">投票前です</div>
    </label>
    <div id="vote-question-answers">
        <!-- 今回はデモなので選択肢の文言を固定とする -->
        <label><input type="radio" name="vote-answer" value="大分県"> 大分県</label>
        <label><input type="radio" name="vote-answer" value="大分県以外"> 大分県以外</label>
        <input type="button" id="vote-question-send" value="Send">
    </div>

    <script>
        const roomName = '{{ room_name }}'
        // WebSocketの定義
        const answerSocket = new WebSocket('ws://' + window.location.host + '/ws/vote/' + roomName + '/')

        // サーバからのメッセージ受信時処理
        answerSocket.onmessage = (e) => {
            const data = JSON.parse(e.data);

            if(data.type !== 'send_question') return;

            document.getElementById('vote-question-text').innerHTML = data.question
            document.getElementById('vote-question-answers').classList.toggle('show')
        }

        // WebSocket終了時処理
        answerSocket.onclose = (e) => {
            console.error('Question socket closed.')
        }

        // 問題送信処理
        document.getElementById('vote-question-send').onclick = (e) => {
            const answerInputDoms = document.querySelectorAll('[name=vote-answer]')
            const answers = {}

            for(let i = 0; i < answerInputDoms.length; i++) {
                if(!answerInputDoms[i].checked) continue

                answers.answer = answerInputDoms[i].value
            }

            if (Object.keys(answers).length == 0) return;

            answerSocket.send(JSON.stringify(answers))
            document.getElementById('vote-question-text').innerHTML = '回答しました'
            document.getElementById('vote-question-answers').classList.toggle('show')
        }
    </script>
</body>
</html>

Views, Rouinの設定を行う

question.html, answer.htmlを利用できるよう viewsの設定を行う

# <project-root>/vote/views.py
from django.shortcuts import render

def index(request):
    pass

def question(request, room_name):
    return render(request, 'question/question.html', {
        'room_name': room_name
    })

def answer(request, room_name):
    return render(request, 'answer/answer.html', {
        'room_name': room_name
    })

viewに対する routingの設定を行う

# <project-root>/<project-app>/urls.py
urlpatterns = [
    ...
    path('vote/', include('vote.urls')),
]

# <project-root>/vote/urls.py
from django.urls import path

from . import views

urlpatterns = [
    path('question/<str:room_name>/', views.question, name='question'),
    path('answer/<str:room_name>/', views.answer, name='answer'),
]

Websocketの処理を作成する

WebSocketを利用するために consumerを作成する
複数のグループに属する場合のうまい書き方がわからなくてself.channel_layer.group_add()を複数回書いてるけどこんなんでいいのかな…

import json
from channels.generic.websocket import AsyncWebsocketConsumer

class VoteConsumer(AsyncWebsocketConsumer):
    # 接続時処理
    async def connect(self):
        # チャンネル名とグループ名を定義
        self.room_group_name_question = 'question_%s' % self.scope['url_route']['kwargs']['room_name']
        self.room_group_name_answer = 'answer_%s' % self.scope['url_route']['kwargs']['room_name']

        # グループに追加
        await self.channel_layer.group_add(
            self.room_group_name_question,
            self.channel_name
        )
        await self.channel_layer.group_add(
            self.room_group_name_answer,
            self.channel_name
        )
        await self.accept()

    # 切断時処理
    async def disconnect(self, close_code):
        # グループから削除
        await self.channel_layer.group_discard(
            self.room_group_name_question,
            self.channel_name
        )
        await self.channel_layer.group_discard(
            self.room_group_name_answer,
            self.channel_name
        )
        await self.close()
    
    # メッセージ受信時処理
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)

        if 'question' in text_data_json.keys():
            await self.channel_layer.group_send(
                self.room_group_name_answer,
                {
                    'type': 'send_question',
                    'question': text_data_json['question']
                }
            )

        if 'answer' in text_data_json.keys():
            await self.channel_layer.group_send(
                self.room_group_name_question,
                {
                    'type': 'send_answer',
                    'answer': text_data_json['answer']
                }
            )

    # 質問の送信
    async def send_question(self, event):
        question = event['question']

        await self.send(text_data=json.dumps({
            'type': 'send_question',
            'question': question
        }))

    # 回答の送信
    async def send_answer(self, event):
        answer = event['answer']

        await self.send(text_data=json.dumps({
            'type': 'send_answer',
            'answer': answer
        }))

WebSocket用のルーティングを設定する

前述のConsumerに対する routingを設定する
これを設定することでWebSocketのルーティングが有効化される

# <project-root>/<project-app>/asgi.py
...
import django.urls import re_path
import vote.consumers

application = ProtocolTypeRouter({
    "http": AsgiHandler,
    "websocket": AuthMiddlewareStack(
        URLRouter([
            ...,
            re_path(r'ws/vote/(?P<room_name>\w+)/$', vote.consumers.VoteConsumer),
        ])
    ),
})

動作確認

ここまでできたらサーバを起動する(Channels用のRedisも起動しておく)

$ redis-server &
$ python manage.py runserver 0.0.0.0:8000

ブラウザでhttp://localhost:8000/vote/question/lobby, http://localhost:8000/vote/answer/lobbyにアクセスしてみる。

/vote/question/lobbyで質問を入力し送信すると、/vote/answer/lobby側で質問が反映される。質問に回答すると/vote/question/lobby側に回答が追加されるのがわかる。

f:id:Satoh_D:20201028151112g:plain

今回は回答を固定したり、回答者の情報は送信しないなど本当に必要最低限の機能しか実装していないが、あとは作り込みの問題なので割愛とする

Django ChannelsがあるとWebSocketの利用が楽になっていいですね。

Django と Channelsでチャット作成のチュートリアルをやってみる(Tutorial Part 4: Automated Testing)

Channelsのチュートリアルシリーズ最後!
今回は前章までで作成したもののテストコードを書く

前章までの記事は下記

satoh-d.hatenablog.com

https://satoh-d.hatenablog.com/entry/2020/10/26/180000satoh-d.hatenablog.com

https://satoh-d.hatenablog.com/entry/2020/10/27/180000satoh-d.hatenablog.com

前提

  • Django: 2.2.8(本当はDjang 3.0+がいいけどローカルがこれだったので。。)
  • Channels: 2.4.0
  • Redis-server: 5.0.9
  • Google Chrome: 86.0.4240.111(Headless Chromeで利用)
  • ChromeDriver: 86.0.4240.22
  • Selenium: 3.141.0

Chrome, ChromeDriver, Seleniumのインストール

それぞれのインストール方法は次の通り(今回はDockerコンテナ上で実施)

# Google Chroome
$ wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add
$ echo 'deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main' | tee /etc/apt/sources.list.d/google-chrome.list
$ apt-get update
$ apt-get install -y google-chrome-stable
$ google-chrome --version
Google Chrome 86.0.4240.111 

# Chrome Driver
$ curl -OL https://chromedriver.storage.googleapis.com/76.0.3809.126/chromedriver_linux64.zip
$ unzip chromedriver_linux64.zip chromedriver
$ mv chromedriver /usr/bin/chromedriver

# Selenium
$ pip install selenium

テストコードは下記の通り
Headless Chromeを定義する際にOptionを指定しないとテスト実施時にエラーとなるため注意すること

# chat/tests.py
from channels.testing import ChannelsLiveServerTestCase
from selenium import webdriver
# Tutorialに載ってないけどOptionsを定義しないとテスト実施時にエラーとなる
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.wait import WebDriverWait

class ChatTests(ChannelsLiveServerTestCase):
    serve_static = True  # emulate StaticLiveServerTestCase

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        # optionsはTutorialに載ってないけど定義しないとテスト実施時にエラーとなる
        options = Options()
        options.add_argument('--no-sandbox')
        options.add_argument('--headless')
        try:
            # NOTE: Requires "chromedriver" binary to be installed in $PATH
            cls.driver = webdriver.Chrome(options=options)
        except:
            super().tearDownClass()
            raise

    @classmethod
    def tearDownClass(cls):
        cls.driver.quit()
        super().tearDownClass()

    def test_when_chat_message_posted_then_seen_by_everyone_in_same_room(self):
        try:
            self._enter_chat_room('room_1')

            self._open_new_window()
            self._enter_chat_room('room_1')

            self._switch_to_window(0)
            self._post_message('hello')
            WebDriverWait(self.driver, 2).until(lambda _:
                'hello' in self._chat_log_value,
                'Message was not received by window 1 from window 1')
            self._switch_to_window(1)
            WebDriverWait(self.driver, 2).until(lambda _:
                'hello' in self._chat_log_value,
                'Message was not received by window 2 from window 1')
        finally:
            self._close_all_new_windows()

    def test_when_chat_message_posted_then_not_seen_by_anyone_in_different_room(self):
        try:
            self._enter_chat_room('room_1')

            self._open_new_window()
            self._enter_chat_room('room_2')

            self._switch_to_window(0)
            self._post_message('hello')
            WebDriverWait(self.driver, 2).until(lambda _:
                'hello' in self._chat_log_value,
                'Message was not received by window 1 from window 1')

            self._switch_to_window(1)
            self._post_message('world')
            WebDriverWait(self.driver, 2).until(lambda _:
                'world' in self._chat_log_value,
                'Message was not received by window 2 from window 2')
            self.assertTrue('hello' not in self._chat_log_value,
                'Message was improperly received by window 2 from window 1')
        finally:
            self._close_all_new_windows()

    # === Utility ===

    def _enter_chat_room(self, room_name):
        self.driver.get(self.live_server_url + '/chat/')
        ActionChains(self.driver).send_keys(room_name + '\n').perform()
        WebDriverWait(self.driver, 2).until(lambda _:
            room_name in self.driver.current_url)

    def _open_new_window(self):
        self.driver.execute_script('window.open("about:blank", "_blank");')
        self.driver.switch_to_window(self.driver.window_handles[-1])

    def _close_all_new_windows(self):
        while len(self.driver.window_handles) > 1:
            self.driver.switch_to_window(self.driver.window_handles[-1])
            self.driver.execute_script('window.close();')
        if len(self.driver.window_handles) == 1:
            self.driver.switch_to_window(self.driver.window_handles[0])

    def _switch_to_window(self, window_index):
        self.driver.switch_to_window(self.driver.window_handles[window_index])

    def _post_message(self, message):
        ActionChains(self.driver).send_keys(message + '\n').perform()

    @property
    def _chat_log_value(self):
        return self.driver.find_element_by_css_selector('#chat-log').get_property('value')

ChannelsLiveServerTestCaseDjangoのE2Eテストツール(StaticLiveServerTestCaseLiveServerTestCase)を拡張したもの
Channlesで定義した内部のルーティング(/ws/room/ROOM_NAMEのような)をいい感じにテストしてくれるとか

テストコード実行時にテスト用DBを自動で作成するため、権限を付けてあげる
今回は(MySQL, DB名: test_django(デフォルト))

mysql> grant all privileges on test_django.* to 'user'@'%';

実際にテストを動かしてみる
「OK」とでればテストが通ったことになる

$ python manage.py test chat.tests
Creating test database for alias 'default'...
System check identified no issues (0 silenced).
..
----------------------------------------------------------------------
Ran 2 tests in 2.818s

OK
Destroying test database for alias 'default'...

最後に

Tutorialと環境が違う(そもそもDjangoのバージョンがちがったり色々...)ことでハマってしまうところが多々あった。
とはいえなんとか終わらせることができてよかった。
もうすこしWebSocketでできることをしらべないといけないかなーと...。

参考サイト

Django と Channelsでチャット作成のチュートリアルをやってみる(Tutorial Part 3: Rewrite Chat Server as Asynchronous)

前々回前回の続き

今回は前章までで作成したチャットサーバを非同期処理で書き直す
非同期処理にするとリクエストごとにスレッドが追加されないから同期処理に比べてパフォーマンスが高いとのこと

前章までで作成したChatConsumerを非同期で書き直すと以下の通り

# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    # 接続時処理
    async def connect(self):
        # チャンネル名とグループ名を定義
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = 'chat_%s' % self.room_name

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    # 切断時処理
    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    # メッセージ受信時処理
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )
    
    # メッセージの受信→送信
    async def chat_message(self, event):
        message = event['message']

        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))

非同期処理に書き換えた場合に変わる点は以下の通り

  • WebsocketConsumerではなくAsyncWebsocketConsumerを継承する
  • 全てのメソッドがdefではなくasync defに変更となる
  • I/Oが発生する処理は全てawaitを付与する
  • async_to_syncを利用しなくなる

参考サイト

Django と Channelsでチャット作成のチュートリアルをやってみる(Tutorial Part 2: Implement a Chat Server)

前回の続きでDjangoとChannelsについて頑張る

satoh-d.hatenablog.com

今回はいよいよChannelsを利用してチャットのデモを作るところ
チュートリアルと環境が違うこと、Redis用に新たにdockerコンテナを作るところがあったけど敢えてコンテナを作らず直接インストールしようとしたらすんごいハマってしまった。
でもうまく行ったからよしとしよう

前提

  • Django: 2.2.8(本当はDjang 3.0+がいいけどローカルがこれだったので。。)
  • Channels: 2.4.0
  • Redis-server: 5.0.9

3. tutorial

3-1. Channlesのセットアップ

channles用アプリを作る

$ python manage.py startapp chat

今回はchat/views.py, chhat/__init__.pyしか使わないので他のファイルは削除する <project-app>/settings.pyに先程追加したchatを有効化する処理を追記する

INSTALLED_APPS = [
    ...
    # My applications
    'chat',
    ...
]

chat/templates/chat/index.htmlを作成する

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>Chat Rooms</title>
</head>
<body>
    What chat room would you like to enter?<br>
    <input id="room-name-input" type="text" size="100"><br>
    <input id="room-name-submit" type="button" value="Enter">

    <script>
        document.querySelector('#room-name-input').focus();
        document.querySelector('#room-name-input').onkeyup = function(e) {
            if (e.keyCode === 13) {  // enter, return
                document.querySelector('#room-name-submit').click();
            }
        };

        document.querySelector('#room-name-submit').onclick = function(e) {
            var roomName = document.querySelector('#room-name-input').value;
            window.location.pathname = '/chat/' + roomName + '/';
        };
    </script>
</body>
</html>

chat/views.pyに以下を追記する

def index(request):
    # chat/templates/chat/index.html をビューとして返す
    return render(request, 'chat/index.html')

chat/urls.py, <project-app>/urls.pyにルーティングを追加する

# chat/urls.py
from django.urls import path

from . import views

urlpatterns = [
    path('', views.index, name='index')
]
# <project-app>/urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    ...
    path('chat/', include('chat.urls')),
]

3-2. チャットサーバーの実装(Implement a Chat Server)

チャット部屋用のviewを追加する

$ touch <project-app>/chat/templates/chat/room.html
<!-- chat/templates/chat/room.html -->
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>Chat Room</title>
</head>
<body>
    <textarea id="chat-log" cols="100" rows="20"></textarea><br>
    <input id="chat-message-input" type="text" size="100"><br>
    <input id="chat-message-submit" type="button" value="Send">
    {{ room_name|json_script:"room-name" }}
    <script>
        const roomName = JSON.parse(document.getElementById('room-name').textContent);

        const chatSocket = new WebSocket(
            'ws://'
            + window.location.host
            + '/ws/chat/'
            + roomName
            + '/'
        );

        chatSocket.onmessage = function(e) {
            const data = JSON.parse(e.data);
            document.querySelector('#chat-log').value += (data.message + '\n');
        };

        chatSocket.onclose = function(e) {
            console.error('Chat socket closed unexpectedly');
        };

        document.querySelector('#chat-message-input').focus();
        document.querySelector('#chat-message-input').onkeyup = function(e) {
            if (e.keyCode === 13) {  // enter, return
                document.querySelector('#chat-message-submit').click();
            }
        };

        document.querySelector('#chat-message-submit').onclick = function(e) {
            const messageInputDom = document.querySelector('#chat-message-input');
            const message = messageInputDom.value;
            chatSocket.send(JSON.stringify({
                'message': message
            }));
            messageInputDom.value = '';
        };
    </script>
</body>
</html>

追加したテンプレートに対応するview, ルーティングを<project-root>/chat/views.py, urls.pyに追記する

def room(request, room_name):
    return render(request, 'chat/room.html', {
        'room_name': room_name
    })
urlpatterns = [
    path('', views.index, name='index'),
    path('<str:room_name>/', views.room, name='room'), # 追記
]

http://localhost:8000/chat/lobbyにアクセスしてみるとチャット用画面が表示される
試しにテキストボックスにHelloと入力しsendを押してみるとWebSocketのエラーが発生する
これはWebSocketのルーティングが定義されていないためである

Websocketの処理を記述するために<project-root>/chat/consumers.pyを作成する

$ touch <project-root>/chat/consumers.py
# chat/consumers.py
import json
from channels.generic.websocket import WebsocketConsumer

class ChatConsumer(WebsocketConsumer):
    # 接続時処理
    def connect(self):
        self.accept()

    # 切断時処理
    def disconnect(self, close_code):
        pass

    # メッセージ受信時処理
    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        self.send(text_data=json.dumps({
            'message': message
        }))

consumers.py用に新規でルーティングを<project-root>/chat/routing.pyに記載する

$ touch <project-root>/chat/routing.py
# chat/routing.py
from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

# Django 2.2以下では次のように記載する
# `ChatConsumer` has no attribute `as_asgi()`とエラーが出るため
websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer),
]

上記のas_asgi()はユーザの接続ごとにconsumerのインスタンスを生成するASGIアプリケーションである
Djangoas_view()のようにユーザの接続ごとにviewを生成する動きにと同じようなもの

次に<project-root>/<project-app>/asgi.pyを以下のように編集する

# mysite/asgi.py
import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import chat.routing

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")

application = ProtocolTypeRouter({
  "http": get_asgi_application(),
  "websocket": AuthMiddlewareStack(
        URLRouter(
            chat.routing.websocket_urlpatterns
        )
    ),
})

# Django 2.2以下は以下のようなコードになる(get_asgi_applicationあたりが変わっている)
import os

import django
from channels.auth import AuthMiddlewareStack
from channels.http import AsgiHandler
from channels.routing import ProtocolTypeRouter, URLRouter
import chat.routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
django.setup()

application = ProtocolTypeRouter({
    "http": AsgiHandler,
    "websocket": AuthMiddlewareStack(
        URLRouter(
            chat.routing.websocket_urlpatterns
        )
    ),
})

AuthMiddlewareStackWebSocketの接続情報を定義することでWebSocketの接続プロトコルws://, wss://)で接続できるようになる

改めてhttp://localhost:8000/chat/lobbyにアクセスする
テキストボックスにHelloと入力しsendを押すとエラーなく上のテキストエリアにHellloと表示される

しかしタブを複数開きそれぞれでテキストボックスに文字を入力しても、別のタブには反映されない
これはChatConsumerがそれぞれの接続ごとにインスタンス化されているが、お互いに会話はできていない状態となっている
Channelsにはchannel layerというレイヤーが提供されており、これを利用することでChatConsumer同士で会話ができるようになる

チャンネルレイヤーとは

各コンシューマインスタンス同士やDjangoの他のパーツとの会話ができるようになるシステム
チャンネルレイヤーは以下を提供している

  • channel: メールボックス的なもの。各チャンネルには名前があり、その名前があれば(所属していれば?)誰でもメッセージを送ることができる
  • group: チャンネルの関連的なもの。各グループには名前があり、その名前があれば(所属していれば?)グループの追加/削除に、グループに所属している全てのチャンネルにメッセージを送ることができる

チャンネルレイヤーを利用するにはRedisが動いている必要がある
Redisをインストールする(Tutorialではdockerを起動しているが、PythonのDockerに手動でインストール → 起動してみる)

$ cd /code
# redis-serverのインストール
$ wget http://download.redis.io/releases/redis-5.0.9.tar.gz
$ tar xzf redis-5.0.9.tar.gz
$ cd redis-5.0.9
$ make install
# redis-serverの起動(続けて操作するために&をつけることでBG起動)
$ /usr/loca/bin/redis-server &
# ダウンロードしたファイルの削除
$ cd ..
$ rm redis-5.0.9.tar.gz
$ rm -r redis-5.0.9

RedisPythonから利用するためのパッケージをインストールしておく

$ pip install channels_redis

チャンネルレイヤーを使うための設定を<project-root>/<project-app>/settings.pyに追記する

# mysite/settings.py
# Channels
ASGI_APPLICATION = 'mysite.routing.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

チャンネルレイヤーがRedisの元で動いているか確認をするためにshellで実行してみる

$ cd <project-root>
$ python manage.py shell

# チャンネルレイヤーをimport
>>> import channels.layers
>>> channel_layer = channels.layers.get_channel_layer()
>>> from asgiref.sync import async_to_sync
# test_channelに`hello`とメッセージを送信
>>> async_to_sync(channel_layer.send)('test_channel', {'type': 'hello'})
# test_channelに送信されたメッセージを受信
>>> async_to_sync(channel_layer.receive)('test_channel')
# helloが取得できれば成功
{'type': 'hello'}

これでチャンネルレイヤーを使う準備ができたので、ChatCoonsumerを以下の通り修正する

# chat/consumers.py
import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer

class ChatConsumer(WebsocketConsumer):
    # 接続時処理
    def connect(self):
        # チャンネル名とグループ名を定義
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = 'chat_%s' % self.room_name

        # Join room group
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name,
            self.channel_name
        )

        self.accept()

    # 切断時処理
    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)(
            self.room_group_name,
            self.channel_name
        )

    # メッセージ受信時処理
    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )
    
    # メッセージの受信→送信
    def chat_message(self, event):
        message = event['message']

        # Send message to WebSocket
        self.send(text_data=json.dumps({
            'message': message
        }))
  • ユーザがPOSTしたメッセージはJavaScriptにてWebSocketを通じてChatConsumerに届く
  • ChatConsumerはメッセージに対応するグループに属するChatConsumerに転送する
    • グループ名はURLの部屋名称がChatConsumer.connectにて設定されている
  • 同じグループに属するChatConsumerは受け取ったメッセージをWebSocketを通じてJavaScriptに転送する(chatsocket.onmessage部分)

ソースコードの修正が完了したら http://localhost:8000/chat/lobby を2窓開き、それぞれでメッセージを送信してみる
お互いの窓でメッセージが表示されれば成功

参考サイト

Django と Channelsでチャット作成のチュートリアルをやってみる(Tutorial Part 1: Basic Setup)

案件でチャットを作ることになりそうなので勉強中のDjangoでどこまでできるか確認メモ
調べてみるとChannelsを使えばできそうというということがわかったのでチュートリアルをやってみる

とりあえず今回は Tutorial Part 1: Basic Setup — Channels 2.4.0 documentation までやってみた

channels.readthedocs.io

前提

  • Django: 2.2.8(本当はDjang 3.0+がいいけどローカルがこれだったので。。)
  • Channels: 2.4.0

1. Django Channelsとは

Channels wraps Django’s native asynchronous view support, allowing Django projects to handle not only HTTP, but protocols that require long-running connections too - WebSockets, MQTT, chatbots, amateur radio, and more.

DjangoがHTTPだけでなくWebsockeet、MQTT、チャットボット、アマチュア無線など長時間の接続を必要とするプロトコルを扱えるようにする
ASGIがそれを可能にしているそうな

ASGI(the Asynchronous Server Gateway Interface)

Channelsで作られてたアプリケーションをアプリケーションサーバから切り離し、アプリケーションとミドルウェアとの間の標準インタフェース
WSGI拡張らしい

2. インストール

pipを利用してインストールを行う

$ pip install channels
$ pip list | grep channels
channels 2.4.0

<project-app>/settings.pyを編集しchannelsを有効化する

INSTALLED_APPS = [
    ...
    # 3rd party apps
    'channels',
    ...
]

<project-app>/asgi.pyを新規作成する
※ 今回は config/asgi.py とする

import os

from channels.routing import ProtocolTypeRouter
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    # Just HTTP for now. (We can add other protocols later.)
})

# Django 2.2はASGIをサポートしていないので代替手段となる以下コードを記載する
import os

import django
from channels.http import AsgiHandler
from channels.routing import ProtocolTypeRouter

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
django.setup()

application = ProtocolTypeRouter({
  # Tutorialは "http": AsgiHandler() となっているが AsgiHandler でないと動かない
  "http": AsgiHandler,
  # Just HTTP for now. (We can add other protocols later.)
})

<projcect-app>/settings.pyASGI_APPLICATIONの設定を追記する

...
# Channles and ASGI Settings
ASGI_APPLICATION = 'config.asgi.application'
...

3. tutorial

3-1. Channlesのセットアップ

channles用アプリを作る

$ python manage.py startapp chat

今回はchat/views.py, chhat/__init__.pyしか使わないので他のファイルは削除する <project-app>/settings.pyに先程追加したchatを有効化する処理を追記する

INSTALLED_APPS = [
    ...
    # My applications
    'chat',
    ...
]

chat/templates/chat/index.htmlを作成する

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>Chat Rooms</title>
</head>
<body>
    What chat room would you like to enter?<br>
    <input id="room-name-input" type="text" size="100"><br>
    <input id="room-name-submit" type="button" value="Enter">

    <script>
        document.querySelector('#room-name-input').focus();
        document.querySelector('#room-name-input').onkeyup = function(e) {
            if (e.keyCode === 13) {  // enter, return
                document.querySelector('#room-name-submit').click();
            }
        };

        document.querySelector('#room-name-submit').onclick = function(e) {
            var roomName = document.querySelector('#room-name-input').value;
            window.location.pathname = '/chat/' + roomName + '/';
        };
    </script>
</body>
</html>

chat/views.pyに以下を追記する

def index(request):
    # chat/templates/chat/index.html をビューとして返す
    return render(request, 'chat/index.html')

chat/urls.py, <project-app>/urls.pyにルーティングを追加する

# chat/urls.py
from django.urls import path

from . import views

urlpatterns = [
    path('', views.index, name='index')
]
# <project-app>/urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    ...
    path('chat/', include('chat.urls')),
]

参考サイト