Sdílet prostřednictvím


Simulace vysokofrekvenčního obchodování pomocí Stream Analytics

Kombinace jazyka SQL s uživatelem definovanými funkcemi (UDF) a uživatelem definovanými agregacemi (UDA) JavaScriptu v Azure Stream Analytics umožňuje uživatelům provádět pokročilé analýzy. Pokročilá analýza může zahrnovat online trénování a vyhodnocování strojového učení a simulaci stavových procesů. Tento článek popisuje, jak provádět lineární regresi v úloze Azure Stream Analytics, která provádí průběžné trénování a vyhodnocování ve scénáři vysokofrekvenčního obchodování.

Vysokofrekvenční obchodování

Logický tok vysokofrekvenčního obchodování spočívá v:

  1. Získávání nabídek z burzy cenných papírů v reálném čase.
  2. Sestavení prediktivního modelu z těchto nabídek, abychom mohli předvídat pohyb cen.
  3. Zadávání pokynů k nákupu nebo prodeji, abychom na úspěšné predikci pohybu cen vydělali.

Ve výsledku potřebujeme:

  • Informační kanál nabídek v reálném čase.
  • Prediktivní model schopný pracovat s nabídkami v reálném čase.
  • Simulaci obchodování ukazující zisk nebo ztrátu algoritmu obchodování.

Informační kanál nabídek v reálném čase

Investor Exchange (IEX) nabízí bezplatnou nabídku v reálném čase a ptát se nabídek pomocí socket.io. Je možné napsat jednoduchý konzolový program, který přijímá nabídky v reálném čase a nabízí je jako zdroj dat do služby Azure Event Hubs. Následující kód představuje kostru programu. Kód pro zkrácení vynechává zpracování chyb. Do svého projektu také musíte zahrnout balíčky NuGet SocketIoClientDotNet a WindowsAzure.ServiceBus.

using Quobject.SocketIoClientDotNet.Client;
using Microsoft.ServiceBus.Messaging;
var symbols = "msft,fb,amzn,goog";
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
var socket = IO.Socket("https://ws-api.iextrading.com/1.0/tops");
socket.On(Socket.EVENT_MESSAGE, (message) =>
{
    eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes((string)message)));
});
socket.On(Socket.EVENT_CONNECT, () =>
{
    socket.Emit("subscribe", symbols);
});

Tady je několik vygenerovaných ukázkových událostí:

{"symbol":"MSFT","marketPercent":0.03246,"bidSize":100,"bidPrice":74.8,"askSize":300,"askPrice":74.83,volume":70572,"lastSalePrice":74.825,"lastSaleSize":100,"lastSaleTime":1506953355123,lastUpdated":1506953357170,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"GOOG","marketPercent":0.04825,"bidSize":114,"bidPrice":870,"askSize":0,"askPrice":0,volume":11240,"lastSalePrice":959.47,"lastSaleSize":60,"lastSaleTime":1506953317571,lastUpdated":1506953357633,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"MSFT","marketPercent":0.03244,"bidSize":100,"bidPrice":74.8,"askSize":100,"askPrice":74.83,volume":70572,"lastSalePrice":74.825,"lastSaleSize":100,"lastSaleTime":1506953355123,lastUpdated":1506953359118,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"FB","marketPercent":0.01211,"bidSize":100,"bidPrice":169.9,"askSize":100,"askPrice":170.67,volume":39042,"lastSalePrice":170.67,"lastSaleSize":100,"lastSaleTime":1506953351912,lastUpdated":1506953359641,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"GOOG","marketPercent":0.04795,"bidSize":100,"bidPrice":959.19,"askSize":0,"askPrice":0,volume":11240,"lastSalePrice":959.47,"lastSaleSize":60,"lastSaleTime":1506953317571,lastUpdated":1506953360949,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"FB","marketPercent":0.0121,"bidSize":100,"bidPrice":169.9,"askSize":100,"askPrice":170.7,volume":39042,"lastSalePrice":170.67,"lastSaleSize":100,"lastSaleTime":1506953351912,lastUpdated":1506953362205,"sector":"softwareservices","securityType":"commonstock"}
{"symbol":"GOOG","marketPercent":0.04795,"bidSize":114,"bidPrice":870,"askSize":0,"askPrice":0,volume":11240,"lastSalePrice":959.47,"lastSaleSize":60,"lastSaleTime":1506953317571,lastUpdated":1506953362629,"sector":"softwareservices","securityType":"commonstock"}

Poznámka:

Časové razítko události je hodnota lastUpdated v unixovém čase.

Prediktivní model pro vysokofrekvenční obchodování

Pro tuto ukázku používáme lineární model popsaný v tomto dokumentu.

VOI (Volume Order Imbalance) je funkce aktuální kupní/prodejní ceny a objemu a kupní/prodejní ceny a objemu z posledního impulzu. Studie identifikuje korelaci mezi funkcí VOI a budoucím pohybem ceny. Vytvoří lineární model mezi posledních pěti hodnotami VOI a změnou ceny v dalších 10 tiscích. Model se trénuje pomocí dat z předchozího dne a lineární regrese.

Natrénovaný model se pak použije k předvídání změny cen u nabídek v aktuálním obchodním dni v reálném čase. Pokud je předpokládaná změna ceny dostatečně velká, uzavře se obchod. V závislosti na nastavení prahové hodnoty se dají během trénovacího dne očekávat tisíce obchodů jedné akcie.

Definice nevyrovnanosti pořadí svazků

Teď vyjádříme operace trénování a predikce v úloze Azure Stream Analytics.

Nejprve se vyčistí vstupy. Unixový čas se prostřednictvím funkce DATEADD převede na datový typ datetime. Funkce TRY_CAST slouží k vynucení datových typů, aniž by došlo k selhání dotazu. Vždy je vhodné přetypovat vstupní pole na očekávané datové typy, takže při manipulaci nebo porovnání polí neexistuje žádné neočekávané chování.

WITH
typeconvertedquotes AS (
    /* convert all input fields to proper types */
    SELECT
        System.Timestamp AS lastUpdated,
        symbol,
        DATEADD(millisecond, CAST(lastSaleTime as bigint), '1970-01-01T00:00:00Z') AS lastSaleTime,
        TRY_CAST(bidSize as bigint) AS bidSize,
        TRY_CAST(bidPrice as float) AS bidPrice,
        TRY_CAST(askSize as bigint) AS askSize,
        TRY_CAST(askPrice as float) AS askPrice,
        TRY_CAST(volume as bigint) AS volume,
        TRY_CAST(lastSaleSize as bigint) AS lastSaleSize,
        TRY_CAST(lastSalePrice as float) AS lastSalePrice
    FROM quotes TIMESTAMP BY DATEADD(millisecond, CAST(lastUpdated as bigint), '1970-01-01T00:00:00Z')
),
timefilteredquotes AS (
    /* filter between 7am and 1pm PST, 14:00 to 20:00 UTC */
    /* clean up invalid data points */
	SELECT * FROM typeconvertedquotes
	WHERE DATEPART(hour, lastUpdated) >= 14 AND DATEPART(hour, lastUpdated) < 20 AND bidSize > 0 AND askSize > 0 AND bidPrice > 0 AND askPrice > 0
),

Dále použijeme funkci LAG k získání hodnot z posledního impulzu. Jedna hodina pro hodnotu LIMIT DURATION je zvolena náhodně. S ohledem na frekvenci nabídek se dá předpokládat, že předchozí impulz najdete v poslední hodině.

shiftedquotes AS (
    /* get previous bid/ask price and size in order to calculate VOI */
	SELECT
		symbol,
		(bidPrice + askPrice)/2 AS midPrice,
		bidPrice,
		bidSize,
		askPrice,
		askSize,
		LAG(bidPrice) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS bidPricePrev,
		LAG(bidSize) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS bidSizePrev,
		LAG(askPrice) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS askPricePrev,
		LAG(askSize) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS askSizePrev
	FROM timefilteredquotes
),

Následně můžeme vypočítat hodnotu VOI. Pro jistotu vyfiltrujeme hodnoty null, pokud předchozí impulz neexistuje.

currentPriceAndVOI AS (
    /* calculate VOI */
	SELECT
		symbol,
		midPrice,
		(CASE WHEN (bidPrice < bidPricePrev) THEN 0
            ELSE (CASE WHEN (bidPrice = bidPricePrev) THEN (bidSize - bidSizePrev) ELSE bidSize END)
         END) -
        (CASE WHEN (askPrice < askPricePrev) THEN askSize
            ELSE (CASE WHEN (askPrice = askPricePrev) THEN (askSize - askSizePrev) ELSE 0 END)
         END) AS VOI
	FROM shiftedquotes
	WHERE
		bidPrice IS NOT NULL AND
		bidSize IS NOT NULL AND
		askPrice IS NOT NULL AND
		askSize IS NOT NULL AND
		bidPricePrev IS NOT NULL AND
		bidSizePrev IS NOT NULL AND
		askPricePrev IS NOT NULL AND
		askSizePrev IS NOT NULL
),

Teď znovu použijeme funkci LAG k vytvoření sekvence se 2 po sobě jdoucími hodnotami VOI následovanými 10 po sobě jdoucími hodnotami střední ceny.

shiftedPriceAndShiftedVOI AS (
    /* get 10 future prices and 2 previous VOIs */
    SELECT
		symbol,
		midPrice AS midPrice10,
		LAG(midPrice, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice9,
		LAG(midPrice, 2) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice8,
		LAG(midPrice, 3) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice7,
		LAG(midPrice, 4) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice6,
		LAG(midPrice, 5) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice5,
		LAG(midPrice, 6) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice4,
		LAG(midPrice, 7) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice3,
		LAG(midPrice, 8) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice2,
		LAG(midPrice, 9) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice1,
		LAG(midPrice, 10) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice,
		LAG(VOI, 10) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI1,
		LAG(VOI, 11) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI2
	FROM currentPriceAndVOI
),

Následně data strukturujeme do vstupů pro lineární model se dvěma proměnnými. Znovu vyfiltrujeme události, pro které nemáme všechna data.

modelInput AS (
    /* create feature vector, x being VOI, y being delta price */
	SELECT
		symbol,
		(midPrice1 + midPrice2 + midPrice3 + midPrice4 + midPrice5 + midPrice6 + midPrice7 + midPrice8 + midPrice9 + midPrice10)/10.0 - midPrice AS y,
		VOI1 AS x1,
		VOI2 AS x2
	FROM shiftedPriceAndShiftedVOI
	WHERE
		midPrice1 IS NOT NULL AND
		midPrice2 IS NOT NULL AND
		midPrice3 IS NOT NULL AND
		midPrice4 IS NOT NULL AND
		midPrice5 IS NOT NULL AND
		midPrice6 IS NOT NULL AND
		midPrice7 IS NOT NULL AND
		midPrice8 IS NOT NULL AND
		midPrice9 IS NOT NULL AND
		midPrice10 IS NOT NULL AND
		midPrice IS NOT NULL AND
		VOI1 IS NOT NULL AND
		VOI2 IS NOT NULL
),

Vzhledem k tomu, že Azure Stream Analytics neobsahuje integrovanou funkci lineární regrese, koeficienty pro lineární model vypočítáme pomocí agregací SUM a AVG.

Matematický vzorec lineární regrese

modelagg AS (
    /* get aggregates for linear regression calculation,
     http://faculty.cas.usf.edu/mbrannick/regression/Reg2IV.html */
	SELECT
		symbol,
		SUM(x1 * x1) AS x1x1,
		SUM(x2 * x2) AS x2x2,
		SUM(x1 * y) AS x1y,
		SUM(x2 * y) AS x2y,
		SUM(x1 * x2) AS x1x2,
		AVG(y) AS avgy,
		AVG(x1) AS avgx1,
		AVG(x2) AS avgx2
	FROM modelInput
	GROUP BY symbol, TumblingWindow(hour, 24, -4)
),
modelparambs AS (
    /* calculate b1 and b2 for the linear model */
	SELECT
		symbol,
		(x2x2 * x1y - x1x2 * x2y)/(x1x1 * x2x2 - x1x2 * x1x2) AS b1,
		(x1x1 * x2y - x1x2 * x1y)/(x1x1 * x2x2 - x1x2 * x1x2) AS b2,
		avgy,
		avgx1,
		avgx2
	FROM modelagg
),
model AS (
    /* calculate a for the linear model */
	SELECT
		symbol,
		avgy - b1 * avgx1 - b2 * avgx2 AS a,
		b1,
		b2
	FROM modelparambs
),

Abychom pro vyhodnocení aktuálních událostí mohli použít model z předchozího dne, chceme nabídky spojit s modelem. Ale místo použití operátoru JOIN sjednotíme události modelu a události nabídek pomocí operátoru UNION. Potom pomocí funkce LAG spárujeme události s modelem z předchozího dne, abychom získali přesně jednu shodu. Kvůli víkendu se musíme podívat o tři dny zpět. Kdybychom použili přímé spojení operátorem JOIN, dostali bychom pro každou událost nabídky tři modely.

shiftedVOI AS (
    /* get two consecutive VOIs */
	SELECT
		symbol,
		midPrice,
		VOI AS VOI1,		
		LAG(VOI, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI2
	FROM currentPriceAndVOI
),
VOIAndModel AS (
    /* combine VOIs and models */
	SELECT
		'voi' AS type,
		symbol,
		midPrice,
		VOI1,
		VOI2,
        0.0 AS a,
        0.0 AS b1,
        0.0 AS b2
	FROM shiftedVOI
	UNION
	SELECT
		'model' AS type,
		symbol,
        0.0 AS midPrice,
        0 AS VOI1,
        0 AS VOI2,
		a,
		b1,
		b2
	FROM model
),
VOIANDModelJoined AS (
    /* match VOIs with the latest model within 3 days (72 hours, to take the weekend into account) */
	SELECT
		symbol,
		midPrice,
		VOI1 as x1,
		VOI2 as x2,
		LAG(a, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS a,
		LAG(b1, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS b1,
		LAG(b2, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS b2
	FROM VOIAndModel
	WHERE type = 'voi'
),

Teď můžeme provést předpovědi a na základě modelu vygenerovat signály pro nákup nebo prodej s prahovou hodnotou 0,02. Hodnota obchodu 10 znamená nákup. Hodnota obchodu −10 znamená prodej.

prediction AS (
    /* make prediction if there is a model */
	SELECT
		symbol,
		midPrice,
		a + b1 * x1 + b2 * x2 AS efpc
	FROM VOIANDModelJoined
	WHERE
		a IS NOT NULL AND
		b1 IS NOT NULL AND
		b2 IS NOT NULL AND
        x1 IS NOT NULL AND
        x2 IS NOT NULL
),
tradeSignal AS (
    /* generate buy/sell signals */
	SELECT
        DateAdd(hour, -7, System.Timestamp) AS time,
		symbol,		
		midPrice,
        efpc,
		CASE WHEN (efpc > 0.02) THEN 10 ELSE (CASE WHEN (efpc < -0.02) THEN -10 ELSE 0 END) END AS trade,
		DATETIMEFROMPARTS(DATEPART(year, System.Timestamp), DATEPART(month, System.Timestamp), DATEPART(day, System.Timestamp), 0, 0, 0, 0) as date
	FROM prediction
),

Simulace obchodování

Jakmile budeme mít obchodní signály, chceme otestovat efektivitu této strategie obchodování, aniž bychom skutečně obchodovali.

Tento test provedeme pomocí UDA se skákajícím oknem, kdy se skok provádí každou minutu. Seskupování k datu a klauzule Having umožňují okno pouze pro události, které patří do stejného dne. V případě skákajícího okna trvajícího dva dny se pomocí seskupení podle data operátorem GROUP BY toto seskupení rozdělí na předchozí den a aktuální den. Klauzule HAVING vyfiltruje okna, která končí aktuálním dnem, ale byla seskupená předchozí den.

simulation AS
(
    /* perform trade simulation for the past 7 hours to cover an entire trading day, and generate output every minute */
	SELECT
        DateAdd(hour, -7, System.Timestamp) AS time,
		symbol,
		date,
		uda.TradeSimulation(tradeSignal) AS s
	FROM tradeSignal
	GROUP BY HoppingWindow(minute, 420, 1), symbol, date
	Having DateDiff(day, date, time) < 1 AND DATEPART(hour, time) < 13
)

UDA JavaScriptu ve funkci init inicializuje všechny průběžné součty, při každém přidání události do okna vypočítá přechod stavu a na konci okna vrátí výsledky simulace. Obecný průběh obchodování je:

  • Nákup akcií při přijetí signálu nákupu a neexistuje žádný punčochový holding.
  • Prodej akcií, když se obdrží signál k prodeji a drží se akcie.
  • Krátké, pokud se nedrží zásoby.

Pokud je otevřená krátká pozice a obdrží se signál k nákupu, provedeme nákup a uzavřeme pozici. V této simulaci držíme nebo zkrachujeme 10 akcií. Náklady na transakci jsou ploché $8.

function main() {
	var TRADE_COST = 8.0;
	var SHARES = 10;
	this.init = function () {
		this.own = false;
		this.pos = 0;
		this.pnl = 0.0;
		this.tradeCosts = 0.0;
		this.buyPrice = 0.0;
		this.sellPrice = 0.0;
		this.buySize = 0;
		this.sellSize = 0;
		this.buyTotal = 0.0;
		this.sellTotal = 0.0;
	}
	this.accumulate = function (tradeSignal, timestamp) {
		if(!this.own && tradeSignal.trade == 10) {
		  // Buy to open
		  this.own = true;
		  this.pos = 1;
		  this.buyPrice = tradeSignal.midprice;
		  this.tradeCosts += TRADE_COST;
		  this.buySize += SHARES;
		  this.buyTotal += SHARES * tradeSignal.midprice;
		} else if(!this.own && tradeSignal.trade == -10) {
		  // Sell to open
		  this.own = true;
		  this.pos = -1
		  this.sellPrice = tradeSignal.midprice;
		  this.tradeCosts += TRADE_COST;
		  this.sellSize += SHARES;
		  this.sellTotal += SHARES * tradeSignal.midprice;
		} else if(this.own && this.pos == 1 && tradeSignal.trade == -10) {
		  // Sell to close
		  this.own = false;
		  this.pos = 0;
		  this.sellPrice = tradeSignal.midprice;
		  this.tradeCosts += TRADE_COST;
		  this.pnl += (this.sellPrice - this.buyPrice)*SHARES - 2*TRADE_COST;
		  this.sellSize += SHARES;
		  this.sellTotal += SHARES * tradeSignal.midprice;
		  // Sell to open
		  this.own = true;
		  this.pos = -1;
		  this.sellPrice = tradeSignal.midprice;
		  this.tradeCosts += TRADE_COST;
		  this.sellSize += SHARES;		  
		  this.sellTotal += SHARES * tradeSignal.midprice;
		} else if(this.own && this.pos == -1 && tradeSignal.trade == 10) {
		  // Buy to close
		  this.own = false;
		  this.pos = 0;
		  this.buyPrice = tradeSignal.midprice;
		  this.tradeCosts += TRADE_COST;
		  this.pnl += (this.sellPrice - this.buyPrice)*SHARES - 2*TRADE_COST;
		  this.buySize += SHARES;
		  this.buyTotal += SHARES * tradeSignal.midprice;
		  // Buy to open
		  this.own = true;
		  this.pos = 1;
		  this.buyPrice = tradeSignal.midprice;
		  this.tradeCosts += TRADE_COST;
		  this.buySize += SHARES;		  
		  this.buyTotal += SHARES * tradeSignal.midprice;
		}
	}
	this.computeResult = function () {
		var result = {
			"pnl": this.pnl,
			"buySize": this.buySize,
			"sellSize": this.sellSize,
			"buyTotal": this.buyTotal,
			"sellTotal": this.sellTotal,
			"tradeCost": this.tradeCost
			};
		return result;
	}
}

Nakonec odešleme výstup na řídicí panel Power BI, který zobrazí vizualizaci.

SELECT * INTO tradeSignalDashboard FROM tradeSignal /* output tradeSignal to PBI */
SELECT
    symbol,
    time,
    date,
    TRY_CAST(s.pnl as float) AS pnl,
    TRY_CAST(s.buySize as bigint) AS buySize,
    TRY_CAST(s.sellSize as bigint) AS sellSize,
    TRY_CAST(s.buyTotal as float) AS buyTotal,
    TRY_CAST(s.sellTotal as float) AS sellTotal
    INTO pnlDashboard
FROM simulation /* output trade simulation to PBI */

Obchoduje vizuál grafu Power BI

Vizuál grafu PNL Power BI

Shrnutí

Realistický model vysokofrekvenčního obchodování můžeme implementovat pomocí mírně složitého dotazu v Azure Stream Analytics. Vzhledem k chybějící integrované funkci lineární regrese musíme model zjednodušit a místo pěti vstupních proměnných použít dvě. Odhodlaný uživatel však možná dokáže jako UDA JavaScriptu implementovat i sofistikovanější algoritmy vyšších dimenzí.

Za zmínku stojí, že většinu dotazu, kromě UDA JavaScriptu, je možné testovat a ladit v sadě Visual Studio prostřednictvím nástrojů Azure Stream Analytics pro Visual Studio. Od napsání počátečního dotazu strávil autor testováním a laděním dotazu v sadě Visual Studio méně než 30 minut.

V současné době není možné UDA ladit v sadě Visual Studio. Pracujeme na tom, aby bylo možné projít kód JavaScriptu. Kromě toho pole, která dosáhnou UDA, mají malá písmena. Během testování dotazů to nebylo zřejmé chování. Díky úrovni kompatibility Azure Stream Analytics 1.1 však zachováváme velikosti písmen v názvech polí, takže je chování přirozenější.

Doufám, že tento článek poslouží jako inspirace pro všechny uživatele Azure Stream Analytics, kteří můžou naši službu využít k průběžnému provádění pokročilých analýz v reálném čase. Podělte se s námi o svůj názor, usnadníte nám tím implementaci dotazů pro scénáře pokročilých analýz.