Hi Harshita,
Sure, please adjusted the code as shown below;
import pytest
from unittest.mock import MagicMock, patch
import pandas as pd
from cognite.client import CogniteClient
c = CogniteClient()
def process_data(database, table, key):
row_list = c.raw.rows.list(database, table, limit=-1, columns=["key", "LastUpdatedDatetime"]).to_pandas()
if not row_list.empty:
val = list(row_list[row_list.index.isin([key])]['LastUpdatedDatetime'])
return val
else:
return None
class CogniteClient:
def __init__(self):
self.raw = MagicMock()
@pytest.mark.asyncio
async def test_your_function():
database = "your_database"
table = "your_table"
key = "your_key"
c = CogniteClient()
last_updated_datetime = "2023-03-16T17:51:54Z"
mock_list = MagicMock()
mock_list.return_value = [{"key": "your_key", "LastUpdatedDatetime": last_updated_datetime}]
mock_list.to_pandas.return_value = pd.DataFrame(mock_list.return_value)
with patch.object(c.raw.rows, 'list', return_value=mock_list):
result = process_data(database, table, key)
assert result == ["2023-03-16T17:51:54Z"]
I have tested the above code with our test environment and able to passed the unit mock test. Please change the LastUpdatedDatetime, database, table, key with your environment values.
plugins: asyncio-0.21.0, cov-3.0.0, rerunfailures-11.1.2, xdist-3.2.1
asyncio: mode=strict
collected 1 item
test_data_processor.py . [100%]
============================================================================================================ 1 passed in 12.33s ============================================================================================================