Skip to content

ARIES Example

Establishing a connection between an ARIES database and whitson+ through our external API is a standard practice. While this example involves an ARIES database, the majority of the steps are applicable to connecting any data source to whitson+, such as an SQL database.

Auto Update Prod Data to whitson+: Workflow Overview

bhp input data model

Engineers spend 95% of their time uploading data. This example demonstrates an automated daily update of production data. The process adheres to a widely used structure: connecting to a database, verifying if the well is already uploaded to whitson+, and creating a new well with production data if it isn't. Alternatively, if the well exists, it appends the new production data to the existing entity. Let's explore the details!

Connect ARIES to whitson+: Python Example

Main Script: aries.py

What does the aries.py file do?

This file shows an example of how one connect to an ARIES database and a relevant whitson+ domain. The file uses a helper class provided below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
import pyodbc
import whitson_connect

# To fill
CLIENT = "your_domain_here"
CLIENT_ID = "your_client_id_here"
CLIENT_SECRET = "your_client_secret"
PROJECT_ID = "your_project_id_here"


whitson_connection = whitson_connect.WhitsonConnection(
    CLIENT, CLIENT_ID, CLIENT_SECRET, PROJECT_ID
)

# whitson_connection.access_token = whitson_connection.get_access_token() # ONLY RUN ONCE PER WORK SESSION
whitson_connection.access_token = "your_access_token_here"

# Set up the connection string
# Replace 'YourAccessDatabase.accdb' with the path to your Access database file
conn_str = r"DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=C:\\Bitbucket\\whitson-pvt-user-manual\\docs\\external_api\\aries_python_code\\AriesExample.mdb;"

# Note: If you're using a password-protected Access database, you need to include the UID and PWD parameters in the connection string to provide the username and password.
# conn_str = r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=YourAccessDatabase.accdb;UID=YourUsername;PWD=YourPassword;'

aries_connection = pyodbc.connect(conn_str)

cursor = aries_connection.cursor()

# In this example, the wellname is put in the lease column AC_PROPERTY table
cursor.execute(
    """
SELECT AD.propnum, AP.LEASE AS WellName
FROM AC_Daily AS AD
LEFT JOIN AC_PROPERTY AS AP ON AD.propnum = AP.propnum
UNION
SELECT AP.propnum, AP.LEASE AS WellName
FROM AC_PROPERTY AS AP
"""
)

aries_wells = cursor.fetchall()

whitson_wells = whitson_connection.get_wells()

# Create well if not in project already
for aries_well in aries_wells:
    propnum, wellname = aries_well
    if any(well["name"] == wellname for well in whitson_wells):
        print(f"Well with name '{wellname}' already exists.")
    else:
        well_info = {"project_id": PROJECT_ID, "name": wellname, "uwi_api": propnum}
        whitson_connection.create_well(payload=well_info)

# Get updated list after potential new wells are added
whitson_wells = whitson_connection.get_wells()

# Fetch daily production data from relevat ARIES table (AC_DAILY)
cursor.execute("SELECT * FROM AC_DAILY")

# The AC_DAILY table column headers must be mapped to relevant whitson+ production data types.
# In this example the headers are: ['PROPNUM', 'D_DATE', 'OIL', 'GAS', 'WATER', 'TBG', 'CSG', 'LINE_PRES', 'CHOKESIZE']
# print("Column Headers for AC_Daily: " + str([column[0] for column in cursor.description]))  # UNCOMMENT TO SEE COLUMN HEADERS FOR YOUR CASE

aries_prod_data_rows = cursor.fetchall()
production_payload = []
prior_propnum = aries_prod_data_rows[0][0]

for aries_prod_row in aries_prod_data_rows:
    propnum, date, qo, qg, qw, p_tubing, p_casing, p_line, chokesize = aries_prod_row
    this_prod_time = {
        "date": date.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if date else None,
        "qo_sc": max(0, qo) if qo is not None else None,
        "qg_sc": max(0, qg) if qg is not None else None,
        "qw_sc": max(0, qw) if qw is not None else None,
        "p_wf_measured": None,
        "p_tubing": max(0, p_tubing) if p_tubing is not None else None,
        "p_casing": max(0, p_casing) if p_casing is not None else None,
        "qg_gas_lift": None,
        "liquid_level": None,
        "choke_size": max(0, chokesize) if chokesize is not None else None,
        "line_pressure": max(0, p_line) if p_line is not None else None,
    }
    if propnum == prior_propnum:
        production_payload.append(this_prod_time)
    else:
        well_id = whitson_connection.get_well_id_by_propnum(whitson_wells, prior_propnum)
        whitson_connection.upload_production_to_well(
            well_id,
            {"production_data": production_payload},
            append_only=True,
        )
        production_payload = []
        production_payload.append(this_prod_time)
        prior_propnum = propnum

# Upload data for the last well
well_id = whitson_connection.get_well_id_by_propnum(whitson_wells, prior_propnum)
whitson_connection.upload_production_to_well(well_id, {"production_data": production_payload}, append_only=True,)

cursor.close()
aries_connection.close()

Helper Class: whitson_connect.py

What does the whitson_connect.py do?

This is a helper class used in the aries.py file above. These are the only two files required to run the outlined workflow. The class shows a few examples of the available endpoints. A complete list can be found here.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import http.client
import json
import requests


class WhitsonConnection:
    def __init__(self, client_name, client_id, client_secret, project_id):
        self.client_name = client_name
        self.client_id = client_id
        self.client_secret = client_secret
        self.project_id = project_id
        self.access_token = None

    def get_access_token(self):
        """
        Get a access token for a given work session.
        """
        conn = http.client.HTTPSConnection("whitson.eu.auth0.com")
        payload = {
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "audience": f"https://{self.client_name}.whitson.com/",
            "grant_type": "client_credentials",
        }

        headers = {"content-type": "application/json"}
        conn.request("POST", "/oauth/token", json.dumps(payload), headers)
        res = conn.getresponse()
        data = res.read()
        return json.loads(data.decode("utf-8")).get("access_token")

    def get_well_id_by_propnum(self, wells, propnum):
        """
        Get the well_id of a given propnum.
        """
        return next(
            (well["id"] for well in wells if well.get("uwi_api") == propnum), None
        )

    def get_fields(self):
        """
        Get all fields on domain.
        """
        base_url = f"https://{self.client_name}.whitson.com/api-external/v1/"
        response = requests.get(
            base_url + "fields",
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
        )
        res = response.json()
        if not res:
            raise Exception("no existing fields")
        return res

    def get_wells(self):
        """
        Get a list of wells in a project.
        """
        base_url = f"http://{self.client_name}.whitson.com/api-external/v1/wells"
        response = requests.get(
            base_url,
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            params={
                "project_id": self.project_id
            },
        )
        res = response.json()
        if not res:
            return []
        return res

    def get_projects(self, field_id: int | None = None):
        """
        Get a list of projects in field.
        """
        base_url = f"http://{self.client_name}.whitson.com/api-external/v1/fields/{field_id}/projects"
        response = requests.get(
            base_url,
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
        )
        res = response.json()
        if not res:
            raise Exception("no existing wells")
        return res

    def create_well(self, payload: dict) -> requests.Response:
        """
        Create a new well on a domain.
        """
        base_url = f"http://{self.client_name}.whitson.com/api-external/v1/wells"
        response = requests.post(
            base_url,
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            json=payload,
        )
        if response.status_code >= 200 and response.status_code < 300:
            print(f"successfully created well {payload['name']}")
        else:
            print(response.text)
        return response

    def create_project(self, field_id: int, payload: dict) -> requests.Response:
        """
        Create a new project on a domain.
        """
        base_url = f"http://{self.client_name}.whitson.com/api-external/v1/fields/{field_id}/projects"
        response = requests.post(
            base_url,
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            json=payload,
        )
        if response.status_code >= 200 and response.status_code < 300:
            print(f"successfully created project {payload['name']}")
        else:
            print(response.text)
        return response

    def upload_production_to_well(
        self,
        well_id: int,
        payload: list[dict],
        append_only: bool = False,
    ) -> requests.Response:
        """
        Upload production data to well.
        """
        response = requests.post(
            f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/production_data",
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            json=payload,
            params={"append_only": append_only},
        )
        if response.status_code >= 200 and response.status_code < 300:
            print(f"successfully updated production data on well {well_id}")
        else:
            print(response.text)
        return response

    def bulk_upload_production_to_well(self, payload: list[dict]) -> requests.Response:
        """
        Upload production to well.
        """
        response = requests.post(
            f"http://{self.client_name}.whitson.com/api-external/v1/wells/production_data",
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            json=payload,
        )
        if response.status_code >= 200 and response.status_code < 300:
            print("success")
        else:
            print(response.text)
        return response

    def get_production(
        self,
        well_id: int,
    ) -> requests.Response:
        """
        Get a list of wells.
        """
        response = requests.get(
            f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/production_data",
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
        )
        res = response.json()
        if not res:
            raise Exception("no existing wells")
        return res

    def edit_input_quick(self, well_id: int, payload: dict) -> requests.Response:
        """
        Edit the input quick (PVT) property of a well.
        """
        response = requests.put(
            f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/input_quick",
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            json=payload,
        )
        if response.status_code >= 200 and response.status_code < 300:
            print(f"successfully edited input quick for well {well_id}")
        else:
            print(response.text)
        return response

    def upload_well_data_to_well(
        self, well_id: int, payload: list[dict]
    ) -> requests.Response:
        """
        Upload a well data to a well.
        """
        base_url = f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input/well_data"
        response = requests.post(
            base_url,
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            json=payload,
        )
        if response.status_code >= 200 and response.status_code < 300:
            print(f"successfully updated well_data to well {well_id}")
        else:
            print(response.text)
        return response

    def edit_well_deviation_data(
        self, well_id: int, payload: list[dict]
    ) -> requests.Response:
        """
        Edit well deviation data of a well in the database.
        """
        base_url = f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/bhp_input/well_deviation_survey"
        response = requests.put(
            base_url,
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            json=payload,
        )
        if response.status_code >= 200 and response.status_code < 300:
            print(f"changed well deivation survey on well_id {well_id}")
        else:
            print(response.text)
        return response

    def run_composition_calc(self, well_id: int) -> requests.Response:
        """
        Run PVT (composition) calculation on well.
        """
        response = requests.get(
            f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/run_composition_calc",
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
        )
        if response.status_code >= 200 and response.status_code < 300:
            print(f"success on running composition calc on well {well_id}")
        else:
            print(response.text)
        return response

    def run_bhp_calc(self, well_id: dict) -> requests.Response:
        """
        Run bhp calculation on the well specified by the provided well_id.
        """
        base_url = f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/run_bhp_calculation"
        response = requests.get(
            base_url,
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
        )
        if response.status_code == 202:
            print(f"successfully ran bhp calc on {well_id}")
        else:
            print(response.text)
        return response

    def get_pwf_active(self, well_id: int, from_date: str = None) -> requests.Response:
        """
        Get active pwf from the database for the given well_id, from the start date in from_date.

        Example payload:
        this_well_id = integer
        this_from_date = "YYYY-MM-DD"

        Example function call:
        active_pwf_well = whitson_connection.get_pwf_active(this_well_id, this_from_date)
        """

        base_url = f"http://{self.client_name}.whitson.com/api-external/v1/wells/{well_id}/pwf_active"
        response = requests.get(
            base_url,
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            params={"from_date": from_date},
        )
        if response.status_code == 200:
            print(f"successfully received active pwf on {well_id}")
        else:
            print(response.text)
        return response.json()

    def get_pwf_active_multiple(self, payload: dict) -> requests.Response:
        """
        Get active pwf from the database for all the well_ids in the self.project_id from the start date in from_date

        Example payload:
        payload = {"from_date":"YYYY-MM-DD", "page": 0, "page_size": 10}

        Example function call:
        active_pwf_wells = self.get_pwf_active_multiple(payload)

        """

        base_url = f"http://{self.client_name}.whitson.com/api-external/v1/wells/pwf_active"
        response = requests.get(
            base_url,
            headers={
                "content-type": "application/json",
                "Authorization": f"Bearer {self.access_token}",
            },
            params=payload,
        )
        if response.status_code == 200:
            print(f"successfully recieved active pwf")
        elif response.status_code == 404:
            print(f"No wells found matching the payload criteria") 
        else: print(response.text)
        return response.json()

Windows Task Scheduler: Automating Daily Script Execution

A scheduler, such as the Windows Task Scheduler, is a tool vital for automating dataflow in and out of whitson+software upkeep. It enables the regular update of e.g. production data and the addition of new wells, ensuring the system stays current. The video above shows an example of how this done with Windows Task Scheduler.

Windows Task Scheduler

To run a Python script from the Windows Task Scheduler, follow these steps:

  1. Open Task Scheduler: Press Win + S, type "Task Scheduler," and hit Enter.
  2. Create a Basic Task: In the Actions pane, click on "Create Basic Task."
  3. Provide a Name and Description: Enter a name and description for your task, then click "Next."
  4. Choose Trigger: Select the trigger for your task. For example, choose "Daily" if you want to run the script every day. Click "Next."
  5. Set Trigger Details: Specify the start date, time, and recurrence pattern. Click "Next."
  6. Choose Action: Select "Start a Program" and click "Next."
  7. Specify Program/Script:
    • In the "Program/script" field, provide the path to your Python executable (e.g., C:\Path\to\python.exe
    • In the "Add arguments" field, provide the path to your Python script (e.g., C:\Path\to\your\script.py). Click "Next."
  8. Review Settings: Review your settings and click "Finish."

Now, your Python script will be executed automatically according to the schedule you specified. Ensure that the Python executable and your script paths are correct.

If you encounter any issues, you can check the Task Scheduler's "History" tab for information about the last run result. Additionally, make sure your Python script has the necessary permissions to access resources and that the Python interpreter is in the system's PATH or specify the full path to it.

Are there alternatives to Windows Task Scheduler?

Several alternatives to the Windows Task Scheduler exist, offering additional features and flexibility. Here are some popular alternatives:

Cron on Linux/Unix Cron is a time-based job scheduler in Unix-like operating systems. Users can schedule tasks using a cron syntax. It is particularly prevalent in Linux environments.

Task Scheduler on macOS Similar to the Windows Task Scheduler, macOS also has a built-in scheduler called launchd. It allows users to run tasks based on events or schedules.

Automate on macOS Automate is a built-in app on macOS that allows users to create scripts to automate tasks. It provides a graphical user interface for creating automation workflows.

cronie on Linux An extension of the cron daemon for Linux systems, providing additional features and compatibility with the traditional cron syntax. GNU at for Unix-like Systems Allows for one-time execution of commands at a specified time. Useful for ad-hoc scheduling.

Systemd Timers on Linux Systemd is a system and service manager for Linux, and it includes a timer component for scheduling tasks.

Third-party Task Scheduler Software Various third-party tools offer advanced features for task scheduling on different platforms. Examples include Task Scheduler Pro, VisualCron, and Advanced Task Scheduler.

Azure Logic Apps or Azure Functions In cloud environments like Azure, services like Logic Apps or Functions can be used for serverless task scheduling.

Choose the tool that best fits your requirements and integrates seamlessly with your workflow. The selection may depend on your operating system, specific scheduling needs, and the level of automation you seek.

Adding Logging to Python Script Run via Windows Task Scheduler

I found it somewhat hard to debug while using the Windows Task Scheduler, so found it very beneficial to use Python's logging library in tandem during the testing period.

Logging progress from a Python script running in the Windows Task Scheduler can be achieved by utilizing Python's logging module. Here's a basic example of how you can incorporate logging into your script:

1. Import the logging Module:

Add the following import statement at the beginning of your Python script:

1
import logging

2. Configure the Logging:

Configure the logging settings. You can do this in your script's if name == "main": block or at the beginning of your script. For example:

1
2
if __name__ == "__main__":
    logging.basicConfig(filename='script_log.txt', level=logging.DEBUG)
This configuration sets up logging to write messages to a file named script_log.txt and sets the logging level to DEBUG, which captures all messages.

3. Log Progress:

Use the logging functions to log progress at various points in your script. For example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def main():
    logging.info("Script execution started.")
    # Your main script logic here
    logging.debug("Some progress update.")
    # More script logic
    logging.debug("Another progress update.")
    # ...

if __name__ == "__main__":
    main()

You can use different logging levels (DEBUG, INFO, WARNING, ERROR, CRITICAL) based on the importance of the logged information.

4. Review the Log File:

After your script runs through the Task Scheduler, you can check the log file (script_log.txt in this example) for progress updates and any logged information.

Remember to adjust the logging configuration and levels based on your needs. You can customize the log file path, logging format, and other settings according to your preferences.

This logging approach provides a convenient way to capture progress and debug information during script execution, making it easier to diagnose any issues that may occur when the script is run through the Windows Task Scheduler.