Source code for ntnx_iam_py_client.api_client

# coding: utf-8

"""
IGNORE:
    Nutanix Iam Versioned APIs

    iam desc placeholder  # noqa: E501

    OpenAPI spec version: 4.0.1-alpha-1
    
    Generated by: https://github.com/swagger-api/swagger-codegen.git
IGNORE
"""
from __future__ import absolute_import

import datetime
import atexit
import json
import mimetypes
from multiprocessing.pool import ThreadPool
import os
import re
import tempfile
import sys
import uuid
import logging

# python 2 and python 3 compatibility library
import six
from six.moves.urllib.parse import quote

from ntnx_iam_py_client.configuration import Configuration
import ntnx_iam_py_client.models
from ntnx_iam_py_client import rest

PY2 = sys.version_info[0] < 3

logger = logging.getLogger(__name__)

[docs]class ApiClient(object): """API client to handle the client-server communication, and is invariant across implementations. :param configuration: Configuration object for this client :type configuration: :class:`~ntnx_vmm_py_client.configuration.Configuration`, required """ # noqa: E501 SDK_VERSION = "v4.0.a1" PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types NATIVE_TYPES_MAPPING = { 'int': int, 'long': int if six.PY3 else long, # noqa: F821 'float': float, 'str': str, 'bool': bool, 'date': datetime.date, 'datetime': datetime.datetime, 'object': object, } DUPLICATE_SCHEMA_MAPPING = { # Populated if multiple schemas exist with the same name 'iam.v4.authn.GroupType' : 'GroupType', 'iam.v4.authn.DirectoryServiceSearchEntity' : 'DirectoryServiceSearchEntity', 'OneOfiam.v4.authn.UpdateDirectoryServiceApiResponsedata' : 'UpdateDirectoryServiceApiResponsedata', 'iam.v4.authz.AccessPolicyType' : 'AccessPolicyType', 'iam.v4.authn.GetCertAuthProviderApiResponse' : 'GetCertAuthProviderApiResponse', 'iam.v4.authn.UserConfiguration' : 'UserConfiguration', 'iam.v4.authz.ListRoleApiResponse' : 'ListRoleApiResponse', 'iam.v4.authz.ViewAccessPolicyApiResponse' : 'ViewAccessPolicyApiResponse', 'iam.v4.authn.DirectoryServiceInfoGroup' : 'DirectoryServiceInfoGroup', 'iam.v4.authz.ViewOperationApiResponse' : 'ViewOperationApiResponse', 'iam.v4.authn.GetSamlSpMetadataApiResponse' : 'GetSamlSpMetadataApiResponse', 'iam.v4.authn.UpdateDirectoryServiceApiResponse' : 'UpdateDirectoryServiceApiResponse', 'common.v1.config.Flag' : 'Flag', 'iam.v4.authz.CreateRoleApiResponse' : 'CreateRoleApiResponse', 'OneOfiam.v4.authz.ListRoleApiResponsedata' : 'ListRoleApiResponsedata', 'iam.v4.authz.ListOperationApiResponse' : 'ListOperationApiResponse', 'iam.v4.authn.UpdateSamlIdentityProviderApiResponse' : 'UpdateSamlIdentityProviderApiResponse', 'iam.v4.authn.IdpProperties' : 'IdpProperties', 'iam.v4.authn.CreateSamlIdentityProviderApiResponse' : 'CreateSamlIdentityProviderApiResponse', 'iam.v4.authz.ViewRoleApiResponse' : 'ViewRoleApiResponse', 'iam.v4.authn.DirectoryServiceInfoOu' : 'DirectoryServiceInfoOu', 'OneOfiam.v4.authn.CreateDirectoryServiceApiResponsedata' : 'CreateDirectoryServiceApiResponsedata', 'OneOfiam.v4.authn.GetUserApiResponsedata' : 'GetUserApiResponsedata', 'OneOfiam.v4.authz.ViewRoleApiResponsedata' : 'ViewRoleApiResponsedata', 'iam.v4.authn.ListUserApiResponse' : 'ListUserApiResponse', 'iam.v4.authz.UpdateAccessPolicyApiResponse' : 'UpdateAccessPolicyApiResponse', 'iam.v4.authn.UserStateUpdate' : 'UserStateUpdate', 'iam.v4.authz.AccessPolicy' : 'AccessPolicy', 'iam.v4.authn.GetUserGroupApiResponse' : 'GetUserGroupApiResponse', 'iam.v4.authn.ConnectionDirectoryServiceApiResponse' : 'ConnectionDirectoryServiceApiResponse', 'OneOfiam.v4.authn.ListUserApiResponsedata' : 'ListUserApiResponsedata', 'iam.v4.authn.CreateDirectoryServiceApiResponse' : 'CreateDirectoryServiceApiResponse', 'OneOfiam.v4.authn.UpdateCertAuthProviderApiResponsedata' : 'UpdateCertAuthProviderApiResponsedata', 'OneOfiam.v4.authz.ViewOperationApiResponsedata' : 'ViewOperationApiResponsedata', 'iam.v4.authn.UserType' : 'UserType', 'OneOfiam.v4.authz.CreateAccessPolicyApiResponsedata' : 'CreateAccessPolicyApiResponsedata', 'iam.v4.authn.PasswordChangeRequest' : 'PasswordChangeRequest', 'OneOfiam.v4.authn.CreateCertAuthProviderApiResponsedata' : 'CreateCertAuthProviderApiResponsedata', 'OneOfiam.v4.authn.ActivateUserApiResponsedata' : 'ActivateUserApiResponsedata', 'OneOfiam.v4.authn.GetSamlSpMetadataApiResponsedata' : 'GetSamlSpMetadataApiResponsedata', 'OneOfiam.v4.authn.UpdateUserApiResponsedata' : 'UpdateUserApiResponsedata', 'OneOfiam.v4.authz.DeleteRoleApiResponsedata' : 'DeleteRoleApiResponsedata', 'iam.v4.authn.OpenLdapConfig' : 'OpenLdapConfig', 'OneOfiam.v4.error.ErrorResponseerror' : 'ErrorResponseerror', 'iam.v4.authn.ListDirectoryServiceApiResponse' : 'ListDirectoryServiceApiResponse', 'common.v1.response.ApiResponseMetadata' : 'ApiResponseMetadata', 'iam.v4.common.SortOrderType' : 'SortOrderType', 'iam.v4.authn.ListUserGroupApiResponse' : 'ListUserGroupApiResponse', 'iam.v4.authn.User' : 'User', 'iam.v4.authn.UserGroupConfiguration' : 'UserGroupConfiguration', 'iam.v4.authz.CreateAccessPolicyApiResponse' : 'CreateAccessPolicyApiResponse', 'iam.v4.authn.SearchDirectoryServiceApiResponse' : 'SearchDirectoryServiceApiResponse', 'common.v1.response.ExternalizableAbstractModel' : 'ExternalizableAbstractModel', 'iam.v4.authz.Filter' : 'Filter', 'iam.v4.authn.CreateUserGroupApiResponse' : 'CreateUserGroupApiResponse', 'OneOfiam.v4.authn.GetCertAuthProviderApiResponsedata' : 'GetCertAuthProviderApiResponsedata', 'common.v1.config.Message' : 'Message', 'iam.v4.authz.UpdateRoleApiResponse' : 'UpdateRoleApiResponse', 'OneOfiam.v4.authn.ConnectionDirectoryServiceApiResponsedata' : 'ConnectionDirectoryServiceApiResponsedata', 'iam.v4.authn.UpdateCertAuthProviderApiResponse' : 'UpdateCertAuthProviderApiResponse', 'OneOfiam.v4.authn.ListDirectoryServiceApiResponsedata' : 'ListDirectoryServiceApiResponsedata', 'iam.v4.authn.DirectoryServiceSearchResult' : 'DirectoryServiceSearchResult', 'OneOfiam.v4.authn.GetUserGroupApiResponsedata' : 'GetUserGroupApiResponsedata', 'OneOfcommon.v1.config.KVPairvalue' : 'KVPairvalue', 'OneOfiam.v4.authn.SearchDirectoryServiceApiResponsedata' : 'SearchDirectoryServiceApiResponsedata', 'iam.v4.authz.ListAccessPolicyApiResponse' : 'ListAccessPolicyApiResponse', 'OneOfiam.v4.authn.ResetUserPasswordApiResponsedata' : 'ResetUserPasswordApiResponsedata', 'iam.v4.authn.GetUserApiResponse' : 'GetUserApiResponse', 'iam.v4.authz.Role' : 'Role', 'OneOfiam.v4.authn.CreateUserGroupApiResponsedata' : 'CreateUserGroupApiResponsedata', 'iam.v4.authn.GetSamlIdentityProviderApiResponse' : 'GetSamlIdentityProviderApiResponse', 'iam.v4.authn.GroupSearchType' : 'GroupSearchType', 'OneOfiam.v4.authz.UpdateAccessPolicyApiResponsedata' : 'UpdateAccessPolicyApiResponsedata', 'iam.v4.authn.DirectoryServiceConnectionRequest' : 'DirectoryServiceConnectionRequest', 'OneOfiam.v4.authz.UpdateRoleApiResponsedata' : 'UpdateRoleApiResponsedata', 'OneOfiam.v4.authn.CreateSamlIdentityProviderApiResponsedata' : 'CreateSamlIdentityProviderApiResponsedata', 'common.v1.config.MessageSeverity' : 'MessageSeverity', 'OneOfiam.v4.authz.ViewAccessPolicyApiResponsedata' : 'ViewAccessPolicyApiResponsedata', 'iam.v4.authn.CertAuthProvider' : 'CertAuthProvider', 'iam.v4.authn.DirectoryServiceInfo' : 'DirectoryServiceInfo', 'iam.v4.authn.NameIdPolicyFormat' : 'NameIdPolicyFormat', 'OneOfiam.v4.authn.UpdateSamlIdentityProviderApiResponsedata' : 'UpdateSamlIdentityProviderApiResponsedata', 'iam.v4.authn.ListSamlIdentityProviderApiResponse' : 'ListSamlIdentityProviderApiResponse', 'iam.v4.authz.DeleteAccessPolicyApiResponse' : 'DeleteAccessPolicyApiResponse', 'common.v1.response.ApiLink' : 'ApiLink', 'iam.v4.authn.DirectoryServiceSearchAttribute' : 'DirectoryServiceSearchAttribute', 'iam.v4.error.ErrorResponse' : 'ErrorResponse', 'iam.v4.authn.CreateUserApiResponse' : 'CreateUserApiResponse', 'OneOfiam.v4.authz.ListOperationApiResponsedata' : 'ListOperationApiResponsedata', 'OneOfiam.v4.authn.ListSamlIdentityProviderApiResponsedata' : 'ListSamlIdentityProviderApiResponsedata', 'iam.v4.authn.DsServiceAccount' : 'DsServiceAccount', 'iam.v4.error.SchemaValidationErrorMessage' : 'SchemaValidationErrorMessage', 'OneOfiam.v4.authn.CreateUserApiResponsedata' : 'CreateUserApiResponsedata', 'iam.v4.authn.DirectoryType' : 'DirectoryType', 'iam.v4.authn.DirectoryService' : 'DirectoryService', 'iam.v4.authn.ResetUserPasswordApiResponse' : 'ResetUserPasswordApiResponse', 'iam.v4.authn.ListCertAuthProviderApiResponse' : 'ListCertAuthProviderApiResponse', 'OneOfiam.v4.authn.ListUserGroupApiResponsedata' : 'ListUserGroupApiResponsedata', 'iam.v4.authn.ActivateUserApiResponse' : 'ActivateUserApiResponse', 'iam.v4.authn.UserGroup' : 'UserGroup', 'iam.v4.authn.DirectoryServiceSearchQuery' : 'DirectoryServiceSearchQuery', 'iam.v4.authz.DeleteRoleApiResponse' : 'DeleteRoleApiResponse', 'OneOfiam.v4.authn.GetSamlIdentityProviderApiResponsedata' : 'GetSamlIdentityProviderApiResponsedata', 'iam.v4.authn.UpdateUserApiResponse' : 'UpdateUserApiResponse', 'iam.v4.authz.StatusType' : 'StatusType', 'iam.v4.authn.CertRevocationInfo' : 'CertRevocationInfo', 'iam.v4.authn.PasswordResetRequest' : 'PasswordResetRequest', 'OneOfiam.v4.authn.ChangeUserPasswordApiResponsedata' : 'ChangeUserPasswordApiResponsedata', 'OneOfiam.v4.authn.ListCertAuthProviderApiResponsedata' : 'ListCertAuthProviderApiResponsedata', 'iam.v4.authz.Operation' : 'Operation', 'iam.v4.error.SchemaValidationError' : 'SchemaValidationError', 'OneOfiam.v4.authz.ListAccessPolicyApiResponsedata' : 'ListAccessPolicyApiResponsedata', 'common.v1.config.TenantAwareModel' : 'TenantAwareModel', 'iam.v4.authn.GetDirectoryServiceApiResponse' : 'GetDirectoryServiceApiResponse', 'iam.v4.authn.CreateCertAuthProviderApiResponse' : 'CreateCertAuthProviderApiResponse', 'OneOfiam.v4.authz.DeleteAccessPolicyApiResponsedata' : 'DeleteAccessPolicyApiResponsedata', 'iam.v4.error.AppMessage' : 'AppMessage', 'iam.v4.authn.SamlIdentityProvider' : 'SamlIdentityProvider', 'iam.v4.authn.UserStatusType' : 'UserStatusType', 'OneOfiam.v4.authn.GetDirectoryServiceApiResponsedata' : 'GetDirectoryServiceApiResponsedata', 'OneOfiam.v4.authz.CreateRoleApiResponsedata' : 'CreateRoleApiResponsedata', 'iam.v4.authn.ChangeUserPasswordApiResponse' : 'ChangeUserPasswordApiResponse', 'common.v1.config.KVPair' : 'KVPair' } def __init__(self, configuration=None): if configuration is None: configuration = Configuration() self.configuration = configuration self.__pool = None self.rest_client = rest.RESTClientObject(configuration, max_retry_attempts=configuration.max_retry_attempts, backoff_factor=configuration.backoff_factor) self.__default_headers = {} self.__cookie = None self.__refresh_cookie = True def __close(self): if self.__pool: self.__pool.close() self.__pool.join() self.__pool = None if hasattr(atexit, 'unregister'): atexit.unregister(self.__close) logger.info("Thread pool closed") def __initialize_threadpool(self): if not self.__pool: logger.info("Initializing the thread pool") self.__pool = ThreadPool() atexit.register(self.__close)
[docs] def add_default_header(self, header_name, header_value): """Add a default header to be included in each HTTP request. :param header_name: Name of a default header :type header_name: :class:`str` :param header_value: Value of the provided default header :type header_value: :class:`str` """ if header_name == 'Authorization': self.__cookie = None self.__default_headers[header_name] = header_value
def __get_request_timeout(self, request_timeout): """Gets the Request timeout in seconds. :param request_timeout: A list of integers for the [connect, read] timeouts in milliseconds for an operation :type request_timeout: :class:`list` :return: A final list of integers for the [connect, read] timeouts in milliseconds for an operation :rtype: :class:`list` """ if not request_timeout or not isinstance(request_timeout, list) or len(request_timeout) != 2: request_timeout = [] request_timeout.append(self.__get_valid_timeout(self.configuration.connect_timeout, self.configuration.default_connect_timeout)) request_timeout.append(self.__get_valid_timeout(self.configuration.read_timeout, self.configuration.default_read_timeout)) return request_timeout else: # Handle connect timeout connect_configuration_timeout_in_seconds = self.__get_valid_timeout(self.configuration.connect_timeout, self.configuration.default_connect_timeout) request_timeout[0] = self.__get_valid_timeout(request_timeout[0], connect_configuration_timeout_in_seconds * 1000) # Handle read timeout read_configuration_timeout_in_seconds = self.__get_valid_timeout(self.configuration.read_timeout, self.configuration.default_read_timeout) request_timeout[1] = self.__get_valid_timeout(request_timeout[1], read_configuration_timeout_in_seconds * 1000) return request_timeout def __get_valid_timeout(self, timeout, default_timeout): if not timeout or not isinstance(timeout, (int, float) if six.PY3 else (int, long, float)): # noqa: E501,F821 return default_timeout / 1000 max_timeout_in_milliseconds = 30 * 60 * 1000 # 30 minutes if timeout <= 0: timeout = default_timeout elif timeout > max_timeout_in_milliseconds: timeout = max_timeout_in_milliseconds return timeout / 1000 def __update_cookies(self, response_data): # Set Cookie information to reuse in subsequent requests for a valid response cookies = response_data.getheader('Set-Cookie') if cookies is not None: cookie = '' cookies_list = cookies.split(',') for i in range(0, len(cookies_list)): final_cookie = cookies_list[i].split(';', 1)[0] final_cookie = final_cookie.strip() if '=' in final_cookie else '' if final_cookie: cookie += final_cookie + ';' # Remove trailing ";" if cookie: cookie = cookie[:-1] self.__cookie = cookie self.__refresh_cookie = False logger.debug("Retained cookie: %s", cookie) def __call_api( self, resource_path, method, path_params=None, query_params=None, header_params=None, body=None, post_params=None, files=None, response_type=None, auth_settings=None, _return_http_data_only=None, collection_formats=None, _preload_content=True, _request_timeout=None): config = self.configuration # Add User-Agent header from configuration if config.user_agent: self.add_default_header('User-Agent', config.user_agent) # header parameters header_params = header_params or {} header_params.update(self.__default_headers) # requestID for idempotence functionality if 'NTNX-Request-Id' not in header_params : requestId = str(uuid.uuid4()) header_params['NTNX-Request-Id'] = requestId if body and hasattr(body, "get_reserved") and 'ETag' in body.get_reserved(): header_params['If-Match'] = body.get_reserved()['ETag'] if self.__cookie: header_params['Cookie'] = self.__cookie if "Authorization" in header_params: header_params.pop("Authorization") else : self.__update_params_for_auth(header_params, query_params, auth_settings) if header_params: header_params = self.__sanitize_for_serialization(header_params) header_params = dict(self.__parameters_to_tuples(header_params, collection_formats)) # path parameters if path_params: path_params = self.__sanitize_for_serialization(path_params) path_params = self.__parameters_to_tuples(path_params, collection_formats) for k, v in path_params: # specified safe chars, encode everything resource_path = resource_path.replace( '{%s}' % k, quote(str(v), safe=config.safe_chars_for_path_param) ) # query parameters if query_params: query_params = self.__sanitize_for_serialization(query_params) query_params = self.__parameters_to_tuples(query_params, collection_formats) # post parameters if post_params or files: post_params = self.__prepare_post_parameters(post_params, files) post_params = self.__sanitize_for_serialization(post_params) post_params = self.__parameters_to_tuples(post_params, collection_formats) # body if body: body = self.__sanitize_for_serialization(body) # request url url = config.scheme + '://' + config.host + ':' + str(config.port) + resource_path # perform request and return response response_data = self.request( method, url, query_params=query_params, headers=header_params, post_params=post_params, body=body, _preload_content=_preload_content, _request_timeout=_request_timeout) # Retry one more time for 401 response with basic auth header and no cookie if response_data.status == 401: logger.debug("Retrying for an unauthorized request") self.__refresh_cookie = True if 'Cookie' in header_params: header_params.update(self.__default_headers) header_params.pop('Cookie') self.__update_params_for_auth(header_params, query_params, auth_settings) response_data = self.request( method, url, query_params=query_params, headers=header_params, post_params=post_params, body=body, _preload_content=_preload_content, _request_timeout=_request_timeout) # Raise error if 401 persists if response_data.status == 401: raise ntnx_iam_py_client.rest.ApiException(http_resp=response_data) if self.__refresh_cookie: self.__update_cookies(response_data) self.last_response = response_data return_data = response_data if _preload_content: # deserialize response data if response_data.status != 204: return_data = json.loads(response_data.data) return_data = self.__add_header_to_reserved(response_data, return_data, "ETag") if response_type is None and "$objectType" in return_data: response_type = return_data.get("$objectType") if PY2: inner_response_type = response_type.encode('utf-8', 'ignore') else: inner_response_type = response_type return self.deserialize(return_data, inner_response_type) else: return_data = None if _return_http_data_only: return (return_data) else: return (return_data, response_data.status, response_data.getheaders()) def __sanitize_for_serialization(self, obj): """Builds a JSON POST object. :param obj: The object to sanitize :type obj: :class:`object` :return: The sanitized object with the following behavior:: If obj is None, return None. If obj is str, int, long, float, bool, return directly. If obj is datetime.datetime or datetime.date, convert to string in ISO8601 format. If obj is list, sanitize each element in the list. If obj is dict, return the dict. If obj is a model, return the properties dictionary. :rtype: :class:`object` """ # noqa: E501 if obj is None: return None elif isinstance(obj, self.PRIMITIVE_TYPES): return obj elif isinstance(obj, list): return [self.__sanitize_for_serialization(sub_obj) for sub_obj in obj] elif isinstance(obj, tuple): return tuple(self.__sanitize_for_serialization(sub_obj) for sub_obj in obj) elif isinstance(obj, (datetime.datetime, datetime.date)): return obj.isoformat() if isinstance(obj, dict): obj_dict = obj else: # Convert model obj to dict except # attributes `swagger_types`, `attribute_map` # and attributes which value is not None. # Convert attribute name to json key in # model definition for request. obj_dict = {obj.attribute_map[attr]: getattr(obj, attr) for attr, _ in six.iteritems(obj.swagger_types) if getattr(obj, attr) is not None} return {key: self.__sanitize_for_serialization(val) for key, val in six.iteritems(obj_dict)} def __deserialize_response(self, response, response_type): """Deserializes response into an object based on the provided response type. :param response: A response object to be deserialized :type response: :class:`~ntnx_iam_py_client.rest.RESTResponse`, required :param response_type: A class literal for deserialized object, or string of class name :type response_type: required :return: Deserialized object :rtype: :class:`object` """ # noqa: E501 # handle file downloading # save response body into a tmp file and return the instance if response_type == "file": return self.__deserialize_file(response) # fetch data from response object try: data = json.loads(response.data) except ValueError: data = response.data return self.deserialize(data, response_type)
[docs] def deserialize(self, data, klass, discriminator = None): """Deserializes dict, list, str into an object. :param data: dict, list or str :param klass: class literal, or string of class name :param discriminator: Deprecated: Discriminator to deserialize into specific model object for a oneOf object. This is no longer needed and will be removed from the method definition soon. :type discriminator: :class:`str` :return: Deserialized object :rtype: :class:`object` """ # noqa: E501 if data is None: return None if type(klass) == str: if klass.startswith('list['): sub_kls = re.match(r'list\[(.*)\]', klass).group(1) return [self.deserialize(sub_data, sub_kls) for sub_data in data] if klass.startswith('dict('): sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2) return {str(k): self.deserialize(v, sub_kls) for k, v in six.iteritems(data)} # convert str to class if klass in self.NATIVE_TYPES_MAPPING: klass = self.NATIVE_TYPES_MAPPING[klass] else: klass = self.__getattr(klass) if klass in self.PRIMITIVE_TYPES: return self.__deserialize_primitive(data, klass) elif klass == object: return self.__deserialize_object(data) elif klass == datetime.date: return self.__deserialize_date(data) elif klass == datetime.datetime: return self.__deserialize_datatime(data) else: return self.__deserialize_model(data, klass)
def _call_api(self, resource_path, method, path_params=None, query_params=None, header_params=None, body=None, post_params=None, files=None, response_type=None, auth_settings=None, async_req=None, _return_http_data_only=None, collection_formats=None, _preload_content=True, _request_timeout=None): """Makes the HTTP request (synchronous) and returns deserialized data. To make an asynchronous request, set the async_req to True. :param resource_path: Path to API endpoint :type resource_path: :class:`str`, required :param method: HTTP method name :type method: :class:`str`, required :param path_params: A dictionary of path parameters in the url :type path_params: :class:`dict` :param query_params: A list of tuples denoting query parameters in the url :type query_params: :class:`list` :param header_params: A dictionary of header parameters to be placed in request headers :type header_params: :class:`dict` :param body: Request body :param post_params: Request's form parameters for application/x-www-form-urlencoded or multipart/form-data content type :type post_params: :class:`str` :param files: A dictionary in {key -> filename, value -> filepath} format for multipart/form-data. :type files: :class:`dict` :param response_type: Response data type :type response_type: :class:`str` :param auth_settings: A list of auth settings names for the request. The following values are accepted currently: :: ['basicAuthScheme'] :type auth_settings: :class:`list` :param async_req: A flag to toggle between synchronous/asynchronous connection for HTTP request (**Default** False) :type async_req: :class:`bool` :param _return_http_data_only: A flag to retrurn response data without the status code and response headers :type _return_http_data_only: :class:`bool` :param collection_formats: A dictionary of collection formats for path, query, header, and post parameters :type collection_formats: :class:`dict` :param _preload_content: If False, the urllib3.HTTPResponse object will be returned without reading/decoding response data (**Default** True) :type _preload_content: :class:`bool` :param _request_timeout: Timeout setting for this request. It needs to be a pair (list) of [connect, read] timeouts. If invalid list length or type provided, the timeouts will be derived from the :class:`~ntnx_iam_py_client.configuration.Configuration`. :type _request_timeout: :class:`list` :return: If async_req parameter is True, the request will be called asynchronously and returns the request thread. If async_req is False or missing, then the method will return the deserialized response. """ if not async_req: return self.__call_api(resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout) else: self.__initialize_threadpool() thread = self.__pool.apply_async(self.__call_api, (resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout)) return thread def request(self, method, url, query_params=None, headers=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): client_request_timeout = self.__get_request_timeout(_request_timeout) """Makes the HTTP request using RESTClient.""" if method == "GET": return self.rest_client.GET(url, query_params=query_params, _preload_content=_preload_content, _request_timeout=client_request_timeout, headers=headers) elif method == "HEAD": return self.rest_client.HEAD(url, query_params=query_params, _preload_content=_preload_content, _request_timeout=client_request_timeout, headers=headers) elif method == "OPTIONS": return self.rest_client.OPTIONS(url, query_params=query_params, headers=headers, post_params=post_params, _preload_content=_preload_content, _request_timeout=client_request_timeout, body=body) elif method == "POST": return self.rest_client.POST(url, query_params=query_params, headers=headers, post_params=post_params, _preload_content=_preload_content, _request_timeout=client_request_timeout, body=body) elif method == "PUT": return self.rest_client.PUT(url, query_params=query_params, headers=headers, post_params=post_params, _preload_content=_preload_content, _request_timeout=client_request_timeout, body=body) elif method == "PATCH": return self.rest_client.PATCH(url, query_params=query_params, headers=headers, post_params=post_params, _preload_content=_preload_content, _request_timeout=client_request_timeout, body=body) elif method == "DELETE": return self.rest_client.DELETE(url, query_params=query_params, headers=headers, _preload_content=_preload_content, _request_timeout=client_request_timeout, body=body) else: error_message = "http method must be `GET`, `HEAD`, `OPTIONS`, `POST`, `PATCH`, `PUT` or `DELETE`." logger.error(error_message) raise ValueError(error_message) def __parameters_to_tuples(self, params, collection_formats): """Get parameters as list of tuples, formatting collections. :param params: Parameters as dict or list of two-tuples :type params: :class:`dict` :param collection_formats: Parameter collection formats :type collection_formats: :class:`dict` :returns: Parameters as list of tuples, collections formatted """ # noqa: E501 new_params = [] if collection_formats is None: collection_formats = {} for k, v in six.iteritems(params) if isinstance(params, dict) else params: # noqa: E501 if k in collection_formats: collection_format = collection_formats[k] if collection_format == 'multi': new_params.extend((k, value) for value in v) else: if collection_format == 'ssv': delimiter = ' ' elif collection_format == 'tsv': delimiter = '\t' elif collection_format == 'pipes': delimiter = '|' else: # csv is the default delimiter = ',' new_params.append( (k, delimiter.join(str(value) for value in v))) else: new_params.append((k, v)) return new_params def __prepare_post_parameters(self, post_params=None, files=None): """Builds form parameters. :param post_params: Normal form parameters :param files: File parameters :return: Form parameters with files """ # noqa: E501 params = [] if post_params: params = post_params if files: for k, v in six.iteritems(files): if not v: continue file_names = v if type(v) is list else [v] for n in file_names: with open(n, 'rb') as f: filename = os.path.basename(f.name) filedata = f.read() mimetype = (mimetypes.guess_type(filename)[0] or 'application/octet-stream') params.append( tuple([k, tuple([filename, filedata, mimetype])])) return params def _select_header_accept(self, accepts): """Returns `Accept` based on an array of headers provided. :param accepts: List of headers :type accepts: :class:`list` :return: If application/json is available, returns that. Otherwise, returns a comma separated string of all the provided Accept types """ # noqa: E501 if not accepts: return accepts = [x.lower() for x in accepts] if 'application/json' in accepts: return 'application/json' else: return ', '.join(accepts) def _select_header_content_type(self, content_types): """Returns `Content-Type` based on an array of content_types provided. :param content_types: List of content-types :type content_types: :class:`list` :return: Content-Type (e.g. application/json) """ # noqa: E501 if not content_types: return 'application/json' content_types = [x.lower() for x in content_types] if 'application/json' in content_types or '*/*' in content_types: return 'application/json' else: return content_types[0] def __update_params_for_auth(self, headers, queries, auth_settings): """Updates header and query params based on authentication setting. :param headers: Header parameters dict to be updated. :param queries: Query parameters tuple list to be updated. :param auth_settings: Authentication setting identifiers list. """ # noqa: E501 if not auth_settings: return for auth in auth_settings: auth_setting = self.configuration._auth_settings().get(auth) if auth_setting: headers.pop(auth_setting['key'], None) if not auth_setting['value']: continue elif auth_setting['in'] == 'header': headers[auth_setting['key']] = auth_setting['value'] elif auth_setting['in'] == 'query': queries.append((auth_setting['key'], auth_setting['value'])) else: error_message = 'Authentication token must be in `query` or `header`' logger.error(error_message) raise ValueError(error_message)
[docs] @staticmethod def get_etag(object): """Method to extract ETag from an object if exists The ETag is usually provided in the response of the GET API calls which can further be used in other HTTP operations. :param object: The object from which etag needs to be fetched :type object: :class:`object` :return: ETag header value for the object :rtype: :class:`str` or :class:`None` """ # noqa: E501 etag = None if object: if hasattr(object, "get_reserved") and 'ETag' in object.get_reserved(): etag = ApiClient.get_value_for_case_insensitive_key_match(object.get_reserved(), 'ETag') if not etag and hasattr(object, "data") and hasattr(object.data, "get_reserved") and 'ETag' in object.data.get_reserved(): etag = ApiClient.get_value_for_case_insensitive_key_match(object.data.get_reserved(), 'ETag') return etag
@staticmethod def get_value_for_case_insensitive_key_match(object, key): if object and key: key = key.lower() for k in object: if k.lower() == key: return object[k] def __add_header_to_reserved(self, response_data, return_data, key): if hasattr(response_data, "getheaders") and key in response_data.getheaders(): header_val = response_data.getheader(key) if "data" in return_data.keys(): if isinstance(return_data["data"], list) and len(return_data["data"])>0: for item in return_data["data"]: if isinstance(item, dict) and "$reserved" in item.keys(): item["$reserved"][key] = header_val elif isinstance(return_data["data"], dict) and "$reserved" in return_data["data"].keys(): return_data["data"]["$reserved"][key] = header_val return return_data def __deserialize_file(self, response): """Deserializes body to file Saves response body into a file in a temporary folder, using the filename from the `Content-Disposition` header if provided. :param response: :class:`~ntnx_iam_py_client.rest.RESTResponse` :return: File path """ fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path) os.close(fd) os.remove(path) content_disposition = response.getheader("Content-Disposition") if content_disposition: filename = re.search(r'filename=[\'"]?([^\'"\s]+)[\'"]?', content_disposition).group(1) path = os.path.join(os.path.dirname(path), filename) with open(path, "wb") as f: f.write(response.data) return path def __deserialize_primitive(self, data, klass): """Deserializes string to primitive type. :param data: :type data: :class:`str` :param klass: class literal :return: :class:`int` or :class:`long` or :class:`float` or :class:`str` or :class:`bool` value """ try: return klass(data) except UnicodeEncodeError: return six.text_type(data) except TypeError: return data def __deserialize_object(self, value): """Return a original value. :return: :class:`object` """ return value def __deserialize_date(self, string): """Deserializes string to date. :param string: :type string: str :raises :class:`~ntnx_iam_py_client.rest.ApiException` if provided string is not a valid date :return: date object """ try: from dateutil.parser import parse return parse(string).date() except ImportError: return string except ValueError: raise rest.ApiException( status=0, reason="Failed to parse `{0}` as date object".format(string) ) def __deserialize_datatime(self, string): """Deserializes string to datetime. The string should be in ISO8601 datetime format. :param string: :type string: :class:`str` :raises :class:`~ntnx_iam_py_client.rest.ApiException` if provided string is not a valid datetime :return: :class:`datetime` """ try: from dateutil.parser import parse return parse(string) except ImportError: return string except ValueError: raise rest.ApiException( status=0, reason=( "Failed to parse `{0}` as datetime object" .format(string) ) ) def __hasattr(self, object, name): return name in object.__class__.__dict__ def __getattr(self, name): if name in self.DUPLICATE_SCHEMA_MAPPING: return getattr(ntnx_iam_py_client.models, self.DUPLICATE_SCHEMA_MAPPING[name]) if hasattr(ntnx_iam_py_client.models, name.split('.')[-1]): return getattr(ntnx_iam_py_client.models, name.split('.')[-1]) else: return getattr(ntnx_iam_py_client.models, name) def __deserialize_model(self, data, klass): """Deserializes list or dict to model. :param data: :type data: :class:`dict`, :class:`list` :param klass: class literal :return: Model object """ if not hasattr(klass, 'swagger_types') and not self.__hasattr(klass, 'get_real_child_model'): return data kwargs = {} if klass.swagger_types is not None: for attr, attr_type in six.iteritems(klass.swagger_types): if data is not None and isinstance(data, (list, dict)): value = None if klass.attribute_map[attr] in data: value = data[klass.attribute_map[attr]] data.pop(klass.attribute_map[attr]) elif attr in data: value = data[attr] data.pop(attr) one_of_value = None if attr_type.startswith('OneOf'): if type(value) is list: one_of_value = [] # OneOf of list type can have list of objects or list of primitive types for value_item in value: if type(value_item) is not dict: one_of_value.append(self.deserialize(value_item, type(value_item))) else: one_of_value.append(self.deserialize(value_item, self.__getattr(value_item['$objectType']))) elif type(value) is dict: # OneOf of dict type can be a single response object or a map of primitive types if '$objectType' in value: one_of_value = self.deserialize(value, self.__getattr(value['$objectType'])) else: one_of_value = {} for item_key, item_value in six.iteritems(value): one_of_value[item_key] = self.deserialize(item_value, type(item_value)) else: one_of_value = value kwargs[attr] = self.deserialize(value, attr_type) if one_of_value is None else one_of_value if "_unknown_fields" in klass.swagger_types: if "_unknown_fields" not in kwargs or kwargs["_unknown_fields"] is None: kwargs["_unknown_fields"] = {} for key, value in data.items(): if value is not None: kwargs["_unknown_fields"][key] = value instance = klass(**kwargs) if (isinstance(instance, dict) and klass.swagger_types is not None and isinstance(data, dict)): for key, value in data.items(): if key not in klass.swagger_types: instance[key] = value return instance