Skip to content

Commit

Permalink
Merge pull request #73 from domoinc/ds-upsert
Browse files Browse the repository at this point in the history
upsert support
  • Loading branch information
jeremydmorris authored Jan 28, 2022
2 parents 94c33af + ea0b70a commit 2914688
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
20 changes: 8 additions & 12 deletions pydomo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pandas import to_datetime
from io import StringIO
import logging
import json

DOMO = """####################################################################################################
####################################################################################################
Expand Down Expand Up @@ -195,21 +196,16 @@ def ds_get(self, dataset_id):
df[col] = to_datetime(df[col])
except ValueError:
pass
except TypeError:
pass

return df

def ds_create(self, df_up, name, description=''):
dsr = DataSetRequest()
dsr.name = name
dsr.description = description
dsr.schema = Schema([Column(ColumnType.STRING, 'tt1'),
Column(ColumnType.STRING, 'tt2')])

new_ds_info = self.datasets.create(dsr)

self.utilities.stream_upload(new_ds_info['id'],df_up,warn_schema_change=False)

return new_ds_info['id']
def ds_create(self, df_up, name, description='', update_method='REPLACE', key_column_names=''):
new_stream = self.utilities.stream_create(df_up, name, description, update_method, key_column_names)
ds_id = json.loads(new_stream.content.decode('utf-8'))['dataSet']['id']
self.utilities.stream_upload(ds_id,df_up,warn_schema_change=False)
return ds_id


def ds_update(self, ds_id, df_up):
Expand Down
13 changes: 12 additions & 1 deletion pydomo/utilities/UtilitiesClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import math
import sys
import json

from pydomo.DomoAPIClient import DomoAPIClient
from pydomo.datasets import DataSetClient
Expand Down Expand Up @@ -89,4 +90,14 @@ def stream_upload(self, ds_id, df_up, warn_schema_change=True):

result = self.stream.commit_execution(stream_id, exec_id)

return result
return result

def stream_create(self, up_ds, name, description, updateMethod='REPLACE', keyColumnNames=''):
df_schema = self.data_schema(up_ds)
req_body = {'dataSet': {'name': name, 'description': description, 'schema': {'columns': df_schema}}, 'updateMethod': updateMethod}
if( updateMethod == 'UPSERT' ):
req_body['keyColumnNames'] = keyColumnNames
# return req_body
st_created = self.transport.post('/v1/streams/', req_body, {})
return(st_created)
return json.loads(st_created.content.decode('utf-8'))['dataSet']['id']

0 comments on commit 2914688

Please sign in to comment.