whitson_connect is a collection of API helper functions, packaged within a WhitsonConnection class.
Most of the examples in this manual use the whitson_connect library.
You can download the python file below and simply import it into your own script to execute some of the example workflows in subsequent sections. Using a WhitsonConnection object to run HTTP requests and access some of the endpoints available to you.
This simplifies the process of making external API requests in whitson+ to fetch data, create wells, add production data, adjust settings in bulk, run calculations in the software and finally also extract results and information from whitson+.
Helper Class: WhitsonConnection
What does this script, whitson_connect.py do?
This script defines a helper class WhitsonConnection, used for example to import production data from ARIES here.
Save the script in this section as a python file in your working directory and import whitson_connect in your script to call these functions using a WhitsonConnection object - you can pass arguments and data as parameters and payloads, respectively.
The class shows a few examples of the available endpoints. A complete list can be found here.
importdecimalimporthttp.clientimportjsonimportmathimportosimportrandomimportsmtplibimporttimeimportwarningsfromdatetimeimportdatetimefromtypingimportAny,Dict,Listimportnumpyasnp# Third-party library importsimportpandasaspdimportpyodbcimportrequestsfromsnowflake.sqlalchemyimportURLfromsqlalchemyimportcreate_engine,textclassWhitsonConnection:def__init__(self,client_name=None,client_id=None,client_secret=None,audience=None):self.client_name=client_nameself.client_id=client_idself.client_secret=client_secretself.access_token=Nonedefget_access_token(self,audience=None):""" Get a access token for a given work session. """conn=http.client.HTTPSConnection("whitson.eu.auth0.com")ifaudience==None:audience=f"https://{self.client_name}.whitson.com/"else:audience=audiencepayload={"client_id":self.client_id,"client_secret":self.client_secret,"audience":audience,"grant_type":"client_credentials",}headers={"content-type":"application/json"}conn.request("POST","/oauth/token",json.dumps(payload),headers)res=conn.getresponse()data=res.read()returnjson.loads(data.decode("utf-8")).get("access_token")def_read_token_from_file(self,file_path):ifos.path.exists(file_path):withopen(file_path,"r")asfile:try:data=json.load(file)return(data.get("client_id"),data.get("access_token"),data.get("timestamp"),)exceptjson.JSONDecodeError:returnNone,None,Noneelse:returnNone,None,Nonedef_write_token_to_file(self,file_path,token,timestamp):data={"client_id":self.client_id,"access_token":token,"timestamp":timestamp,}withopen(file_path,"w")asfile:json.dump(data,file)defget_access_token_smart(self,audience=None):""" Get an access token for a given work session. Does not request a new one if the previous token has been requested within the last 24 hrs. """token_file_path="access_token.txt"stored_client_id,stored_token,stored_timestamp=self._read_token_from_file(token_file_path)current_time=time.time()# Check if a token was previously requested within the last 24 hoursif(stored_client_id==self.client_idandstored_tokenand(current_time-stored_timestamp)<(24*60*60)):returnstored_tokenelse:ifaudience==None:audience=f"https://{self.client_name}.whitson.com/"else:audience=audienceconn=http.client.HTTPSConnection("whitson.eu.auth0.com")payload={"client_id":self.client_id,"client_secret":self.client_secret,"audience":audience,"grant_type":"client_credentials",}headers={"content-type":"application/json"}conn.request("POST","/oauth/token",json.dumps(payload),headers)res=conn.getresponse()data=res.read()new_token=json.loads(data.decode("utf-8")).get("access_token")# Update the stored token and timestampself._write_token_to_file(token_file_path,new_token,current_time)returnnew_tokendefget_valid_or_default(self,value:float,default:float=None)->float:""" Returns the given value if it is not NaN; otherwise, returns the specified default value. Args: value (float): The value to be checked for NaN. default (float, optional): The value to return if the input is NaN. Defaults to None. Returns: float: The original value if it is not NaN, otherwise the default value. """returnvalueifnotpd.isna(value)elsedefaultdefget_well_id_by_uwi_api(self,wells:list[dict],uwi_api:str):""" Get the well_id of a given uwi_api. lookup_key is the database value in whitson+ where the uwi_api is stored, typically uwi_api or external_id. """returnnext((well["id"]forwellinwellsifwell.get("uwi_api")==uwi_api),None,# Default value if no match is found)defget_well_id_by_propnum(self,wells:list[dict],propnum:str,lookup_key:str="uwi_api"):""" Get the well_id of a given propnum. lookup_key is the database value in whitson+ where the propnum is stored, typically uwi_api or external_id. """returnnext((well["id"]forwellinwellsifwell.get(lookup_key)==propnum),None,# Default value if no match is found)defget_well_id_by_wellname(self,wells:list[dict],wellname:str):""" Get the well_id of a given wellname. """returnnext((well["id"]forwellinwellsifwell.get("name")==wellname),None,# Default value if no match is found)defget_fields(self):""" Get all fields on domain. """base_url=f"https://{self.client_name}.whitson.com/api-external/v1/"response=requests.get(base_url+"fields",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)res=response.json()ifnotres:raiseException("no existing fields")returnresdefget_wells(self,project_id:int):""" Get a list of wells in a project. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id,},)res=response.json()ifnotres:return[]returnresdefget_well_from_well_id(self,well_id:int):""" Get the well info, given a well ID. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"well_id":well_id,},)res=response.json()ifnotres:return[]returnresdefget_wells_from_projects(self,project_ids:list[int],page_size:int=1000):""" Get a list of wells from projects with project_id given in list. Example: whitson_wells = whitson_connection.get_wells_from_project([1, 2, 3]) Lower the page size if 502 Error """all_wells=[]base_url=(f"http://{self.client_name}.whitson.com/api-external/v1/wells_paginated")forproject_idinproject_ids:page=1# Start with the first pagewhileTrue:response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id,"page":page,"page_size":page_size,# Lower this if Error 502},)res=response.json()ifresponse.status_code==500:print("Internal Server Error (500) encountered.")breakif(notresorres==[]):# If the response is empty, there are no more wells for this projectbreakall_wells.extend(res)# Append the wells from this page to the list of all wellspage+=1# Move to the next pagereturnall_wellsdefget_wells_from_projects_old(self,project_ids:list[int],page_size:int=1000):""" Get a list of wells from projects with project_id given in list. Example: whitson_wells = whitson_connection.get_wells_from_project([1, 2, 3]) Lower the page size if 502 Error """all_wells=[]base_url=(f"http://{self.client_name}.whitson.com/api-external/v1/old_wells_paginated")forproject_idinproject_ids:page=1# Start with the first pagewhileTrue:response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id,"page":page,"page_size":page_size,# Lower this if Error 502},)res=response.json()ifresponse.status_code==500:print("Internal Server Error (500) encountered.")breakif(notresorres==[]):# If the response is empty, there are no more wells for this projectbreakall_wells.extend(res)# Append the wells from this page to the list of all wellspage+=1# Move to the next pagereturnall_wellsdefget_well_id_name_external_id_uwi_api(self,well_ids:list[int]):""" Get a list of projects in field. """base_url=(f"http://{self.client_name}.whitson.com/api-external/v1/wells/data_fields")response=requests.post(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json={"well_ids":well_ids,"data_fields":["id","name","uwi_api","external_id"],},)returnresponse.json()defget_external_id_dict_from_project(self,project_ids:list[int],page_size:int=3000,remove_substring:str=None,)->dict[str,int]:""" Retrieves a dictionary mapping well external IDs to their whitson+ IDs for wells in the specified projects. Args: project_ids (list[int]): A list of whitson+ project IDs to retrieve wells from. page_size (int, optional): The number of wells to fetch per request. Defaults to 3000. remove_substring (str, optional): A substring to remove from each `external_id` if present. Defaults to None. Returns: dict[str, int]: A dictionary where the keys are the well external IDs (non-None) and the values are the corresponding internal IDs (non-None). Raises: ValueError: If the `project_ids` list is empty. """ifnotproject_ids:raiseValueError("The 'project_ids' list cannot be empty.")wells=self.get_wells_from_projects(project_ids,page_size)return{(item["external_id"].replace(remove_substring,"")ifremove_substringelseitem["external_id"]):item["id"]foriteminwellsifitem["external_id"]isnotNoneanditem["id"]isnotNone}defget_wells_and_scenarios_from_projects(self,project_ids:list[int],page_size:int=1000):""" Get 2 lists - one for all wells, one for all scenarios (except those created by @whitson.com users) - from multiple projects with each project_id given in list. Example payload: project_ids = [1,2,3] Example function call: whitson_wells = whitson_connection.get_wells_and_scenarios_from_projects(project_ids) Lower the page size if 502 Error """all_wells=[]all_scenarios=[]base_url=(f"http://{self.client_name}.whitson.com/api-external/v1/wells_paginated")# print("Collecting Wells for Projects - ", project_ids)forproject_idinproject_ids:page=1# Start with the first pagewhileTrue:response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id,"page":page,"page_size":page_size,# Lower this if Error 502},)res=response.json()if(notres):# If the response is empty, there are no more wells for this projectbreak# print("Writing page", page, " for project id -", project_id)all_wells.extend(res)# Append the wells from this page to the list of all wellspage+=1# Move to the next page# print("Found - ", len(all_wells), "wells in total for all projects", project_ids)# Collecting Scenarios for Project# print("Collecting Scenarios for Project - ", project_id)base_url_scenario=(f"http://{self.client_name}.whitson.com/api-external/v1/scenario")response=requests.get(base_url_scenario,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id,},)res=response.json()# keeping only external scenariosexternal_scenarios=[]# print("Found ", len(res), " scnenarios in project id - ", project_id)fori,scenarioinenumerate(res.copy()):if"@whitson.com"notinscenario["owner"]:scenario["project_id"]=project_idscenario["name"]=next((well["name"]forwellinall_wellsifwell.get("id")==scenario["main_well_id"]),None,)scenario["id"]=scenario["scenario_id"]# Consistent with get wellsexternal_scenarios.append(scenario)# print("Found ", len(external_scenarios)," external scenarios in project", project_id)all_scenarios.extend(external_scenarios)# keeping only external scenariosreturnall_wells,all_scenariosdefget_projects(self,field_id:int):""" Get a list of projects in field. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/fields/{field_id}/projects"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)res=response.json()ifnotres:raiseException("no existing wells")returnresdefget_surface_processes(self):""" Get all processes on domain. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/processes"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)res=response.json()ifnotres:raiseException("no existing processes")returnresdefcreate_well(self,payload:dict,add_default_wellbore=True)->requests.Response:""" Create a new well on a domain. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells"response=requests.post(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,params={"add_default_wellbore":add_default_wellbore},)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully created well {payload['name']}")else:print(response.text)returnresponsedefedit_well_info(self,payload:dict)->requests.Response:""" Edit well info for one or more wells at the same time. Example payload: well_info = [{'id': 10, 'l_w': 5000}, {'id': 11, 'l_w': 10000}] Example function call: whitson_connection.edit_well_info(well_info) More info about endpoint here: https://internal.whitson.com/api-external/swagger/#/Base%20Data/patch_api_external_v1_wells """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells"response=requests.patch(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully edited well(s).")else:print(response.text)returnresponsedefedit_well_info_batch(self,payload:list,batch_size=1000)->list:""" Edit well info for one or more wells in batches. This function splits the payload into batches of 5000 wells at a time. Example payload: well_info = [{'id': 10, 'l_w': 5000}, {'id': 11, 'l_w': 10000}, ...] Example function call: whitson_connection.edit_well_info(well_info) More info about endpoint here: https://internal.whitson.com/api-external/swagger/#/Base%20Data/patch_api_external_v1_wells """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells"responses=[]foriinrange(0,len(payload),batch_size):batch_payload=payload[i:i+batch_size]response=requests.patch(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=batch_payload,)if200<=response.status_code<300:print(f"Successfully edited batch {i//batch_size+1}.")else:print(f"Error in batch {i//batch_size+1}: {response.text}")responses.append(response)returnresponsesdefedit_well_info_by_id(self,well_id:int,payload:dict)->requests.Response:""" Edit well info for one well_id at the time. """base_url=(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}")response=requests.put(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully edited well(s).")else:print(response.text)returnresponsedefedit_well_info_in_chunks(self,wells_to_edit_payload_list,chunk_size=5000):""" Process the wells_to_edit_payload_list in chunks and call edit_well_info for each chunk. Args: wells_to_edit_payload_list (list): The list of payloads to be processed. chunk_size (int): The size of each chunk. Default is 5000. """total_rows=len(wells_to_edit_payload_list)chunks=-(-total_rows//chunk_size)# Alternative to math.ceil for positive numbersforiinrange(chunks):start_index=i*chunk_sizeend_index=start_index+chunk_size# Extract the chunkpayload_chunk=wells_to_edit_payload_list[start_index:end_index]# Call the function for the current chunkself.edit_well_info(payload=payload_chunk)print(f"Processed chunk {i+1} of {chunks} with {len(payload_chunk)} rows")defcreate_project(self,field_id:int,payload:dict)->requests.Response:""" Create a new project on a domain. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/fields/{field_id}/projects"response=requests.post(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully created project {payload['name']}")else:print(response.text)returnresponsedefupload_production_to_well(self,well_id:int,payload:list[dict],append_only:bool=False,)->requests.Response:""" Upload production data to well. Parameters: well_id (int): The ID of the well to update. payload (list[dict]): A list of dictionaries containing the production data. append_only (bool): Determines the behavior for handling existing data. - False: Replaces existing data for matching dates with payload data. For a given matching date, the entire dataset will be replaced with the payload data (not merged). Appends new data if the date does not exist. Does not affect old data not in the payload. - True: Appends new data if the date does not exist. Rejects payload data if the date exists. Does not affect old data not in the payload. Returns: requests.Response: The response from the API after attempting to upload the production data. """response=requests.post(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/production_data",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,params={"append_only":append_only},)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully updated production data on well {well_id}")else:print(response.text)returnresponsedefbulk_upload_production_to_well(self,payload:list[dict])->requests.Response:""" Uploads production data to a specified well in bulk. This function sends a POST request to the Whitson API to upload a list of production data records associated with a well. Each production data entry must include the `well_id` and the `date` of the production. Parameters: - payload (list[dict]): A list of dictionaries containing production data entries. Each dictionary should have the following keys: - well_id (int): The ID of the well that holds the production record. (required) - date (str): The date of the production in ISO 8601 format (e.g., "2024-09-14T21:07:08.556Z"). (required) - Additional optional fields may include: - qo_sc, qw_sc, qg_sc, qo_se, qw_se, qg_se, qo_sep, qw_sep, qg_sep, p_sep, t_sep, p_wf_measured, p_tubing, p_casing, p_gas_lift, liquid_level, choke_size, line_pressure, etc., as defined by the API. Returns: - requests.Response: The response object from the POST request. - If the request is successful (status code 200-299), "success" is printed. - If the request fails, the error response text is printed. Example: >>> payload = [ { "well_id": 123, "date": "2024-09-14T21:07:08.556Z", "qo_sc": 100.0, "qw_sc": 200.0, # Additional production data fields... } ] >>> response = bulk_upload_production_to_well(payload) >>> if response.status_code == 200: print("Production data uploaded successfully.") else: print("Failed to upload production data.") """response=requests.post(f"http://{self.client_name}.whitson.com/api-external/v1/wells/production_data",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print("success")else:print(response.text)returnresponsedefconvert_dataframe_to_prod_payload(self,df,columns_to_drop:list[str]=None,convert_to_float:bool=False)->list[dict]:""" Converts a DataFrame into a production data payload suitable for the Whitson+ API. This function processes a DataFrame by dropping specified columns, removing rows where the well ID is not found, and converting any NaN values to None to ensure JSON compatibility. The 'date' column is formatted to ISO 8601 format. **Assumptions:** - The DataFrame is expected to contain the columns as shown in the provided schema, including: 'well_id', 'date', 'qo_sc', 'qg_sc', 'qw_sc', 'qo_sep', 'qg_sep', 'qw_sep', 'p_sep', 't_sep', 'p_wf_measured', 'p_tubing', 'p_casing', 'qg_gas_lift', 'liquid_level', 'choke_size', 'line_pressure'. - The 'date' column must be present and contain date values that can be converted to ISO 8601 format. - The DataFrame is assumed to be in the format shown in the provided image, where certain values may be NaN. Parameters: - df (pd.DataFrame): The input DataFrame containing production data. - columns_to_drop (list[str]): A list of column names to be dropped from the DataFrame. Returns: - list[dict]: A list of dictionaries where each dictionary represents a row of the production data payload formatted for the Whitson+ API. Example: >>> payload = convert_dataframe_to_prod_payload(df, ['insert_date']) >>> print(payload) [ { "well_id": 0, "date": "2024-09-14T21:07:08.556Z", "qo_sc": None, "qg_sc": None, "qw_sc": None, "qo_sep": None, "qg_sep": None, "qw_sep": None, "p_sep": None, "t_sep": None, "p_wf_measured": None, "p_tubing": None, "p_casing": None, "qg_gas_lift": None, "liquid_level": None, "choke_size": None, "line_pressure": None } ] """# Drop the specified columns from the DataFrameifcolumns_to_drop!=None:df=df.drop(columns=columns_to_drop)# Record the original number of rowsoriginal_row_count=len(df)# Drop rows where 'well_id' is NaN after mappingdf=df.dropna(subset=["well_id"])# Record the new number of rowsnew_row_count=len(df)# Print a message if rows were removediforiginal_row_count>new_row_count:print(f"Removed {original_row_count-new_row_count} rows where UWI was not found in the dictionary.")# Convert the 'date' column to ISO 8601 formatdf["date"]=pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")ifconvert_to_float:columns_to_convert=["qo_sc","qw_sc","qg_sc","qo_sep","qg_sep","qw_sep","p_sep","t_sep","p_wf_measured","p_tubing","p_casing","qg_gas_lift","liquid_level","choke_size","line_pressure",]# Retain only the columns that exist in the dataframecolumns_to_convert=[colforcolincolumns_to_convertifcolindf.columns]# Convert the specified columns to floatdf[columns_to_convert]=df[columns_to_convert].astype(float)# Set pressure columns to 14.7 if they are less than or equal to 14.7pressure_columns=["p_sep","p_wf_measured","p_tubing","p_casing","line_pressure",]pressure_columns=[colforcolinpressure_columnsifcolindf.columns]df[pressure_columns]=df[pressure_columns].apply(lambdacol:col.map(lambdax:max(x,14.7)))# Set other specified columns to 0 if they are less than 0non_negative_columns=["qo_sc","qw_sc","qg_sc","qo_sep","qg_sep","qw_sep","t_sep","qg_gas_lift","liquid_level","choke_size",]non_negative_columns=[colforcolinnon_negative_columnsifcolindf.columns]df[non_negative_columns]=df[non_negative_columns].apply(lambdacol:col.map(lambdax:max(x,0)))# Replace NaN values with None for JSON compatibilitydf=df.replace({np.nan:None})# # Remove rows where all columns except 'date' are NaN# df = df.dropna(subset=df.columns.difference(['date']), how='all')# df.to_excel(f'example-{counter}.xlsx')# Record the original number of rowsoriginal_row_count=len(df)# Remove duplicate datesdf_unique=df.drop_duplicates(subset=["well_id","date"])# Record the new number of rowsnew_row_count=len(df_unique)# Print a message if rows were removediforiginal_row_count>new_row_count:print(f"Removed {original_row_count-new_row_count} rows where duplicate dates were found.")# Convert the DataFrame to a list of dictionaries for API uploadreturndf_unique.to_dict("records")defget_production(self,well_id:int,)->requests.Response:""" Get production data. """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/production_data",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)res=response.json()# if not res:# raise Exception("no production data")returnresdefdelete_prod_data(self,well_id:int|None=None):""" Delete production to well. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/production_data"response=requests.delete(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully deleted production data for well {well_id}")else:print(response.text)returnresponsedefdelete_prod_data_between_dates(self,well_id:int,start_date:str,end_date:str):""" Deletes production data for a specified well within a given date range. This function sends a DELETE request to the specified API endpoint to remove production data associated with a well for the given start and end dates. If the deletion is successful, a success message is printed; otherwise, the response's error message is printed. Args: well_id (int): The unique identifier for the well. start_date (str): The start date for the range of data to delete, formatted as 'YYYY-MM-DD'. end_date (str): The end date for the range of data to delete, formatted as 'YYYY-MM-DD'. Returns: requests.Response: The response object returned by the DELETE request. This can be used for further inspection of the request's result. Raises: ValueError: If the response status code indicates failure, an error message will be printed detailing the issue. Example: delete_prod_data_between_dates(well_id=123, start_date="2023-01-01", end_date="2023-01-31") """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/production_data"response=requests.delete(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"start_date":start_date,"end_date":end_date},)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully deleted production data for well {well_id}")else:print(response.text)returnresponsedefget_well_deviation_data(self,well_id:int)->requests.Response:""" Get well deviation data of a well in the database. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input/well_deviation_survey"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)res=response.json()ifnotresandresponse.status_code!=200:raiseException("Something went wrong")returnresdefget_max_md_well_deviation_data(self,well_id:int)->requests.Response:""" Get the max md of a well deviation survey. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input/well_deviation_survey"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)res=response.json()ifnotres:raiseException("Something went weong")else:ifnotresornotall("md"initemforiteminres):returnNoneelse:returnmax(item["md"]foriteminres)# ---------------------------------------------------------------------------------------------------------# PVT Related Functions# ---------------------------------------------------------------------------------------------------------defedit_input_quick(self,well_id:int,payload:dict)->requests.Response:""" Edit the input quick (PVT) property of a well. """response=requests.put(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/input_quick",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully edited input quick for well {well_id}")else:print(response.text)returnresponse### Getting mass fluid datadefget_pvt_fluid_data(self,well_id:int)->requests.Response:""" Get the fluid properties after PVT initialization for {well_id}. Example payload: well_id = this_well_id (a number of type int) --> see swagger doc for additional params you can use such as well name or well id directly. Example usage: response = whitson_connection.get_pvt_fluid_data(well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/mass_fluid_data",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"well_id":well_id},)ifresponse.status_code==200:print(f"Fluid data successfully retrieved for well_id {well_id} ")else:print("Something went wrong - ",response)returnresponse.json()defget_pvt_mass_fluid_data(self,project_id:int)->requests.Response:""" Get the fluid properties after PVT initialization for all the wells in {project_id}. Example payload: project_id = this_project_id (a number of type int) --> see swagger doc for additional params you can use such as well name or well id directly. Example usage: response = whitson_connection.get_pvt_mass_fluid_data(project_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/mass_fluid_data",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id},)ifresponse.status_code==200:print(f"Fluid data successfully retrieved for all wells in project {project_id} ")else:print("Something went wrong - ",response)returnresponse.json()defget_pvt_mass_fluid_data_from_projects(self,project_id_list:List[int])->List[dict]:""" Get the fluid properties after PVT initialization for all wells across multiple projects. Parameters: project_id_list (List[int]): A list of project IDs for which to retrieve fluid data. Returns: List[dict]: A list of responses containing fluid data for all projects. Example usage: response = whitson_connection.get_pvt_mass_fluid_data_from_projects([project_id1, project_id2]) """all_fluid_data=[]# Store fluid data from all projectsforproject_idinproject_id_list:response=self.get_pvt_mass_fluid_data(project_id)# Call the existing functionifresponseandnot(isinstance(response,dict)andresponse.get("code")==500):# Assuming response is a dictionary or listall_fluid_data.extend(response)# Add the fluid data from this project to the listreturnall_fluid_data### Getting bot tabledefget_well_ids_without_pvt(self,PROJECT_ID_LIST,whitson_wells,unique_item:str="external_id"):""" """external_id_dict={item[unique_item]:item["id"]foriteminwhitson_wells}pvt_calcs=self.get_pvt_mass_fluid_data_from_projects(PROJECT_ID_LIST)well_ids_with_pvt=[entry["well_id"]forentryinpvt_calcsif"well_id"inentry]all_well_ids=list(external_id_dict.values())returnlist(set(all_well_ids)-set(well_ids_with_pvt))defget_pvt_bot_table(self,well_id)->requests.Response:""" Get the black oil table generated for the {well_id} - each row of the BOT is returned as one element in the response. Example payload: well_id = this_well_id Example usage: response = whitson_connection.get_pvt_bot_table(well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bot/bot_table",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"well_id":well_id},)ifresponse.status_code==200:print(f"BOT successfully retrieved for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()defrun_composition_calc(self,well_id:int)->requests.Response:""" Run PVT (composition) calculation on well. """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/run_composition_calc",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code>=200andresponse.status_code<300:print(f"success on running composition calc on well {well_id}")else:print(response.text)returnresponsedefcalc_median_gor(self,well_id:int,production_data:list[dict]=None,is_sep_rates:bool=False,num_of_timesteps:int=30,default:float=1000,)->float:""" Calculate the median Gas-Oil Ratio (GOR) for a given well over a specified number of timesteps. This function retrieves production data for the specified well and calculates the GOR as the ratio of gas production (qg_sep) to oil production (qo_sep) for each timestep. If separator rates are used, the corresponding GOR is calculated using separator data. The function filters out None values and identifies the first valid non-zero GOR value. It then calculates the median GOR from the subsequent data points, up to the specified `number_of_timesteps`. If no valid GOR values are found, the function returns a default value. Parameters: well_id (int): The unique identifier for the well. production_data (dict, optional): use production data available instead of getting it from whitson+. is_sep_rates (bool, optional): Flag to determine if separator rates should be used. Defaults to False. num_of_timesteps (int, optional): The number of data points to consider for the median calculation. Defaults to 30. default (float, optional): The value to return if no valid GOR values are found. Defaults to 1000. Returns: float: The median GOR value for the specified well over the selected time steps, or the default value if no valid data is available. """production_data=(self.get_production(well_id)ifproduction_data==Noneelseproduction_data)prefix="_sep"ifis_sep_rateselse"_sc"qo_sep_series=[entry["qo"+prefix]forentryinproduction_data]qg_sep_series=[entry["qg"+prefix]forentryinproduction_data]gor=[(qg/qo*1000)ifqonotin[None,0]andqgisnotNoneelseNoneforqo,qginzip(qo_sep_series,qg_sep_series)]filtered_gor=[valueforvalueingorifvalueisnotNone]first_non_zero_index=next((ifori,valueinenumerate(filtered_gor)ifvalue!=0),None)iffirst_non_zero_indexisnotNone:selected_gor=filtered_gor[first_non_zero_index:first_non_zero_index+num_of_timesteps]returnnp.median(selected_gor)ifselected_gorelseNoneelse:returndefault# ---------------------------------------------------------------------------------------------------------# Sampling Data Related Functions# ---------------------------------------------------------------------------------------------------------defget_sampling_data_for_well(self,well_id:int)->requests.Response:""" Returns all the sampling data uploaded for well. """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/sampling_data",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code>=200andresponse.status_code<300:print(f"success on returning all the sampling data uploaded for well {well_id}")else:print(response.text)returnresponse.json()defupload_sampling_data_to_well(self,well_id:int,payload:dict)->requests.Response:""" Upload sampling data to a well. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/sampling_data"response=requests.post(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully uploaded sampling data for for well {well_id}")else:print(response.text)returnresponse# ---------------------------------------------------------------------------------------------------------# Common Process Conversion# ---------------------------------------------------------------------------------------------------------defget_common_process_rates_for_well(self,well_id:int)->requests.Response:""" Returns common process rates for well. """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/well_monitoring/{well_id}/common_process_conversion",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code>=200andresponse.status_code<300:print(f"success on returning common process rates for well {well_id}")else:print(response.text)returnresponse.json()defget_separtor_oil_shrinkage_for_well(self,well_id:int)->requests.Response:""" Returns all separator oil shrinkage data for well. """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/well_monitoring/{well_id}/separator_oil_shrinkage",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code>=200andresponse.status_code<300:print(f"success on returning separator oil shrinkage data for well {well_id}")else:print(response.text)returnresponse.json()defget_compositional_tracking_data(self,well_id:int)->requests.Response:""" Returns compositional tracking data for well. """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/well_monitoring/{well_id}/composition_tracking",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code>=200andresponse.status_code<300:print(f"success on returning compositional tracking data for well {well_id}")else:print(response.text)returnresponse.json()# ---------------------------------------------------------------------------------------------------------# BHP Input Related Functions# ---------------------------------------------------------------------------------------------------------defupload_well_data_to_well(self,well_id:int,payload:list[dict])->requests.Response:""" Upload a well data to a well. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input/well_data"response=requests.post(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully updated well_data to well {well_id}")else:print(response.text)returnresponsedefedit_well_data_for_well_data_id(self,well_data_id:int,payload:list[dict])->requests.Response:""" Edit a well data to for well_data_id WELL_DATA_ID. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/bhp_input/well_data/{well_data_id}"response=requests.put(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully updated well_data to well_data_id {well_data_id}")else:print(response.text)returnresponsedefedit_gas_lift_data(self,well_data_id:int,payload:list[dict])->requests.Response:""" Edit gas lift data for well_data_id WELL_DATA_ID. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/bhp_input/well_data/{well_data_id}/gas_lift_data"response=requests.put(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully updated well_data to well_data_id {well_data_id}")else:print(response.text)returnresponsedefedit_well_deviation_data(self,well_id:int,payload:list[dict])->requests.Response:""" Edit well deviation data of a well in the database. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input/well_deviation_survey"response=requests.put(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"changed well deivation survey on well_id {well_id}")else:print(response.text)returnresponsedefrun_bhp_calc(self,well_id:int)->requests.Response:""" Run bhp calculation on the well specified by the provided well_id. More info here: https://internal.whitson.com/api-external/swagger/#/BHP%20Data/get_api_external_v1_wells_run_bhp_calculation """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/run_bhp_calculation"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==202:print(f"successfully ran bhp calc on Well: {well_id}")else:print(response.text)defrun_bhp_calc_in_projects(self,project_ids:list)->requests.Response:""" Bulk Run bhp calculation on all the wells specified by the provided project_ids Example Payload: project_ids = [255, 94] Example function call: whitson_connection.run_bhp_calc_in_projects(project_ids) More info here: https://internal.whitson.com/api-external/swagger/#/BHP%20Data/get_api_external_v1_wells_run_bhp_calculation """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/run_bhp_calculation"forproject_idinproject_ids:response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id},)ifresponse.status_code==202:print(f"successfully ran bhp calc on Project: {project_id}")else:print(response.text)defedit_rate_type_for_well(self,well_id:int,rate_type:str):""" Updates the rate type for a specified well in the Whitson+ platform. Parameters: well_id (int): The unique identifier for the well in the Whitson+ system. rate_type (str): The rate type to set for the well. Valid options are: - "measured" for stock tank rate - "common" for separator rate """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/set_production_rate_type/{rate_type}"response=requests.put(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"well_id":well_id,"rate_type":rate_type},)ifresponse.status_code>=200andresponse.status_code<300:print(f"Changed the rate type on well with id {well_id}")else:print(response.text)returnresponsedefrun_separator_oil_shrinkage_calc(self,well_id:int)->requests.Response:""" Run common process conversion and separator oil shrinkage on the well specified by the provided well_id. More info here: https://internal.whitson.com/api-external/swagger/#/Well%20Monitoring/get_api_external_v1_wells__well_id__run_well_monitoring """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/run_well_monitoring"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==202:print(f"successfully run separator oil shrinkage calc on {well_id}")else:print(response.text)returnresponsedefrun_separator_oil_shrinkage_calc_in_projects(self,project_ids:list)->requests.Response:""" Bulk separator calculation on all the wells specified by the provided project_ids Example Payload: project_ids = [255, 94] Example function call: whitson_connection.run_separator_oil_shrinkage_calc_in_projects(project_ids) """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/run_well_monitoring"forproject_idinproject_ids:response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id},)ifresponse.status_code==202:print(f"successfully ran well monitoring on Project: {project_id}")else:print(response.text)defget_bhp_calc(self,well_id:int,from_date:str=None)->requests.Response:""" Get bhp calculation on the well specified by the provided well_id. If from_date is specified as "YYYY-MM-DD" the BHP calcs after this date is returned. If the from_date is not specified, all BHP records are returned. More info here: https://internal.whitson.com/api-external/swagger/#/BHP%20Data/get_api_external_v1_wells__well_id__bhp_calculation """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_calculation"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"from_date":from_date},)ifresponse.status_code==200:print(f"successfully retrieved bhp calc on {well_id}")else:print(response.text)returnresponse.json()defget_bhp_from_projects(self,project_ids:list[int],from_date:str=None,page_size:int=1000,last_updated:str=None,return_all:bool=True,)->requests.Response:""" Get a list of well BHPs from projects with project_id given in list. Example: whitson_wells_bhp = whitson_connection.get_bhp_from_projects([1, 2, 3]) If from_date is specified as "YYYY-MM-DD" the BHP calcs after this date is returned. If the from_date is not specified, all BHP records are returned. If last_updated is specified as "YYYY-MM-DD" the BHP calcs updated after this date is returned. Lower the page size if 502 Error """all_wells=[]base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/bhp_calculation"forproject_idinproject_ids:page=1# Start with the first pagewhileTrue:try:response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id,"page":page,"from_date":from_date,"page_size":page_size,# Lower this if Error 502"updated":last_updated,"return_all":return_all,},)res=response.json()if(notres):# If the response is empty, there are no more wells for this projectbreakall_wells.extend(res)# Append the wells from this page to the list of all wellsexcept:print("Something went wrong")page+=1# Move to the next pageprint(page)returnall_wellsdefget_well_data(self,well_id:int=0):""" Get the wellbore info. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input/well_data"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)returnresponse.json()defget_wellbore_data_from_well_id_list(self,well_id_list:list[int],updated:str=None)->dict:""" Fetch BHP input data for a list of wells using their well IDs. Args: well_id_list (list[int]): A list of well IDs to retrieve BHP input data for. updated (str, optional): Date to filter the BHP input on and onwards in the format 'YYYY-MM-DD'. If not specified, all data is retrieved. Returns: dict: A dictionary containing the BHP input data for the specified wells in JSON format. Raises: requests.exceptions.RequestException: If there is an issue with the HTTP request. Example: >>> client.get_wellbore_data_from_well_id_list([101, 102], "2024-11-29") { "well_data": [...] } """url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/bhp_inputs"params={"well_ids":well_id_list}ifupdated:params["updated"]=updatedtry:response=requests.patch(url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=params,)response.raise_for_status()returnresponse.json()exceptrequests.exceptions.RequestExceptionase:raiseException(f"Failed to fetch wellbore data: {e}")defget_wellbore_data_from_well_id_list_no_deviation(self,well_id_list:list[int],updated:str=None)->dict:""" Fetch BHP input data for a list of wells without deviation surveys. Args: well_id_list (list[int]): A list of well IDs to retrieve BHP input data for. updated (str, optional): Date to filter the BHP input on and onwards in the format 'YYYY-MM-DD'. If not specified, all data is retrieved. Returns: dict: A dictionary containing the BHP input data for the specified wells in JSON format. Raises: requests.exceptions.RequestException: If there is an issue with the HTTP request. Example: >>> client.get_wellbore_data_no_deviation([101, 102], "2024-11-29") { "well_data": [...] } """url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/bhp_inputs_no_deviation_survey"payload={"well_ids":well_id_list}ifupdated:payload["updated"]=updatedtry:response=requests.patch(url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)response.raise_for_status()returnresponse.json()exceptrequests.exceptions.RequestExceptionase:raiseException(f"Failed to fetch wellbore data (no deviation survey): {e}")defget_well_deviation_and_perf_interval(self,well_id:int=0):""" Get the bottomhole pressure input. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)returnresponse.json()defedit_perf_interval(self,well_id:int,payload:list[dict]):""" Edit perf interval """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input"response=requests.put(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"Changed perforated interval on well_id {well_id}")else:print(response.text)returnresponsedefis_wellbore_configuration_already_uploaded(self,new_wellbore_configuration,existing_wellbore_data):""" Checks if a wellbore configuration with the same 'use_from_date' as the new configuration already exists in the existing wellbore data. Args: new_wellbore_configuration (dict): The new wellbore configuration to check. existing_wellbore_data (list of dict): The list of existing wellbore configurations. Returns: bool: True if a configuration with the same 'use_from_date' already exists, False otherwise. """returnany(wellbore["use_from_date"]==new_wellbore_configuration["use_from_date"]forwellboreinexisting_wellbore_data)defis_default_well_configuration(self,wellbore_data)->bool:""" Checks if the provided wellbore data represents a default well configuration. Args: wellbore_data (list of dict): List containing dictionaries representing wellbore data. Each dictionary should have keys 'use_from_date', 'well_data_casing', and 'well_data_tubing'. Returns: bool: True if the well configuration matches default criteria: - 'bottom_md' of the first casing is 12000, - 'bottom_md' of the first tubing is 7000, - 'use_from_date' is None. False otherwise. """ifnotisinstance(wellbore_data,list)orlen(wellbore_data)==0:returnFalsefirst_well_data=wellbore_data[0]# if first_well_data.get('well_data_tubing', [{}]) == []:# return Falsereturn(first_well_data.get("use_from_date")isNoneandfirst_well_data.get("well_data_casing",[{}])[0].get("bottom_md")==12000# and first_well_data.get('well_data_tubing', [{}])[0].get('bottom_md') == 7000)defis_default_deviation_survey(self,well_id:int)->bool:""" Checks if the provided deviation survey is is whitson+ default. """default_survey=[{"md":0.0,"tvd":0.0},{"md":7000.0,"tvd":7000.0},{"md":12000.0,"tvd":7000.0},]returndefault_survey==self.get_well_deviation_data(well_id)defis_default_perforated_interval(self,well_id)->bool:""" Checks if the provided perforation interval is whitson+ default. """well_and_perf=self.get_well_deviation_and_perf_interval(well_id)return(well_and_perf.get("top_perforation_md")==7100andwell_and_perf.get("bottom_perforation_md")==12000)defrun_numerical_model(self,well_id:int,include_forecast:bool=True,rate_control:str="BHP"):""" Run numerical model """base_url=f"http://{self.client_name}.whitson.com//api-external/v1/wells/{well_id}/run_history_matching"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"well_id":str(well_id),"grid_refinement":"Low","rate_control":rate_control,"include_forecast":str(include_forecast).lower(),},)ifresponse.status_code>=200andresponse.status_code<300:print(f"Successfully ran numerical model for well with id {well_id}")else:print(response.text)returnresponsedefedit_numerical_model_for_many_wells(self,payload:dict):response=requests.put(f"http://{self.client_name}.whitson.com/api-external/v1/wells/history_matching_input",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully edited well(s)")else:print(response.text)returnresponsedefget_numerical_model_input_for_well(self,well_id):response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/history_matching_input",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"well_id":str(well_id)},)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully edited well(s)")else:print(response.text)returnresponse.json()defrun_numerical_model_autofit(self,well_id:int,payload:dict,sleep_time:int=0):response=requests.put(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/run_autofit_history_matching",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully ran autofit on well: {well_id}")else:print(response.text)time.sleep(sleep_time)returnresponsedefedit_numerical_model_forecast(self,well_id:int,payload:dict):response=requests.put(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/forecast_input",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully edited forecast schedule for well: {well_id}")else:print(response.text)returnresponsedefedit_numerical_model_forecast_for_many_wells(self,payload:dict):response=requests.put(f"http://{self.client_name}.whitson.com/api-external/v1/wells/forecast_inputs",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully edited forecast for many wells")else:print(response.text)returnresponsedefget_numerical_model_rates_and_pressures(self,well_id:int):""" Edit perf interval """base_url=f"http://{self.client_name}.whitson.com//api-external/v1/wells/{well_id}/history_matching"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully retrieved numerical model rates and pressures for well_id {well_id}")else:print(response.text)returnresponse.json()defget_pwf_active(self,well_id:int,from_date:str=None)->requests.Response:"""s Get active pwf from the database for the given well_id, from the start date in from_date. Example params: this_well_id = integer this_from_date = "YYYY-MM-DD" Example function call: active_pwf_well = whitson_connection.get_pwf_active(this_well_id, this_from_date) """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/pwf_active"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"from_date":from_date},)ifresponse.status_code==200:print(f"successfully received active pwf on {well_id}")else:print(response.text)returnresponse.json()defget_pwf_active_multiple(self,payload:dict)->requests.Response:""" Get active pwf from the database for all the well_ids from the start date in from_date Example payload: payload = {"from_date":"YYYY-MM-DD", "page": 0, "page_size": 10} Example function call: active_pwf_wells = whitson_connection.get_pwf_active_multiple(payload) """base_url=(f"http://{self.client_name}.whitson.com/api-external/v1/wells/pwf_active")response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params=payload,)ifresponse.status_code==200:print(f"successfully recieved active pwf")elifresponse.status_code==404:print(f"No wells found matching the payload criteria")else:print(response.text)returnresponse.json()defrun_numerical_forecast(self,well_id,end_day,initial_bhp,decline_rate,abandonment_pressure):"""Run forecast based on"""forecast_payload=[{"well_id":well_id,"forecast_control":"bhp","forecast_type":"parametric_decline","initial_forecast_type":"simulated","custom_schedule_data":{"data":[{"day":3650,"value":abandonment_pressure}]},"parametric_decline_segments":{"data":[{"forecast_end_time":end_day,"type":"Hyperbolic","initial_value":initial_bhp,"decline":decline_rate,"final_value":abandonment_pressure,}]},}]self.edit_numerical_model_forecast_for_many_wells(forecast_payload)self.run_numerical_model(well_id,rate_control="Gas")defget_available_custom_attributes(self,well_id:int)->requests.Response:""" Get available (existing) custom attributes for well {well_id}. Example params: this_well_id = integer Example function call: response = whitson_connection.get_available_custom_attributes(this_well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/available_custom_attributes",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"Custom attribute(s) successfully retrieved for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()defget_custom_attribute_value(self,well_id:int)->requests.Response:""" Get values for (existing) custom attributes for well {well_id}. Example params: this_well_id = integer Example function call: response = whitson_connection.get_custom_attributes_value(this_well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/custom_attributes",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"Custom attribute(s) data successfully retrieved for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()defdelete_custom_attribute_value(self,well_id:int,attribute_name:str)->requests.Response:""" Delete values for (existing) custom attributes for well {well_id}. Note that this does not delete the custom attribute from the project. Example params: this_well_id = integer this_custom_attribute = 'MyAttribute' Example function call: response = whitson_connection.delete_custom_attributes_value(this_well_id, this_custom_attribute) """response=requests.delete(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/custom_attributes/{attribute_name}",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"Custom attribute(s) successfully updated for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()defedit_custom_attribute_value(self,well_id:int,payload:dict)->requests.Response:""" Set values for (existing) custom attributes for well {well_id}. Example payload: payload = {"attribute_name": "MyNumericAttribute", "number_attribute": {"attribute_value": 1250}} Example function call: response = whitson_connection.set_custom_attributes_value(this_well_id, payload) """response=requests.post(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/custom_attributes",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code==200:print(f"Custom attribute(s) successfully updated for well {well_id}")elifresponse.status_code==403:print(f"Custom attribute does not exist")else:print("Something went wrong - ",response)returnresponsedefedit_well_deviation_survey(self,well_id:int,payload:dict)->requests.Response:""" Example payload: [{'md': 0, 'tvd': 0}, {'md': 95.1, 'tvd': 95.1}, {'md': 153.6, 'tvd': 153.6}] Endpoint: https://internal.whitson.com/api-external/swagger/#/BHP%20Data/put_api_external_v1_wells__well_id__bhp_input_well_deviation_survey """response=requests.put(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input/well_deviation_survey",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully edited well deviation survey for {well_id}.")else:print(response.text)returnresponsedefdelete_wellbore_config_by_well_data_id(self,well_data_id:int):""" Delete wellbore with wellbore id well_data_id. """response=requests.delete(f"http://{self.client_name}.whitson.com/api-external/v1/wells/bhp_input/well_data/{well_data_id}",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"Wellbore config {well_data_id} successfully deleted")else:print("Something went wrong - ",response)returnresponse.json()# ---------------------------------------------------------------------------------------------------------# Analytical RTA Related Functions# ---------------------------------------------------------------------------------------------------------defget_classical_rta_interpretation(self,well_id:int)->requests.Response:""" Get (existing) classical RTA results for well {well_id}. Example params: this_well_id = integer Example function call: response = whitson_connection.get_classical_rta_interpretation(this_well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/arta_interpretations",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"Classical RTA data successfully retrieved for well {well_id}")returnresponse.json()else:print("Something went wrong - ",response)defget_analytical_rta_timeseries(self,well_id:int)->requests.Response:""" Get (existing) analytical RTA timeseries for well {well_id}. Example params: this_well_id = integer Example function call: response = whitson_connection.get_analytical_rta_time_series(this_well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/arta_time_series",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"Analytical RTA timeseries data successfully retrieved for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()defget_material_balance_timeseries(self,well_id:int)->requests.Response:""" Get (existing) material balance timeseries for well {well_id}. Example params: this_well_id = integer Example function call: response = whitson_connection.get_material_balance_timeseries(this_well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/material_balance_time",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"Material balance timeseries successfully retrieved for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()defget_fractional_rta_interpretations(self,well_id:int)->requests.Response:""" Get (existing) fractional RTA interpretations for well {well_id}. Example params: this_well_id = integer Example function call: response = whitson_connection.get_fractional_rta_interpretations(this_well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/fractional_rta_interpretations",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"Fractional RTA data successfully retrieved for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()# ---------------------------------------------------------------------------------------------------------# Data Status (on whitson) Related Functions# ---------------------------------------------------------------------------------------------------------defget_new_data_status_by_project_id(self,project_id:int)->requests.Response:""" Get (existing) data status for well {well_id}. Bool flag set to True if there are changes in any of the fields returned since last BHP calc, False if there is no change in input to BHP calc since the previous run. Example params: this_project_id = integer Example function call: response = whitson_connection.get_data_status(this_project_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/new_well_input_status",params={"project_id":project_id,},headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"New Data Status successfully retrieved for project {project_id}")else:print("Something went wrong - ",response)returnresponse.json()defget_well_data_status(self,well_id:int)->requests.Response:""" Get data status (data exists/not) for well {well_id}. Example params: this_well_id = integer Example function call: response = whitson_connection.get_data_status(this_well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/status",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"Data Status successfully retrieved for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()# ---------------------------------------------------------------------------------------------------------# Numerical RTA Related Functions# ---------------------------------------------------------------------------------------------------------def_get_nrta_report_dataframe(self):returnpd.DataFrame(columns=["Run","swi","fcd","swc","sorw","sorg","sgc","nw","now","ng","nog","Error","Probability_to_Accept","Was_Accepted",])def_edit_nrta_weight_factors(self,payload:dict)->requests.Response:""" Edit numerical RTA weight factors. payload = [{ "well_id": 1, "oil_cum": 0, "gas_cum": 0, "water_cum": 0, "gor_cum": 0, "oil": 0, "gas": 0, "water": 0, "gor": 0 }] """response=requests.put(f"http://{self.client_name}.whitson.com/api-external/v1/wells/edit_nrta_weight_factors",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)returnresponsedef_edit_nrta_project_parameters(self,project_id:int,params):swi,fcd,swc,sorw,sorg,sgc,nw,now,ng,nog=paramsrel_perm_fcd={"fcd":fcd,"swc":swc,"sorw":sorw,"sorg":sorg,"sgc":sgc,"nw":nw,"now":now,"ng":ng,"nog":nog,"krwro":1,"krgro":1,"krocw":1,"fracture_swc":0,"fracture_sorw":0,"fracture_sorg":0,"fracture_sgc":0,"fracture_nw":1,"fracture_now":1,"fracture_ng":1,"fracture_nog":1,"fracture_krwro":1,"fracture_krocw":1,"fracture_krgro":1,}swi_gamma_cr={"Sw_i":swi,"gamma_m":0.0000,"gamma_f":0.0000,"cr":0.000004,}wells=self.get_wells(project_id)rel_perm_fcd_payload=[]swi_pressure_dep_payload=[]forwellinwells:well_id=well["id"]# For rel_perm_fcd_payloadrel_perm_fcd_payload.append({"well_id":well_id,**rel_perm_fcd}# Unpack rel_perm_fcd dictionary)# For swi_pressure_dep_payloadswi_pressure_dep_payload.append({"id":well_id,**swi_gamma_cr}# Unpack swi_gamma_cr dictionary)self.__edit_input_nrta_rel_perm_and_fcd_all_wells_project(rel_perm_fcd_payload)self.__edit_swi_pressure_dep_all_wells_in_project(swi_pressure_dep_payload)def__edit_swi_pressure_dep_all_wells_in_project(self,payload:dict)->requests.Response:""" Edit the NRTA input for all wells in project. """response=requests.patch(f"http://{self.client_name}.whitson.com/api-external/v1/wells",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,)# if response.status_code >= 200 and response.status_code < 300:# print(f"successfully edited info for well {well_id}")# else:# print(response.text)returnresponsedef__edit_input_nrta_rel_perm_and_fcd_all_wells_project(self,payload:dict)->requests.Response:""" Edit the NRTA input for all wells in project. """response=requests.put(f"http://{self.client_name}.whitson.com/api-external/v1/wells/rta_input",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=payload,# params={"project_id": self.project_id},)# if response.status_code >= 200 and response.status_code < 300:# print(f"successfully edited input quick for well {well_id}")# else:# print(response.text)returnresponsedef_run_nrta_on_all_wells_in_project(self,project_id:int,params:dict={"grid_refinement":"Low","num_type_curves":"5_normal"},sleep_time:dict=0,):""" Executes numerical rate transient analysis (NRTA) on all wells in the current project. This method iterates through all wells in the project and performs NRTA on each well using the specified parameters. Parameters: ---------- params : dict, optional A dictionary containing parameters for the NRTA. Defaults to: { "grid_refinement": "Low", "num_type_curves": "5_normal" } - "grid_refinement" (str): Specifies the level of grid refinement for the NRTA. Possible values are "Low", "Medium", and "High". - "num_type_curves" (str): Specifies the number and type of curves to be used in the analysis. The default value is "5_normal". sleep_time : int, optional The amount of time to wait (in seconds) between processing each well. Defaults to 0. """wells=self.get_wells(project_id)forwellinwells:time.sleep(sleep_time)well_id=well["id"]self.run_numerical_rta_for_well(well_id,params)defrun_numerical_rta_for_well(self,well_id:int,params:dict={"grid_refinement":"Low","num_type_curves":"5_normal"},)->requests.Response:""" Runs numerical rate transient analysis (NRTA) for a specified well. This method sends a request to the Whitson API to initiate NRTA for the well identified by `well_id` using the given parameters. Parameters: ---------- well_id : int The ID of the well for which to run the NRTA. params : dict, optional A dictionary containing parameters for the NRTA. Defaults to: { "grid_refinement": "Low", "num_type_curves": "5_normal" } - "grid_refinement" (str): Specifies the level of grid refinement for the NRTA. Possible values are "Low", "Medium", and "High". - "num_type_curves" (str): Specifies the number and type of curves to be used in the analysis. The default value is "5_normal". """response=requests.get(f"http://{self.client_name}.whitson.com//api-external/v1/wells/{well_id}/run_rta",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params=params,)# if response.status_code == 202:# print(f"Numerical RTA for well {well_id}")# else:# print(response.text)returnresponsedefrun_numerical_rta_autofit(self,well_id:int)->requests.Response:""" Runs the numerical rate transient analysis (NRTA) autofit for a specified well. This method sends a request to the Whitson API to perform an autofit NRTA on the well identified by `well_id`. Parameters: ---------- well_id : int The ID of the well for which to run the NRTA autofit. """response=requests.get(f"http://{self.client_name}.whitson.com//api-external/v1/wells/{well_id}/autofit_rta",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)# time.sleep(0.05)# if response.status_code == 200:# print(f"Numerical RTA Autofit successful for well {well_id}")# else:# print(response.text)returnresponsedef_run_nrta_autofit_on_all_wells_in_project(self,project_id:int,sleep_time:int=0):""" Runs the numerical rate transient analysis (NRTA) autofit for all wells in a specified project. This method retrieves all wells associated with the given `project_id` and performs an NRTA autofit on each well. Parameters: ---------- project_id : int The ID of the project for which to run the NRTA autofit on all wells. sleep_time : int, optional The amount of time to wait (in seconds) between processing each well. Defaults to 0. """wells=self.get_wells(project_id)forwellinwells:time.sleep(sleep_time)well_id=well["id"]self.run_numerical_rta_autofit(well_id)def_update_nrta_input_parameters(self,min_values,max_values,params):""" Update NRTA input parameters with small random variations. This method adjusts each parameter by adding a random value within a defined jump size range, ensuring the updated parameters remain within specified minimum and maximum values. Parameters ---------- min_values : list or array-like The minimum values for each parameter. max_values : list or array-like The maximum values for each parameter. params : list or array-like The current parameters to be updated. Returns ------- tuple A tuple of updated parameters, each adjusted by a small random amount and constrained within the provided minimum and maximum values. """# Define the jump sizejump_size=[(max_val-min_val)/5formin_val,max_valinzip(min_values,max_values)]# Update each parameter with a small random amount within its specified rangeupdated_parameters=[param+random.uniform(-jump,jump)forparam,jumpinzip(params,jump_size)]# Ensure the updated values stay within the specified rangeupdated_parameters=[max(min_val,min(max_val,updated_param))forupdated_param,min_val,max_valinzip(updated_parameters,min_values,max_values)]returntuple(updated_parameters)def_get_total_project_error(self,project_id:int,weights:dict):""" Calculate the total project error and individual well errors for a given project. This method computes the total error for a project by aggregating individual well errors. It also returns the LFP (Last Flowing Pressure) and OOIP (Original Oil in Place) values for each well. Parameters ---------- project_id : int The unique identifier of the project for which the error is being calculated. weights : dict A dictionary containing weights for different error components used in the error calculation. Returns ------- tuple A tuple containing: - error (float): The average total error for the project. - individual_errors (list of float): The average individual errors for each well. - lfps (list of float): The Last Flowing Pressure values for each well. - ooips (list of float): The Original Oil in Place values for each well. """wells=self.get_wells(project_id)error=0cumulative_individual_errors=[]lfps=[]ooips=[]all_nrta_data=self._get_nrta_outputs_for_project(project_id)all_nrta_data=sorted(all_nrta_data,key=lambdax:x["well_id"])lfps=[entry["lfp"]forentryinall_nrta_data]ooips=[entry["ooip"]forentryinall_nrta_data]get_all_errors_in_project=self._get_all_errors_in_project(project_id)forwellinget_all_errors_in_project:this_error,individual_errors=self._get_nrta_error(well,weights)error+=this_errorifnotcumulative_individual_errors:cumulative_individual_errors=individual_errorselse:cumulative_individual_errors=[sum(x)forxinzip(cumulative_individual_errors,individual_errors)]error=error/len(wells)individual_errors=[error/len(wells)forerrorincumulative_individual_errors]returnerror,individual_errors,lfps,ooipsdef_get_nrta_outputs_for_project(self,project_id:int):""" Retrieve NRTA outputs for a specific project. This method fetches LFP (Last Flowing Pressure), OOIP (Original Oil in Place), OGIP (Original Gas in Place), and OGIP_a for the given project using the API. Parameters ---------- project_id : int The unique identifier of the project. Returns ------- list A list of dictionaries containing NRTA output data for each well in the project. Raises ------ Exception If no wells are found for the given project. """base_url=(f"http://{self.client_name}.whitson.com/api-external/v1/wells/rta_calc")response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id},)res=response.json()ifnotres:raiseException("no existing wells")returnresdefget_nrta_outputs_for_well(self,well_id:int):""" Retrieve NRTA outputs for a specific well.. This method fetches LFP (Last Flowing Pressure), OOIP (Original Oil in Place), OGIP (Original Gas in Place), and OGIP_a for the given well using the API. Parameters ---------- well_id : int The unique identifier of the project. Returns ------- list A list of dictionaries containing NRTA output data for each well in the project. Raises ------ Exception If no wells are found for the given project. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/rta_calc"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)res=response.json()ifnotres:raiseException("no existing wells")returnresdef_get_all_errors_in_project(self,project_id:int):""" Retrieve all errors for a specific project. This method fetches error data for all wells in the specified project using the API. Parameters ---------- project_id : int The unique identifier of the project. Returns ------- list A list of dictionaries containing error data for each well in the project. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells//rta_autofit_rms"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"project_id":project_id},)res=response.json()ifnotres:raiseException("no existing wells")returnresdef_get_nrta_error(self,res,weights:dict):""" Calculate NRTA error for a given well. This method computes the error for each run based on the provided results and weights. Parameters ---------- res : dict A dictionary containing the results for a specific well. weights : dict A dictionary containing weights for different error components. Returns ------- tuple A tuple containing: - tot_error (float): The total calculated error for the well. - individual_errors (list of float): The individual error values for each component. """individual_errors=[valueforkey,valueinres.items()]res.pop("well_id",None)tot_error=math.log10(sum(((weights[key]*value))forkey,valueinres.items()))returntot_error,individual_errorsdef_append_to_nrta_report(self,result_df,run,test_parameters,this_error,probability_to_accept,was_accpeted,individual_errors,lfps,ooips,):""" Append NRTA run results to the report DataFrame. This method adds the results of an NRTA run, including test parameters, error metrics, and other relevant data, to the provided result DataFrame. Parameters ---------- result_df : pd.DataFrame The DataFrame to which the results will be appended. run : int The run identifier. test_parameters : list The list of test parameters used in the run. this_error : float The total error for the run. probability_to_accept : float The probability of accepting the run. was_accpeted : bool Whether the run was accepted. individual_errors : list of float The list of individual errors for each error component. lfps : list of float The Last Flowing Pressure values for the wells. ooips : list of float The Original Oil in Place values for the wells. Returns ------- pd.DataFrame The updated result DataFrame with the new run results appended. """warnings.simplefilter(action="ignore",category=FutureWarning)row={"Run":run,"swi":test_parameters[0],"fcd":test_parameters[1],"swc":test_parameters[2],"sorw":test_parameters[3],"sorg":test_parameters[4],"sgc":test_parameters[5],"nw":test_parameters[6],"now":test_parameters[7],"ng":test_parameters[8],"nog":test_parameters[9],"Error":this_error,"Probability_to_Accept":probability_to_accept,"Was_Accepted":was_accpeted,"cum_oil_error":individual_errors[4],"cum_gas_error":individual_errors[0],"cum_water_error":individual_errors[6],"cum_gor_error":individual_errors[2],"oil_error":individual_errors[5],"gas_error":individual_errors[1],"water_error":individual_errors[7],"gor_error":individual_errors[3],}row_string=(f"{run}, "f"{test_parameters[0]:.4f}, "f"{test_parameters[1]:.4f}, "f"{test_parameters[2]:.4f}, "f"{test_parameters[3]:.4f}, "f"{test_parameters[4]:.4f}, "f"{test_parameters[5]:.4f}, "f"{test_parameters[6]:.4f}, "f"{test_parameters[7]:.4f}, "f"{test_parameters[8]:.4f}, "f"{test_parameters[9]:.4f}, "f"{this_error:.4f}, "f"{probability_to_accept:.4f}, "f"{was_accpeted}, "f"{individual_errors[4]:.4f}, "f"{individual_errors[0]:.4f}, "f"{individual_errors[6]:.4f}, "f"{individual_errors[2]:.4f}, "f"{individual_errors[5]:.4f}, "f"{individual_errors[1]:.4f}, "f"{individual_errors[7]:.4f}, "f"{individual_errors[3]:.4f}")print(row_string)# Concatenate all lfps and ooips into strings separated by commaslfps_str=", ".join(map(str,lfps))ooips_str=", ".join(map(str,ooips))# Add lfps and ooips to the row dictionaryrow["lfps"]=lfps_strrow["ooips"]=ooips_str# Append the row to the result DataFramerow_df=pd.DataFrame([row])result_df=pd.concat([result_df,row_df],ignore_index=True)returnresult_df# ---------------------------------------------------------------------------------------------------------# DCA related Functions# ---------------------------------------------------------------------------------------------------------defget_dca_fits(self,well_id:int)->requests.Response:""" Get (existing) DCA fits for well {well_id}. Example params: this_well_id = integer Example function call: response = whitson_connection.get_dca_fits(this_well_id) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/well_dca/dca_export",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"DCA data successfully retrieved for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()defget_dca_saved_cases(self,well_id:int)->requests.Response:""" Fetch saved Decline Curve Analysis (DCA) cases for a given well ID. Args: well_id (int): The unique identifier of the well. Returns: requests.Response: The API response containing the DCA data if successful. Example: >>> response = whitson_connection.get_dca_saved_cases(12345) >>> if response.status_code == 200: >>> print(response.json()) """response=requests.get(f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/well_dca/saved_cases",headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},)ifresponse.status_code==200:print(f"DCA data successfully retrieved for well {well_id}")else:print("Something went wrong - ",response)returnresponse.json()defget_saved_dca_fits_by_well_id_list(self,well_id_list:dict)->requests.Response:""" """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/dca_saved_cases"response=requests.patch(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=well_id_list,# params={# "updated": last_updated,# },)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully retrieved DCA well(s).")else:print(response.text)returnresponse.json()defget_dca_fits_by_well_id_list(self,well_id_list:dict)->requests.Response:""" Retrieve DCA fits for one or more wells using their IDs. This function sends a PATCH request to update or retrieve Decline Curve Analysis (DCA) fits for wells specified in the payload. The well IDs are provided in a dictionary format. Parameters: ---------- well_id_list : dict A dictionary containing the IDs of the wells. The payload structure is: { "well_ids": [ <well_id_1>, <well_id_2>, ... ] } Returns: ------- requests.Response The response object from the API call, containing the status and any data returned by the server. Example: -------- Example payload: well_id_list = { "well_ids": [ 123, 456, 789 ] } Example function call: response = whitson_connection.get_dca_fits_by_well_id_list(well_id_list) More information: ----------------- For details about the endpoint, visit: https://internal.whitson.com/api-external/swagger/#/DCA/patch_api_external_v1_wells_dca """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/dca"response=requests.patch(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},json=well_id_list,# params={# "updated": last_updated,# },)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully retrieved DCA well(s).")else:print(response.text)returnresponse.json()defget_dca_daily_rates_by_well_id_list(self,well_id_list:dict)->requests.Response:""" Retrieve Decline Curve Analysis (DCA) daily rates for specified wells. This method sends a PATCH request to fetch or update DCA daily rates for wells identified by their IDs. The well IDs should be provided in the payload in a dictionary format. Parameters: ---------- well_id_list : dict A dictionary containing the well IDs in the following structure: { "well_ids": [ <well_id_1>, <well_id_2>, ... ] } Returns: ------- requests.Response The response object from the API call, which includes the HTTP status, any error messages, and the returned data from the server. Example: -------- Payload structure: well_id_list = { "well_ids": [ 123, 456, 789 ] } Function call: response = whitson_connection.get_dca_daily_rates_by_well_id_list(well_id_list) Endpoint Documentation: ------------------------ Refer to the API documentation for more details: https://internal.whitson.com/api-external/swagger/#/DCA/patch_api_external_v1_wells_dca Notes: ------ - Ensure the `self.access_token` is valid for authentication. - The method prints a success message for responses with status codes in the 200-299 range. - Non-successful responses log the server's response text for troubleshooting. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/dca/daily_rates"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params={"well_ids":well_id_list,},)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully retrieved DCA forecasted daily rates.")else:print(response.text)returnresponse.json()defget_dca_saved_cases_daily_rates_by_well_id_list(self,well_id_list:dict)->requests.Response:""" Retrieve Decline Curve Analysis (DCA) daily rates for specified wells. This method sends a GET request to fetch or update DCA daily rates for wells identified by their IDs. The well IDs should be provided in the payload in a dictionary format. Parameters: ---------- well_id_list : dict A dictionary containing the well IDs in the following structure: { "well_ids": [ <well_id_1>, <well_id_2>, ... ] } Returns: ------- requests.Response The response object from the API call, which includes the HTTP status, any error messages, and the returned data from the server. Example: -------- Payload structure: well_id_list = { "well_ids": [ 123, 456, 789 ] } Function call: response = whitson_connection.get_dca_daily_rates_by_well_id_list(well_id_list) Endpoint Documentation: ------------------------ Refer to the API documentation for more details: https://internal.whitson.com/api-external/swagger/#/DCA/patch_api_external_v1_wells_dca Notes: ------ - Ensure the `self.access_token` is valid for authentication. - The method prints a success message for responses with status codes in the 200-299 range. - Non-successful responses log the server's response text for troubleshooting. """base_url=f"http://{self.client_name}.whitson.com/api-external/v1/wells/dca_saved_cases/daily_rates"response=requests.get(base_url,headers={"content-type":"application/json","Authorization":f"Bearer {self.access_token}",},params=well_id_list,)ifresponse.status_code>=200andresponse.status_code<300:print(f"successfully retrieved DCA forecasted daily rates.")else:print(response.text)returnresponse.json()# ---------------------------------------------------------------------------------------------------------# SNOWFLAKE Related Functions (https://www.snowflake.com/en/)# ---------------------------------------------------------------------------------------------------------defsnowflake_connection(self,account:str,user:str,password,database:str,schema:str="PUBLIC",warehouse:str="XS",role:str="ACCOUNTADMIN",):""" Creates a connection to Snowflake using the provided credentials and parameters. Parameters: - account (str): The Snowflake account identifier, typically in the format '<account_identifier>.<region>.<cloud_provider>'. - user (str): The Snowflake user name. - password (str): The password for the Snowflake user. - database (str): The name of the Snowflake database to connect to. - schema (str): The schema within the database. Default is 'PUBLIC'. - warehouse (str): The name of the Snowflake warehouse to use. Default is 'XS'. - role (str): The role to use for the connection. Default is 'ACCOUNTADMIN'. Returns: - connection: A connection object to the Snowflake database. """engine=create_engine(URL(account=account,user=user,password=password,database=database,schema=schema,warehouse=warehouse,role=role,))returnengine.connect()defsnowflake_table_to_dataframe(self,snowflake_connection,snowflake_query:str)->pd.DataFrame:""" Executes a query on a Snowflake connection and returns the result as a pandas DataFrame. Parameters: - snowflake_connection: An active SQLAlchemy connection object to Snowflake. - snowflake_query (str): The SQL query to execute on the Snowflake database. Returns: - pd.DataFrame: A pandas DataFrame containing the results of the executed query. Example: >>> connection = create_snowflake_connection(account='your_account', user='your_user', password='your_password', database='your_database') >>> query = "SELECT * FROM your_database.your_schema.your_table" >>> df = snowflake_table_to_dataframe(connection, query) >>> print(df.head()) Notes: - Ensure that the Snowflake connection is active and properly configured before calling this function. - The function fetches all rows from the result set, so be mindful of the query size to avoid memory issues. """# snowflake_connection.execute("USE WAREHOUSE XS")# Explicitly activate the warehousesnowflake_query=text(snowflake_query)result=snowflake_connection.execute(snowflake_query)rows=result.fetchall()columns=result.keys()returnpd.DataFrame(rows,columns=columns)# ---------------------------------------------------------------------------------------------------------# DATABRICKS Related Functions (https://www.databricks.com/)# ---------------------------------------------------------------------------------------------------------defdatabricks_connection(self,dsn:str,host:str,port:str,token:str,http_path:str):"""Create a connection string to connect to a Databricks cluster."""connection_string=(f"DSN={dsn};"f"HOST={host};"f"PORT={port};"f"OAuthMechanism=3;"# OAuth mechanism for token-based authenticationf"Auth_AccessToken={token};"# Use Auth_AccessToken instead of Tokenf"HTTPPath={http_path};"f"SSL=1;")returnpyodbc.connect(connection_string,autocommit=True)defdatabricks_connection_oauth2(self,dsn:str,host:str,port:str,http_path:str,client_id:str,client_secret:str,):"""Create a connection string to connect to a Databricks cluster using OAuth2 authentication."""connection_string=(f"DSN={dsn};"f"HOST={host};"f"PORT={port};"f"AuthMech=11;"# Authentication mechanism for OAuth2f"Auth_Flow=1;"# Specifies OAuth2 flowf"Auth_Client_ID={client_id};"# Updated to match the required formatf"Auth_Client_Secret={client_secret};"# Updated to match the required formaf"HTTPPath={http_path};"f"SSL=1;")returnpyodbc.connect(connection_string,autocommit=True)defconnect_to_databricks_sql(self,host:str,token:str,http_path:str):fromdatabricksimportsql"""Create a connection to a Databricks SQL warehouse using `databricks-sql-connector`."""try:conn=sql.connect(server_hostname=host,http_path=http_path,access_token=token)print("✅ Connected to Databricks successfully!")returnconnexceptExceptionase:print(f"❌ Failed to connect to Databricks: {e}")returnNone# ---------------------------------------------------------------------------------------------------------# PRODMAN related Functions (https://prodman.ca/)# ---------------------------------------------------------------------------------------------------------defprodman_get_wells(self,domain:str,api_key:str)->json:""" Fetches well data from the Prodman API for a given domain. This function sends a GET request to the Prodman API, using the provided domain and API key, to retrieve a list of wells in JSON format. The function returns the parsed JSON response if the request is successful, otherwise, it prints an error message and returns an empty list. Args: domain (str): The domain name to be used in the API request URL (e.g., 'example' for 'https://example.prodman.ca'). api_key (str): The API key required for authentication to access the Prodman API. Returns: json: A list of well data in JSON format if the request is successful. If the request fails, an empty list is returned. More info about PRODMAN api can be found at https://YOURCOMPANYDOMAIN.prodman.ca/api/help. """params={"api_key":api_key}url=f"https://{domain}.prodman.ca/api/wells/&return-type=json"response=requests.get(url,params=params)ifresponse.status_code==200:content=response.content.decode("utf-8")returnjson.loads(content)else:print(f"Error: {response.status_code}")return[]defprodman_get_production(self,domain:str,api_key:str)->json:""" Fetches production data from the Prodman API for all wells within a specified date range. This function sends a GET request to the Prodman API, using the provided domain and API key, to retrieve production data for all wells. The data includes fields such as `entity_id`, `date`, `oil`, `gas`, and `water`. The results are returned in JSON format. If the request fails, the function prints an error message and returns an empty list. Args: domain (str): The domain name to be used in the API request URL (e.g., 'example' for 'https://example.prodman.ca'). api_key (str): The API key required for authentication to access the Prodman API. Returns: json: A list of production data in JSON format if the request is successful. If the request fails, an empty list is returned. More info about PRODMAN api can be found at https://YOURCOMPANYDOMAIN.prodman.ca/api/help. """params={"api_key":api_key,"well_id":"all","start":"2000-01-01","end":datetime.today().strftime("%Y-%m-%d"),"fields":"entity_id, date, oil, gas, water, casing, tubing, intake, jtf","units":"us","return-type":"json",}url=f"https://{domain}.prodman.ca/api/production/"response=requests.get(url,params=params)ifresponse.status_code==200:content=response.content.decode("utf-8")returnjson.loads(content)else:print(f"Error: {response.status_code}")return[]defpropman_canada_uwi_cleanup(self,uwi:str)->str:""" Cleans up a UWI string for Propman Canada. This function performs the following steps: 1. Removes all occurrences of '-' and '/' from the input UWI string. 2. Ensures the cleaned UWI string starts with '1'. If it does not, '1' is added at the beginning. 3. Pads the UWI string with '0's at the end to make it exactly 16 characters long if it has fewer than 16 characters. 4. Prints the processed UWI if modifications are made. 5. Prints a warning if the cleaned UWI is already 16 characters or more without needing modifications. Parameters: uwi (str): The input UWI string to be cleaned and formatted. Returns: str: The cleaned and formatted UWI string. """# Replace '/' and '-' with an empty string using str.replacecleaned_uwi=uwi.replace("/","").replace("-","")# Ensure the UWI starts with '1'ifnotcleaned_uwi.startswith("1"):cleaned_uwi="1"+cleaned_uwi# If the length is less than 16, pad with '0' at the endiflen(cleaned_uwi)==15:cleaned_uwi=cleaned_uwi.ljust(16,"0")print(f"Processed UWI: {cleaned_uwi}")returncleaned_uwieliflen(cleaned_uwi)==16:returncleaned_uwielse:# print("Warning: The processed UWI already meets the requirements or is longer than 16 characters.")returncleaned_uwi# ---------------------------------------------------------------------------------------------------------# General Functions# ---------------------------------------------------------------------------------------------------------defround_to_significant_digits(self,number:float,digits:int=4)->float:""" Rounds a number to the specified number of significant digits. Parameters: ----------- number : float The number to be rounded. digits : int The number of significant digits to round to. Returns: -------- float The number rounded to the specified number of significant digits. Returns 0 if the input number is 0 to avoid log10 errors. Examples: --------- >>> round_to_significant_digits(12345.6789, 3) 12300 >>> round_to_significant_digits(0.012345, 4) 0.01235 Notes: ------ This function uses logarithmic calculations to determine the appropriate rounding level for the desired number of significant digits. It handles zero input separately to avoid mathematical issues with logarithms of zero. """ifnumber==0:return0# Return 0 if the input is 0 to avoid log10 issuessignificant_digits=digits-int(math.floor(math.log10(abs(number))))-1returnround(number,significant_digits)defrun_function(self,func):try:# Print the name of the function being runprint(f"Starting {func.__name__} process.")# Capture the start timestart_time=time.time()# Run the functionfunc()# Capture the end timeend_time=time.time()# Calculate the duration in secondselapsed_time=end_time-start_time# Convert the time to minutes and secondsminutes=int(elapsed_time//60)seconds=int(elapsed_time%60)# Print completion message with timetime_string=f"{func.__name__} completed successfully in {minutes} minutes and {seconds} seconds.\n"print(time_string)returntime_stringexceptExceptionase:# Print an error message if the function failserror_string=f"Something went wrong while running {func.__name__}: {e}\n"print(error_string)returnerror_stringdef_convert_dataframe_to_timestamp_json(self,dataframe,primary_key="well_id",timestamp_column_name="insert_date")->json:""""""dataframe=dataframe[[primary_key,timestamp_column_name]].copy()dataframe[timestamp_column_name]=pd.to_datetime(dataframe[timestamp_column_name])dataframe=dataframe.loc[dataframe.groupby(primary_key)[timestamp_column_name].idxmax()]returndataframe.to_json(orient="records",date_format="iso",indent=4)def_save_dataframe_timestamp_to_json(self,dataframe,filepath,primary_key="well_id",timestamp_column_name="insert_date",):""""""dataframe=dataframe[[primary_key,timestamp_column_name]].copy()dataframe[timestamp_column_name]=pd.to_datetime(dataframe[timestamp_column_name])dataframe=dataframe.loc[dataframe.groupby(primary_key)[timestamp_column_name].idxmax()]dataframe.to_json(filepath,orient="records",date_format="iso",indent=4)def_find_new_or_updated_well_ids(self,old_list,new_list,primary_key,timestamp_key):""" Find primary keys where the timestamp differs or where new primary keys are present. Parameters: old_list (list): The old list of dictionaries containing primary keys and timestamps. new_list (list): The new list of dictionaries containing primary keys and timestamps. primary_key (str): The key representing the primary key in each dictionary. timestamp_key (str): The key representing the timestamp in each dictionary. Returns: list: A list of primary keys where either: - The primary key is present in the new list but not in the old (new record). - The primary key exists in both lists, but the timestamps differ (updated record). """ifold_listisNone:return[item[primary_key]foriteminnew_list]differences=[]# Convert the lists to dictionaries for easier comparisonold_dict={item[primary_key]:item[timestamp_key]foriteminold_list}new_dict={item[primary_key]:item[timestamp_key]foriteminnew_list}# Find primary keys that are either new or have a different timestampforkeyinnew_dict:ifkeynotinold_dictorold_dict[key]!=new_dict[key]:differences.append(key)returndifferencesdefis_data_sync_needed(self,dataframe:pd.DataFrame,json_filename:str,client_name:str=None)->bool:""" Determines if the provided DataFrame needs to be synced with the JSON file. If the file does not exist or if the JSON content differs, it updates the file and returns True (indicating syncing is necessary). Otherwise, returns False. Parameters: ---------- dataframe : pd.DataFrame The DataFrame to be compared or saved. json_filename : str The name of the JSON file to compare against or save to. client_name : str The name of the client, used to create the file path dynamically. Returns: ------- bool True if syncing is necessary (file does not exist or JSON is different), False if the data matches the existing file. Notes: ----- - If the JSON file exists, the function reads its contents and compares it with the JSON representation of the provided DataFrame. - If the JSON file does not exist, the function creates it by dumping the provided DataFrame into the JSON file. - The JSON file is stored in the path: `scheduler/company/{client_name}/associated_files/{json_filename}.json` """client_name_to_use=(self.client_name.lower()ifclient_name==Noneelseclient_name)filepath=(os.path.dirname(os.path.abspath(__file__)).replace("aries_python_code","")+f"scheduler\\company\\{client_name_to_use}\\associated_files\\{json_filename}.json")ifos.path.exists(filepath):# Load JSON from a filewithopen(filepath,"r")asfile:df_from_json=json.load(file)# Convert the dataframe to JSON and parse it as a dictionarydf_from_dataframe=json.loads(dataframe.to_json(orient="records"))# Check if they are equalis_equal=df_from_json==df_from_dataframeifis_equal:returnFalse# No need to sync if data is the sameelse:dataframe.to_json(filepath,orient="records",indent=4)returnTrue# Sync is needed if data is differentelse:dataframe.to_json(filepath,orient="records",indent=4)print(f"File created and data frame saved to {filepath}")returnTrue# Sync is needed if the file does not existdefget_dataframe_to_update(self,dataframe:pd.DataFrame,timestamp_filename:str,primary_key:str,timestamp_column_name:str,client_name:str=None,)->List[str]:""" Identify well IDs from the provided DataFrame that are new or have updated timestamps by comparing them with records in an existing JSON timestamp file. If discrepancies are found (i.e., new or modified well IDs), the function returns these IDs and updates the JSON timestamp file. Parameters: dataframe (pd.DataFrame): The current DataFrame containing well IDs and their corresponding timestamps. timestamp_filename (str): The name of the JSON file (excluding path) to store historical well ID timestamps. primary_key (str): The column name in the DataFrame that uniquely identifies each well (e.g., 'well_id'). timestamp_column_name (str): The column name in the DataFrame representing the timestamp for each well ID. client_name (str, optional): The name of the client used to construct the JSON file path. Defaults to None, in which case the object's `client_name` attribute is used. Returns: List[str]: A list of well IDs that are either new or have an updated timestamp in the DataFrame compared to the records in the JSON file. """client_name_to_use=(self.client_name.lower()ifclient_name==Noneelseclient_name)base_dir=os.path.dirname(os.path.abspath(__file__)).replace("aries_python_code","")folder_path=os.path.join(base_dir,"scheduler","company",client_name_to_use,"associated_files")os.makedirs(folder_path,exist_ok=True)filepath=os.path.join(folder_path,f"{timestamp_filename}.json")new_timestamp=json.loads(self._convert_dataframe_to_timestamp_json(dataframe,primary_key,timestamp_column_name))old_timestamp=json.load(open(filepath))ifos.path.exists(filepath)elseNonewell_ids_to_update=self._find_new_or_updated_well_ids(old_timestamp,new_timestamp,primary_key,timestamp_column_name)self._save_dataframe_timestamp_to_json(dataframe,filepath,primary_key,timestamp_column_name)returndataframe[dataframe[primary_key].isin(well_ids_to_update)]defsend_email(self,from_email:str,password:str,to_email:str,subject:str,body:str,smtp_server:str="smtp.office365.com",smtp_port:int=587,)->None:""" Sends an email using the specified SMTP server. Parameters: from_email (str): The sender's email address. password (str): The sender's email account password. to_email (str): The recipient's email address. subject (str): The subject of the email. body (str): The body of the email, which will be formatted as HTML. smtp_server (str, optional): The SMTP server address. Default is "smtp.office365.com". smtp_port (int, optional): The SMTP server port. Default is 587. Returns: None """message=f"Subject: {subject}\n\n{body}"formatted_body=body.replace("\n","<br>")html_body=f""" <html> <body> <p style="font-size:10px; font-family:Arial;">{formatted_body}</p> </body> </html> """message=f"Subject: {subject}\n"message+="MIME-Version: 1.0\n"message+="Content-Type: text/html\n\n"message+=html_bodywithsmtplib.SMTP(smtp_server,smtp_port)asserver:server.starttls()server.login(from_email,password)server.sendmail(from_email,to_email,message)defclean_deviation_survey_payload(self,data:List[Dict[str,Any]])->List[Dict[str,Any]]:""" Cleans a list of dictionaries by removing duplicates based on the 'md' value, ensuring the list is sorted in increasing order, and removing any entry where 'md' or 'tvd' includes a NaN value. Parameters: data (List[Dict[str, Any]]): A list of dictionaries, where each dictionary contains 'md' and 'tvd' keys. Returns: List[Dict[str, Any]]: A cleaned and sorted list of dictionaries based on 'md'. """# Remove entries with NaN values for 'md' or 'tvd'data=[entryforentryindataifnot(pd.isna(entry["md"])orpd.isna(entry["tvd"]))]# Convert 'md' and 'tvd' to floatsdata=[{"md":float(entry["md"]),"tvd":float(entry["tvd"])}forentryindata]unique_data={}forentryindata:ifentry["md"]notinunique_data:unique_data[entry["md"]]=entrycleaned_data=sorted(unique_data.values(),key=lambdax:x["md"])returncleaned_datadefget_new_well_ids_2(self,unique_well_ids_df:pd.DataFrame,json_filename:str,client_name:str=None,)->List[dict]:""" Returns a list of new well IDs not already present in the saved JSON file for the specified client. Overwrites the JSON file with the new set of well IDs. Parameters: - unique_well_ids_df (pd.DataFrame): DataFrame containing the queried well IDs. - client_name (str): The name of the client for directory construction. - json_filename (str): The base name of the JSON file where well IDs are stored. Returns: - list: A list of new well IDs. Returns an empty list if no new well IDs are detected. """client_name_to_use=(self.client_name.lower()ifclient_name==Noneelseclient_name)# Determine the full JSON filepathfilepath=os.path.join(os.path.dirname(os.path.abspath(__file__)).replace("aries_python_code",""),f"scheduler/company/{client_name_to_use}/associated_files/{json_filename}.json",)# Load existing well IDs from JSON if the file exists, else initialize an empty listifos.path.exists(filepath):withopen(filepath,"r")asfile:existing_well_ids=json.load(file)else:existing_well_ids=[]# Convert the new well IDs to a dictionary format for comparisonnew_well_ids=json.loads(unique_well_ids_df.to_json(orient="records"))# Identify only new well IDsnew_wells_only=[wellforwellinnew_well_idsifwellnotinexisting_well_ids]# Overwrite the JSON with the new well IDs (latest dataset)unique_well_ids_df.to_json(filepath,orient="records",indent=4)returnnew_wells_only# Returns list of new well IDs or empty list if nonedef_get_auth0_logs_old(self,per_page:int=50,page:int=0)->requests.Response:""" Retrieve log events from Auth0 Management API. :param per_page: Number of logs per page (default: 50, max: 100). :param page: Page number for pagination (default: 0). Example function call: response = whitson_connection.get_auth0_logs(per_page=100, page=1) """url=f"https://{self.client_name}/api/v2/logs"params={"per_page":per_page,"page":page,"sort":"date:-1","type":"ssa"}headers={"Content-Type":"application/json","Authorization":f"Bearer {self.access_token}",}response=requests.get(url,headers=headers,params=params)ifresponse.status_code==200:print(f"Successfully retrieved {len(response.json())} log entries.")returnresponse.json()else:print("Failed to retrieve logs - ",response.text)returnresponse.json()fromdatetimeimportdatetimedef_get_auth0_logs(self,per_page:int=50)->list:""" Retrieve log events from Auth0 Management API with automatic pagination, filtering only logs from today. :param per_page: Number of logs per page (default: 50, max: 100). This function retrieves logs from today by filtering with the "q" parameter. Example function call: logs = whitson_connection.get_auth0_logs(per_page=100) """# Get today's date in UTCtoday=datetime.datetime.utcnow().date()today_start=f"{today}T00:00:00.000Z"today_end=f"{today}T23:59:59.999Z"url=f"https://{self.client_name}/api/v2/logs"params={"per_page":per_page,"sort":"date:-1","type":"ssa","q":f"date:[{today_start} TO {today_end}]",# Filter logs for today}headers={"Content-Type":"application/json","Authorization":f"Bearer {self.access_token}",}all_logs=[]whileurl:response=requests.get(url,headers=headers,params=paramsif"?"inurlelseNone)ifresponse.status_code!=200:print("Failed to retrieve logs -",response.text)breaklogs=response.json()all_logs.extend(logs)print(f"Retrieved {len(logs)} log entries. Total: {len(all_logs)}")# Extract 'next' URL from Link headerlink_header=response.headers.get("Link","")next_url=Noneiflink_header:links=link_header.split(", ")forlinkinlinks:if'rel="next"'inlink:next_url=link.split(";")[0].strip("<>")breakurl=next_url# Set next URL or exit if Nonereturnall_logsdefget_new_well_ids(self,unique_well_ids_df:pd.DataFrame,json_filename:str,client_name:str=None,)->List[str]:""" Returns a list of new well IDs not already present in the saved JSON file for the specified client. Updates the JSON file with the new set of unique well IDs, storing one well ID per line. Parameters: - unique_well_ids_df (pd.DataFrame): DataFrame containing the queried well IDs (must include a 'well_id' column). - json_filename (str): The base name of the JSON file where well IDs are stored. - client_name (str, optional): The name of the client for directory construction. Defaults to `self.client_name`. Returns: - List[str]: A list of new well IDs. Returns an empty list if no new well IDs are detected. """if"well_id"notinunique_well_ids_df.columns:raiseValueError("The DataFrame must contain a 'well_id' column.")client_name_to_use=(self.client_name.lower()ifclient_nameisNoneelseclient_name)# Determine the full JSON filepathfilepath=os.path.join(os.path.dirname(os.path.abspath(__file__)).replace("aries_python_code",""),f"scheduler/company/{client_name_to_use}/associated_files/{json_filename}.json",)# Load existing well IDs from JSON if the file existsifos.path.exists(filepath):withopen(filepath,"r")asfile:existing_well_ids=set(json.load(file))else:existing_well_ids=set()# Extract unique well IDs from the DataFramenew_well_ids=set(unique_well_ids_df["well_id"].astype(str))# Ensure all well_ids are strings# Identify only new well IDsnew_wells_only=list(new_well_ids-existing_well_ids)# Update the JSON file with the latest unique well IDsupdated_well_ids=list(existing_well_ids.union(new_well_ids))withopen(filepath,"w")asfile:json.dump(updated_well_ids,file,indent=4)returnnew_wells_onlydef_find_decimals(self,data):ifisinstance(data,dict):forkey,valueindata.items():ifisinstance(value,decimal.Decimal):print(f"Decimal found at key '{key}' with value {value}")elifisinstance(value,dict):self._find_decimals(value)elifisinstance(value,list):foriteminvalue:self._find_decimals(item)elifisinstance(data,list):foritemindata:self._find_decimals(item)def_get_h_f(self,well):"""Determine the appropriate h_f value based on the condition of h and h_f fields."""return(well["h"]ifpd.notnull(well["h"])andwell["h"]<well["h_f"]elsewell["h_f"])def_get_percentage(self,value,default=30):"""Convert value to percentage if less than 1, otherwise use as-is. Return default if value is NaN."""return(float(value)*100ifpd.notna(value)andfloat(value)<=1else(float(value)ifpd.notna(value)elsedefault))def_normalize_porosity(self,value,default=0.05):"""Convert porosity to decimal if greater than 1, otherwise use as-is. Return default if value is NaN."""return(float(value)/100ifpd.notna(value)andfloat(value)>1else(float(value)ifpd.notna(value)elsedefault))def_convert_temperature(self,value,threshold=400):"""Convert temperature from Fahrenheit to Celsius if above threshold. Return as-is if NaN or below threshold."""returnvalue-459.67ifpd.notnull(value)andvalue>thresholdelsevaluedef_is_date_in_list(self,date,list_of_dicts):"""Checks if date exists in any dictionary in list_of_dicts."""returndatein{d["date"]fordinlist_of_dicts}## Done!## Done!## Last updated - 13 Jan 2025
We'll routinely add to this class to improve the usability of the endpoints and simplify eAPI workflows further. Feel free to send your feedback/wishlist at support@whitson.com.
Getting Started
After downloading the helper class above, you can import it into your script as shown below. You can request us (at support@whitson.com) to provide the client id and client secret or the key contact in your company, who confidentially maintains the client id and client secret values used for your company. This will be used to generate an authorization token which can be used for 24 hours, so please be mindful about the number of times you use the get_access_token() function on line 46.
The script below is an example of how to create a WhitsonConnection class object and use one of the functions to retrieve all the well header information stored in the database for all the wells in the project.
importwhitson_connect# To fillCLIENT="your_domain_here"#This is the company suffix in your whitson urls ex. 'courses' in courses.whitson.comCLIENT_ID="your_client_id_here"# Available on requestCLIENT_SECRET="your_client_secret"# Available on requestPROJECT_ID="your_project_id_here"# This is the numeric value of the project, available from the URL like '638' in https://courses.whitson.com/fields/5/projects/638/## Creating a WhitsonConnection class objectwhitson_connection=whitson_connect.WhitsonConnection(CLIENT,CLIENT_ID,CLIENT_SECRET)## Issuing an authorization token here - automatically saves the access token in the working directory so only one token is issued every 24 hours.whitson_connection.access_token=whitson_connection.get_access_token_smart()## Get all the wells in the projectwhitson_wells=whitson_connection.get_wells(PROJECT_ID)print(whitson_wells)