Machine learning в продакшн — Flask REST API


Обученная модель машинного обучения сама по себе пользу бизнесу не принесет. Модель должна быть интегрирована в IT инфраструктуру компании. Рассмотрим реализацию REST API микросервиса на примере задачи классификации цветов Ирисов. Набор данных состоит из длины и ширины двух типов лепестков Ириса: sepal и petal. Целевая переменная — это сорт Ириса: 0 — Setosa, 1 — Versicolor, 2 — Virginica.

Сохранение и загрузка модели

Прежде чем переходить к реализации нашего API надо обучить и сохранить модель. Возьмем модель RandomForestClassifier. Теперь сохраним модель в файл и загрузим, чтобы делать прогнозы. Это можно сделать с помощью pickle или joblib. Рассмотрим pickle, вариант с joblib останется для самостоятельного разбора.

12import pickle filename = 'model.pkl' pickle.dump(clf, open(filename, 'wb'))

Для загрузки и проверки модели воспользуемся pickle.load

12loaded_model = pickle.load(open(filename, 'rb')) result = loaded_model.score(X_test, y_test) print(result)

Код обучения, сохранения и загрузки модели доступен в репозитории — ссылка

Что такое REST API и микросервисы

Микросервис — веб-сервис, который выполняет одну бизнес-потребность и может взаимодействовать с другими сервисами в IT инфраструктуре использую, например HTTP. Архитектура, состоящая из нескольких микросервисов называется микросервисной.

REST (Representational State Transfer) — принципы организации взаимодействия сервисов посредством протокола HTTP. Клиенты отправляют запросы, используя методы представленные протоколом HTTP, и выполняют операцию. Например: получение, добавление, модификация или удаление данных.

API (Application Programming Interface) — интерфейс для общения сервисов.

Проектируем микросервис

Перейдем к практике, что бы стало понятнее. Спроектируем наш сервис. Структура сервиса:

  • rest_api.py — приложение Flask, которое взаимодействует с клиентом и возвращает предсказание модели
  • model.py — файл с функциями загрузки моделей
  • models/ — папка с сохранёнными моделями
  • logs/ — папка с логами

Доступ к API будет осуществляться по-следующему URL — http://[hostname]/iris/api/v1.0/getpred

В URL включено имя приложения и версия API. Имя приложения позволяет идентифицировать сервис. Версия API пригодится если будут новые версии сервиса, но нужно сохранить старый вызов. Это может быть нужно при тестировании или API различаются для разных систем.

Еще создадим http://[hostname]/iris/api/v1.0/status/ для проверки статуса выполнения запроса к сервису и http://[hostname]/iris/api/v1.0/result/ для получения результаты работы модели.

Cоздадим каркас нашего сервиса.

import os

from flask import Flask, jsonify, abort, make_response, request
import requests
import json
import time
import sys
import pandas as pd

app = Flask(__name__)

def launch_task(sepal_length, sepal_width, petal_length, petal_width, api):
    
    print(sepal_length, sepal_width, petal_length, petal_width, api)
    if api == 'v1.0':
        res_dict = {'Done': 'API exist'}
        return res_dict
    else:
        res_dict = {'error': 'API doesnt exist'}
        return res_dict

@app.route('/iris/api/v1.0/getpred', methods=['GET'])
def get_task():
    result = launch_task(request.args.get('sepal_length'), request.args.get('sepal_width'), \
                        request.args.get('petal_length'), request.args.get('petal_width'), 'v1.0')
	
    return make_response(jsonify(result), 200)

if __name__ == '__main__':
    app.run(port=5000, debug=True)

Пока модель не используется. Отмечу несколько моментов — функция get_task использует метод GET и на вход получает необходимые для работы модели признаки. Таким образом обращение к нашему сервису выглядит следующим образом: http://[hostname]/iris/api/v1.0/getpred?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2

Функция launch_task будет уже работать с моделью, а пока просто проверяет доступность версии API и выводит в консоль переданные в неё параметры.

Убедимся, что всё работает. Запустим в консоли наше приложение:

python rest_api.py

Обратимся в браузере по адресу http://127.0.0.1:5000/iris/api/v1.0/getpred?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2 и видим в браузере и консоли, что всё прекрасно работает.

Тестируем наш API
Тестируем наш API
Тестируем наш API в консоли
Тестируем наш API в консоли

Каркас готов, будем его дорабатывать.

Обработка ошибок

Обратите внимание, на строку return make_response(jsonify(result), 200). 200 — это код состояния HTTP — «Ок».

В реальности, в работе сервиса могут возникнуть ошибки, поэтому создадим обработку ошибок. Будем обрабатывать две часто встречающихся ошибки: 404 — «Не найдено» и 500 — «внутренняя ошибка сервера».

@app.errorhandler(404)
def not_found(error):
    return make_response(jsonify({'code': 'PAGE_NOT_FOUND'}), 404)
 
@app.errorhandler(500)
def server_error(error):
    return make_response(jsonify({'code': 'INTERNAL_SERVER_ERROR'}), 500)

Работаем с моделью

Создадим файл model.py для загрузки обученной модели.

import pickle
 
PATH_TO_MODELS = 'models/'
filename = 'model.pkl'
 
model = PATH_TO_MODELS + filename
 
def load_model():
    loaded_model = pickle.load(open(model, 'rb'))
    return loaded_model

Теперь пора дописать функцию launch_task, что бы возвращались предсказания сделанные моделью.

model = M.load_model()
targets = ['setosa', 'versicolor', 'virginica']
 
def get_pred(sepal_length, sepal_width, petal_length, petal_width):
    
    all_columns = ['sepal length', 'sepal width', 'petal length', 'petal width']
    lst = [sepal_length, sepal_width, petal_length, petal_width]
    df = pd.DataFrame([lst], columns = all_columns)
    
    df = df.astype(float)
    result = model.predict_proba(df)
    predx = ['%.3f' % elem for elem in result[0]]
    preds_concat = pd.concat([pd.Series(targets), pd.Series(predx)], axis=1)
    preds = pd.DataFrame(data=preds_concat)
    preds.columns = ["class", "probability"]
    return preds.reset_index(drop=True)
 
def launch_task(sepal_length, sepal_width, petal_length, petal_width, api):
    
    pred_model = get_pred(sepal_length, sepal_width, petal_length, petal_width)
 
    if api == 'v1.0':
        res_dict = {'result':  json.loads( pd.DataFrame(pred_model).to_json(orient='records'))}
        return res_dict
    else:
        res_dict = {'error': 'API doesnt exist'}
        return res_dict

Добавлена загрузка модели и функция get_pred, которая по полученным значениям признака возвращает датафрейм c именем сорта Ириса и вероятностью принадлежности к классу. В launch_task теперь возвращается сериализованный ответ в формате JSON.

Сериализованный ответ в формате JSON
Сериализованный ответ в формате JSON

Казалось бы сервис готов. Да это действительно так. Но давайте еще поработаем над ним.

Логирование

Реализуем логирование в микросервисе. В лог будут фиксироваться ключевые моменты и ошибки при работе микросервис. Логирование реализуется с помощью библиотеки logging.

import logging
logging.basicConfig(filename='logs/logs.log',level=logging.DEBUG)

Далее в необходимых местах сервиса проставьте запись в лог.

logging.debug('Ошибка')
logging.info('Информационное сообщение')
logging.warning('Предупреждение')

Лог выглядит следующим образом

Лог
Лог

Очередь задач на базе Redis

В нашем примере модель отрабатывает быстро. А представьте, что модель работает с изображением, видео или текстом. Выполнение модели потребует чуть большего времени. Например: 3–10 секунд. Это значит клиент дожидается ответа сервиса. Поэтому выполнение нужно сделать асинхронным. То есть сервис не ждёт завершения процесса, а продолжает работу независимо. К сожалению, Flask не поддерживает асинхронную работу, поэтому будем использовать инструмент Python RQ. RQ обозначает Redis Queue, инструмент работает на Redis. Помните — под Windows RQ работать не будет.

Как это будет работать? Клиент обращается в микросервис, сервис фиксирует job_id, модель в фоновом режиме обрабатывает запрос. Узнать о статусе запроса можно по адресу http://[hostname]/iris/api/v1.0/status/, используя job_id. В случае если статус — success, то по адресу http://[hostname]/iris/api/v1.0/result, так же по job_id получить результат выполнения модели.

from rq import Queue, get_current_job
from redis import Redis
redis_conn = Redis(host='app-redis', port=6379)
queue = Queue('rest_api', connection=redis_conn, default_timeout=1200)

Таймаут на выполнение задается с помощью default_timeout. Тут заданы 1200 секунд на выполнение для задач, попадающих в очередь с названием rest_api.

Запустим очередь командой

rq worker rest_api

Для работы микросервиса запустите достаточное количество воркеров для обработки. В случае если воркеров мало, задачи будут вставать в очередь и выполняться по мере освобождения.

Модифицируем нашу функцию get_task для запуска launch_task с использованием очереди.

def get_response(dict, status=200):
    return make_response(jsonify(dict), status)
 
def get_job_response(job_id):
    return get_response({'job_id': job_id})
 
@app.route('/iris/api/v1.0/getpred', methods=['GET'])
def get_task():
 
    job_id = request.args.get('job_id')
    job = queue.enqueue('rest_api.launch_task', request.args.get('sepal_length'), request.args.get('sepal_width'), \
                         request.args.get('petal_length'), request.args.get('petal_width'), 'v1.0', job_id, result_ttl=60 * 60 * 24, \
                        job_id=job_id)
 
    return get_job_response(job.get_id())

Обратите внимание, что теперь в launch_task передаётся еще один дополнительный параметр — job_id. Параметр result_ttl отвечает за срок хранения результата. Значение передаётся в секундах. В примере срок хранения одни сутки.

Теперь при запуске микросервиса в браузере возвращается job_id.

job_id при запуске сервиса
job_id при запуске сервиса

Теперь реализуем проверку статуса выполнения модели. Возвращаться JSON будет в следующем формате:

  • code — код ответа. 404 — OT_FOUND, PAGE_NOT_FOUND. 505 — INTERNAL_SERVER_ERROR. 200 — READY. 202 — NOT_READY
  • status — success/error/running
def get_process_response(code, process_status, status=200):
    return get_response({
        'code': code,
        'status': process_status
    }, status)
 
@app.route('/iris/api/status/<id>')
def status(id):
    job = queue.fetch_job(id)
 
    if (job is None):
        return get_process_response('NOT_FOUND', 'error', 404)
 
    if (job.is_failed):
        return get_process_response('INTERNAL_SERVER_ERROR', 'error', 500)
 
    if (job.is_finished):
        return get_process_response('READY', 'success')
 
    return get_process_response('NOT_READY', 'running', 202)

Используя job_id можно узнать статус выполнения модели. Например: http://[hostname]/iris/api/v1.0/status/[job_id]

Проверка статуса
Проверка статуса

Осталось это реализовать получение результата работы модели. В случае, если обработка не завершена, функция возвращает Not ready, 202. Если обработаны возвращает JSON с информацией об имени сорта Ириса и вероятностью принадлежности к классу.

@app.route('/iris/api/result/<id>')
def result(id):
    job = queue.fetch_job(id)
    
    if job is None:
        return get_process_response('NOT_FOUND', 'error', 404)
 
    if job.is_failed:
        return get_process_response('INTERNAL_SERVER_ERROR', 'error', 500)
 
    if job.is_finished:
        job_result = copy.deepcopy(job.result)
        result = {
            'result': job_result['result']
        }
 
        return get_response(result)
 
    return get_process_response('NOT_FOUND', 'error', 404)

Таким же образом с помощью job_id получим результат выполнения модели. Например: http://[hostname]/iris/api/v1.0/result/[job_id]

Результат работы микросервиса
Результат работы микросервиса

Теперь микросервис готов для интеграции с IT инфраструктурой.

Интеграция с IT инфраструктурой

Сгенерируем файл requirements.txt на основе модулей и пакетов, которые вы импортируете в свой проект. Конечно, можно сделать это вручную. Но лучше мы воспользуемся pipreqs для автоматического создания зависимостей Python.

Устанавливаем pipreqs

pip install pipreqs

Запускаем

pipreqs /<your_project_path>/

Готово

Результат раброты pipreqs
Результат раброты pipreqs

К сожалению, придется изменить файл вручную. Добавим туда gunicorn, который еще пригодится. Но об этом позже. scikit_learn тоже добавим для работы модели. Версия scikit_learn должна быть той, на которой модель обучена. Проверить версию можно так:

import sklearn
 
print('The scikit-learn version is {}.'.format(sklearn.__version__))

Содержание файла выглядит так:

Содержание файла requirements.txt
Содержание файла requirements.txt

Теперь упакуем весь микросервис в Docker. Docker — это программное обеспечение для автоматизации развёртывания и управления приложениями в средах с поддержкой контейнеризации.

Почему Docker? Основной плюс, быстрое развертывание. Docker создает контейнер для каждого процесса и не загружает ОС. Всё происходит за секунды.

Изоляция и безопасность. При использовании Docker ресурсы изолированы и разделены. Можно не боятся удалять контейнеры, удаление будет полным и чистым. Используются только назначеные ресурсы.

Еще одно преимущество де-факто стандартизация подхода. Почти у всех крупных компаний инфраструктура строится с использованием Docker. Благодаря стандартизации уменьшается количество времени, потраченного на дефекты, и увеличивает количество времени, доступного для разработки функций.

Создаем Dockerfile следующего содержания:

FROM python:3.7-buster
 
RUN apt-get update -y
 
WORKDIR /usr/src/app
 
ENV LANG C.UTF-8
 
COPY requirements.txt ./
RUN pip install -r requirements.txt
 
COPY . .

Разберем каждую строчку:

  • FROM python:3.7-buster — базовый образ
  • RUN apt-get update -y — Обновить информацию о репозиториях внутри контейнера
  • WORKDIR /usr/src/app — Сменить рабочую директорию внутри контейнера. Команды далее будут запускаться внутри директории /usr/src/app внутри контейнера
  • ENV LANG C.UTF-8 — Устанавливаем языковой стандарт внутри контейнера
  • COPY requirements.txt ./ — копируем наш файл с зависимостями
  • RUN pip install -r requirements.txt — Установить зависимости, сохраненные в requirements.txt.
  • COPY . . — копируем новый код в файловую систему контейнера

Теперь создадим файл docker-compose.yml для определения набора сервисов.

version: '3'
 
services:
  iris:
    build: .
    image: iris:1.0
    container_name: iris
    ports:
      - 5000:5000
    extra_hosts:
        - "app-redis:[your IP]"
    command: /usr/src/app/start.sh

На что тут стоит обратить внимание: command — запускает файл start.sh О нём чуть позже. extra_hosts добавляет сопоставления имен хостов.Это нужно для работы с Redis. Если хотите протестировать локально, то укажите IP вашего компьютера.

Переходим к файлу start.sh

#!/bin/bash
 
run_rq() {
  rq worker rest_api -u 'redis://app-redis:6379' 2>&1 | tee -a &
}
 
run_gunicorn() {
  gunicorn rest_api:app -b 0.0.0.0:5000 --workers=2 2>&1 | tee -a 
}
 
run_rq
run_gunicorn

В этом скрипте запускаются уже знакомый воркер для очереди и запускаем наш микросервис с помощью Gunicorn. Gunicorn — это WSGI-сервер, созданный для использования в UNIX-системах. Этот сервер относительно быстрый, ресурсоёмкий, легко запускается и работает с широким спектром веб-фреймворков.

Протестируем, запускаем команду для создания контейнера

docker-compose build

Теперь запускаем

docker-compose up

Работает и готово к запуску.

Тестирование контейнера Docker
Тестирование контейнера Docker

Таким образом запускался Docker, что бы протестировать работоспособность контейнера. В продакшене сделайте сразу так:

docker-compose up -d --build --force-recreate

Флаг -d предназначен для запуска контейнера в фоновом режиме.

Заключение

В этой заметке получилось рассмотреть большой стэк технологий. Познакомились с микросервисной архитектурой, создали каркас для микросервиса Flask, логируем работу сервиса и познакомились с очередью задач на базе Redis. Отдельно рассмотрели интеграцию решения в ИТ инфрастуктуру с помощью Docker.

Эта заметка не претендует на полноту, но позволяет быстро создавать из этого каркаса микросервисы. Этот каркас подходит как для работы с табличными данными, так и для задач компьютерного зрения.

Дополнительный материал

Ссылка на репозиторий с кодом из заметки