# coding: utf-8
"""
IGNORE:
Nutanix Iam Versioned APIs
iam desc placeholder # noqa: E501
OpenAPI spec version: 4.0.1-alpha-1
Generated by: https://github.com/swagger-api/swagger-codegen.git
IGNORE
"""
from __future__ import absolute_import
import datetime
import atexit
import json
import mimetypes
from multiprocessing.pool import ThreadPool
import os
import re
import tempfile
import sys
import uuid
import logging
# python 2 and python 3 compatibility library
import six
from six.moves.urllib.parse import quote
from ntnx_iam_py_client.configuration import Configuration
import ntnx_iam_py_client.models
from ntnx_iam_py_client import rest
PY2 = sys.version_info[0] < 3
logger = logging.getLogger(__name__)
[docs]class ApiClient(object):
"""API client to handle the client-server communication, and is invariant across implementations.
:param configuration: Configuration object for this client
:type configuration: :class:`~ntnx_vmm_py_client.configuration.Configuration`, required
""" # noqa: E501
SDK_VERSION = "v4.0.a1"
PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types
NATIVE_TYPES_MAPPING = {
'int': int,
'long': int if six.PY3 else long, # noqa: F821
'float': float,
'str': str,
'bool': bool,
'date': datetime.date,
'datetime': datetime.datetime,
'object': object,
}
DUPLICATE_SCHEMA_MAPPING = {
# Populated if multiple schemas exist with the same name
'iam.v4.authn.GroupType' : 'GroupType',
'iam.v4.authn.DirectoryServiceSearchEntity' : 'DirectoryServiceSearchEntity',
'OneOfiam.v4.authn.UpdateDirectoryServiceApiResponsedata' : 'UpdateDirectoryServiceApiResponsedata',
'iam.v4.authz.AccessPolicyType' : 'AccessPolicyType',
'iam.v4.authn.GetCertAuthProviderApiResponse' : 'GetCertAuthProviderApiResponse',
'iam.v4.authn.UserConfiguration' : 'UserConfiguration',
'iam.v4.authz.ListRoleApiResponse' : 'ListRoleApiResponse',
'iam.v4.authz.ViewAccessPolicyApiResponse' : 'ViewAccessPolicyApiResponse',
'iam.v4.authn.DirectoryServiceInfoGroup' : 'DirectoryServiceInfoGroup',
'iam.v4.authz.ViewOperationApiResponse' : 'ViewOperationApiResponse',
'iam.v4.authn.GetSamlSpMetadataApiResponse' : 'GetSamlSpMetadataApiResponse',
'iam.v4.authn.UpdateDirectoryServiceApiResponse' : 'UpdateDirectoryServiceApiResponse',
'common.v1.config.Flag' : 'Flag',
'iam.v4.authz.CreateRoleApiResponse' : 'CreateRoleApiResponse',
'OneOfiam.v4.authz.ListRoleApiResponsedata' : 'ListRoleApiResponsedata',
'iam.v4.authz.ListOperationApiResponse' : 'ListOperationApiResponse',
'iam.v4.authn.UpdateSamlIdentityProviderApiResponse' : 'UpdateSamlIdentityProviderApiResponse',
'iam.v4.authn.IdpProperties' : 'IdpProperties',
'iam.v4.authn.CreateSamlIdentityProviderApiResponse' : 'CreateSamlIdentityProviderApiResponse',
'iam.v4.authz.ViewRoleApiResponse' : 'ViewRoleApiResponse',
'iam.v4.authn.DirectoryServiceInfoOu' : 'DirectoryServiceInfoOu',
'OneOfiam.v4.authn.CreateDirectoryServiceApiResponsedata' : 'CreateDirectoryServiceApiResponsedata',
'OneOfiam.v4.authn.GetUserApiResponsedata' : 'GetUserApiResponsedata',
'OneOfiam.v4.authz.ViewRoleApiResponsedata' : 'ViewRoleApiResponsedata',
'iam.v4.authn.ListUserApiResponse' : 'ListUserApiResponse',
'iam.v4.authz.UpdateAccessPolicyApiResponse' : 'UpdateAccessPolicyApiResponse',
'iam.v4.authn.UserStateUpdate' : 'UserStateUpdate',
'iam.v4.authz.AccessPolicy' : 'AccessPolicy',
'iam.v4.authn.GetUserGroupApiResponse' : 'GetUserGroupApiResponse',
'iam.v4.authn.ConnectionDirectoryServiceApiResponse' : 'ConnectionDirectoryServiceApiResponse',
'OneOfiam.v4.authn.ListUserApiResponsedata' : 'ListUserApiResponsedata',
'iam.v4.authn.CreateDirectoryServiceApiResponse' : 'CreateDirectoryServiceApiResponse',
'OneOfiam.v4.authn.UpdateCertAuthProviderApiResponsedata' : 'UpdateCertAuthProviderApiResponsedata',
'OneOfiam.v4.authz.ViewOperationApiResponsedata' : 'ViewOperationApiResponsedata',
'iam.v4.authn.UserType' : 'UserType',
'OneOfiam.v4.authz.CreateAccessPolicyApiResponsedata' : 'CreateAccessPolicyApiResponsedata',
'iam.v4.authn.PasswordChangeRequest' : 'PasswordChangeRequest',
'OneOfiam.v4.authn.CreateCertAuthProviderApiResponsedata' : 'CreateCertAuthProviderApiResponsedata',
'OneOfiam.v4.authn.ActivateUserApiResponsedata' : 'ActivateUserApiResponsedata',
'OneOfiam.v4.authn.GetSamlSpMetadataApiResponsedata' : 'GetSamlSpMetadataApiResponsedata',
'OneOfiam.v4.authn.UpdateUserApiResponsedata' : 'UpdateUserApiResponsedata',
'OneOfiam.v4.authz.DeleteRoleApiResponsedata' : 'DeleteRoleApiResponsedata',
'iam.v4.authn.OpenLdapConfig' : 'OpenLdapConfig',
'OneOfiam.v4.error.ErrorResponseerror' : 'ErrorResponseerror',
'iam.v4.authn.ListDirectoryServiceApiResponse' : 'ListDirectoryServiceApiResponse',
'common.v1.response.ApiResponseMetadata' : 'ApiResponseMetadata',
'iam.v4.common.SortOrderType' : 'SortOrderType',
'iam.v4.authn.ListUserGroupApiResponse' : 'ListUserGroupApiResponse',
'iam.v4.authn.User' : 'User',
'iam.v4.authn.UserGroupConfiguration' : 'UserGroupConfiguration',
'iam.v4.authz.CreateAccessPolicyApiResponse' : 'CreateAccessPolicyApiResponse',
'iam.v4.authn.SearchDirectoryServiceApiResponse' : 'SearchDirectoryServiceApiResponse',
'common.v1.response.ExternalizableAbstractModel' : 'ExternalizableAbstractModel',
'iam.v4.authz.Filter' : 'Filter',
'iam.v4.authn.CreateUserGroupApiResponse' : 'CreateUserGroupApiResponse',
'OneOfiam.v4.authn.GetCertAuthProviderApiResponsedata' : 'GetCertAuthProviderApiResponsedata',
'common.v1.config.Message' : 'Message',
'iam.v4.authz.UpdateRoleApiResponse' : 'UpdateRoleApiResponse',
'OneOfiam.v4.authn.ConnectionDirectoryServiceApiResponsedata' : 'ConnectionDirectoryServiceApiResponsedata',
'iam.v4.authn.UpdateCertAuthProviderApiResponse' : 'UpdateCertAuthProviderApiResponse',
'OneOfiam.v4.authn.ListDirectoryServiceApiResponsedata' : 'ListDirectoryServiceApiResponsedata',
'iam.v4.authn.DirectoryServiceSearchResult' : 'DirectoryServiceSearchResult',
'OneOfiam.v4.authn.GetUserGroupApiResponsedata' : 'GetUserGroupApiResponsedata',
'OneOfcommon.v1.config.KVPairvalue' : 'KVPairvalue',
'OneOfiam.v4.authn.SearchDirectoryServiceApiResponsedata' : 'SearchDirectoryServiceApiResponsedata',
'iam.v4.authz.ListAccessPolicyApiResponse' : 'ListAccessPolicyApiResponse',
'OneOfiam.v4.authn.ResetUserPasswordApiResponsedata' : 'ResetUserPasswordApiResponsedata',
'iam.v4.authn.GetUserApiResponse' : 'GetUserApiResponse',
'iam.v4.authz.Role' : 'Role',
'OneOfiam.v4.authn.CreateUserGroupApiResponsedata' : 'CreateUserGroupApiResponsedata',
'iam.v4.authn.GetSamlIdentityProviderApiResponse' : 'GetSamlIdentityProviderApiResponse',
'iam.v4.authn.GroupSearchType' : 'GroupSearchType',
'OneOfiam.v4.authz.UpdateAccessPolicyApiResponsedata' : 'UpdateAccessPolicyApiResponsedata',
'iam.v4.authn.DirectoryServiceConnectionRequest' : 'DirectoryServiceConnectionRequest',
'OneOfiam.v4.authz.UpdateRoleApiResponsedata' : 'UpdateRoleApiResponsedata',
'OneOfiam.v4.authn.CreateSamlIdentityProviderApiResponsedata' : 'CreateSamlIdentityProviderApiResponsedata',
'common.v1.config.MessageSeverity' : 'MessageSeverity',
'OneOfiam.v4.authz.ViewAccessPolicyApiResponsedata' : 'ViewAccessPolicyApiResponsedata',
'iam.v4.authn.CertAuthProvider' : 'CertAuthProvider',
'iam.v4.authn.DirectoryServiceInfo' : 'DirectoryServiceInfo',
'iam.v4.authn.NameIdPolicyFormat' : 'NameIdPolicyFormat',
'OneOfiam.v4.authn.UpdateSamlIdentityProviderApiResponsedata' : 'UpdateSamlIdentityProviderApiResponsedata',
'iam.v4.authn.ListSamlIdentityProviderApiResponse' : 'ListSamlIdentityProviderApiResponse',
'iam.v4.authz.DeleteAccessPolicyApiResponse' : 'DeleteAccessPolicyApiResponse',
'common.v1.response.ApiLink' : 'ApiLink',
'iam.v4.authn.DirectoryServiceSearchAttribute' : 'DirectoryServiceSearchAttribute',
'iam.v4.error.ErrorResponse' : 'ErrorResponse',
'iam.v4.authn.CreateUserApiResponse' : 'CreateUserApiResponse',
'OneOfiam.v4.authz.ListOperationApiResponsedata' : 'ListOperationApiResponsedata',
'OneOfiam.v4.authn.ListSamlIdentityProviderApiResponsedata' : 'ListSamlIdentityProviderApiResponsedata',
'iam.v4.authn.DsServiceAccount' : 'DsServiceAccount',
'iam.v4.error.SchemaValidationErrorMessage' : 'SchemaValidationErrorMessage',
'OneOfiam.v4.authn.CreateUserApiResponsedata' : 'CreateUserApiResponsedata',
'iam.v4.authn.DirectoryType' : 'DirectoryType',
'iam.v4.authn.DirectoryService' : 'DirectoryService',
'iam.v4.authn.ResetUserPasswordApiResponse' : 'ResetUserPasswordApiResponse',
'iam.v4.authn.ListCertAuthProviderApiResponse' : 'ListCertAuthProviderApiResponse',
'OneOfiam.v4.authn.ListUserGroupApiResponsedata' : 'ListUserGroupApiResponsedata',
'iam.v4.authn.ActivateUserApiResponse' : 'ActivateUserApiResponse',
'iam.v4.authn.UserGroup' : 'UserGroup',
'iam.v4.authn.DirectoryServiceSearchQuery' : 'DirectoryServiceSearchQuery',
'iam.v4.authz.DeleteRoleApiResponse' : 'DeleteRoleApiResponse',
'OneOfiam.v4.authn.GetSamlIdentityProviderApiResponsedata' : 'GetSamlIdentityProviderApiResponsedata',
'iam.v4.authn.UpdateUserApiResponse' : 'UpdateUserApiResponse',
'iam.v4.authz.StatusType' : 'StatusType',
'iam.v4.authn.CertRevocationInfo' : 'CertRevocationInfo',
'iam.v4.authn.PasswordResetRequest' : 'PasswordResetRequest',
'OneOfiam.v4.authn.ChangeUserPasswordApiResponsedata' : 'ChangeUserPasswordApiResponsedata',
'OneOfiam.v4.authn.ListCertAuthProviderApiResponsedata' : 'ListCertAuthProviderApiResponsedata',
'iam.v4.authz.Operation' : 'Operation',
'iam.v4.error.SchemaValidationError' : 'SchemaValidationError',
'OneOfiam.v4.authz.ListAccessPolicyApiResponsedata' : 'ListAccessPolicyApiResponsedata',
'common.v1.config.TenantAwareModel' : 'TenantAwareModel',
'iam.v4.authn.GetDirectoryServiceApiResponse' : 'GetDirectoryServiceApiResponse',
'iam.v4.authn.CreateCertAuthProviderApiResponse' : 'CreateCertAuthProviderApiResponse',
'OneOfiam.v4.authz.DeleteAccessPolicyApiResponsedata' : 'DeleteAccessPolicyApiResponsedata',
'iam.v4.error.AppMessage' : 'AppMessage',
'iam.v4.authn.SamlIdentityProvider' : 'SamlIdentityProvider',
'iam.v4.authn.UserStatusType' : 'UserStatusType',
'OneOfiam.v4.authn.GetDirectoryServiceApiResponsedata' : 'GetDirectoryServiceApiResponsedata',
'OneOfiam.v4.authz.CreateRoleApiResponsedata' : 'CreateRoleApiResponsedata',
'iam.v4.authn.ChangeUserPasswordApiResponse' : 'ChangeUserPasswordApiResponse',
'common.v1.config.KVPair' : 'KVPair'
}
def __init__(self, configuration=None):
if configuration is None:
configuration = Configuration()
self.configuration = configuration
self.__pool = None
self.rest_client = rest.RESTClientObject(configuration, max_retry_attempts=configuration.max_retry_attempts, backoff_factor=configuration.backoff_factor)
self.__default_headers = {}
self.__cookie = None
self.__refresh_cookie = True
def __close(self):
if self.__pool:
self.__pool.close()
self.__pool.join()
self.__pool = None
if hasattr(atexit, 'unregister'):
atexit.unregister(self.__close)
logger.info("Thread pool closed")
def __initialize_threadpool(self):
if not self.__pool:
logger.info("Initializing the thread pool")
self.__pool = ThreadPool()
atexit.register(self.__close)
def __get_request_timeout(self, request_timeout):
"""Gets the Request timeout in seconds.
:param request_timeout: A list of integers for the [connect, read] timeouts in milliseconds for an operation
:type request_timeout: :class:`list`
:return: A final list of integers for the [connect, read] timeouts in milliseconds for an operation
:rtype: :class:`list`
"""
if not request_timeout or not isinstance(request_timeout, list) or len(request_timeout) != 2:
request_timeout = []
request_timeout.append(self.__get_valid_timeout(self.configuration.connect_timeout, self.configuration.default_connect_timeout))
request_timeout.append(self.__get_valid_timeout(self.configuration.read_timeout, self.configuration.default_read_timeout))
return request_timeout
else:
# Handle connect timeout
connect_configuration_timeout_in_seconds = self.__get_valid_timeout(self.configuration.connect_timeout, self.configuration.default_connect_timeout)
request_timeout[0] = self.__get_valid_timeout(request_timeout[0], connect_configuration_timeout_in_seconds * 1000)
# Handle read timeout
read_configuration_timeout_in_seconds = self.__get_valid_timeout(self.configuration.read_timeout, self.configuration.default_read_timeout)
request_timeout[1] = self.__get_valid_timeout(request_timeout[1], read_configuration_timeout_in_seconds * 1000)
return request_timeout
def __get_valid_timeout(self, timeout, default_timeout):
if not timeout or not isinstance(timeout, (int, float) if six.PY3 else (int, long, float)): # noqa: E501,F821
return default_timeout / 1000
max_timeout_in_milliseconds = 30 * 60 * 1000 # 30 minutes
if timeout <= 0:
timeout = default_timeout
elif timeout > max_timeout_in_milliseconds:
timeout = max_timeout_in_milliseconds
return timeout / 1000
def __update_cookies(self, response_data):
# Set Cookie information to reuse in subsequent requests for a valid response
cookies = response_data.getheader('Set-Cookie')
if cookies is not None:
cookie = ''
cookies_list = cookies.split(',')
for i in range(0, len(cookies_list)):
final_cookie = cookies_list[i].split(';', 1)[0]
final_cookie = final_cookie.strip() if '=' in final_cookie else ''
if final_cookie:
cookie += final_cookie + ';'
# Remove trailing ";"
if cookie:
cookie = cookie[:-1]
self.__cookie = cookie
self.__refresh_cookie = False
logger.debug("Retained cookie: %s", cookie)
def __call_api(
self, resource_path, method, path_params=None,
query_params=None, header_params=None, body=None, post_params=None,
files=None, response_type=None, auth_settings=None,
_return_http_data_only=None, collection_formats=None,
_preload_content=True, _request_timeout=None):
config = self.configuration
# Add User-Agent header from configuration
if config.user_agent:
self.add_default_header('User-Agent', config.user_agent)
# header parameters
header_params = header_params or {}
header_params.update(self.__default_headers)
# requestID for idempotence functionality
if 'NTNX-Request-Id' not in header_params :
requestId = str(uuid.uuid4())
header_params['NTNX-Request-Id'] = requestId
if body and hasattr(body, "get_reserved") and 'ETag' in body.get_reserved():
header_params['If-Match'] = body.get_reserved()['ETag']
if self.__cookie:
header_params['Cookie'] = self.__cookie
if "Authorization" in header_params:
header_params.pop("Authorization")
else :
self.__update_params_for_auth(header_params, query_params, auth_settings)
if header_params:
header_params = self.__sanitize_for_serialization(header_params)
header_params = dict(self.__parameters_to_tuples(header_params, collection_formats))
# path parameters
if path_params:
path_params = self.__sanitize_for_serialization(path_params)
path_params = self.__parameters_to_tuples(path_params,
collection_formats)
for k, v in path_params:
# specified safe chars, encode everything
resource_path = resource_path.replace(
'{%s}' % k,
quote(str(v), safe=config.safe_chars_for_path_param)
)
# query parameters
if query_params:
query_params = self.__sanitize_for_serialization(query_params)
query_params = self.__parameters_to_tuples(query_params, collection_formats)
# post parameters
if post_params or files:
post_params = self.__prepare_post_parameters(post_params, files)
post_params = self.__sanitize_for_serialization(post_params)
post_params = self.__parameters_to_tuples(post_params, collection_formats)
# body
if body:
body = self.__sanitize_for_serialization(body)
# request url
url = config.scheme + '://' + config.host + ':' + str(config.port) + resource_path
# perform request and return response
response_data = self.request(
method, url, query_params=query_params, headers=header_params,
post_params=post_params, body=body,
_preload_content=_preload_content,
_request_timeout=_request_timeout)
# Retry one more time for 401 response with basic auth header and no cookie
if response_data.status == 401:
logger.debug("Retrying for an unauthorized request")
self.__refresh_cookie = True
if 'Cookie' in header_params:
header_params.update(self.__default_headers)
header_params.pop('Cookie')
self.__update_params_for_auth(header_params, query_params, auth_settings)
response_data = self.request(
method, url, query_params=query_params, headers=header_params,
post_params=post_params, body=body,
_preload_content=_preload_content,
_request_timeout=_request_timeout)
# Raise error if 401 persists
if response_data.status == 401:
raise ntnx_iam_py_client.rest.ApiException(http_resp=response_data)
if self.__refresh_cookie:
self.__update_cookies(response_data)
self.last_response = response_data
return_data = response_data
if _preload_content:
# deserialize response data
if response_data.status != 204:
return_data = json.loads(response_data.data)
return_data = self.__add_header_to_reserved(response_data, return_data, "ETag")
if response_type is None and "$objectType" in return_data:
response_type = return_data.get("$objectType")
if PY2:
inner_response_type = response_type.encode('utf-8', 'ignore')
else:
inner_response_type = response_type
return self.deserialize(return_data, inner_response_type)
else:
return_data = None
if _return_http_data_only:
return (return_data)
else:
return (return_data, response_data.status,
response_data.getheaders())
def __sanitize_for_serialization(self, obj):
"""Builds a JSON POST object.
:param obj: The object to sanitize
:type obj: :class:`object`
:return:
The sanitized object with the following behavior::
If obj is None, return None.
If obj is str, int, long, float, bool, return directly.
If obj is datetime.datetime or datetime.date, convert to string in ISO8601 format.
If obj is list, sanitize each element in the list.
If obj is dict, return the dict.
If obj is a model, return the properties dictionary.
:rtype: :class:`object`
""" # noqa: E501
if obj is None:
return None
elif isinstance(obj, self.PRIMITIVE_TYPES):
return obj
elif isinstance(obj, list):
return [self.__sanitize_for_serialization(sub_obj)
for sub_obj in obj]
elif isinstance(obj, tuple):
return tuple(self.__sanitize_for_serialization(sub_obj)
for sub_obj in obj)
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
if isinstance(obj, dict):
obj_dict = obj
else:
# Convert model obj to dict except
# attributes `swagger_types`, `attribute_map`
# and attributes which value is not None.
# Convert attribute name to json key in
# model definition for request.
obj_dict = {obj.attribute_map[attr]: getattr(obj, attr)
for attr, _ in six.iteritems(obj.swagger_types)
if getattr(obj, attr) is not None}
return {key: self.__sanitize_for_serialization(val)
for key, val in six.iteritems(obj_dict)}
def __deserialize_response(self, response, response_type):
"""Deserializes response into an object based on the provided response type.
:param response: A response object to be deserialized
:type response: :class:`~ntnx_iam_py_client.rest.RESTResponse`, required
:param response_type: A class literal for deserialized object, or string of class name
:type response_type: required
:return: Deserialized object
:rtype: :class:`object`
""" # noqa: E501
# handle file downloading
# save response body into a tmp file and return the instance
if response_type == "file":
return self.__deserialize_file(response)
# fetch data from response object
try:
data = json.loads(response.data)
except ValueError:
data = response.data
return self.deserialize(data, response_type)
[docs] def deserialize(self, data, klass, discriminator = None):
"""Deserializes dict, list, str into an object.
:param data: dict, list or str
:param klass: class literal, or string of class name
:param discriminator: Deprecated: Discriminator to deserialize into specific model object for a oneOf object. This is no longer needed and will be removed from the method definition soon.
:type discriminator: :class:`str`
:return: Deserialized object
:rtype: :class:`object`
""" # noqa: E501
if data is None:
return None
if type(klass) == str:
if klass.startswith('list['):
sub_kls = re.match(r'list\[(.*)\]', klass).group(1)
return [self.deserialize(sub_data, sub_kls)
for sub_data in data]
if klass.startswith('dict('):
sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2)
return {str(k): self.deserialize(v, sub_kls)
for k, v in six.iteritems(data)}
# convert str to class
if klass in self.NATIVE_TYPES_MAPPING:
klass = self.NATIVE_TYPES_MAPPING[klass]
else:
klass = self.__getattr(klass)
if klass in self.PRIMITIVE_TYPES:
return self.__deserialize_primitive(data, klass)
elif klass == object:
return self.__deserialize_object(data)
elif klass == datetime.date:
return self.__deserialize_date(data)
elif klass == datetime.datetime:
return self.__deserialize_datatime(data)
else:
return self.__deserialize_model(data, klass)
def _call_api(self, resource_path, method,
path_params=None, query_params=None, header_params=None,
body=None, post_params=None, files=None,
response_type=None, auth_settings=None, async_req=None,
_return_http_data_only=None, collection_formats=None,
_preload_content=True, _request_timeout=None):
"""Makes the HTTP request (synchronous) and returns deserialized data. To make an asynchronous request, set the async_req to True.
:param resource_path: Path to API endpoint
:type resource_path: :class:`str`, required
:param method: HTTP method name
:type method: :class:`str`, required
:param path_params: A dictionary of path parameters in the url
:type path_params: :class:`dict`
:param query_params: A list of tuples denoting query parameters in the url
:type query_params: :class:`list`
:param header_params: A dictionary of header parameters to be placed in request headers
:type header_params: :class:`dict`
:param body: Request body
:param post_params: Request's form parameters for application/x-www-form-urlencoded or multipart/form-data content type
:type post_params: :class:`str`
:param files: A dictionary in {key -> filename, value -> filepath} format for multipart/form-data.
:type files: :class:`dict`
:param response_type: Response data type
:type response_type: :class:`str`
:param auth_settings: A list of auth settings names for the request. The following values are accepted currently: ::
['basicAuthScheme']
:type auth_settings: :class:`list`
:param async_req: A flag to toggle between synchronous/asynchronous connection for HTTP request (**Default** False)
:type async_req: :class:`bool`
:param _return_http_data_only: A flag to retrurn response data without the status code and response headers
:type _return_http_data_only: :class:`bool`
:param collection_formats: A dictionary of collection formats for path, query, header, and post parameters
:type collection_formats: :class:`dict`
:param _preload_content: If False, the urllib3.HTTPResponse object will be returned without reading/decoding response data (**Default** True)
:type _preload_content: :class:`bool`
:param _request_timeout: Timeout setting for this request. It needs to be a pair (list) of
[connect, read] timeouts. If invalid list length or type provided,
the timeouts will be derived from the :class:`~ntnx_iam_py_client.configuration.Configuration`.
:type _request_timeout: :class:`list`
:return: If async_req parameter is True, the request will be called asynchronously and returns the request thread.
If async_req is False or missing, then the method will return the deserialized response.
"""
if not async_req:
return self.__call_api(resource_path, method,
path_params, query_params, header_params,
body, post_params, files,
response_type, auth_settings,
_return_http_data_only, collection_formats,
_preload_content, _request_timeout)
else:
self.__initialize_threadpool()
thread = self.__pool.apply_async(self.__call_api, (resource_path,
method, path_params, query_params,
header_params, body,
post_params, files,
response_type, auth_settings,
_return_http_data_only,
collection_formats,
_preload_content, _request_timeout))
return thread
def request(self, method, url, query_params=None, headers=None,
post_params=None, body=None, _preload_content=True,
_request_timeout=None):
client_request_timeout = self.__get_request_timeout(_request_timeout)
"""Makes the HTTP request using RESTClient."""
if method == "GET":
return self.rest_client.GET(url,
query_params=query_params,
_preload_content=_preload_content,
_request_timeout=client_request_timeout,
headers=headers)
elif method == "HEAD":
return self.rest_client.HEAD(url,
query_params=query_params,
_preload_content=_preload_content,
_request_timeout=client_request_timeout,
headers=headers)
elif method == "OPTIONS":
return self.rest_client.OPTIONS(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=client_request_timeout,
body=body)
elif method == "POST":
return self.rest_client.POST(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=client_request_timeout,
body=body)
elif method == "PUT":
return self.rest_client.PUT(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=client_request_timeout,
body=body)
elif method == "PATCH":
return self.rest_client.PATCH(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=client_request_timeout,
body=body)
elif method == "DELETE":
return self.rest_client.DELETE(url,
query_params=query_params,
headers=headers,
_preload_content=_preload_content,
_request_timeout=client_request_timeout,
body=body)
else:
error_message = "http method must be `GET`, `HEAD`, `OPTIONS`, `POST`, `PATCH`, `PUT` or `DELETE`."
logger.error(error_message)
raise ValueError(error_message)
def __parameters_to_tuples(self, params, collection_formats):
"""Get parameters as list of tuples, formatting collections.
:param params: Parameters as dict or list of two-tuples
:type params: :class:`dict`
:param collection_formats: Parameter collection formats
:type collection_formats: :class:`dict`
:returns: Parameters as list of tuples, collections formatted
""" # noqa: E501
new_params = []
if collection_formats is None:
collection_formats = {}
for k, v in six.iteritems(params) if isinstance(params, dict) else params: # noqa: E501
if k in collection_formats:
collection_format = collection_formats[k]
if collection_format == 'multi':
new_params.extend((k, value) for value in v)
else:
if collection_format == 'ssv':
delimiter = ' '
elif collection_format == 'tsv':
delimiter = '\t'
elif collection_format == 'pipes':
delimiter = '|'
else: # csv is the default
delimiter = ','
new_params.append(
(k, delimiter.join(str(value) for value in v)))
else:
new_params.append((k, v))
return new_params
def __prepare_post_parameters(self, post_params=None, files=None):
"""Builds form parameters.
:param post_params: Normal form parameters
:param files: File parameters
:return: Form parameters with files
""" # noqa: E501
params = []
if post_params:
params = post_params
if files:
for k, v in six.iteritems(files):
if not v:
continue
file_names = v if type(v) is list else [v]
for n in file_names:
with open(n, 'rb') as f:
filename = os.path.basename(f.name)
filedata = f.read()
mimetype = (mimetypes.guess_type(filename)[0] or
'application/octet-stream')
params.append(
tuple([k, tuple([filename, filedata, mimetype])]))
return params
def _select_header_accept(self, accepts):
"""Returns `Accept` based on an array of headers provided.
:param accepts: List of headers
:type accepts: :class:`list`
:return: If application/json is available, returns that.
Otherwise, returns a comma separated string of all the provided Accept types
""" # noqa: E501
if not accepts:
return
accepts = [x.lower() for x in accepts]
if 'application/json' in accepts:
return 'application/json'
else:
return ', '.join(accepts)
def _select_header_content_type(self, content_types):
"""Returns `Content-Type` based on an array of content_types provided.
:param content_types: List of content-types
:type content_types: :class:`list`
:return: Content-Type (e.g. application/json)
""" # noqa: E501
if not content_types:
return 'application/json'
content_types = [x.lower() for x in content_types]
if 'application/json' in content_types or '*/*' in content_types:
return 'application/json'
else:
return content_types[0]
def __update_params_for_auth(self, headers, queries, auth_settings):
"""Updates header and query params based on authentication setting.
:param headers: Header parameters dict to be updated.
:param queries: Query parameters tuple list to be updated.
:param auth_settings: Authentication setting identifiers list.
""" # noqa: E501
if not auth_settings:
return
for auth in auth_settings:
auth_setting = self.configuration._auth_settings().get(auth)
if auth_setting:
headers.pop(auth_setting['key'], None)
if not auth_setting['value']:
continue
elif auth_setting['in'] == 'header':
headers[auth_setting['key']] = auth_setting['value']
elif auth_setting['in'] == 'query':
queries.append((auth_setting['key'], auth_setting['value']))
else:
error_message = 'Authentication token must be in `query` or `header`'
logger.error(error_message)
raise ValueError(error_message)
[docs] @staticmethod
def get_etag(object):
"""Method to extract ETag from an object if exists
The ETag is usually provided in the response of the GET API calls which can further be used in other HTTP operations.
:param object: The object from which etag needs to be fetched
:type object: :class:`object`
:return: ETag header value for the object
:rtype: :class:`str` or :class:`None`
""" # noqa: E501
etag = None
if object:
if hasattr(object, "get_reserved") and 'ETag' in object.get_reserved():
etag = ApiClient.get_value_for_case_insensitive_key_match(object.get_reserved(), 'ETag')
if not etag and hasattr(object, "data") and hasattr(object.data, "get_reserved") and 'ETag' in object.data.get_reserved():
etag = ApiClient.get_value_for_case_insensitive_key_match(object.data.get_reserved(), 'ETag')
return etag
@staticmethod
def get_value_for_case_insensitive_key_match(object, key):
if object and key:
key = key.lower()
for k in object:
if k.lower() == key:
return object[k]
def __add_header_to_reserved(self, response_data, return_data, key):
if hasattr(response_data, "getheaders") and key in response_data.getheaders():
header_val = response_data.getheader(key)
if "data" in return_data.keys():
if isinstance(return_data["data"], list) and len(return_data["data"])>0:
for item in return_data["data"]:
if isinstance(item, dict) and "$reserved" in item.keys():
item["$reserved"][key] = header_val
elif isinstance(return_data["data"], dict) and "$reserved" in return_data["data"].keys():
return_data["data"]["$reserved"][key] = header_val
return return_data
def __deserialize_file(self, response):
"""Deserializes body to file
Saves response body into a file in a temporary folder, using the filename from the `Content-Disposition` header if provided.
:param response: :class:`~ntnx_iam_py_client.rest.RESTResponse`
:return: File path
"""
fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path)
os.close(fd)
os.remove(path)
content_disposition = response.getheader("Content-Disposition")
if content_disposition:
filename = re.search(r'filename=[\'"]?([^\'"\s]+)[\'"]?',
content_disposition).group(1)
path = os.path.join(os.path.dirname(path), filename)
with open(path, "wb") as f:
f.write(response.data)
return path
def __deserialize_primitive(self, data, klass):
"""Deserializes string to primitive type.
:param data:
:type data: :class:`str`
:param klass: class literal
:return: :class:`int` or :class:`long` or :class:`float` or :class:`str` or :class:`bool` value
"""
try:
return klass(data)
except UnicodeEncodeError:
return six.text_type(data)
except TypeError:
return data
def __deserialize_object(self, value):
"""Return a original value.
:return: :class:`object`
"""
return value
def __deserialize_date(self, string):
"""Deserializes string to date.
:param string:
:type string: str
:raises :class:`~ntnx_iam_py_client.rest.ApiException` if provided string is not a valid date
:return: date object
"""
try:
from dateutil.parser import parse
return parse(string).date()
except ImportError:
return string
except ValueError:
raise rest.ApiException(
status=0,
reason="Failed to parse `{0}` as date object".format(string)
)
def __deserialize_datatime(self, string):
"""Deserializes string to datetime.
The string should be in ISO8601 datetime format.
:param string:
:type string: :class:`str`
:raises :class:`~ntnx_iam_py_client.rest.ApiException` if provided string is not a valid datetime
:return: :class:`datetime`
"""
try:
from dateutil.parser import parse
return parse(string)
except ImportError:
return string
except ValueError:
raise rest.ApiException(
status=0,
reason=(
"Failed to parse `{0}` as datetime object"
.format(string)
)
)
def __hasattr(self, object, name):
return name in object.__class__.__dict__
def __getattr(self, name):
if name in self.DUPLICATE_SCHEMA_MAPPING:
return getattr(ntnx_iam_py_client.models, self.DUPLICATE_SCHEMA_MAPPING[name])
if hasattr(ntnx_iam_py_client.models, name.split('.')[-1]):
return getattr(ntnx_iam_py_client.models, name.split('.')[-1])
else:
return getattr(ntnx_iam_py_client.models, name)
def __deserialize_model(self, data, klass):
"""Deserializes list or dict to model.
:param data:
:type data: :class:`dict`, :class:`list`
:param klass: class literal
:return: Model object
"""
if not hasattr(klass, 'swagger_types') and not self.__hasattr(klass, 'get_real_child_model'):
return data
kwargs = {}
if klass.swagger_types is not None:
for attr, attr_type in six.iteritems(klass.swagger_types):
if data is not None and isinstance(data, (list, dict)):
value = None
if klass.attribute_map[attr] in data:
value = data[klass.attribute_map[attr]]
data.pop(klass.attribute_map[attr])
elif attr in data:
value = data[attr]
data.pop(attr)
one_of_value = None
if attr_type.startswith('OneOf'):
if type(value) is list:
one_of_value = []
# OneOf of list type can have list of objects or list of primitive types
for value_item in value:
if type(value_item) is not dict:
one_of_value.append(self.deserialize(value_item, type(value_item)))
else:
one_of_value.append(self.deserialize(value_item, self.__getattr(value_item['$objectType'])))
elif type(value) is dict:
# OneOf of dict type can be a single response object or a map of primitive types
if '$objectType' in value:
one_of_value = self.deserialize(value, self.__getattr(value['$objectType']))
else:
one_of_value = {}
for item_key, item_value in six.iteritems(value):
one_of_value[item_key] = self.deserialize(item_value, type(item_value))
else:
one_of_value = value
kwargs[attr] = self.deserialize(value, attr_type) if one_of_value is None else one_of_value
if "_unknown_fields" in klass.swagger_types:
if "_unknown_fields" not in kwargs or kwargs["_unknown_fields"] is None:
kwargs["_unknown_fields"] = {}
for key, value in data.items():
if value is not None:
kwargs["_unknown_fields"][key] = value
instance = klass(**kwargs)
if (isinstance(instance, dict) and
klass.swagger_types is not None and
isinstance(data, dict)):
for key, value in data.items():
if key not in klass.swagger_types:
instance[key] = value
return instance