diff --git a/README_CN.md b/README_CN.md index 051a4041..ed5e314a 100755 --- a/README_CN.md +++ b/README_CN.md @@ -975,97 +975,6 @@ res.log_print(); 通过`delete_external_store`删除一个快速查询. - - -## 投递管理 -投递的配置一般称为Job, 包含了投递的具体配置以及调度日程安排. 而某一个具体时间的运行实例称为Task. - -- 获取配置列表 - - 通过`list_shipper`获取投递配置的列表 - -- 创建配置 - - 通过`create_shipper`创建一个投递配置. 可以传入一个json字符串(如下)或者对应的dict对象(如get_shipper获得的对象结果),如下: - -OSS以CSV格式投递: -```json -{ - "shipperName": "oss-ship-test1", - "targetConfiguration": { - "bufferInterval": 300, - "bufferSize": 256, - "compressType": "none", - "enable": true, - "ossBucket": "bucket1", - "ossPrefix": "bucket_folder1", - "pathFormat": "%Y/%m/%d/%H/%M", - "roleArn": "acs:ram::1234:role/aliyunlogdefaultrole", - "storage": { - "detail": { - "columns": [ - "k3", - "k4" - ], - "delimiter": "|", - "header": false, - "nullIdentifier": "", - "quote": "\"" - }, - "format": "csv" - } - }, - "targetType": "oss" -} -``` - -OSS以JSON格式投递(注意storage.detail.columns为空是必须的,这个和get_shipper有所差别): -```json -{ - "shipperName": "oss-ship-test1", - "targetConfiguration": { - "bufferInterval": 300, - "bufferSize": 256, - "compressType": "snappy", - "enable": true, - "ossBucket": "bucket1", - "ossPrefix": "bucket_folder1", - "pathFormat": "%Y/%m/%d/%H/%M", - "roleArn": "acs:ram::1234:role/aliyunlogdefaultrole", - "storage": { - "detail": { - "enableTag": true, - "columns": [ - ] - }, - "format": "json" - } - }, - "targetType": "oss" -} - -``` - -- 获取配置 - - 通过`get_shipper`获取一个投递配置的具体信息. - -- 更新配置 - - 通过`update_shipper`更新一个投递配置. - -- 删除配置 - - 通过`delete_shipper`删除一个投递配置. - -- 获取运行实例列表 - - 通过`get_shipper_tasks`获取投递运行实例. - -- 重试运行实例 - - 通过`retry_shipper_tasks`重试某一个运行实例. - ## Elasticsearch 数据迁移 用于将 Elasticsearch 中的数据迁移至日志服务。 diff --git a/aliyun/log/__init__.py b/aliyun/log/__init__.py index 1b090013..1ec9d739 100755 --- a/aliyun/log/__init__.py +++ b/aliyun/log/__init__.py @@ -9,7 +9,6 @@ from .logtail_config_detail import * from .machine_group_detail import MachineGroupDetail from .putlogsrequest import PutLogsRequest -from .shipper_config import ShipperTask, OssShipperConfig, OdpsShipperConfig from .version import __version__ from .logitem import LogItem from .consumer_group_request import * @@ -30,7 +29,6 @@ from .project_response import * from .pulllog_response import PullLogResponse from .shard_response import * -from .shipper_response import * from .common_response import * from .external_store_config_response import * from .proto import LogGroupRaw as LogGroup diff --git a/aliyun/log/logclient.py b/aliyun/log/logclient.py index 48c7d326..bd0086ea 100644 --- a/aliyun/log/logclient.py +++ b/aliyun/log/logclient.py @@ -6,13 +6,8 @@ :Author: Aliyun """ -import json import requests -import six import time -import zlib -from datetime import datetime -import logging from .credentials import StaticCredentialsProvider from .scheduled_sql import ScheduledSQLConfiguration from .scheduled_sql_response import * @@ -32,7 +27,6 @@ from .listtopicsresponse import ListTopicsResponse from .logclient_operator import copy_project, list_more, query_more, pull_log_dump, copy_logstore, copy_data, \ get_resource_usage, arrange_shard, transform_data -from .logexception import LogException from .logstore_config_response import * from .substore_config_response import * from .logtail_config_response import * @@ -42,19 +36,16 @@ from .pulllog_response import PullLogResponse from .putlogsresponse import PutLogsResponse from .shard_response import * -from .shipper_response import * from .resource_response import * from .resource_params import * from .tag_response import GetResourceTagsResponse from .topostore_response import * from .topostore_params import * -from .util import base64_encodestring as b64e -from .util import base64_encodestring as e64, base64_decodestring as d64, Util +from .util import base64_encodestring as e64, base64_decodestring as d64 from .version import API_VERSION, USER_AGENT from .proto import LogGroupRaw as LogGroup from .external_store_config_response import * -import struct from .logresponse import LogResponse from copy import copy from .pluralize import pluralize @@ -2258,85 +2249,6 @@ def get_config_applied_machine_groups(self, project_name, config_name): (resp, header) = self._send("GET", project_name, None, resource, params, headers) return GetConfigAppliedMachineGroupsResponse(resp, header) - def get_shipper_tasks(self, project_name, logstore_name, shipper_name, start_time, end_time, status_type='', - offset=0, size=100): - """ get odps/oss shipper tasks in a certain time range - Unsuccessful operation will cause an LogException. - - :type project_name: string - :param project_name: the Project name - - :type logstore_name: string - :param logstore_name: the logstore name - - :type shipper_name: string - :param shipper_name: the shipper name - - :type start_time: int - :param start_time: the start timestamp - - :type end_time: int - :param end_time: the end timestamp - - :type status_type: string - :param status_type: support one of ['', 'fail', 'success', 'running'] , if the status_type = '' , return all kinds of status type - - :type offset: int - :param offset: the begin task offset - - :type size: int - :param size: the needed tasks count, -1 means all - - :return: GetShipperTasksResponse - - :raise: LogException - """ - # need to use extended method to get more - if int(size) == -1 or int(size) > MAX_LIST_PAGING_SIZE: - return list_more(self.get_shipper_tasks, int(offset), int(size), MAX_LIST_PAGING_SIZE, - project_name, logstore_name, shipper_name, start_time, end_time, status_type) - - headers = {} - params = {"from": str(int(start_time)), - "to": str(int(end_time)), - "status": status_type, - "offset": str(int(offset)), - "size": str(int(size))} - - resource = "/logstores/" + logstore_name + "/shipper/" + shipper_name + "/tasks" - (resp, header) = self._send("GET", project_name, None, resource, params, headers) - return GetShipperTasksResponse(resp, header) - - def retry_shipper_tasks(self, project_name, logstore_name, shipper_name, task_list): - """ retry failed tasks , only the failed task can be retried - Unsuccessful operation will cause an LogException. - - :type project_name: string - :param project_name: the Project name - - :type logstore_name: string - :param logstore_name: the logstore name - - :type shipper_name: string - :param shipper_name: the shipper name - - :type task_list: string array - :param task_list: the failed task_id list, e.g ['failed_task_id_1', 'failed_task_id_2',...], currently the max retry task count 10 every time - - :return: RetryShipperTasksResponse - - :raise: LogException - """ - headers = {} - params = {} - body = six.b(json.dumps(task_list)) - headers['Content-Type'] = 'application/json' - headers['x-log-bodyrawsize'] = str(len(body)) - resource = "/logstores/" + logstore_name + "/shipper/" + shipper_name + "/tasks" - - (resp, header) = self._send("PUT", project_name, body, resource, params, headers) - return RetryShipperTasksResponse(header, resp) - def check_upsert_scheduled_sql(self, project_name, scheduled_sql, method, resource): config = scheduled_sql.getConfiguration() if not isinstance(config, ScheduledSQLConfiguration): @@ -5667,161 +5579,6 @@ def create_savedsearch(self, project, detail): (resp, header) = self._send("POST", project, body_str, resource, params, headers) return CreateEntityResponse(header, resp) - def list_shipper(self, project, logstore, offset=0, size=100): - """list shippers - Unsuccessful opertaion will cause an LogException. - - :type project: string - :param project: the project name - - :type logstore: string - :param logstore: the logstore name - - :type offset: int - :param offset: the offset of all the matched shippers - - :type size: int - :param size: the max return shipper count, -1 means all - - :return: ListEntityResponse - - :raise: LogException - """ - if int(size) == -1 or int(size) > MAX_LIST_PAGING_SIZE: - return list_more(self.list_shipper, int(offset), int(size), MAX_LIST_PAGING_SIZE, - project, logstore) - - headers = {} - params = {} - params["offset"] = str(offset) - params["size"] = str(size) - resource = "/logstores/" + logstore + "/shipper" - (resp, header) = self._send("GET", project, None, resource, params, headers) - return ListEntityResponse(header, resp, resource_name="shipper", entities_key="shipper") - - def get_shipper(self, project, logstore, entity): - """get shipper - Unsuccessful opertaion will cause an LogException. - - :type project: string - :param project: the project name - - :type logstore: string - :param logstore: the logstore name - - :type entity: string - :param entity: the shipper name - - :return: GetEntityResponse - - :raise: LogException - """ - headers = {} - params = {} - resource = "/logstores/" + logstore + "/shipper/" + entity - (resp, header) = self._send("GET", project, None, resource, params, headers) - return GetEntityResponse(header, resp) - - def delete_shipper(self, project, logstore, entity): - """delete shipper - Unsuccessful opertaion will cause an LogException. - - :type project: string - :param project: the project name - - :type logstore: string - :param logstore: the logstore name - - :type entity: string - :param entity: the shipper name - - :return: DeleteEntityResponse - - :raise: LogException - """ - headers = {} - params = {} - resource = "/logstores/" + logstore + "/shipper/" + entity - (resp, header) = self._send("DELETE", project, None, resource, params, headers) - return DeleteEntityResponse(header, resp) - - def update_shipper(self, project, logstore, detail): - """update shipper - Unsuccessful opertaion will cause an LogException. - - :type project: string - :param project: the project name - - :type logstore: string - :param logstore: the logstore name - - :type detail: dict/string - :param detail: json string of shipper config details - - :return: UpdateEntityResponse - - :raise: LogException - """ - params = {} - headers = {} - shipper_name = None - if hasattr(detail, "to_json"): - detail = detail.to_json() - body_str = six.b(json.dumps(detail)) - shipper_name = detail.get("shipperName" or "name", "") - elif isinstance(detail, six.binary_type): - body_str = detail - elif isinstance(detail, six.text_type): - body_str = detail.encode("utf8") - else: - body_str = six.b(json.dumps(detail)) - shipper_name = detail.get("shipperName" or "name", "") - - if shipper_name is None: - shipper_name = json.loads(body_str).get("shipperName", "") - - assert shipper_name, LogException( - "InvalidParameter", - 'unknown shipper name in "{0}"'.format(detail), - ) - headers["Content-Type"] = "application/json" - headers["x-log-bodyrawsize"] = str(len(body_str)) - resource = "/logstores/" + logstore + "/shipper/" + shipper_name - (resp, headers) = self._send("PUT", project, body_str, resource, params, headers) - return UpdateEntityResponse(headers, resp) - - def create_shipper(self, project, logstore, detail): - """create shipper - Unsuccessful opertaion will cause an LogException. - - :type project: string - :param project: the project name - - :type logstore: string - :param logstore: the logstore name - - :type detail: dict/string - :param detail: json string of shipper config details - - :return: CreateEntityResponse - - :raise: LogException - """ - params = {} - headers = {"x-log-bodyrawsize": "0", "Content-Type": "application/json"} - if hasattr(detail, "to_json"): - detail = detail.to_json() - body_str = six.b(json.dumps(detail)) - elif isinstance(detail, six.binary_type): - body_str = detail - elif isinstance(detail, six.text_type): - body_str = detail.encode("utf8") - else: - body_str = six.b(json.dumps(detail)) - resource = "/logstores/" + logstore + "/shipper" - (resp, header) = self._send("POST", project, body_str, resource, params, headers) - return CreateEntityResponse(header, resp) - def create_rebuild_index(self, project, logstore, job_name, display_name, from_time, to_time): """Create a job that rebuild index for a logstore type: (string, string, string, string, int, int) -> CreateRebuildIndexResponse @@ -5893,4 +5650,3 @@ def get_rebuild_index(self, project, job_name): # make_lcrud_methods(LogClient, 'dashboard', name_field='dashboardName') # make_lcrud_methods(LogClient, 'alert', name_field='name', root_resource='/jobs', entities_key='results', job_type="Alert") # make_lcrud_methods(LogClient, 'savedsearch', name_field='savedsearchName') -# make_lcrud_methods(LogClient, 'shipper', logstore_level=True, root_resource='/shipper', name_field='shipperName', raw_resource_name='shipper') diff --git a/aliyun/log/shipper_config.py b/aliyun/log/shipper_config.py deleted file mode 100755 index 5c8cb232..00000000 --- a/aliyun/log/shipper_config.py +++ /dev/null @@ -1,124 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 - -# Copyright (C) Alibaba Cloud Computing -# All rights reserved. - - -class OdpsShipperConfig(object): - """ Odps shipper config - - :type odps_endpoint: string - :param odps_endpoint: the odps endpoint - - :type odps_project: string - :param odps_project: the odps project name - - :type odps_table: string - :param odps_table: the odps table name - - :type log_fields_list: string array - :param log_fields_list: the log field(keys in log) list mapping to the odps table column. e.g log_fields_list=['__time__', 'key_a', 'key_b'], the $log_time, $log_key_a, $log_key_b will mapping to odps table column No.1, No.2, No.3 - - :type partition_column: string array - :param partition_column: the log fields mapping to odps table partition column - - :type partition_time_format: string - :param partition_time_format: the time format of __partition_time__, e.g yyyy_MM_dd_HH_mm - - """ - - def __init__(self, odps_endpoint, odps_project, odps_table, log_fields_list, partition_column, - partition_time_format, bufferInterval=1800): - self.odps_endpoint = odps_endpoint - self.odps_project = odps_project - self.odps_table = odps_table - self.log_fields_list = log_fields_list - self.partition_column = partition_column - self.partition_time_format = partition_time_format - self.buffer_interval = bufferInterval - - def to_json(self): - json_value = {"odpsEndpoint": self.odps_endpoint, "odpsProject": self.odps_project, - "odpsTable": self.odps_table, "fields": self.log_fields_list, - "partitionColumn": self.partition_column, "partitionTimeFormat": self.partition_time_format, - "bufferInterval": self.buffer_interval} - return json_value - - -class OssShipperConfig(object): - """A oss ship config - - :type oss_bucket: string - :param oss_bucket: the oss bucket name - - :type oss_prefix: string - :param oss_prefix: the the prefix path where to save the log - - :type oss_role_arn: string - :param oss_role_arn: the ram arn used to get the temporary write permission to the oss bucket - - :type buffer_interval: int - :param buffer_interval: the time(seconds) to buffer before save to oss - - :type buffer_mb: int - :param buffer_mb: the data size(MB) to buffer before save to oss - - :type compress_type: string - :param compress_type: the compress type, only support 'snappy' or 'none' - """ - - def __init__(self, oss_bucket, oss_prefix, oss_role_arn, buffer_interval=300, buffer_mb=128, - compress_type='snappy'): - self.oss_bucket = oss_bucket - self.oss_prefix = oss_prefix - self.oss_role_arn = oss_role_arn - self.buffer_interval = buffer_interval - self.buffer_mb = buffer_mb - self.compress_type = compress_type - - def to_json(self): - json_value = {'ossBucket': self.oss_bucket, 'ossPrefix': self.oss_prefix, 'roleArn': self.oss_role_arn, - 'bufferInterval': self.buffer_interval, 'bufferSize': self.buffer_mb, - 'compressType': self.compress_type} - print(json_value) - return json_value - - -class ShipperTask(object): - """A shipper task - - :type task_id: string - :param task_id: the task id - - :type task_status: string - :param task_status: one of ['success', 'running', 'fail'] - - :type task_message: string - :param task_message: the error message of task_status is 'fail' - - :type task_create_time: int - :param task_create_time: the task create time (timestamp from 1970.1.1) - - :type task_last_data_receive_time: int - :param task_last_data_receive_time: last log data receive time (timestamp) - - :type task_finish_time: int - :param task_finish_time: the task finish time (timestamp) - """ - - def __init__(self, task_id, task_status, task_message, task_create_time, task_last_data_receive_time, - task_finish_time): - self.task_id = task_id - self.task_status = task_status - self.task_message = task_message - self.task_create_time = task_create_time - self.task_last_data_receive_time = task_last_data_receive_time - self.task_finish_time = task_finish_time - - def to_json(self): - json_value = {'id': self.task_id, 'taskStatus': self.task_status, 'taskMessage': self.task_message, - 'taskCreateTime': self.task_create_time, - 'taskLastDataReceiveTime': self.task_last_data_receive_time, - 'taskFinishTime': self.task_finish_time} - return json_value diff --git a/aliyun/log/shipper_response.py b/aliyun/log/shipper_response.py deleted file mode 100755 index dd1dd7bf..00000000 --- a/aliyun/log/shipper_response.py +++ /dev/null @@ -1,270 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 - -# Copyright (C) Alibaba Cloud Computing -# All rights reserved. - -__all__ = ['CreateShipperResponse', 'UpdateShipperResponse', 'DeleteShipperResponse', - 'GetShipperConfigResponse', 'ListShipperResponse', 'GetShipperTasksResponse', - 'RetryShipperTasksResponse'] - - -from .logresponse import LogResponse -from .shipper_config import OdpsShipperConfig -from .shipper_config import OssShipperConfig -from .shipper_config import ShipperTask - - -class CreateShipperResponse(LogResponse): - def __init__(self, header, resp=''): - LogResponse.__init__(self, header, resp) - - def log_print(self): - print('CreateShipperResponse:') - print('headers:', self.get_all_headers()) - - -class UpdateShipperResponse(LogResponse): - def __init__(self, header, resp=''): - LogResponse.__init__(self, header, resp) - - def log_print(self): - print('UpdateShipperResponse:') - print('headers:', self.get_all_headers()) - - -class DeleteShipperResponse(LogResponse): - def __init__(self, header, resp=''): - LogResponse.__init__(self, header, resp) - - def log_print(self): - print('DeleteShipperResponse:') - print('headers:', self.get_all_headers()) - - -class GetShipperConfigResponse(LogResponse): - def __init__(self, resp, header): - LogResponse.__init__(self, header, resp) - self.create_time = resp['createTime'] - self.last_modify_time = resp['lastModifyTime'] - self.type = resp['targetType'] - target_config = resp['targetConfiguration'] - if self.type == 'odps': - self.config = OdpsShipperConfig(target_config["odpsEndpoint"], - target_config["odpsProject"], - target_config["odpsTable"], - target_config["fields"], - target_config["partitionColumn"], - target_config["partitionTimeFormat"], - target_config["bufferInterval"]) - elif self.type == 'oss': - self.config = OssShipperConfig(target_config['ossBucket'], - target_config['ossPrefix'], - target_config['roleArn'], - target_config['bufferInterval'], - target_config['bufferSize'], - target_config['compressType']) - - def get_config(self): - """ - - :return: - """ - return self.config - - def get_create_time(self): - """ - - :return: - """ - return self.create_time - - def get_last_modify_time(self): - """ - - :return: - """ - return self.last_modify_time - - def log_print(self): - """ - - :return: - """ - print('GetShipperConfigResponse:') - print('type:' + self.type) - print('config:' + str(self.config.to_json())) - - -class ListShipperResponse(LogResponse): - def __init__(self, resp, header): - LogResponse.__init__(self, header, resp) - self.count = resp['count'] - self.total = resp['total'] - self.shipper_names = resp['shipper'] - - def get_shipper_count(self): - """ - - :return: - """ - return self.count - - def get_shipper_total(self): - """ - - :return: - """ - return self.total - - def get_shipper_names(self): - """ - - :return: - """ - return self.shipper_names - - def log_print(self): - """ - - :return: - """ - print('ListShipperResponse:') - print('shipper count : ' + str(self.count)) - print('shipper total : ' + str(self.total)) - print('shipper names : ' + str(self.shipper_names)) - - -class GetShipperTasksResponse(LogResponse): - def __init__(self, resp, header): - LogResponse.__init__(self, header, resp) - self.count = resp['count'] - self.total = resp['total'] - self.running_count = resp['statistics']['running'] - self.success_count = resp['statistics']['success'] - self.fail_count = resp['statistics']['fail'] - self.tasks = [] - for task_res in resp['tasks']: - task = ShipperTask(task_res['id'], task_res['taskStatus'], task_res['taskMessage'], - task_res['taskCreateTime'], - task_res['taskLastDataReceiveTime'], task_res['taskFinishTime']) - self.tasks.append(task) - - def get_task_count(self): - """ - - :return: - """ - return self.count - - def get_count(self): - return self.count - - def get_task_total(self): - """ - - :return: - """ - return self.total - - def get_total(self): - return self.total - - def get_running_task_count(self): - """ - - :return: - """ - return self.running_count - - def get_success_task_count(self): - """ - - :return: - """ - return self.success_count - - def get_fail_task_count(self): - """ - - :return: - """ - return self.fail_count - - def _get_task_ids(self, status): - task_ids = [] - for task in self.tasks: - if task.task_status == status: - task_ids.append(task.task_id) - return task_ids - - def get_fail_task_ids(self): - """ - - :return: - """ - return self._get_task_ids("fail") - - def get_running_task_ids(self): - """ - - :return: - """ - return self._get_task_ids("running") - - def get_success_task_ids(self): - """ - - :return: - """ - return self._get_task_ids("success") - - def get_tasks(self): - """ - - :return: - """ - return self.tasks - - def log_print(self): - """ - - :return: - """ - print('GetShipperTasksResponse:') - print('ship count : ' + str(self.count)) - print('ship total : ' + str(self.total)) - print('ship running_count : ' + str(self.running_count)) - print('ship success_count : ' + str(self.success_count)) - print('ship fail_count : ' + str(self.fail_count)) - print('ship taks : ') - for task in self.tasks: - print(str(task.to_json())) - - def merge(self, response): - if not isinstance(response, GetShipperTasksResponse): - raise ValueError("passed response is not a GetShipperTasksResponse: " + str(type(response))) - - self.count += response.get_count() - self.total = response.get_total() # use the latest total count - self.running_count += response.get_running_task_count() - self.success_count += response.get_success_task_count() - self.fail_count += response.get_fail_task_count() - self.tasks.extend(response.get_tasks()) - - # update body - self.body['count'] = self.count - self.body['total'] = self.total - self.body['statistics']['running'] = self.running_count - self.body['statistics']['success'] = self.success_count - self.body['statistics']['fail'] = self.fail_count - - return self - - -class RetryShipperTasksResponse(LogResponse): - def __init__(self, header, resp=''): - LogResponse.__init__(self, header, resp) - - def log_print(self,): - print('RetryShipperTasksResponse') diff --git a/doc/source/api.rst b/doc/source/api.rst index 114c9b00..00d0f87b 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -45,9 +45,6 @@ Request and Config Class SyslogConfigDetail MachineGroupDetail IndexConfig - OssShipperConfig - OdpsShipperConfig - ShipperTask Response Class @@ -112,16 +109,6 @@ Response Class GetMachineGroupAppliedConfigResponse GetConfigAppliedMachineGroupsResponse -.. py:currentmodule:: aliyun.log -.. autosummary:: - CreateShipperResponse - UpdateShipperResponse - DeleteShipperResponse - GetShipperConfigResponse - ListShipperResponse - GetShipperTasksResponse - RetryShipperTasksResponse - .. py:currentmodule:: aliyun.log .. autosummary:: ConsumerGroupEntity @@ -285,18 +272,6 @@ Alert delete_alert -Shipper ----------- -.. autosummary:: - create_shipper - update_shipper - delete_shipper - get_shipper - list_shipper - get_shipper_tasks - retry_shipper_tasks - - Definitions ------------- @@ -321,7 +296,6 @@ Definitions .. autoclass:: SyslogConfigDetail .. autoclass:: MachineGroupDetail .. autoclass:: PutLogsRequest -.. autoclass:: ShipperTask .. autoclass:: LogResponse :members: @@ -403,11 +377,6 @@ Definitions .. autoclass:: GetConfigAppliedMachineGroupsResponse :members: -.. autoclass:: GetShipperTasksResponse - :members: -.. autoclass:: RetryShipperTasksResponse - :members: - .. autoclass:: ConsumerGroupEntity :members: .. autoclass:: CreateConsumerGroupResponse diff --git a/tests/sample_shipper.py b/tests/sample_shipper.py deleted file mode 100644 index f128f743..00000000 --- a/tests/sample_shipper.py +++ /dev/null @@ -1,41 +0,0 @@ -from aliyun.log import LogClient - - -# TODO: change me -project = "test-project" -logstore = "test-logstore" -role = "acs:ram::00000000:role/aliyunlogdefaultrole" - -bucket = "test-bucket" -prefix = "test-data" - -client = LogClient( - endpoint='cn-chengdu.log.aliyuncs.com', - accessKeyId='***', - accessKey='***', -) - -shipper = { - "shipperName": "test-shipper", - "targetConfiguration": { - "bufferInterval": 300, - "bufferSize": 32, - "compressType": "none", - "enable": True, - "ossBucket": bucket, - "ossPrefix": prefix, - "pathFormat": "%Y/%m/%d/%H/%M", - "roleArn": role, - "storage": { - "detail": { - "enableTag": False - }, - "format": "json" - }, - "timeZone": "" - }, - "targetType": "oss" -} - -resp = client.create_shipper(project, logstore, shipper) -resp.log_print()