ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

django简单接口请求平台

2022-02-10 17:34:25  阅读:124  来源: 互联网

标签:请求 get app send django 接口 import id channel


背景

消息服务使用websocket发送消息,其他同事测试过程中发送消息不太方便,编写了有页面发送消息的脚本

 

项目结构

 

  • urls.py
  • 路由配置
"""ims_send_msg URL Configuration

The `urlpatterns` list routes URLs to views. For more information please see:
    https://docs.djangoproject.com/en/3.0/topics/http/urls/
Examples:
Function views
    1. Add an import:  from my_app import views
    2. Add a URL to urlpatterns:  path('', views.home, name='home')
Class-based views
    1. Add an import:  from other_app.views import Home
    2. Add a URL to urlpatterns:  path('', Home.as_view(), name='home')
Including another URLconf
    1. Import the include() function: from django.urls import include, path
    2. Add a URL to urlpatterns:  path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path

from send_msg import views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('index/', views.index),
]

 

  • tools.py
  • 一些工具方法
import json
import requests
import re
import time
from websocket import create_connection

import urllib3

urllib3.disable_warnings()

requests.packages.urllib3.disable_warnings()

def get_send_id():
    send_id = 'send_id_' + str(int(round(time.time() * 10000000)))
    return send_id

def get_target_id():
    target_id = 'target_id_' + str(int(round(time.time() * 10000000)))
    return target_id

def get_message():
    message = 'message_' + str(int(round(time.time() * 10000000)))
    return message

def msg_id():
    user_id = str(int(round(time.time() * 10000000)))
    # print(user_id)
    return user_id

# 10位时间戳
def get_timestamp_ten():
    timestamp_ten = str(int(round(time.time())))
    # print(timestamp_ten)
    return timestamp_ten

# 13位时间戳
def get_timestamp_thirteen():
    timestamp_thirteen = str(int(round(time.time() * 1000)))
    # print(timestamp_thirteen)
    return timestamp_thirteen

def send_msg(url, app_id, channel_id, send_id, target_id, message):
    global socket
    url = 'http://{}'.format(url)
    data = {'app_id': app_id,
            'channel_id': channel_id,
            'access_token': '',
            'client': 'pc_browser',
            'package_check': 'package_check',
            'third_party_user_id': send_id,
            'context': 'e30',
            'hide': '0'}
    try:
        r = requests.post(url, data, verify= False)
        if r.json().get('code') == 200:
            print('\nNode: {}'.format(r.json()))
            socket_domain = ''.join(re.findall(r'connect_str\":\"https://(.*?com)', r.text))
            ip = ''.join(re.findall(r'ip=(.*?)&', r.text))
            request_id = r.json()['request_id']
            socket_dict = {'ip': ip,
                           'channelId': channel_id,
                           'appId': app_id,
                           'userId': send_id,
                           'context': 'e30',
                           'version': 'v4',
                           'hide': '0',
                           'client': 'pc_browser',
                           'requestId': request_id,
                           'EIO': '3',
                           'transport': 'websocket',
                           }

            socket_path = ''
            for i in socket_dict:
                socket_path = socket_path + i + '=' + socket_dict[i] + '&'
            socket_path = socket_path[:-1]
            socket = 'wss://' + socket_domain + '/socket.io/?' + socket_path
            print('Socket: {}'.format(socket))

            msg = [
                "message",
                {
                    "userId": send_id,
                    "targetId": target_id,
                    "channelId": channel_id,
                    "msgData": {
                        "type": "text",
                        "text_content": message
                    },
                    "terminal": "pc_browser",
                    "context": {},
                    "version": "v4",
                    "appId": app_id,
                    "msgType": "chat",
                    "msgId": msg_id(),
                    "dataTime": get_timestamp_thirteen()
                }
            ]

            wss = create_connection(socket, verify= False)
            status = wss.getstatus()
            if status == 101:
                wss.send('40/' + channel_id)
                msg_json = json.dumps(msg).replace('["message",','')[1:-1]
                socket_msg = "42/" + channel_id + "," + json.dumps(msg)
                if wss.recv() is not None:
                    wss.send(socket_msg)
                    print('Message: {}\n'.format(msg_json))
                    return msg_json
            else:
                return 'ERROR: {}'.format(wss.recv())
        else:
            return 'ERROR: {}'.format(r.json())
    except Exception as error:
        print('ERROR: {}\n'.format(error))
        return error

def query_msg(url, app_id, channel_id, send_id):
    url = url + ''
    data = {'app_id': app_id,
            'signed_at' : get_timestamp_ten(),
            'channel_id': channel_id,
            'sign': 'vhall',
            'send_id': send_id
            }
    r = requests.post('http://' + url, data, verify= False)
    if r.json().get('code') == 200:
        # print('Query : {}'.format(r.json()))
        return r.text
    else:
        return 'ERROR: {}'.format(r.text)

 

  • views.py
  • 视图层逻辑
import json

from django.shortcuts import render
from send_msg.tools import *
import time

# Create your views here.

def index(request):
    env = request.POST.get('env')
    if env == 'online': env = ''

    if env == 'standby': env = ''

    if env == 'pre': env = ''

    if env == 'test': env = ''

    if env == 'b3': env = ''

    app_id = request.POST.get('app_id')
    channel_id = request.POST.get('channel_id')

    if request.POST.get('send_id'): send_id = request.POST.get('send_id')
    else: send_id = get_send_id()

    if request.POST.get('target_id'): target_id = request.POST.get('target_id')
    else: target_id = get_target_id()

    if request.POST.get('message'): message = request.POST.get('message')
    else: message = get_message()

    if request.POST.get('num'): num = request.POST.get('num')
    else: num = 1

    respoen = {}

    respoen['env'] = env
    respoen['app_id'] = app_id
    respoen['channel_id'] = channel_id
    respoen['send_id'] = send_id
    respoen['target_id'] = target_id
    respoen['message'] = message
    respoen['num'] = num

    for i in range(int(num)):
        try:
            respoen['result'] = send_msg(env, app_id, channel_id, send_id, target_id, message)
            print('Send Request parameters : {}'.format(respoen))
        except Exception as requ_error:
            respoen['result'] = requ_error

        time.sleep(1)

        try:
            respoen['query_result'] = query_msg(env, app_id, channel_id, send_id)
            print('Query Request parameters : {}'.format(respoen))
        except Exception as query_error:
            respoen['query_result'] = query_error

    return render(request, 'index.html', respoen)

 

  • index.html
  • 主页面html代码
{% load static %}
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>ims</title>
    <style>
        *{
            margin:0;
            padding:0;
        }
        .body{
            margin:0 auto;
            width:1650px;
        }
    </style>

    <style>
        .div{
            width: 150px; 
            height: 25px; 
            font-weight:bold; 
            font-size:16px; 
            text-align:center;
        }
    </style>
</head>
<body class="body">
    <form action="/index/" method="post">
        {% csrf_token %}   <!--加入这行 -->
        <div>
            <br><br>
            <label>
                <select style="height: 35px; width: 350px; font-weight:bold; font-size:18px; text-align:left;" name="env">
                    <option value="online">线上: </option>
                    <option value="standby">备用集群 2: </option>
                    <option value="b3">备用集群 3: </option>
                    <option value="pre">预发布: </option>
                    <option value="test">测试: </option>
                </select>
            </label>

            <div>
                <br>
                <label class="div">
                    app_id : 
                    <input type="text" value="" class="div" name="app_id"/>
                </label>
                <br><br>
                
                <label class="div">
                    channel_id : 
                    <input type="text" value="" class="div" name="channel_id"/>
                </label>
                <br><br>

                <label class="div">
                    send_id : 
                    <input type="text" value="" class="div" name="send_id"/>
                </label>
                <br><br>

                <label class="div">
                    target_id : 
                    <input type="text" value="" class="div" name="target_id"/>
                </label>
                <br><br>

                <label class="div">
                    message : 
                    <input type="text" value="" class="div" name="message"/>
                </label>
                <br><br>
                <label class="div">
                    num : 
                    <input type="text" value="1" class="div" name="num"/>
                </label>
                <br><br>
            </div>

            <input type="submit" style="width: 90px; height: 30px" value="提交" />
        </div>
    </form>
    <br>
    <h3>请求参数</h3>
    <label>
        <textarea wrap="soft" type="text" style="width: 1600px; height: 210px; font-size:16px; color: blueviolet;">
            
 环境: {{ env }}
 app_id: {{ app_id }}
 channel_id: {{ channel_id }}
 消息发送者: {{ send_id }}
 消息接收者: {{ target_id }}
 消息体:
{{ result }}
        </textarea>
        <br><br>
        <h3>查询结果</h3>
        <textarea wrap="soft" type="text" style="width: 1600px; height: 210px; font-size:16px; color: blueviolet;">

 环境: {{ env }}
 app_id: {{ app_id }}
 channel_id: {{ channel_id }}
 消息发送者: {{ send_id }}
 查询结果:
{{ query_result }}
        </textarea>
    </label>
</body>
</html>

 

  • run.bat
  • Windows平台启动脚本
if "%1"=="hide" goto CmdBegin
start mshta vbscript:createobject("wscript.shell").run("""%~0"" hide",0)(window.close)&&exit
:CmdBegin
python manage.py runserver 127.0.0.1:8888

  

  • run.sh
  • Linux平台启动脚本
nohup python3 -u manage.py runserver 0.0.0.0:80 > out.log 2>&1 &

  

 

页面

 

标签:请求,get,app,send,django,接口,import,id,channel
来源: https://www.cnblogs.com/Echo-Mikasa/p/15880065.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有