| import json |
| import os |
| import time |
|
|
| import numpy as np |
| import redis |
| import settings |
| from tensorflow.keras.applications import ResNet50 |
| from tensorflow.keras.applications.resnet50 import decode_predictions, preprocess_input |
| from tensorflow.keras.preprocessing import image |
|
|
| |
| db = redis.Redis( |
| host=settings.REDIS_IP, port=settings.REDIS_PORT, db=settings.REDIS_DB_ID |
| ) |
|
|
| |
| model = ResNet50(include_top=True, weights="imagenet") |
|
|
|
|
| def predict(image_name): |
| """ |
| Load image from the corresponding folder based on the image name |
| received, then, run our ML model to get predictions. |
| |
| Parameters |
| ---------- |
| image_name : str |
| Image filename. |
| |
| Returns |
| ------- |
| class_name, pred_probability : tuple(str, float) |
| Model predicted class as a string and the corresponding confidence |
| score as a number. |
| """ |
| class_name = None |
| pred_probability = None |
|
|
| |
| image_path = os.path.join(settings.UPLOAD_FOLDER, image_name) |
|
|
| |
| img = image.load_img(image_path, target_size=(224, 224)) |
|
|
| |
| |
| x = image.img_to_array(img) |
|
|
| |
| x_batch = np.expand_dims(x, axis=0) |
|
|
| |
| x_batch = preprocess_input(x_batch) |
|
|
| |
| predictions = model.predict(x_batch) |
|
|
| |
| top_pred = decode_predictions(predictions, top=1)[0][0] |
| _, class_name, pred_probability = top_pred |
|
|
| |
| pred_probability = round(float(pred_probability), 4) |
|
|
| return class_name, pred_probability |
|
|
|
|
| def classify_process(): |
| """ |
| Loop indefinitely asking Redis for new jobs. |
| When a new job arrives, takes it from the Redis queue, uses the loaded ML |
| model to get predictions and stores the results back in Redis using |
| the original job ID so other services can see it was processed and access |
| the results. |
| |
| Load image from the corresponding folder based on the image name |
| received, then, run our ML model to get predictions. |
| """ |
| while True: |
| |
| q = db.brpop(settings.REDIS_QUEUE)[1] |
|
|
| |
| q = json.loads(q.decode("utf-8")) |
|
|
| |
| job_id = q["id"] |
|
|
| |
| prediction, score = predict(q["image_name"]) |
|
|
| |
| output = {"prediction": prediction, "score": score} |
|
|
| |
| |
| db.set(job_id, json.dumps(output)) |
|
|
| |
| time.sleep(settings.SERVER_SLEEP) |
|
|
|
|
| if __name__ == "__main__": |
| |
| print("Launching ML service...") |
| classify_process() |
|
|