猿问

如何使用python pandas处理传入的实时数据

哪一种是使用熊猫处理实时传入数据的最推荐/ Python方法?


每隔几秒钟,我就会收到以下格式的数据点:


{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',

 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

我想将其附加到现有的DataFrame上,然后对其进行一些分析。


问题是,仅将DataFrame.append添加到行中可能会导致所有复制的性能问题。

我尝试过的事情:

一些人建议预分配一个大的DataFrame并在数据输入时对其进行更新:


In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)


In [2]: columns = ['high', 'low', 'open', 'close']


In [3]: df = pd.DataFrame(index=t, columns=columns)


In [4]: df

Out[4]: 

                    high  low open close

2013-01-01 00:00:00  NaN  NaN  NaN   NaN

2013-01-01 00:00:01  NaN  NaN  NaN   NaN

2013-01-01 00:00:02  NaN  NaN  NaN   NaN

2013-01-01 00:00:03  NaN  NaN  NaN   NaN

2013-01-01 00:00:04  NaN  NaN  NaN   NaN


In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}


In [6]: data_ = pd.Series(data)


In [7]: df.loc[data['time']] = data_


In [8]: df

Out[8]: 

                    high  low open close

2013-01-01 00:00:00  NaN  NaN  NaN   NaN

2013-01-01 00:00:01  NaN  NaN  NaN   NaN

2013-01-01 00:00:02    4    3    2     1

2013-01-01 00:00:03  NaN  NaN  NaN   NaN

2013-01-01 00:00:04  NaN  NaN  NaN   NaN

另一种选择是建立字典列表。只需将传入的数据附加到列表中,然后将其切成较小的DataFrame,即可完成工作。


In [9]: ls = []


In [10]: for n in range(5):

   .....:     # Naive stuff ahead =)

   .....:     time = '2013-01-01 00:00:0' + str(n)

   .....:     d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}

   .....:     ls.append(d)


In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')


In [12]: df

Out[12]: 

                        close      high       low      open stock

time                                                             

2013-01-01 00:00:01  3.270078  1.008289  7.486118  2.180683  BLAH

2013-01-01 00:00:02  3.883586  2.215645  0.051799  2.310823  BLAH

或类似的东西,也许要多处理一些输入。


跃然一笑
浏览 230回答 3
3回答

阿波罗的战车

我将使用HDF5 / pytables如下:将数据尽可能长地保留为python列表。将结果追加到该列表。当它变大时:使用pandas io(和一个可附加的表)推送到HDF5 Store。清除列表。重复。实际上,我定义的函数为每个“键”使用一个列表,以便您可以在同一过程中将多个DataFrame存储到HDF5存储。我们定义一个函数,您需要在每一行中调用它d:CACHE = {}STORE = 'store.h5'   # Note: another option is to keep the actual file opendef process_row(d, key, max_len=5000, _cache=CACHE):    """    Append row d to the store 'key'.    When the number of items in the key's cache reaches max_len,    append the list of rows to the HDF5 store and clear the list.    """    # keep the rows for each key separate.    lst = _cache.setdefault(key, [])    if len(lst) >= max_len:        store_and_clear(lst, key)    lst.append(d)def store_and_clear(lst, key):    """    Convert key's cache list to a DataFrame and append that to HDF5.    """    df = pd.DataFrame(lst)    with pd.HDFStore(STORE) as store:        store.append(key, df)    lst.clear()注意:我们使用with语句在每次写入后自动关闭存储。它可以更快地保持开放,但即便如此我们建议您定期刷新(收盘刷新)。还要注意,使用collection deque而不是列表可能更易读,但是列表的性能在这里会稍好一些。要使用此功能,请致电:process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},            key="df")注意:“ df”是pytables存储中使用的存储键。作业完成后,请确保您store_and_clear剩余的缓存:for k, lst in CACHE.items():  # you can instead use .iteritems() in python 2    store_and_clear(lst, k)现在,您可以通过以下方式使用完整的DataFrame:with pd.HDFStore(STORE) as store:    df = store["df"]                    # other keys will be store[key]

慕斯王

您实际上是在尝试解决两个问题:捕获实时数据并分析该数据。第一个问题可以通过为此目的设计的Python日志记录来解决。然后可以通过读取相同的日志文件来解决另一个问题。
随时随地看视频慕课网APP

相关分类

Python
我要回答