forked from oracle-samples/oracle-functions-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunc.py
76 lines (66 loc) · 2.68 KB
/
func.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#
# oci-vault-get-secret-python version 1.0.
#
# Copyright (c) 2020 Oracle, Inc.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
#
import io
import json
import base64
import oci
import logging
import hashlib
from fdk import response
def get_text_secret(secret_ocid):
#decrypted_secret_content = ""
signer = oci.auth.signers.get_resource_principals_signer()
try:
client = oci.secrets.SecretsClient({}, signer=signer)
secret_content = client.get_secret_bundle(secret_ocid).data.secret_bundle_content.content.encode('utf-8')
decrypted_secret_content = base64.b64decode(secret_content).decode("utf-8")
except Exception as ex:
print("ERROR: failed to retrieve the secret content", ex, flush=True)
raise
return {"secret content": decrypted_secret_content}
def get_binary_secret_into_file(secret_ocid, filepath):
#decrypted_secret_content = ""
signer = oci.auth.signers.get_resource_principals_signer()
try:
client = oci.secrets.SecretsClient({}, signer=signer)
secret_content = client.get_secret_bundle(secret_ocid).data.secret_bundle_content.content.encode('utf-8')
except Exception as ex:
print("ERROR: failed to retrieve the secret content", ex, flush=True)
raise
try:
with open(filepath, 'wb') as secretfile:
decrypted_secret_content = base64.decodebytes(secret_content)
secretfile.write(decrypted_secret_content)
except Exception as ex:
print("ERROR: cannot write to file " + filepath, ex, flush=True)
raise
secret_md5 = hashlib.md5(decrypted_secret_content).hexdigest()
return {"secret md5": secret_md5}
def handler(ctx, data: io.BytesIO=None):
logging.getLogger().info("function start")
secret_ocid = secret_type = resp = ""
try:
cfg = dict(ctx.Config())
secret_ocid = cfg["secret_ocid"]
logging.getLogger().info("Secret ocid = " + secret_ocid)
secret_type = cfg["secret_type"]
logging.getLogger().info("Secret type = " + secret_type)
except Exception as e:
print('ERROR: Missing configuration keys, secret ocid and secret_type', e, flush=True)
raise
if secret_type == "text":
resp = get_text_secret(secret_ocid)
elif secret_type == "binary":
resp = get_binary_secret_into_file(secret_ocid, "/tmp/secret")
else:
raise ValueError('the value of the configuration parameter "secret_type" has to be either "text" or "binary"')
logging.getLogger().info("function end")
return response.Response(
ctx,
response_data=resp,
headers={"Content-Type": "application/json"}
)