ROSでFace++を使って顔認識する

動機

せっかくROSの勉強をしているのでFace++をROSで使ってみました.
ROSでHTTP通信してるノード?パッケージ?が少ない気がするので,参考になれば.
目標は顔認識をするノードとサービス通信するクライアントノードを作成して顔画像を識別する.(検出じゃないよ!!)

開発環境

パッケージ作成

. ~catkin_ws/devel/setup.bash
cd ~/catkin_ws/src
catkin_create_pkg face_recognition_service std_msgs rospy message_generation
  1. ROSコマンドの有効化(ターミナルを起動する毎に実行)
  2. catkinワークスペースのソースディレクトリに移動
  3. face_recognition_serviceパッケージをstd_msgs rospyライブラリに依存するとして作成

/////////////////////////////////////////////
なくても実行できたんだけど何故?

roscd face_recognition_service
nano package.xml
  1. face_recognition_serviceパッケージのフォルダに移動
  2. package.xmlを編集
- <!--   <exec_depend>message_runtime</exec_depend> -->
+ <exec_depend>message_runtime</exec_depend>

この辺よく分かっていないので勉強します.
Cのコンパイル関係なのかな.
///////////////////////////////////////////////

mkdir srv
cd srv
touch Protocol.srv
nano CMakeList.txt
  1. サービスファイル用フォルダの作成
  2. srvフォルダに移動
  3. サービスProtocol.srvを作成
  4. CMakeList.txtを編集
# add_service_files(
#   FILES
#   Service1.srv
#   Service2.srv
# )
--->
add_service_files(
  FILES
  Protocol.srv
)

# generate_messages(
#   DEPENDENCIES
#   std_msgs
# )
--->
generate_messages(
  DEPENDENCIES
  std_msgs
)

この辺もよく分かってない.
サービス用のライブラリを作るための作法という認識.
~/catkin_ws/devel/lib/python2.7/dist-packages/test/srv/内にファイルが作成される.

cd ~/catkin_ws
catkin_make

ワークフォルダに移動してmakeする.

サービス作成

roscd face_recognition_service
cd srv
nano Protocol.srv

Protocol.srvを編集.

string function
string img
string id
---
string result_id
float64 confidence

---の上がリクエスト,下がレスポンス
リクエス

  • function 学習・識別のスイッチ
  • img 画像データ
  • id 画像のタグ

レスポンス

  • result_id 識別された画像のタグ
  • confidence タグの確率

ノード作成

server側ノード
cd ..
cd src
touch service_server.py
chmod +x service_server.py
nano service_server.py
  1. ノードservice_server.pyを作成
  2. 実行可能な状態に変更
#!/usr/bin/env python
import rospy
from face_recognition_service.srv import Protocol, ProtocolResponse
import requests
from face_pp import FacePP

face_api = FacePP('API_KEY', 'API_SECRET')

def handler(req):
    if req.function == 'learn':
        face_api.learn_face(req.img, req.id)
        return ProtocolResponse("learn", 0.)
    elif req.function == 'recognize':
        print face_api.search_face_id(req.img)
        ret = face_api.search_face_id(req.img)
        return ProtocolResponse(ret[0], ret[1])
    else:
        print "Error"
        return 0

if __name__ == '__main__':
    rospy.init_node('face_recognizer_server')
    face_api.get_face_set("FaceSet")

    s = rospy.Service('face_recognition', Protocol, handler)
    rospy.spin()
client側ノード
cd ..
cd src
touch test_client.py
chmod +x test_client.py
nano test_client.py
#!/usr/bin/env python
import rospy
from face_recognition_service.srv import *
import base64
import glob

train_dataset_paths = "/home/pi/Pictures/face-dataset/train"
test_dataset_paths = "/home/pi/Pictures/face-dataset/test"
face_ids = ["a", "b"]

if __name__ == "__main__":
    rospy.init_node('test_client')
    rospy.wait_for_service('face_recognition')
    protocol = rospy.ServiceProxy('face_recognition', Protocol)

    for index, face_id in enumerate(face_ids):
        files_path = train_dataset_paths + '/' + face_id
        # files = os.listdir(files_path)
        files = glob.glob(files_path + '/*.jpg')
        for file in files:
            print face_id + ':' + file
            with open(file, 'rb') as f:
                img_file = base64.b64encode(f.read())
                res = protocol('learn', img_file, face_id)
                print(res.result_id, res.confidence)

    for index, face_id in enumerate(face_ids):
        files_path = test_dataset_paths + '/' + face_id
        # files = os.listdir(files_path)
        files = glob.glob(files_path + '/*.jpg')
        for file in files:
            print face_id + ':' + file
            with open(file, 'rb') as f:
                img_file = base64.b64encode(f.read())
                res = protocol('recognize', img_file, face_id)
                print(res.result_id, res.confidence)

face_datasetは前回と同じところから取得.

実行

roscore
rosrun face_recognition_service service_server.py
rosrun face_recognition_service test_client.py

それぞれ別のターミナルで実行.

実行結果

service_serverノード
# 学習中
a:/home/pi/Pictures/face-dataset/train/a/0000.jpg
('learn', 0.0)
~~~
# 識別中
a:/home/pi/Pictures/face-dataset/test/a/test.jpg
('a', 93.048)
test_clientノード
# 識別中
('a', 93.048)

タグがaである確率が93 %

終わりに

. ~catkin_ws/devel/setup.bash
chmod +x node.py
これらの実行し忘れが多かった.

付録

face_recgnition_service/src内に記述
face_pp.py

import requests
import pprint

class FacePP:
    def __init__(self, api_key, api_secret):
        self.API_KEY = api_key
        self.API_SECRET = api_secret
        self.FACE_SET_TOKEN = ""

    def get_face_set(self, faceset_name):
        create_url = 'https://api-us.faceplusplus.com/facepp/v3/faceset/create'
        payload = {'api_key': self.API_KEY,
                   'api_secret': self.API_SECRET,
                   'outer_id': faceset_name
                   }
        res = requests.post(create_url, data=payload)
        try:
            self.FACE_SET_TOKEN = res.json()['faceset_token']
        except Exception as ex:
            msg = res.json()['error_message']
            if msg == "FACESET_EXIST":
                getdetail_url = 'https://api-us.faceplusplus.com/facepp/v3/faceset/getdetail'
                res = requests.post(getdetail_url, data=payload)
                self.FACE_SET_TOKEN = res.json()['faceset_token']
        # print('Get face token')
        # pprint.pprint(res.json())

    def learn_face(self, face_image, face_id):
        # get face token
        detect_url = 'https://api-us.faceplusplus.com/facepp/v3/detect'
        payload = {'api_key': self.API_KEY,
                   'api_secret': self.API_SECRET,
                   'image_base64': face_image
                   }
        res = requests.post(detect_url, data=payload)
        # print('Detect')
        # pprint.pprint(res.json())
        token = res.json()['faces'][0]['face_token']
        # set face id
        set_url = 'https://api-us.faceplusplus.com/facepp/v3/face/setuserid'
        payload = {'api_key': self.API_KEY,
                   'api_secret': self.API_SECRET,
                   'face_token': token,
                   'user_id': face_id
                   }
        res = requests.post(set_url, data=payload)
        # print('Set')
        # pprint.pprint(res.json())
        # add face token to faceset
        add_url = 'https://api-us.faceplusplus.com/facepp/v3/faceset/addface'
        payload = {'api_key': self.API_KEY,
                   'api_secret': self.API_SECRET,
                   'faceset_token': self.FACE_SET_TOKEN,
                   'face_tokens': token
                   }
        res = requests.post(add_url, data=payload)
        # print('Add')
        # pprint.pprint(res.json())

    def search_face_id(self, face_image):
        search_url = 'https://api-us.faceplusplus.com/facepp/v3/search'
        payload = {'api_key': self.API_KEY,
                   'api_secret': self.API_SECRET,
                   'image_base64': face_image,
                   'faceset_token': self.FACE_SET_TOKEN
                   }
        res = requests.post(search_url, data=payload)
        # print('Search')
        # pprint.pprint(res.json())
        return res.json()['results'][0]['user_id'].encode(), res.json()['results'][0]['confidence']
        # return res.json()['results'][0]['user_id'].encode()

    # other function
    def check_face_sets_name(self):
        getfacesets_url = 'https://api-us.faceplusplus.com/facepp/v3/faceset/getfacesets'
        payload = {'api_key': self.API_KEY,
                   'api_secret': self.API_SECRET,
                   }
        res = requests.post(getfacesets_url, data=payload)
        return [d.get('outer_id') for d in res.json()['facesets']]