В последнее время веб-проект компании должен использовать отправку сообщений в режиме реального времени, которая может своевременно передавать важные сведения ответственному лицу. Сначала я подумал об использовании более зрелого решения WS. Но после анализа спроса я думаю, что в этом сценарии использование SSE более целесообразно.
Основные причины заключаются в следующем:
Среди них самый главный: наше требование — односторонняя рассылка сообщений.
В настоящее время бизнес-код реализуется студентами JAVA, но я мог бы также использовать для его реализации Python, а вдруг он понадобится в будущем?
pip install "fastapi[all]"
pip install sse-starlette
Полная реализация кода:
# -*- coding: utf-8 -*-
import random
import asyncio
import uvicorn
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
# Междоменная настройка, поскольку тест требует внешнего доступа, доступ разрешен всем доменам.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get('/stream')
async def stream(request: Request):
def new_count():
return random.randint(1, 100)
async def event_generator():
index = 0
while True:
index += 1
if await request.is_disconnected():
break
# Тест берет случайные данные, каждый раз принимая случайное число.
if count := new_count():
yield {'data': count}
await asyncio.sleep(1)
return EventSourceResponse(event_generator())
if __name__ == '__main__':
uvicorn.run('main:app', reload=True)
Далее запустите службу и получите к ней доступ в браузере:
http://127.0.0.1:8000/stream
Как можно заметить, случайные числа выводятся с интервалом в одну секунду.
Теперь, когда мы включили скачок, давайте попробуем подключить этот интерфейс к службе веб-интерфейса.
По сравнению с fastapi, flask более минималистичный.
pip install flask
# -*- coding: utf-8 -*-
import random
import time
from flask import Flask, Response
app = Flask(__name__)
@app.route('/stream')
def stream():
def new_count():
return random.randint(1, 100)
def eventStream():
while True:
if count := new_count():
yield 'id: %d\n' % id + 'data: %s\n\n' % count
time.sleep(1)
return Response(eventStream(), mimetype="text/event-stream")
if __name__ == '__main__':
app.run(debug=True, port=8888)
Чтобы непосредственно ощутить прелесть SSE, введите следующий код в консоли браузера для подключения к серверу.
const source = new EventSource("http://127.0.0.1:8000/stream")
source.addEventListener('message', function (event) {
console.log(event.data)
}, false);
Выше приведен основной базовый код Python для реализации sse, но в реальных проектах требуется ряд операций, таких как работа с базами данных, включая рассылку сообщений назначенным получателям и т. д. В этой статье основное внимание уделяется бизнес-сценариям и выбору подходящих технологий для достижения требований.
end.
ссылка: