bat.apis.imagga

 1import requests
 2import concurrent.futures
 3
 4class Imagga:
 5
 6    def __init__(self, api_key, api_secret, concurrency=1):
 7        self.api_key = api_key
 8        self.api_secret = api_secret
 9        self.concurrency = concurrency
10
11        self.url = 'https://api.imagga.com/v2/tags'
12
13    def predict(self, image_path):
14        try:
15            response = requests.post(
16                self.url,
17                auth=(self.api_key, self.api_secret),
18                files={'image': open(image_path, 'rb')}
19                )
20            response = response.json()
21        except Exception as e:
22            print(e)
23            return
24
25        if 'result' in response:
26            return [(item['tag']['en'], item['confidence']) for item in response['result']['tags']]
27        else:
28            return
29
30    def predictX(self, image_paths):
31        y_preds = []
32        y_index = []
33        y_executors = {}
34
35        with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor:
36            for i, image_path in enumerate(image_paths):
37                # Load the input image and construct the payload for the request
38                y_executors[executor.submit(self.predict, image_path)] = i
39
40            for y_executor in concurrent.futures.as_completed(y_executors):
41                y_index.append(y_executors[y_executor])
42                y_preds.append(y_executor.result())
43
44            y_preds = [y for _, y in sorted(zip(y_index, y_preds))]
45
46        return y_preds
47
48    def print(self, y_preds):
49        max_len = 0
50
51        for y in y_preds:
52            if len(y[0]) > max_len:
53                max_len = len(y[0])
54        
55        for y in y_preds:
56            print('{:<{w}s}{:.5f}'.format(y[0], y[1], w=max_len+1))
class Imagga:
 5class Imagga:
 6
 7    def __init__(self, api_key, api_secret, concurrency=1):
 8        self.api_key = api_key
 9        self.api_secret = api_secret
10        self.concurrency = concurrency
11
12        self.url = 'https://api.imagga.com/v2/tags'
13
14    def predict(self, image_path):
15        try:
16            response = requests.post(
17                self.url,
18                auth=(self.api_key, self.api_secret),
19                files={'image': open(image_path, 'rb')}
20                )
21            response = response.json()
22        except Exception as e:
23            print(e)
24            return
25
26        if 'result' in response:
27            return [(item['tag']['en'], item['confidence']) for item in response['result']['tags']]
28        else:
29            return
30
31    def predictX(self, image_paths):
32        y_preds = []
33        y_index = []
34        y_executors = {}
35
36        with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor:
37            for i, image_path in enumerate(image_paths):
38                # Load the input image and construct the payload for the request
39                y_executors[executor.submit(self.predict, image_path)] = i
40
41            for y_executor in concurrent.futures.as_completed(y_executors):
42                y_index.append(y_executors[y_executor])
43                y_preds.append(y_executor.result())
44
45            y_preds = [y for _, y in sorted(zip(y_index, y_preds))]
46
47        return y_preds
48
49    def print(self, y_preds):
50        max_len = 0
51
52        for y in y_preds:
53            if len(y[0]) > max_len:
54                max_len = len(y[0])
55        
56        for y in y_preds:
57            print('{:<{w}s}{:.5f}'.format(y[0], y[1], w=max_len+1))
Imagga(api_key, api_secret, concurrency=1)
 7    def __init__(self, api_key, api_secret, concurrency=1):
 8        self.api_key = api_key
 9        self.api_secret = api_secret
10        self.concurrency = concurrency
11
12        self.url = 'https://api.imagga.com/v2/tags'
api_key
api_secret
concurrency
url
def predict(self, image_path):
14    def predict(self, image_path):
15        try:
16            response = requests.post(
17                self.url,
18                auth=(self.api_key, self.api_secret),
19                files={'image': open(image_path, 'rb')}
20                )
21            response = response.json()
22        except Exception as e:
23            print(e)
24            return
25
26        if 'result' in response:
27            return [(item['tag']['en'], item['confidence']) for item in response['result']['tags']]
28        else:
29            return
def predictX(self, image_paths):
31    def predictX(self, image_paths):
32        y_preds = []
33        y_index = []
34        y_executors = {}
35
36        with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor:
37            for i, image_path in enumerate(image_paths):
38                # Load the input image and construct the payload for the request
39                y_executors[executor.submit(self.predict, image_path)] = i
40
41            for y_executor in concurrent.futures.as_completed(y_executors):
42                y_index.append(y_executors[y_executor])
43                y_preds.append(y_executor.result())
44
45            y_preds = [y for _, y in sorted(zip(y_index, y_preds))]
46
47        return y_preds
def print(self, y_preds):
49    def print(self, y_preds):
50        max_len = 0
51
52        for y in y_preds:
53            if len(y[0]) > max_len:
54                max_len = len(y[0])
55        
56        for y in y_preds:
57            print('{:<{w}s}{:.5f}'.format(y[0], y[1], w=max_len+1))