pandas大数据分析性能优化实例-read_csv引擎和分组等

需求

人脸识别的人名如下,共452人

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
$ head i_enroll.txt 
output/enroll_list/D23
output/enroll_list/I54
output/enroll_list/G23
output/enroll_list/G43
output/enroll_list/F38
output/enroll_list/I20
output/enroll_list/J19
output/enroll_list/E42
output/enroll_list/F31
output/enroll_list/F22

人脸识别的图片列表如下,共269796张

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
$ head i_enroll.txt 
output/enroll_list/D23
output/enroll_list/I54
output/enroll_list/G23
output/enroll_list/G43
output/enroll_list/F38
output/enroll_list/I20
output/enroll_list/J19
output/enroll_list/E42
output/enroll_list/F31
output/enroll_list/F22

人脸识别的图结果如下,为452*269796矩阵,上亿级数据。

1
2
$ head verify452-3.53.csv
-1.000000,-1.000000,0.346309,0.366479...

要求统计出自己和自己识别,分数低于0.7;自己和别人识别,分数高于0.7的记录。

初始实现

  • 你的赞助是我们前进的动力:

一流企业专家自动化性能接口测试 数据分析 python一对一教,非骗人的培训机构(多数大陆培训机构的老师实际未入门)承接excel合并,电脑自动化操作等工程 并欢迎讨论中医草药风水相学等道家国学

qq群python 测试开发自动化测试 144081101 教你做免费的线上博客(放在简历中增加亮点),自动化测试平台,性能测试工具等,让你有实际项目经验 联系qq:37391319

交流QQ群:python 测试开发自动化测试 144081101 Python数据分析pandas Excel 630011153 中医草药自学自救大数据 391441566 南方中医草药鉴别学习 184175668 中医草药湿热湿疹胃病 291184506 python高级人工智能视觉 6089740

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import pandas as pd
import time

enroll_file = r'i_enroll.txt'
real_file = r'i_real.txt'
score_file = r'verify452-3.53.csv'

real_photos = pd.read_csv(real_file,names=['filename'])
real_photos['filename'] = real_photos['filename'].apply(
    lambda x:x.replace("/home/andrew/code/data/tof/base_test_data/vivo-verify-452/./", ''))


persons = pd.read_csv(enroll_file,names=['person'])
persons['person'] = persons['person'].apply(
    lambda x:x.replace("output/enroll_list/", ''))

print(time.ctime())
df = pd.read_csv(score_file, names=real_photos['filename'], engine='c',
                 na_filter=False, low_memory=False)
df.index = persons['person']
print(time.ctime())

person_errors = []    
other_errors = []
for person in df.index:
    scores = df.loc[person]
    for item  in scores.index:
        if person + '/' in item:
            if scores[item] < 0.7 or scores[item] == -1:
                print(person, item, scores[item])
                person_errors.append((person, item, scores[item]))
        else:
            if scores[item] >= 0.7 or scores[item] == -1:
                print(person, item, scores[item])
                other_errors.append((person, item, scores[item]))               

df_person_errors = pd.DataFrame(person_errors,columns=['person','filename','score'])
df_other_errors = pd.DataFrame(other_errors,columns=['person','filename','score'])

df_person_errors.to_csv('person_errors.csv', index=False)
df_other_errors.to_csv('other_errors.csv', index=False)

执行时的问题:

  • 加载数据需要20分钟左右
  • 分析数据需要40分钟左右

初步优化

试图用多进程优化,效果不明显,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# -*- coding: utf-8 -*-
# Author:    china-testing#126.com 技术支持qq群:630011153
# CreateDate: 2018-04-17
import pandas as pd
import os
import pathlib
import multiprocessing
import time

import data_common
import collections

enroll_file = r'i_enroll.txt'
real_file = r'i_real.txt'
score_file = r'verify452-3.53.csv'
#score_file = r'verfify.csv'


real_photos = pd.read_csv(real_file,names=['filename'])
real_photos['filename'] = real_photos['filename'].apply(
    lambda x:x.replace("/home/andrew/code/data/tof/base_test_data/vivo-verify-452/./", ''))


persons = pd.read_csv(enroll_file,names=['person'])
persons['person'] = persons['person'].apply(
    lambda x:x.replace("output/enroll_list/", ''))

print(time.ctime())
df = pd.read_csv(score_file, names=real_photos['filename'], engine='c',
                 na_filter=False, low_memory=False)
df.index = persons['person']
print(time.ctime())


def consumer(queue, person_errors, other_errors, lock):
    while True:
        record = queue.get()
        if record is None:
            break  
        person, scores = record
        for item  in scores.index:
            if person + '/' in item:
                if scores[item] < 0.7 or scores[item] == -1:
                    print(person, item, scores[item])
                    with lock:
                        person_errors.append((person, item, scores[item]))
            else:
                if scores[item] >= 0.7 or scores[item] == -1:
                    print(person, item, scores[item])
                    with lock:
                        other_errors.append((person, item, scores[item]))                        

queue = multiprocessing.Queue()
process = []
person_errors = multiprocessing.Manager().list()
other_errors = multiprocessing.Manager().list()
lock = multiprocessing.Lock()
if multiprocessing.cpu_count() < 3:
    number = multiprocessing.cpu_count()
else:
    number = multiprocessing.cpu_count() - 1

# Launch the consumer process
for i in range(number):
    t = multiprocessing.Process(
        target=consumer,args=(queue, person_errors, other_errors, lock))
    t.daemon=True
    process.append(t)

for i in range(number):
    process[i].start()

for person in df.index:
    queue.put((person, df.loc[person]))

for i in range(number):
    queue.put(None) 

for i in range(number):
    process[i].join()                    

df_person_errors = pd.DataFrame(person_errors,columns=['person','filename','score'])
df_other_errors = pd.DataFrame(other_errors,columns=['person','filename','score'])

df_person_errors.to_csv('person_errors.csv', index=False)
df_other_errors.to_csv('other_errors.csv', index=False)
print(time.ctime())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding: utf-8 -*-
# Author:    china-testing#126.com 技术支持qq群:630011153
# CreateDate: 2018-04-17
import pandas as pd
import time

enroll_file = r'i_enroll.txt'
real_file = r'i_real.txt'
score_file = r'verify452-3.53.csv'
#score_file = r'verfify.csv'


real_photos = pd.read_csv(real_file,names=['filename'])
real_photos['filename'] = real_photos['filename'].apply(
    lambda x:x.replace("/home/andrew/code/data/tof/base_test_data/vivo-verify-452/./", ''))
real_photos['person'] =  real_photos['filename'].apply(
    lambda x:x.split('/')[0])


persons = pd.read_csv(enroll_file,names=['person'])
persons['person'] = persons['person'].apply(
    lambda x:x.replace("output/enroll_list/", ''))

df = pd.read_csv(score_file, header=None, engine='c',
                 na_filter=False, low_memory=False)
df.index = persons['person']

self_errors = []    
other_errors = []
for person in df.index:
    print("index:", person)
    print(time.ctime())
    row = df.loc[person]
    row.index = [real_photos['person'], real_photos['filename']]
    self = row[person]
    self_error = self[(self<0.7) & (self>-1)]
    for item in self_error.index:
        self_errors.append((person, item, self_error[item]))
    print(self_error)
    others = row.drop(person,level=0)
    other_error = others[others>=0.7]
    for item in other_error.index:
        other_errors.append([person, item[0], item[1], other_error.loc[item]])    
    print(other_error)

df_person_errors = pd.DataFrame(self_errors,columns=['person','filename','score'])
df_other_errors = pd.DataFrame(other_errors,columns=['person','other', 'filename','score'])
df_person_errors.to_csv('self_errors.csv', index=False)
df_other_errors.to_csv('other_errors.csv', index=False)
print(time.ctime())

优化结果:

  • 加载数据需要30s左右
  • 分析数据需要4分钟左右

生成的文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ head other_errors.csv 
person,other,filename,score
G43,B40,B40/Real/image_1515289683799.ir,0.706021
E42,E39,E39/Real/image_1515286582859.ir,0.778084
E42,E39,E39/Real/image_1515286579323.ir,0.745422
A23,D1,D1/Real/image_1514960322975.ir,0.7184189999999999
A21,I45,I45/Real/image_1521946590192.ir,0.759404
A21,I45,I45/Real/image_1521946146939.ir,0.7444069999999999
A21,I45,I45/Real/image_1521946590553.ir,0.807068
A21,I45,I45/Real/image_1521946446169.ir,0.804136
A21,I45,I45/Real/image_1521946593359.ir,0.705128
$ head self_errors.csv 
person,filename,score
D23,D23/Real/image_1515223221370.ir,0.590427
D23,D23/Real/image_1515223218995.ir,0.6863020000000001
D23,D23/Real/image_1515223352416.ir,0.370125
D23,D23/Real/image_1515223218455.ir,0.697141
I54,I54/Real/image_1521960624882.ir,0.6420319999999999
I54,I54/Real/image_1521961164122.ir,0.486989
I54,I54/Real/image_1521961162557.ir,0.548539
I54,I54/Real/image_1521961162208.ir,0.468434
I54,I54/Real/image_1521960695016.ir,0.587555

后续优化

参见代码更新

links