Simulace vysokofrekvenčního obchodování pomocí Stream Analytics
Kombinace jazyka SQL s uživatelem definovanými funkcemi (UDF) a uživatelem definovanými agregacemi (UDA) JavaScriptu v Azure Stream Analytics umožňuje uživatelům provádět pokročilé analýzy. Pokročilá analýza může zahrnovat online trénování a vyhodnocování strojového učení a simulaci stavových procesů. Tento článek popisuje, jak provádět lineární regresi v úloze Azure Stream Analytics, která provádí průběžné trénování a vyhodnocování ve scénáři vysokofrekvenčního obchodování.
Vysokofrekvenční obchodování
Logický tok vysokofrekvenčního obchodování spočívá v:
- Získávání nabídek z burzy cenných papírů v reálném čase.
- Sestavení prediktivního modelu z těchto nabídek, abychom mohli předvídat pohyb cen.
- Zadávání pokynů k nákupu nebo prodeji, abychom na úspěšné predikci pohybu cen vydělali.
Ve výsledku potřebujeme:
- Informační kanál nabídek v reálném čase.
- Prediktivní model schopný pracovat s nabídkami v reálném čase.
- Simulaci obchodování ukazující zisk nebo ztrátu algoritmu obchodování.
Informační kanál nabídek v reálném čase
Investor Exchange (IEX) nabízí bezplatnou nabídku v reálném čase a ptát se nabídek pomocí socket.io. Je možné napsat jednoduchý konzolový program, který přijímá nabídky v reálném čase a nabízí je jako zdroj dat do služby Azure Event Hubs. Následující kód představuje kostru programu. Kód pro zkrácení vynechává zpracování chyb. Do svého projektu také musíte zahrnout balíčky NuGet SocketIoClientDotNet a WindowsAzure.ServiceBus.
using Quobject.SocketIoClientDotNet.Client;
using Microsoft.ServiceBus.Messaging;
var symbols = "msft,fb,amzn,goog";
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
var socket = IO.Socket("https://ws-api.iextrading.com/1.0/tops");
socket.On(Socket.EVENT_MESSAGE, (message) =>
{
eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes((string)message)));
});
socket.On(Socket.EVENT_CONNECT, () =>
{
socket.Emit("subscribe", symbols);
});
Tady je několik vygenerovaných ukázkových událostí:
{"symbol":"MSFT","marketPercent":0.03246,"bidSize":100,"bidPrice":74.8,"askSize":300,"askPrice":74.83,volume":70572,"lastSalePrice":74.825,"lastSaleSize":100,"lastSaleTime":1506953355123,lastUpdated":1506953357170,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"GOOG","marketPercent":0.04825,"bidSize":114,"bidPrice":870,"askSize":0,"askPrice":0,volume":11240,"lastSalePrice":959.47,"lastSaleSize":60,"lastSaleTime":1506953317571,lastUpdated":1506953357633,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"MSFT","marketPercent":0.03244,"bidSize":100,"bidPrice":74.8,"askSize":100,"askPrice":74.83,volume":70572,"lastSalePrice":74.825,"lastSaleSize":100,"lastSaleTime":1506953355123,lastUpdated":1506953359118,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"FB","marketPercent":0.01211,"bidSize":100,"bidPrice":169.9,"askSize":100,"askPrice":170.67,volume":39042,"lastSalePrice":170.67,"lastSaleSize":100,"lastSaleTime":1506953351912,lastUpdated":1506953359641,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"GOOG","marketPercent":0.04795,"bidSize":100,"bidPrice":959.19,"askSize":0,"askPrice":0,volume":11240,"lastSalePrice":959.47,"lastSaleSize":60,"lastSaleTime":1506953317571,lastUpdated":1506953360949,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"FB","marketPercent":0.0121,"bidSize":100,"bidPrice":169.9,"askSize":100,"askPrice":170.7,volume":39042,"lastSalePrice":170.67,"lastSaleSize":100,"lastSaleTime":1506953351912,lastUpdated":1506953362205,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"GOOG","marketPercent":0.04795,"bidSize":114,"bidPrice":870,"askSize":0,"askPrice":0,volume":11240,"lastSalePrice":959.47,"lastSaleSize":60,"lastSaleTime":1506953317571,lastUpdated":1506953362629,"sector":"softwareservices","securityType":"commonstock"}
Poznámka:
Časové razítko události je hodnota lastUpdated v unixovém čase.
Prediktivní model pro vysokofrekvenční obchodování
Pro tuto ukázku používáme lineární model popsaný v tomto dokumentu.
VOI (Volume Order Imbalance) je funkce aktuální kupní/prodejní ceny a objemu a kupní/prodejní ceny a objemu z posledního impulzu. Studie identifikuje korelaci mezi funkcí VOI a budoucím pohybem ceny. Vytvoří lineární model mezi posledních pěti hodnotami VOI a změnou ceny v dalších 10 tiscích. Model se trénuje pomocí dat z předchozího dne a lineární regrese.
Natrénovaný model se pak použije k předvídání změny cen u nabídek v aktuálním obchodním dni v reálném čase. Pokud je předpokládaná změna ceny dostatečně velká, uzavře se obchod. V závislosti na nastavení prahové hodnoty se dají během trénovacího dne očekávat tisíce obchodů jedné akcie.
Teď vyjádříme operace trénování a predikce v úloze Azure Stream Analytics.
Nejprve se vyčistí vstupy. Unixový čas se prostřednictvím funkce DATEADD převede na datový typ datetime. Funkce TRY_CAST slouží k vynucení datových typů, aniž by došlo k selhání dotazu. Vždy je vhodné přetypovat vstupní pole na očekávané datové typy, takže při manipulaci nebo porovnání polí neexistuje žádné neočekávané chování.
WITH
typeconvertedquotes AS (
/* convert all input fields to proper types */
SELECT
System.Timestamp AS lastUpdated,
symbol,
DATEADD(millisecond, CAST(lastSaleTime as bigint), '1970-01-01T00:00:00Z') AS lastSaleTime,
TRY_CAST(bidSize as bigint) AS bidSize,
TRY_CAST(bidPrice as float) AS bidPrice,
TRY_CAST(askSize as bigint) AS askSize,
TRY_CAST(askPrice as float) AS askPrice,
TRY_CAST(volume as bigint) AS volume,
TRY_CAST(lastSaleSize as bigint) AS lastSaleSize,
TRY_CAST(lastSalePrice as float) AS lastSalePrice
FROM quotes TIMESTAMP BY DATEADD(millisecond, CAST(lastUpdated as bigint), '1970-01-01T00:00:00Z')
),
timefilteredquotes AS (
/* filter between 7am and 1pm PST, 14:00 to 20:00 UTC */
/* clean up invalid data points */
SELECT * FROM typeconvertedquotes
WHERE DATEPART(hour, lastUpdated) >= 14 AND DATEPART(hour, lastUpdated) < 20 AND bidSize > 0 AND askSize > 0 AND bidPrice > 0 AND askPrice > 0
),
Dále použijeme funkci LAG k získání hodnot z posledního impulzu. Jedna hodina pro hodnotu LIMIT DURATION je zvolena náhodně. S ohledem na frekvenci nabídek se dá předpokládat, že předchozí impulz najdete v poslední hodině.
shiftedquotes AS (
/* get previous bid/ask price and size in order to calculate VOI */
SELECT
symbol,
(bidPrice + askPrice)/2 AS midPrice,
bidPrice,
bidSize,
askPrice,
askSize,
LAG(bidPrice) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS bidPricePrev,
LAG(bidSize) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS bidSizePrev,
LAG(askPrice) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS askPricePrev,
LAG(askSize) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS askSizePrev
FROM timefilteredquotes
),
Následně můžeme vypočítat hodnotu VOI. Pro jistotu vyfiltrujeme hodnoty null, pokud předchozí impulz neexistuje.
currentPriceAndVOI AS (
/* calculate VOI */
SELECT
symbol,
midPrice,
(CASE WHEN (bidPrice < bidPricePrev) THEN 0
ELSE (CASE WHEN (bidPrice = bidPricePrev) THEN (bidSize - bidSizePrev) ELSE bidSize END)
END) -
(CASE WHEN (askPrice < askPricePrev) THEN askSize
ELSE (CASE WHEN (askPrice = askPricePrev) THEN (askSize - askSizePrev) ELSE 0 END)
END) AS VOI
FROM shiftedquotes
WHERE
bidPrice IS NOT NULL AND
bidSize IS NOT NULL AND
askPrice IS NOT NULL AND
askSize IS NOT NULL AND
bidPricePrev IS NOT NULL AND
bidSizePrev IS NOT NULL AND
askPricePrev IS NOT NULL AND
askSizePrev IS NOT NULL
),
Teď znovu použijeme funkci LAG k vytvoření sekvence se 2 po sobě jdoucími hodnotami VOI následovanými 10 po sobě jdoucími hodnotami střední ceny.
shiftedPriceAndShiftedVOI AS (
/* get 10 future prices and 2 previous VOIs */
SELECT
symbol,
midPrice AS midPrice10,
LAG(midPrice, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice9,
LAG(midPrice, 2) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice8,
LAG(midPrice, 3) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice7,
LAG(midPrice, 4) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice6,
LAG(midPrice, 5) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice5,
LAG(midPrice, 6) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice4,
LAG(midPrice, 7) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice3,
LAG(midPrice, 8) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice2,
LAG(midPrice, 9) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice1,
LAG(midPrice, 10) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice,
LAG(VOI, 10) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI1,
LAG(VOI, 11) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI2
FROM currentPriceAndVOI
),
Následně data strukturujeme do vstupů pro lineární model se dvěma proměnnými. Znovu vyfiltrujeme události, pro které nemáme všechna data.
modelInput AS (
/* create feature vector, x being VOI, y being delta price */
SELECT
symbol,
(midPrice1 + midPrice2 + midPrice3 + midPrice4 + midPrice5 + midPrice6 + midPrice7 + midPrice8 + midPrice9 + midPrice10)/10.0 - midPrice AS y,
VOI1 AS x1,
VOI2 AS x2
FROM shiftedPriceAndShiftedVOI
WHERE
midPrice1 IS NOT NULL AND
midPrice2 IS NOT NULL AND
midPrice3 IS NOT NULL AND
midPrice4 IS NOT NULL AND
midPrice5 IS NOT NULL AND
midPrice6 IS NOT NULL AND
midPrice7 IS NOT NULL AND
midPrice8 IS NOT NULL AND
midPrice9 IS NOT NULL AND
midPrice10 IS NOT NULL AND
midPrice IS NOT NULL AND
VOI1 IS NOT NULL AND
VOI2 IS NOT NULL
),
Vzhledem k tomu, že Azure Stream Analytics neobsahuje integrovanou funkci lineární regrese, koeficienty pro lineární model vypočítáme pomocí agregací SUM a AVG.
modelagg AS (
/* get aggregates for linear regression calculation,
http://faculty.cas.usf.edu/mbrannick/regression/Reg2IV.html */
SELECT
symbol,
SUM(x1 * x1) AS x1x1,
SUM(x2 * x2) AS x2x2,
SUM(x1 * y) AS x1y,
SUM(x2 * y) AS x2y,
SUM(x1 * x2) AS x1x2,
AVG(y) AS avgy,
AVG(x1) AS avgx1,
AVG(x2) AS avgx2
FROM modelInput
GROUP BY symbol, TumblingWindow(hour, 24, -4)
),
modelparambs AS (
/* calculate b1 and b2 for the linear model */
SELECT
symbol,
(x2x2 * x1y - x1x2 * x2y)/(x1x1 * x2x2 - x1x2 * x1x2) AS b1,
(x1x1 * x2y - x1x2 * x1y)/(x1x1 * x2x2 - x1x2 * x1x2) AS b2,
avgy,
avgx1,
avgx2
FROM modelagg
),
model AS (
/* calculate a for the linear model */
SELECT
symbol,
avgy - b1 * avgx1 - b2 * avgx2 AS a,
b1,
b2
FROM modelparambs
),
Abychom pro vyhodnocení aktuálních událostí mohli použít model z předchozího dne, chceme nabídky spojit s modelem. Ale místo použití operátoru JOIN sjednotíme události modelu a události nabídek pomocí operátoru UNION. Potom pomocí funkce LAG spárujeme události s modelem z předchozího dne, abychom získali přesně jednu shodu. Kvůli víkendu se musíme podívat o tři dny zpět. Kdybychom použili přímé spojení operátorem JOIN, dostali bychom pro každou událost nabídky tři modely.
shiftedVOI AS (
/* get two consecutive VOIs */
SELECT
symbol,
midPrice,
VOI AS VOI1,
LAG(VOI, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI2
FROM currentPriceAndVOI
),
VOIAndModel AS (
/* combine VOIs and models */
SELECT
'voi' AS type,
symbol,
midPrice,
VOI1,
VOI2,
0.0 AS a,
0.0 AS b1,
0.0 AS b2
FROM shiftedVOI
UNION
SELECT
'model' AS type,
symbol,
0.0 AS midPrice,
0 AS VOI1,
0 AS VOI2,
a,
b1,
b2
FROM model
),
VOIANDModelJoined AS (
/* match VOIs with the latest model within 3 days (72 hours, to take the weekend into account) */
SELECT
symbol,
midPrice,
VOI1 as x1,
VOI2 as x2,
LAG(a, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS a,
LAG(b1, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS b1,
LAG(b2, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS b2
FROM VOIAndModel
WHERE type = 'voi'
),
Teď můžeme provést předpovědi a na základě modelu vygenerovat signály pro nákup nebo prodej s prahovou hodnotou 0,02. Hodnota obchodu 10 znamená nákup. Hodnota obchodu −10 znamená prodej.
prediction AS (
/* make prediction if there is a model */
SELECT
symbol,
midPrice,
a + b1 * x1 + b2 * x2 AS efpc
FROM VOIANDModelJoined
WHERE
a IS NOT NULL AND
b1 IS NOT NULL AND
b2 IS NOT NULL AND
x1 IS NOT NULL AND
x2 IS NOT NULL
),
tradeSignal AS (
/* generate buy/sell signals */
SELECT
DateAdd(hour, -7, System.Timestamp) AS time,
symbol,
midPrice,
efpc,
CASE WHEN (efpc > 0.02) THEN 10 ELSE (CASE WHEN (efpc < -0.02) THEN -10 ELSE 0 END) END AS trade,
DATETIMEFROMPARTS(DATEPART(year, System.Timestamp), DATEPART(month, System.Timestamp), DATEPART(day, System.Timestamp), 0, 0, 0, 0) as date
FROM prediction
),
Simulace obchodování
Jakmile budeme mít obchodní signály, chceme otestovat efektivitu této strategie obchodování, aniž bychom skutečně obchodovali.
Tento test provedeme pomocí UDA se skákajícím oknem, kdy se skok provádí každou minutu. Seskupování k datu a klauzule Having umožňují okno pouze pro události, které patří do stejného dne. V případě skákajícího okna trvajícího dva dny se pomocí seskupení podle data operátorem GROUP BY toto seskupení rozdělí na předchozí den a aktuální den. Klauzule HAVING vyfiltruje okna, která končí aktuálním dnem, ale byla seskupená předchozí den.
simulation AS
(
/* perform trade simulation for the past 7 hours to cover an entire trading day, and generate output every minute */
SELECT
DateAdd(hour, -7, System.Timestamp) AS time,
symbol,
date,
uda.TradeSimulation(tradeSignal) AS s
FROM tradeSignal
GROUP BY HoppingWindow(minute, 420, 1), symbol, date
Having DateDiff(day, date, time) < 1 AND DATEPART(hour, time) < 13
)
UDA JavaScriptu ve funkci init
inicializuje všechny průběžné součty, při každém přidání události do okna vypočítá přechod stavu a na konci okna vrátí výsledky simulace. Obecný průběh obchodování je:
- Nákup akcií při přijetí signálu nákupu a neexistuje žádný punčochový holding.
- Prodej akcií, když se obdrží signál k prodeji a drží se akcie.
- Krátké, pokud se nedrží zásoby.
Pokud je otevřená krátká pozice a obdrží se signál k nákupu, provedeme nákup a uzavřeme pozici. V této simulaci držíme nebo zkrachujeme 10 akcií. Náklady na transakci jsou ploché $8
.
function main() {
var TRADE_COST = 8.0;
var SHARES = 10;
this.init = function () {
this.own = false;
this.pos = 0;
this.pnl = 0.0;
this.tradeCosts = 0.0;
this.buyPrice = 0.0;
this.sellPrice = 0.0;
this.buySize = 0;
this.sellSize = 0;
this.buyTotal = 0.0;
this.sellTotal = 0.0;
}
this.accumulate = function (tradeSignal, timestamp) {
if(!this.own && tradeSignal.trade == 10) {
// Buy to open
this.own = true;
this.pos = 1;
this.buyPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.buySize += SHARES;
this.buyTotal += SHARES * tradeSignal.midprice;
} else if(!this.own && tradeSignal.trade == -10) {
// Sell to open
this.own = true;
this.pos = -1
this.sellPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.sellSize += SHARES;
this.sellTotal += SHARES * tradeSignal.midprice;
} else if(this.own && this.pos == 1 && tradeSignal.trade == -10) {
// Sell to close
this.own = false;
this.pos = 0;
this.sellPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.pnl += (this.sellPrice - this.buyPrice)*SHARES - 2*TRADE_COST;
this.sellSize += SHARES;
this.sellTotal += SHARES * tradeSignal.midprice;
// Sell to open
this.own = true;
this.pos = -1;
this.sellPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.sellSize += SHARES;
this.sellTotal += SHARES * tradeSignal.midprice;
} else if(this.own && this.pos == -1 && tradeSignal.trade == 10) {
// Buy to close
this.own = false;
this.pos = 0;
this.buyPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.pnl += (this.sellPrice - this.buyPrice)*SHARES - 2*TRADE_COST;
this.buySize += SHARES;
this.buyTotal += SHARES * tradeSignal.midprice;
// Buy to open
this.own = true;
this.pos = 1;
this.buyPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.buySize += SHARES;
this.buyTotal += SHARES * tradeSignal.midprice;
}
}
this.computeResult = function () {
var result = {
"pnl": this.pnl,
"buySize": this.buySize,
"sellSize": this.sellSize,
"buyTotal": this.buyTotal,
"sellTotal": this.sellTotal,
"tradeCost": this.tradeCost
};
return result;
}
}
Nakonec odešleme výstup na řídicí panel Power BI, který zobrazí vizualizaci.
SELECT * INTO tradeSignalDashboard FROM tradeSignal /* output tradeSignal to PBI */
SELECT
symbol,
time,
date,
TRY_CAST(s.pnl as float) AS pnl,
TRY_CAST(s.buySize as bigint) AS buySize,
TRY_CAST(s.sellSize as bigint) AS sellSize,
TRY_CAST(s.buyTotal as float) AS buyTotal,
TRY_CAST(s.sellTotal as float) AS sellTotal
INTO pnlDashboard
FROM simulation /* output trade simulation to PBI */
Shrnutí
Realistický model vysokofrekvenčního obchodování můžeme implementovat pomocí mírně složitého dotazu v Azure Stream Analytics. Vzhledem k chybějící integrované funkci lineární regrese musíme model zjednodušit a místo pěti vstupních proměnných použít dvě. Odhodlaný uživatel však možná dokáže jako UDA JavaScriptu implementovat i sofistikovanější algoritmy vyšších dimenzí.
Za zmínku stojí, že většinu dotazu, kromě UDA JavaScriptu, je možné testovat a ladit v sadě Visual Studio prostřednictvím nástrojů Azure Stream Analytics pro Visual Studio. Od napsání počátečního dotazu strávil autor testováním a laděním dotazu v sadě Visual Studio méně než 30 minut.
V současné době není možné UDA ladit v sadě Visual Studio. Pracujeme na tom, aby bylo možné projít kód JavaScriptu. Kromě toho pole, která dosáhnou UDA, mají malá písmena. Během testování dotazů to nebylo zřejmé chování. Díky úrovni kompatibility Azure Stream Analytics 1.1 však zachováváme velikosti písmen v názvech polí, takže je chování přirozenější.
Doufám, že tento článek poslouží jako inspirace pro všechny uživatele Azure Stream Analytics, kteří můžou naši službu využít k průběžnému provádění pokročilých analýz v reálném čase. Podělte se s námi o svůj názor, usnadníte nám tím implementaci dotazů pro scénáře pokročilých analýz.