Обученная модель машинного обучения сама по себе пользу бизнесу не принесет. Модель должна быть интегрирована в IT инфраструктуру компании. Рассмотрим реализацию REST API микросервиса на примере задачи классификации цветов Ирисов. Набор данных состоит из длины и ширины двух типов лепестков Ириса: sepal и petal. Целевая переменная — это сорт Ириса: 0 — Setosa, 1 — Versicolor, 2 — Virginica.
Прежде чем переходить к реализации нашего API надо обучить и сохранить модель. Возьмем модель RandomForestClassifier. Теперь сохраним модель в файл и загрузим, чтобы делать прогнозы. Это можно сделать с помощью pickle или joblib. Рассмотрим pickle, вариант с joblib останется для самостоятельного разбора.
12 | import pickle filename = 'model.pkl' pickle.dump(clf, open(filename, 'wb')) |
Для загрузки и проверки модели воспользуемся pickle.load
12 | loaded_model = pickle.load(open(filename, 'rb')) result = loaded_model.score(X_test, y_test) print(result) |
Код обучения, сохранения и загрузки модели доступен в репозитории — ссылка
Микросервис — веб-сервис, который выполняет одну бизнес-потребность и может взаимодействовать с другими сервисами в IT инфраструктуре использую, например HTTP. Архитектура, состоящая из нескольких микросервисов называется микросервисной.
REST (Representational State Transfer) — принципы организации взаимодействия сервисов посредством протокола HTTP. Клиенты отправляют запросы, используя методы представленные протоколом HTTP, и выполняют операцию. Например: получение, добавление, модификация или удаление данных.
API (Application Programming Interface) — интерфейс для общения сервисов.
Перейдем к практике, что бы стало понятнее. Спроектируем наш сервис. Структура сервиса:
Доступ к API будет осуществляться по-следующему URL — http://[hostname]/iris/api/v1.0/getpred
В URL включено имя приложения и версия API. Имя приложения позволяет идентифицировать сервис. Версия API пригодится если будут новые версии сервиса, но нужно сохранить старый вызов. Это может быть нужно при тестировании или API различаются для разных систем.
Еще создадим http://[hostname]/iris/api/v1.0/status/ для проверки статуса выполнения запроса к сервису и http://[hostname]/iris/api/v1.0/result/ для получения результаты работы модели.
Cоздадим каркас нашего сервиса.
import os
from flask import Flask, jsonify, abort, make_response, request
import requests
import json
import time
import sys
import pandas as pd
app = Flask(__name__)
def launch_task(sepal_length, sepal_width, petal_length, petal_width, api):
print(sepal_length, sepal_width, petal_length, petal_width, api)
if api == 'v1.0':
res_dict = {'Done': 'API exist'}
return res_dict
else:
res_dict = {'error': 'API doesnt exist'}
return res_dict
@app.route('/iris/api/v1.0/getpred', methods=['GET'])
def get_task():
result = launch_task(request.args.get('sepal_length'), request.args.get('sepal_width'), \
request.args.get('petal_length'), request.args.get('petal_width'), 'v1.0')
return make_response(jsonify(result), 200)
if __name__ == '__main__':
app.run(port=5000, debug=True)
Пока модель не используется. Отмечу несколько моментов — функция get_task использует метод GET и на вход получает необходимые для работы модели признаки. Таким образом обращение к нашему сервису выглядит следующим образом: http://[hostname]/iris/api/v1.0/getpred?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2
Функция launch_task будет уже работать с моделью, а пока просто проверяет доступность версии API и выводит в консоль переданные в неё параметры.
Убедимся, что всё работает. Запустим в консоли наше приложение:
python rest_api.py
Обратимся в браузере по адресу http://127.0.0.1:5000/iris/api/v1.0/getpred?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2 и видим в браузере и консоли, что всё прекрасно работает.
Каркас готов, будем его дорабатывать.
Обратите внимание, на строку return make_response(jsonify(result), 200). 200 — это код состояния HTTP — «Ок».
В реальности, в работе сервиса могут возникнуть ошибки, поэтому создадим обработку ошибок. Будем обрабатывать две часто встречающихся ошибки: 404 — «Не найдено» и 500 — «внутренняя ошибка сервера».
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({'code': 'PAGE_NOT_FOUND'}), 404)
@app.errorhandler(500)
def server_error(error):
return make_response(jsonify({'code': 'INTERNAL_SERVER_ERROR'}), 500)
Создадим файл model.py для загрузки обученной модели.
import pickle
PATH_TO_MODELS = 'models/'
filename = 'model.pkl'
model = PATH_TO_MODELS + filename
def load_model():
loaded_model = pickle.load(open(model, 'rb'))
return loaded_model
Теперь пора дописать функцию launch_task, что бы возвращались предсказания сделанные моделью.
model = M.load_model()
targets = ['setosa', 'versicolor', 'virginica']
def get_pred(sepal_length, sepal_width, petal_length, petal_width):
all_columns = ['sepal length', 'sepal width', 'petal length', 'petal width']
lst = [sepal_length, sepal_width, petal_length, petal_width]
df = pd.DataFrame([lst], columns = all_columns)
df = df.astype(float)
result = model.predict_proba(df)
predx = ['%.3f' % elem for elem in result[0]]
preds_concat = pd.concat([pd.Series(targets), pd.Series(predx)], axis=1)
preds = pd.DataFrame(data=preds_concat)
preds.columns = ["class", "probability"]
return preds.reset_index(drop=True)
def launch_task(sepal_length, sepal_width, petal_length, petal_width, api):
pred_model = get_pred(sepal_length, sepal_width, petal_length, petal_width)
if api == 'v1.0':
res_dict = {'result': json.loads( pd.DataFrame(pred_model).to_json(orient='records'))}
return res_dict
else:
res_dict = {'error': 'API doesnt exist'}
return res_dict
Добавлена загрузка модели и функция get_pred, которая по полученным значениям признака возвращает датафрейм c именем сорта Ириса и вероятностью принадлежности к классу. В launch_task теперь возвращается сериализованный ответ в формате JSON.
Казалось бы сервис готов. Да это действительно так. Но давайте еще поработаем над ним.
Реализуем логирование в микросервисе. В лог будут фиксироваться ключевые моменты и ошибки при работе микросервис. Логирование реализуется с помощью библиотеки logging.
import logging
logging.basicConfig(filename='logs/logs.log',level=logging.DEBUG)
Далее в необходимых местах сервиса проставьте запись в лог.
logging.debug('Ошибка')
logging.info('Информационное сообщение')
logging.warning('Предупреждение')
Лог выглядит следующим образом
В нашем примере модель отрабатывает быстро. А представьте, что модель работает с изображением, видео или текстом. Выполнение модели потребует чуть большего времени. Например: 3–10 секунд. Это значит клиент дожидается ответа сервиса. Поэтому выполнение нужно сделать асинхронным. То есть сервис не ждёт завершения процесса, а продолжает работу независимо. К сожалению, Flask не поддерживает асинхронную работу, поэтому будем использовать инструмент Python RQ. RQ обозначает Redis Queue, инструмент работает на Redis. Помните — под Windows RQ работать не будет.
Как это будет работать? Клиент обращается в микросервис, сервис фиксирует job_id, модель в фоновом режиме обрабатывает запрос. Узнать о статусе запроса можно по адресу http://[hostname]/iris/api/v1.0/status/, используя job_id. В случае если статус — success, то по адресу http://[hostname]/iris/api/v1.0/result, так же по job_id получить результат выполнения модели.
from rq import Queue, get_current_job
from redis import Redis
redis_conn = Redis(host='app-redis', port=6379)
queue = Queue('rest_api', connection=redis_conn, default_timeout=1200)
Таймаут на выполнение задается с помощью default_timeout. Тут заданы 1200 секунд на выполнение для задач, попадающих в очередь с названием rest_api.
Запустим очередь командой
rq worker rest_api
Для работы микросервиса запустите достаточное количество воркеров для обработки. В случае если воркеров мало, задачи будут вставать в очередь и выполняться по мере освобождения.
Модифицируем нашу функцию get_task для запуска launch_task с использованием очереди.
def get_response(dict, status=200):
return make_response(jsonify(dict), status)
def get_job_response(job_id):
return get_response({'job_id': job_id})
@app.route('/iris/api/v1.0/getpred', methods=['GET'])
def get_task():
job_id = request.args.get('job_id')
job = queue.enqueue('rest_api.launch_task', request.args.get('sepal_length'), request.args.get('sepal_width'), \
request.args.get('petal_length'), request.args.get('petal_width'), 'v1.0', job_id, result_ttl=60 * 60 * 24, \
job_id=job_id)
return get_job_response(job.get_id())
Обратите внимание, что теперь в launch_task передаётся еще один дополнительный параметр — job_id. Параметр result_ttl отвечает за срок хранения результата. Значение передаётся в секундах. В примере срок хранения одни сутки.
Теперь при запуске микросервиса в браузере возвращается job_id.
Теперь реализуем проверку статуса выполнения модели. Возвращаться JSON будет в следующем формате:
def get_process_response(code, process_status, status=200):
return get_response({
'code': code,
'status': process_status
}, status)
@app.route('/iris/api/status/<id>')
def status(id):
job = queue.fetch_job(id)
if (job is None):
return get_process_response('NOT_FOUND', 'error', 404)
if (job.is_failed):
return get_process_response('INTERNAL_SERVER_ERROR', 'error', 500)
if (job.is_finished):
return get_process_response('READY', 'success')
return get_process_response('NOT_READY', 'running', 202)
Используя job_id можно узнать статус выполнения модели. Например: http://[hostname]/iris/api/v1.0/status/[job_id]
Осталось это реализовать получение результата работы модели. В случае, если обработка не завершена, функция возвращает Not ready, 202. Если обработаны возвращает JSON с информацией об имени сорта Ириса и вероятностью принадлежности к классу.
@app.route('/iris/api/result/<id>')
def result(id):
job = queue.fetch_job(id)
if job is None:
return get_process_response('NOT_FOUND', 'error', 404)
if job.is_failed:
return get_process_response('INTERNAL_SERVER_ERROR', 'error', 500)
if job.is_finished:
job_result = copy.deepcopy(job.result)
result = {
'result': job_result['result']
}
return get_response(result)
return get_process_response('NOT_FOUND', 'error', 404)
Таким же образом с помощью job_id получим результат выполнения модели. Например: http://[hostname]/iris/api/v1.0/result/[job_id]
Теперь микросервис готов для интеграции с IT инфраструктурой.
Сгенерируем файл requirements.txt на основе модулей и пакетов, которые вы импортируете в свой проект. Конечно, можно сделать это вручную. Но лучше мы воспользуемся pipreqs для автоматического создания зависимостей Python.
Устанавливаем pipreqs
pip install pipreqs
Запускаем
pipreqs /<your_project_path>/
Готово
К сожалению, придется изменить файл вручную. Добавим туда gunicorn, который еще пригодится. Но об этом позже. scikit_learn тоже добавим для работы модели. Версия scikit_learn должна быть той, на которой модель обучена. Проверить версию можно так:
import sklearn
print('The scikit-learn version is {}.'.format(sklearn.__version__))
Содержание файла выглядит так:
Теперь упакуем весь микросервис в Docker. Docker — это программное обеспечение для автоматизации развёртывания и управления приложениями в средах с поддержкой контейнеризации.
Почему Docker? Основной плюс, быстрое развертывание. Docker создает контейнер для каждого процесса и не загружает ОС. Всё происходит за секунды.
Изоляция и безопасность. При использовании Docker ресурсы изолированы и разделены. Можно не боятся удалять контейнеры, удаление будет полным и чистым. Используются только назначеные ресурсы.
Еще одно преимущество де-факто стандартизация подхода. Почти у всех крупных компаний инфраструктура строится с использованием Docker. Благодаря стандартизации уменьшается количество времени, потраченного на дефекты, и увеличивает количество времени, доступного для разработки функций.
Создаем Dockerfile следующего содержания:
FROM python:3.7-buster
RUN apt-get update -y
WORKDIR /usr/src/app
ENV LANG C.UTF-8
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY . .
Разберем каждую строчку:
Теперь создадим файл docker-compose.yml для определения набора сервисов.
version: '3'
services:
iris:
build: .
image: iris:1.0
container_name: iris
ports:
- 5000:5000
extra_hosts:
- "app-redis:[your IP]"
command: /usr/src/app/start.sh
На что тут стоит обратить внимание: command — запускает файл start.sh О нём чуть позже. extra_hosts добавляет сопоставления имен хостов.Это нужно для работы с Redis. Если хотите протестировать локально, то укажите IP вашего компьютера.
Переходим к файлу start.sh
#!/bin/bash
run_rq() {
rq worker rest_api -u 'redis://app-redis:6379' 2>&1 | tee -a &
}
run_gunicorn() {
gunicorn rest_api:app -b 0.0.0.0:5000 --workers=2 2>&1 | tee -a
}
run_rq
run_gunicorn
В этом скрипте запускаются уже знакомый воркер для очереди и запускаем наш микросервис с помощью Gunicorn. Gunicorn — это WSGI-сервер, созданный для использования в UNIX-системах. Этот сервер относительно быстрый, ресурсоёмкий, легко запускается и работает с широким спектром веб-фреймворков.
Протестируем, запускаем команду для создания контейнера
docker-compose build
Теперь запускаем
docker-compose up
Работает и готово к запуску.
Таким образом запускался Docker, что бы протестировать работоспособность контейнера. В продакшене сделайте сразу так:
docker-compose up -d --build --force-recreate
Флаг -d предназначен для запуска контейнера в фоновом режиме.
В этой заметке получилось рассмотреть большой стэк технологий. Познакомились с микросервисной архитектурой, создали каркас для микросервиса Flask, логируем работу сервиса и познакомились с очередью задач на базе Redis. Отдельно рассмотрели интеграцию решения в ИТ инфрастуктуру с помощью Docker.
Эта заметка не претендует на полноту, но позволяет быстро создавать из этого каркаса микросервисы. Этот каркас подходит как для работы с табличными данными, так и для задач компьютерного зрения.
Дополнительный материал
Для отправки комментария необходимо войти на сайт.