trifonova commited on
Commit
de34e36
·
1 Parent(s): ed2b581

Refactor code to use class for api client

Browse files
Files changed (1) hide show
  1. fair.py +116 -128
fair.py CHANGED
@@ -13,178 +13,166 @@ SERVER_ADRESS="https://faircompute.com:8000/api/v1"
13
  DOCKER_IMAGE="faircompute/stable-diffusion:pytorch-1.13.1-cu116"
14
  #DOCKER_IMAGE="sha256:e06453fe869556ea3e63572a935aed4261337b261fdf7bda370472b0587409a9"
15
 
16
- def authenticate(email: str, password: str):
17
- url = f'{SERVER_ADRESS}/auth/login'
18
- json_obj = {"email": email, "password": password}
19
- resp = requests.post(url, json=json_obj)
20
- token = resp.json()["token"]
21
- return token
22
-
23
- def get(url, token, **kwargs):
24
- headers = {
25
- 'Authorization': f'Bearer {token}'
26
- }
27
- response = requests.get(url, headers=headers, **kwargs)
28
-
29
- if not response.ok:
30
- raise Exception(f"Error! status: {response.status_code}")
31
- return response
32
-
33
-
34
- def put(url, token, data):
35
- headers = {
36
- 'Content-Type': 'application/json',
37
- 'Authorization': f'Bearer {token}'
38
- }
39
- if not isinstance(data, str):
40
- data = json.dumps(data)
41
- response = requests.put(url, headers=headers, data=data)
42
-
43
- if not response.ok and response.status_code != 206:
44
- raise Exception(f"Error! status: {response.status_code}")
45
- return response
46
-
47
-
48
- def put_program(token, launcher: str, image: str, runtime: str, command: List[str]):
49
- url = f"{SERVER_ADRESS}/programs"
50
- data = {
51
- launcher: {
52
- "image": image,
53
- "command": command,
54
- "runtime": runtime
55
  }
56
- }
57
- response = put(url=url, token=token, data=data)
58
 
59
- return int(response.text)
 
60
 
 
61
 
62
- def put_job(token, program_id, input_files, output_files):
63
- url = f"{SERVER_ADRESS}/jobs?program={program_id}"
64
- data = {
65
- 'input_files': input_files,
66
- 'output_files': output_files
67
- }
68
-
69
- response = put(url=url, token=token, data=data)
70
-
71
- return int(response.text)
72
-
73
-
74
- def get_job_status(token, job_id):
75
- url = f"{SERVER_ADRESS}/jobs/{job_id}/status"
76
- response = get(url=url, token=token)
77
- return response.text
78
-
79
-
80
- def get_cluster_summary(token):
81
- url = f"{SERVER_ADRESS}/nodes/summary"
82
-
83
- response = get(token=token, url=url)
84
-
85
- return response.json()
86
 
 
87
 
88
- def put_job_stream_data(token, job_id, name, data):
89
- url = f"{SERVER_ADRESS}/jobs/{job_id}/data/streams/{name}"
90
- response = put(url=url, token=token, data=data)
 
 
 
91
 
92
- return response.text
93
 
 
94
 
95
- def put_job_stream_eof(token, job_id, name):
96
- url = f"{SERVER_ADRESS}/jobs/{job_id}/data/streams/{name}/eof"
 
 
97
 
98
- response = put(url=url, token=token, data=None)
 
 
99
 
100
- return response.text
101
 
 
 
 
 
102
 
103
- def wait_for_file(token, job_id, path, attempts=10) -> BinaryIO:
104
- headers = {
105
- 'Authorization': f'Bearer {token}'
106
- }
107
- for i in range(attempts):
108
- url = f"{SERVER_ADRESS}/jobs/{job_id}/data/files/{path}"
109
- print(f"Waiting for file {path}...")
110
- try:
111
- with requests.get(url=url, headers=headers, stream=True) as r:
112
- r.raise_for_status()
113
- f = tempfile.TemporaryFile()
114
- for chunk in r.iter_content(chunk_size=8192):
115
- f.write(chunk)
116
 
117
- print(f"File {path} ready")
118
- f.seek(0, 0)
119
- return f
120
- except Exception as e:
121
- print(e)
122
- time.sleep(0.5)
123
 
124
- print(f"Failed to receive {path}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
 
126
 
127
  def text_to_image(text):
128
  email = os.getenv('FAIRCOMPUTE_EMAIL')
129
  password = os.environ.get('FAIRCOMPUTE_PASSWORD')
130
- token = authenticate(email=email, password=password)
131
-
132
- logger.info(token)
133
-
134
- summary = get_cluster_summary(token=token)
135
- logger.info("Summary:")
136
- logger.info(summary)
137
- program_id = put_program(token=token,
138
- launcher="Docker",
139
- image=DOCKER_IMAGE,
140
- runtime="nvidia",
141
- command=[])
142
  logger.info(program_id)
143
 
144
- job_id = put_job(token=token,
145
- program_id=program_id,
146
- input_files=[],
147
- output_files=["/workspace/result.png"])
148
 
149
  logger.info(job_id)
150
 
151
- status = get_job_status(token=token,
152
- job_id=job_id)
153
  logger.info(status)
154
 
155
  while status != "Processing" and status != "Completed":
156
- status = get_job_status(token=token,
157
- job_id=job_id)
158
  logger.info(status)
159
  time.sleep(0.5)
160
 
161
- res = put_job_stream_data(token=token,
162
- job_id=job_id,
163
- name="stdin",
164
- data=text + "\n")
165
  logger.info(res)
166
 
167
- res = put_job_stream_eof(token=token,
168
- job_id=job_id,
169
- name="stdin")
170
  logger.info(res)
171
 
172
- status = get_job_status(token=token,
173
- job_id=job_id)
174
  logger.info(status)
175
 
176
  while status == "Processing":
177
- status = get_job_status(token=token,
178
- job_id=job_id)
179
  logger.info(status)
180
  time.sleep(0.5)
181
  if status == "Completed":
182
  logger.info("Done!")
183
  else:
184
  logger.info("Job Failed")
185
- file = wait_for_file(token=token,
186
- job_id=job_id,
187
- path="%2Fworkspace%2Fresult.png")
188
  return file
189
 
190
 
 
13
  DOCKER_IMAGE="faircompute/stable-diffusion:pytorch-1.13.1-cu116"
14
  #DOCKER_IMAGE="sha256:e06453fe869556ea3e63572a935aed4261337b261fdf7bda370472b0587409a9"
15
 
16
+ class FairApiClient:
17
+ def __init__(self, server_address: str):
18
+ self.server_address = server_address
19
+ self.token = None
20
+
21
+ def authenticate(self, email: str, password: str):
22
+ url = f'{self.server_address}/auth/login'
23
+ json_obj = {"email": email, "password": password}
24
+ resp = requests.post(url, json=json_obj)
25
+ self.token = resp.json()["token"]
26
+
27
+ def get(self, url, **kwargs):
28
+ headers = {
29
+ 'Authorization': f'Bearer {self.token}'
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  }
31
+ response = requests.get(url, headers=headers, **kwargs)
 
32
 
33
+ if not response.ok:
34
+ raise Exception(f"Error! status: {response.status_code}")
35
 
36
+ return response
37
 
38
+ def put(self, url, data):
39
+ headers = {
40
+ 'Content-Type': 'application/json',
41
+ 'Authorization': f'Bearer {self.token}'
42
+ }
43
+ if not isinstance(data, str):
44
+ data = json.dumps(data)
45
+ response = requests.put(url, headers=headers, data=data)
46
+
47
+ if not response.ok and response.status_code != 206:
48
+ raise Exception(f"Error! status: {response.status_code}")
49
+
50
+ return response
51
+
52
+ def put_program(self, launcher: str, image: str, runtime: str, command: List[str]):
53
+ url = f"{self.server_address}/programs"
54
+ data = {
55
+ launcher: {
56
+ "image": image,
57
+ "command": command,
58
+ "runtime": runtime
59
+ }
60
+ }
61
+ response = self.put(url=url, data=data)
62
 
63
+ return int(response.text)
64
 
65
+ def put_job(self, program_id, input_files, output_files):
66
+ url = f"{self.server_address}/jobs?program={program_id}"
67
+ data = {
68
+ 'input_files': input_files,
69
+ 'output_files': output_files
70
+ }
71
 
72
+ response = self.put(url=url, data=data)
73
 
74
+ return int(response.text)
75
 
76
+ def get_job_status(self, job_id):
77
+ url = f"{self.server_address}/jobs/{job_id}/status"
78
+ response = self.get(url=url)
79
+ return response.text
80
 
81
+ def get_cluster_summary(self):
82
+ url = f"{self.server_address}/nodes/summary"
83
+ response = self.get(url=url)
84
 
85
+ return response.json()
86
 
87
+ def put_job_stream_data(self, job_id, name, data):
88
+ url = f"{self.server_address}/jobs/{job_id}/data/streams/{name}"
89
+ response = self.put(url=url, data=data)
90
+ return response.text
91
 
92
+ def put_job_stream_eof(self, job_id, name):
93
+ url = f"{self.server_address}/jobs/{job_id}/data/streams/{name}/eof"
94
+ response = self.put(url=url, data=None)
95
+ return response.text
 
 
 
 
 
 
 
 
 
96
 
 
 
 
 
 
 
97
 
98
+ def wait_for_file(self, job_id, path, attempts=10) -> BinaryIO:
99
+ headers = {
100
+ 'Authorization': f'Bearer {self.token}'
101
+ }
102
+ for i in range(attempts):
103
+ url = f"{self.server_address}/jobs/{job_id}/data/files/{path}"
104
+ print(f"Waiting for file {path}...")
105
+ try:
106
+ with requests.get(url=url, headers=headers, stream=True) as r:
107
+ r.raise_for_status()
108
+ f = tempfile.TemporaryFile()
109
+ for chunk in r.iter_content(chunk_size=8192):
110
+ f.write(chunk)
111
+
112
+ print(f"File {path} ready")
113
+ f.seek(0, 0)
114
+ return f
115
+ except Exception as e:
116
+ print(e)
117
+ time.sleep(0.5)
118
+
119
+ print(f"Failed to receive {path}")
120
 
121
 
122
  def text_to_image(text):
123
  email = os.getenv('FAIRCOMPUTE_EMAIL')
124
  password = os.environ.get('FAIRCOMPUTE_PASSWORD')
125
+ client = FairApiClient(SERVER_ADRESS)
126
+ client.authenticate(email=email, password=password)
127
+
128
+ program_id = client.put_program(
129
+ launcher="Docker",
130
+ image=DOCKER_IMAGE,
131
+ runtime="nvidia",
132
+ command=[])
 
 
 
 
133
  logger.info(program_id)
134
 
135
+ job_id = client.put_job(
136
+ program_id=program_id,
137
+ input_files=[],
138
+ output_files=["/workspace/result.png"])
139
 
140
  logger.info(job_id)
141
 
142
+ status = client.get_job_status(job_id=job_id)
 
143
  logger.info(status)
144
 
145
  while status != "Processing" and status != "Completed":
146
+ status = client.get_job_status(job_id=job_id)
 
147
  logger.info(status)
148
  time.sleep(0.5)
149
 
150
+ res = client.put_job_stream_data(
151
+ job_id=job_id,
152
+ name="stdin",
153
+ data=text + "\n")
154
  logger.info(res)
155
 
156
+ res = client.put_job_stream_eof(
157
+ job_id=job_id,
158
+ name="stdin")
159
  logger.info(res)
160
 
161
+ status = client.get_job_status(job_id=job_id)
 
162
  logger.info(status)
163
 
164
  while status == "Processing":
165
+ status = client.get_job_status(
166
+ job_id=job_id)
167
  logger.info(status)
168
  time.sleep(0.5)
169
  if status == "Completed":
170
  logger.info("Done!")
171
  else:
172
  logger.info("Job Failed")
173
+ file = client.wait_for_file(
174
+ job_id=job_id,
175
+ path="%2Fworkspace%2Fresult.png")
176
  return file
177
 
178