bat.apis.imagga
1import requests 2import concurrent.futures 3 4class Imagga: 5 6 def __init__(self, api_key, api_secret, concurrency=1): 7 self.api_key = api_key 8 self.api_secret = api_secret 9 self.concurrency = concurrency 10 11 self.url = 'https://api.imagga.com/v2/tags' 12 13 def predict(self, image_path): 14 try: 15 response = requests.post( 16 self.url, 17 auth=(self.api_key, self.api_secret), 18 files={'image': open(image_path, 'rb')} 19 ) 20 response = response.json() 21 except Exception as e: 22 print(e) 23 return 24 25 if 'result' in response: 26 return [(item['tag']['en'], item['confidence']) for item in response['result']['tags']] 27 else: 28 return 29 30 def predictX(self, image_paths): 31 y_preds = [] 32 y_index = [] 33 y_executors = {} 34 35 with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: 36 for i, image_path in enumerate(image_paths): 37 # Load the input image and construct the payload for the request 38 y_executors[executor.submit(self.predict, image_path)] = i 39 40 for y_executor in concurrent.futures.as_completed(y_executors): 41 y_index.append(y_executors[y_executor]) 42 y_preds.append(y_executor.result()) 43 44 y_preds = [y for _, y in sorted(zip(y_index, y_preds))] 45 46 return y_preds 47 48 def print(self, y_preds): 49 max_len = 0 50 51 for y in y_preds: 52 if len(y[0]) > max_len: 53 max_len = len(y[0]) 54 55 for y in y_preds: 56 print('{:<{w}s}{:.5f}'.format(y[0], y[1], w=max_len+1))
class
Imagga:
5class Imagga: 6 7 def __init__(self, api_key, api_secret, concurrency=1): 8 self.api_key = api_key 9 self.api_secret = api_secret 10 self.concurrency = concurrency 11 12 self.url = 'https://api.imagga.com/v2/tags' 13 14 def predict(self, image_path): 15 try: 16 response = requests.post( 17 self.url, 18 auth=(self.api_key, self.api_secret), 19 files={'image': open(image_path, 'rb')} 20 ) 21 response = response.json() 22 except Exception as e: 23 print(e) 24 return 25 26 if 'result' in response: 27 return [(item['tag']['en'], item['confidence']) for item in response['result']['tags']] 28 else: 29 return 30 31 def predictX(self, image_paths): 32 y_preds = [] 33 y_index = [] 34 y_executors = {} 35 36 with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: 37 for i, image_path in enumerate(image_paths): 38 # Load the input image and construct the payload for the request 39 y_executors[executor.submit(self.predict, image_path)] = i 40 41 for y_executor in concurrent.futures.as_completed(y_executors): 42 y_index.append(y_executors[y_executor]) 43 y_preds.append(y_executor.result()) 44 45 y_preds = [y for _, y in sorted(zip(y_index, y_preds))] 46 47 return y_preds 48 49 def print(self, y_preds): 50 max_len = 0 51 52 for y in y_preds: 53 if len(y[0]) > max_len: 54 max_len = len(y[0]) 55 56 for y in y_preds: 57 print('{:<{w}s}{:.5f}'.format(y[0], y[1], w=max_len+1))
def
predict(self, image_path):
14 def predict(self, image_path): 15 try: 16 response = requests.post( 17 self.url, 18 auth=(self.api_key, self.api_secret), 19 files={'image': open(image_path, 'rb')} 20 ) 21 response = response.json() 22 except Exception as e: 23 print(e) 24 return 25 26 if 'result' in response: 27 return [(item['tag']['en'], item['confidence']) for item in response['result']['tags']] 28 else: 29 return
def
predictX(self, image_paths):
31 def predictX(self, image_paths): 32 y_preds = [] 33 y_index = [] 34 y_executors = {} 35 36 with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: 37 for i, image_path in enumerate(image_paths): 38 # Load the input image and construct the payload for the request 39 y_executors[executor.submit(self.predict, image_path)] = i 40 41 for y_executor in concurrent.futures.as_completed(y_executors): 42 y_index.append(y_executors[y_executor]) 43 y_preds.append(y_executor.result()) 44 45 y_preds = [y for _, y in sorted(zip(y_index, y_preds))] 46 47 return y_preds