Skip to content

Commit

Permalink
Merge pull request #422 from FederatedAI/develop-1.11.0
Browse files Browse the repository at this point in the history
Develop 1.11.0
  • Loading branch information
zhihuiwan authored Apr 11, 2023
2 parents 7452a51 + 5fe8a1f commit 55a5b80
Show file tree
Hide file tree
Showing 19 changed files with 353 additions and 143 deletions.
11 changes: 11 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# Release 1.11.0
## Major Features and Improvements
* Add data table preview query interface

## Bug Fixes
* Fix the performance problems of upload and reader in processing large amounts of data
* Fix online inference cannot be done after model migration bug
* Fix the model cannot be saved to the specified database bug
* Fix reader data preview display bug


# Release 1.10.1
## Major Features and Improvements
* Optimize table info API
Expand Down
51 changes: 51 additions & 0 deletions doc/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,57 @@ paths:
type: string
example: no find table

'/table/preview':
post:
summary: table data preview
tags:
- table
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- name
- namespace
properties:
name:
type: string
example: "guest"
namespace:
type: string
example: "data"
responses:
'200':
description: get preview table success
content:
application/json:
schema:
type: object
properties:
retcode:
type: integer
example: 0
retmsg:
type: string
example: success
data:
type: object
'404':
description: no found table
content:
application/json:
schema:
type: object
properties:
retcode:
type: integer
example: 210
retmsg:
type: string
example: no find table

'/job/submit':
post:
summary: submit job
Expand Down
5 changes: 3 additions & 2 deletions python/fate_flow/apps/job_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ def update_job():

@manager.route('/report', methods=['POST'])
def job_report():
jobs = JobSaver.query_job(**request.json)
tasks = JobSaver.query_task(**request.json)
if not tasks:
if not tasks or not jobs:
return get_json_result(retcode=101, retmsg='find task failed')
return get_json_result(retcode=0, retmsg='success', data=job_utils.task_report(tasks))
return get_json_result(retcode=0, retmsg='success', data=job_utils.task_report(jobs, tasks))


@manager.route('/parameter/update', methods=['POST'])
Expand Down
6 changes: 6 additions & 0 deletions python/fate_flow/apps/model_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,12 @@ def query_model():
return get_json_result(retcode=retcode, retmsg=retmsg, data=data)


@manager.route('/query/detail', methods=['POST'])
def query_model_detail():
retcode, retmsg, data = model_utils.query_model_detail(**request.json)
return get_json_result(retcode=retcode, retmsg=retmsg, data=data)


@manager.route('/deploy', methods=['POST'])
@validate_request('model_id', 'model_version')
def deploy():
Expand Down
13 changes: 13 additions & 0 deletions python/fate_flow/apps/table_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@ def table_download():
)


@manager.route('/preview', methods=['post'])
def table_data_preview():
request_data = request.json
from fate_flow.component_env_utils.env_utils import import_component_output_depend
import_component_output_depend()
data_table_meta = storage.StorageTableMeta(name=request_data.get("name"), namespace=request_data.get("namespace"))
if not data_table_meta:
return error_response(response_code=210, retmsg=f'no found table:{request_data.get("namespace")}, {request_data.get("name")}')

data = TableStorage.read_table_data(data_table_meta, limit=request_data.get("limit"))
return get_json_result(retcode=0, retmsg='success', data=data)


@manager.route('/delete', methods=['post'])
def table_delete():
request_data = request.json
Expand Down
15 changes: 14 additions & 1 deletion python/fate_flow/components/api_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,14 @@ def upload_data(self):
)
upload_registry_info = self.service_info.get("upload")
logger.info(f"upload info:{upload_registry_info.to_dict()}")
params = self.parameters.get("parameters", {})
params.update({"job_id": self.tracker.job_id, })
en_content = self.encrypt_content()
if en_content:
params.update({"sign": en_content})
response = getattr(requests, upload_registry_info.f_method.lower(), None)(
url=upload_registry_info.f_url,
params={"requestBody": json.dumps(self.parameters.get("parameters", {}))},
params={"requestBody": json.dumps(params)},
data=data,
headers={'Content-Type': data.content_type}
)
Expand All @@ -206,3 +211,11 @@ def set_service_registry_info(self):
if key == info.f_service_name:
self.service_info[key] = info
logger.info(f"set service registry info:{self.service_info}")

def encrypt_content(self, job_id=None):
if not job_id:
job_id = self.tracker.job_id
import hashlib
md5 = hashlib.md5()
md5.update(job_id.encode())
return md5.hexdigest()
21 changes: 18 additions & 3 deletions python/fate_flow/components/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from fate_flow.manager.data_manager import DataTableTracker, TableStorage, AnonymousGenerator
from fate_flow.operation.job_tracker import Tracker
from fate_flow.utils import data_utils
from federatedml.feature.instance import Instance

LOGGER = log.getLogger()
MAX_NUM = 10000
Expand Down Expand Up @@ -305,8 +306,22 @@ def data_info_display(output_table_meta):
data_list[0].extend(headers)
LOGGER.info(f"data info header: {data_list[0]}")
for data in output_table_meta.get_part_of_data():
delimiter = schema.get("meta", {}).get("delimiter") or output_table_meta.id_delimiter
data_list.append(data[1].split(delimiter))
if isinstance(data[1], str):
delimiter = schema.get("meta", {}).get(
"delimiter") or output_table_meta.id_delimiter
data_list.append(data[1].split(delimiter))
elif isinstance(data[1], Instance):
table_data = []
if data[1].inst_id:
table_data = table_data.append(data[1].inst_id)
if not data[1].label is None:
table_data.append(data[1].label)

table_data.extend(data[1].features)
data_list.append([str(v) for v in table_data])
else:
data_list.append(data[1])

data = np.array(data_list)
Tdata = data.transpose()
for data in Tdata:
Expand All @@ -317,7 +332,7 @@ def data_info_display(output_table_meta):
if schema.get("label_name"):
anonymous_info[schema.get("label_name")] = schema.get("anonymous_label")
attribute_info[schema.get("label_name")] = "label"
if schema.get("meta").get("id_list"):
if schema.get("meta", {}).get("id_list"):
for id_name in schema.get("meta").get("id_list"):
if id_name in attribute_info:
attribute_info[id_name] = "match_id"
Expand Down
93 changes: 49 additions & 44 deletions python/fate_flow/components/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,54 +287,59 @@ def get_count(input_file):
count += 1
return count

def kv_generator(self, input_feature_count, fp, job_id, part_of_data):
fate_uuid = uuid.uuid1().hex
get_line = self.get_line()
line_index = 0
LOGGER.info(input_feature_count)
while True:
lines = fp.readlines(JobDefaultConfig.upload_block_max_bytes)
LOGGER.info(JobDefaultConfig.upload_block_max_bytes)
if lines:
for line in lines:
values = line.rstrip().split(self.parameters["id_delimiter"])
k, v = get_line(
values=values,
line_index=line_index,
extend_sid=self.parameters["extend_sid"],
auto_increasing_sid=self.parameters["auto_increasing_sid"],
id_delimiter=self.parameters["id_delimiter"],
fate_uuid=fate_uuid,
)
yield k, v
line_index += 1
if line_index <= 100:
part_of_data.append((k, v))
save_progress = line_index / input_feature_count * 100 // 1
job_info = {
"progress": save_progress,
"job_id": job_id,
"role": self.parameters["local"]["role"],
"party_id": self.parameters["local"]["party_id"],
}
ControllerClient.update_job(job_info=job_info)
else:
return

def update_schema(self, head, fp):
read_status = False
if head is True:
data_head = fp.readline()
self.update_table_schema(data_head)
read_status = True
else:
self.update_table_schema()
return read_status

def upload_file(self, input_file, head, job_id=None, input_feature_count=None, table=None):
if not table:
table = self.table
with open(input_file, "r") as fin:
lines_count = 0
if head is True:
data_head = fin.readline()
part_of_data = []
with open(input_file, "r") as fp:
if self.update_schema(head, fp):
input_feature_count -= 1
self.update_table_schema(data_head)
else:
self.update_table_schema()
n = 0
fate_uuid = uuid.uuid1().hex
get_line = self.get_line()
line_index = 0
while True:
data = list()
lines = fin.readlines(JobDefaultConfig.upload_block_max_bytes)
LOGGER.info(JobDefaultConfig.upload_block_max_bytes)
if lines:
# self.append_data_line(lines, data, n)
for line in lines:
values = line.rstrip().split(self.parameters["id_delimiter"])
k, v = get_line(
values=values,
line_index=line_index,
extend_sid=self.parameters["extend_sid"],
auto_increasing_sid=self.parameters["auto_increasing_sid"],
id_delimiter=self.parameters["id_delimiter"],
fate_uuid=fate_uuid,
)
data.append((k, v))
line_index += 1
lines_count += len(data)
save_progress = lines_count / input_feature_count * 100 // 1
job_info = {
"progress": save_progress,
"job_id": job_id,
"role": self.parameters["local"]["role"],
"party_id": self.parameters["local"]["party_id"],
}
ControllerClient.update_job(job_info=job_info)
table.put_all(data)
if n == 0:
table.meta.update_metas(part_of_data=data)
else:
return
n += 1
self.table.put_all(self.kv_generator(input_feature_count, fp, job_id, part_of_data))
table.meta.update_metas(part_of_data=part_of_data)

def get_computing_table(self, name, namespace, schema=None):
storage_table_meta = storage.StorageTableMeta(name=name, namespace=namespace)
Expand Down
9 changes: 8 additions & 1 deletion python/fate_flow/db/service_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,14 @@ def save(cls, service_config):
cls.parameter_check(server_info)
api_info = server_info.pop("api", {})
for service_name, info in api_info.items():
ServiceRegistry.save_service_info(server_name, service_name, uri=info.get('uri'), method=info.get('method', 'POST'), server_info=server_info)
ServiceRegistry.save_service_info(
server_name, service_name, uri=info.get('uri'),
method=info.get('method', 'POST'),
server_info=server_info,
data=info.get("data", {}),
headers=info.get("headers", {}),
params=info.get("params", {})
)
cls.save_server_info_to_db(server_name, server_info.get("host"), server_info.get("port"), protocol="http")
setattr(cls, server_name.upper(), server_info)
return update_server
Expand Down
2 changes: 1 addition & 1 deletion python/fate_flow/external/storage/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def save(self):
self._con.commit()
sql = None
LOGGER.info(f"save data count:{count}")
if count > 0:
if count > 0 and sql:
sql = ",".join(sql.split(",")[:-1]) + ";"
self._cur.execute(sql)
self._con.commit()
Expand Down
8 changes: 6 additions & 2 deletions python/fate_flow/hook/api/site_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ def signature(parm: SignatureParameters) -> SignatureReturn:
if not service_list:
raise Exception(f"signature error: no found server {HOOK_SERVER_NAME} service signature")
service = service_list[0]
data = service.f_data if service.f_data else {}
data.update(parm.to_dict())
response = getattr(requests, service.f_method.lower(), None)(
url=service.f_url,
json=parm.to_dict()
json=data
)
if response.status_code == 200:
if response.json().get("code") == 0:
Expand All @@ -37,9 +39,11 @@ def authentication(parm: AuthenticationParameters) -> AuthenticationReturn:
raise Exception(
f"site authentication error: no found server {HOOK_SERVER_NAME} service site_authentication")
service = service_list[0]
data = service.f_data if service.f_data else {}
data.update(parm.to_dict())
response = getattr(requests, service.f_method.lower(), None)(
url=service.f_url,
json=parm.to_dict()
json=data
)
if response.status_code != 200:
raise Exception(
Expand Down
6 changes: 3 additions & 3 deletions python/fate_flow/hook/flow/site_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import MD5
from Crypto.Hash import SHA256

from fate_flow.db.key_manager import RsaKeyManager
from fate_flow.entity import RetCode
Expand All @@ -19,7 +19,7 @@ def signature(parm: SignatureParameters) -> SignatureReturn:
private_key = RsaKeyManager.get_key(parm.party_id, key_name=SiteKeyName.PRIVATE.value)
if not private_key:
raise Exception(f"signature error: no found party id {parm.party_id} private key")
sign= PKCS1_v1_5.new(RSA.importKey(private_key)).sign(MD5.new(json.dumps(parm.body).encode()))
sign = PKCS1_v1_5.new(RSA.importKey(private_key)).sign(SHA256.new(json.dumps(parm.body).encode()))
return SignatureReturn(site_signature=base64.b64encode(sign).decode())


Expand All @@ -30,7 +30,7 @@ def authentication(parm: AuthenticationParameters) -> AuthenticationReturn:
if not public_key:
raise Exception(f"signature error: no found party id {party_id} public key")
verifier = PKCS1_v1_5.new(RSA.importKey(public_key))
if verifier.verify(MD5.new(json.dumps(parm.body).encode()), base64.b64decode(parm.site_signature)) is True:
if verifier.verify(SHA256.new(json.dumps(parm.body).encode()), base64.b64decode(parm.site_signature)) is True:
return AuthenticationReturn()
else:
return AuthenticationReturn(code=RetCode.AUTHENTICATION_ERROR, message="authentication failed")
Loading

0 comments on commit 55a5b80

Please sign in to comment.