-
-
Save lehoff/f44b776e734fa59058da3c016cd9ede9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -module(mrts). | |
| -compile([export_all]). | |
| %----------------------------------------------------------------------- | |
| % Buckets we know about | |
| %----------------------------------------------------------------------- | |
| bucket() -> | |
| {<<"ts_weather_demo">>,<<"ts_weather_demo">>}. | |
| locationbucket() -> | |
| locationbucket(mr). | |
| locationbucket(mr) -> | |
| {<<"locationupdateevents">>,<<"locationupdateevents">>}; | |
| locationbucket(lk) -> | |
| <<"locationupdateevents">>. | |
| geobucket() -> | |
| geobucket(mr). | |
| geobucket(mr) -> | |
| {<<"GeoCheckin">>,<<"GeoCheckin">>}; | |
| geobucket(lk) -> | |
| <<"GeoCheckin">>. | |
| %----------------------------------------------------------------------- | |
| % Use map reduce to count all keys in the bucket | |
| %----------------------------------------------------------------------- | |
| count(DevNo) -> | |
| count(geobucket(), DevNo). | |
| count(Bucket, DevNo) -> | |
| mrtsTest(Bucket, count, DevNo). | |
| %----------------------------------------------------------------------- | |
| % Use map reduce to return all keys in the bucket | |
| %----------------------------------------------------------------------- | |
| anonkeys() -> | |
| {ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
| riakc_pb_socket:mapred_bucket(Riak, geobucket(), []). | |
| keys(Method, DevNo) -> | |
| keys(Method, geobucket(Method), DevNo). | |
| keys(mr, Bucket, DevNo) -> | |
| mrtsTest(Bucket, keys, DevNo); | |
| keys(lk, Bucket, DevNo) -> | |
| Riak = pb_socket(DevNo), | |
| riakc_ts:stream_list_keys(Riak, Bucket, []), | |
| receive_keys([]). | |
| receive_keys(Keys) -> | |
| receive | |
| {_, {keys, KeyList}} -> | |
| receive_keys(Keys ++ KeyList); | |
| {_, done} -> | |
| Keys; | |
| Ret -> | |
| io:format(user, "Ret = ~p~n", [Ret]) | |
| end. | |
| %----------------------------------------------------------------------- | |
| % General MR request | |
| %----------------------------------------------------------------------- | |
| mrtsTest(Bucket, Stat, DevNo) -> | |
| Riak = pb_socket(DevNo), | |
| case Stat of | |
| keys -> | |
| {ok, [{0, Ret}]} = riakc_pb_socket:mapred_bucket(Riak, Bucket,[{map, {modfun, riak_kv_mapreduce, map_object_value},none,true}]), | |
| Ret; | |
| count -> | |
| {ok, [{1, [Nkey]}]} = | |
| riakc_pb_socket:mapred_bucket(Riak, Bucket, | |
| [{map, {qfun, fun(I,_,_) -> [1] end}, none,false}, | |
| {reduce, {modfun, riak_kv_mapreduce, reduce_sum}, none, true}]), | |
| Nkey | |
| end. | |
| pb_socket(DevNo) -> | |
| {ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", dev_port(DevNo)), | |
| Riak. | |
| dev_port(DevNo) -> | |
| 10017 + (DevNo-1)*10. | |
| %----------------------------------------------------------------------- | |
| % Issue a formatted query | |
| %----------------------------------------------------------------------- | |
| query(Tmin, Tmax, Weather, Fam) -> | |
| "SELECT * FROM ts_weather_demo " | |
| "WHERE time >= " ++ integer_to_list(Tmin) ++ " AND time <= " ++ integer_to_list(Tmax) ++ | |
| "AND weather = '" ++ Weather ++ "' " ++ | |
| "AND family = '" ++ Fam ++ "' ". | |
| %----------------------------------------------------------------------- | |
| % Issue an arbitrary query | |
| %----------------------------------------------------------------------- | |
| query(location) -> | |
| Q = "SELECT * FROM locationupdateevents " | |
| "WHERE tstamp >= 1416321188291 AND tstamp <= 1416321228319 " | |
| "AND userid = 'anon:' " | |
| "AND eventid = 'a11fbcae-23bf-4535-b3f8-e5013b35e366' ", | |
| query(Q); | |
| query(default) -> | |
| Q = "SELECT * FROM ts_weather_demo " | |
| "WHERE time >= 0 AND time <= 400 " | |
| "AND weather = 'crap' " | |
| "AND family = 'family' ", | |
| query(Q); | |
| query(native) -> | |
| Q = "SELECT * FROM ts_weather_demo " | |
| "WHERE time >= 0 AND time <= 400 " | |
| "AND weather = 'native' " | |
| "AND family = 'family' ", | |
| query(Q); | |
| query(nonnative) -> | |
| Q = "SELECT * FROM ts_weather_demo " | |
| "WHERE time >= 0 AND time <= 400 " | |
| "AND weather = 'non-native' " | |
| "AND family = 'family' ", | |
| query(Q); | |
| query(Query) -> | |
| io:format("Executing query: ~p~n", [Query]), | |
| {ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
| riakc_ts:query(Riak, Query). | |
| query(C, Query) -> | |
| io:format("Executing query: ~p~n", [Query]), | |
| riakc_ts:query(C, Query). | |
| %----------------------------------------------------------------------- | |
| % List buckets | |
| %----------------------------------------------------------------------- | |
| listBuckets() -> | |
| {ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 8087), | |
| riakc_pb_socket:list_buckets(Riak). | |
| getClient(UseNativeEncoding) -> | |
| C = pb_socket(1), | |
| riakc_pb_socket:use_native_encoding(C, UseNativeEncoding), | |
| C. | |
| putC(C) -> | |
| Data = [[<<"test">>, 54.0, <<"family">>, 0.2, 100, 500.0]], | |
| riakc_ts:put(C, <<"ts_weather_demo">>, Data). | |
| putTs(UseNativeEncoding) -> | |
| {ok, C} = riakc_pb_socket:start_link("127.0.0.1", 8087), | |
| riakc_pb_socket:use_native_encoding(C, UseNativeEncoding), | |
| case UseNativeEncoding of | |
| true -> | |
| Data = [[<<"native">>, <<"family">>, 100, 54.0, 0.2, 500.0]]; | |
| _ -> | |
| Data = [[<<"non-native">>, <<"family">>, 100, 54.0, 0.2, 500.0]] | |
| end, | |
| riakc_ts:put(C, <<"ts_weather_demo">>, Data). | |
| putTestKeys() -> | |
| {ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 8087), | |
| riakc_pb_socket:put(Riak, riakc_obj:new(<<"TestBucket">>, <<"myKey1">>, <<"val1">>)), | |
| riakc_pb_socket:put(Riak, riakc_obj:new(<<"TestBucket">>, <<"myKey2">>, <<"val2">>)), | |
| riakc_pb_socket:put(Riak, riakc_obj:new(<<"TestBucket">>, <<"myKey3">>, <<"val3">>)). | |
| listLength(L) -> | |
| docount(L). | |
| docount(L) -> | |
| docount(L, 0). | |
| docount({}, Acc) -> | |
| Acc; | |
| docount([], Acc) -> | |
| Acc; | |
| docount(L, Acc) -> | |
| [H|R] = L, | |
| docount(R,Acc+1). | |
| keycount(Method, _DevNo, 0, _Nexpected) -> | |
| ok; | |
| keycount(Method, DevNo, N, Nexpected) -> | |
| Nexpected = keycount(Method, DevNo), | |
| keycount(Method, DevNo, N-1, Nexpected). | |
| keycount(Method, DevNo) -> | |
| Keys = keys(Method, DevNo), | |
| docount(Keys). | |
| keylist(Method, DevNo) -> | |
| Keys = keys(Method, DevNo), | |
| {ok, Log} = file:open("keys.out", [append]), | |
| listkey(Keys, 0, Log, Method). | |
| listkey({}, Acc, Log, Method) -> | |
| Acc; | |
| listkey([], Acc, Log, Method) -> | |
| Acc; | |
| listkey(L, Acc, Log, mr) -> | |
| [H|R] = L, | |
| {_,Userid} = lists:nth(1,H), | |
| {_,Tstamp} = lists:nth(3,H), | |
| io:format(Log,"~s~s~n",[Userid,integer_to_list(Tstamp)]), | |
| listkey(R,Acc+1,Log,mr); | |
| listkey(L, Acc, Log, lk) -> | |
| [H|R] = L, | |
| T = tuple_to_list(H), | |
| Userid = lists:nth(1,T), | |
| Tstamp = lists:nth(3,T), | |
| io:format(Log,"~s~s~n",[Userid,integer_to_list(Tstamp)]), | |
| listkey(R,Acc+1,Log,lk). | |
| clientTest(pb) -> | |
| Cttb = getClient(true), | |
| Cpb = getClient(false), | |
| unlink(Cttb), | |
| unlink(Cpb), | |
| {Cpb, Cttb}; | |
| clientTest(ttb) -> | |
| Cpb = getClient(false), | |
| Cttb = getClient(true), | |
| unlink(Cttb), | |
| unlink(Cpb), | |
| {Cpb, Cttb}. | |
| pt() -> | |
| Tuple = process_info(self(), current_function), | |
| io:format("Process info = ~p~n", [Tuple]). | |
| dq() -> | |
| query("SELECT * FROM GeoCheckin WHERE time >= 0 AND time < 1000 AND myfamily = 'family1' AND myseries = 'seriesX'"). | |
| dq(_UseNative) -> | |
| {ok, C} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
| %% riakc_pb_socket:use_native_encoding(C, UseNative), | |
| Query = "SELECT * FROM GeoCheckin WHERE time >= 0 AND time < 1000 AND myfamily = 'family1' AND myseries = 'seriesX'", | |
| query(C, Query). | |
| dp(UseNative) -> | |
| dp(UseNative, 1). | |
| dp(_UseNative, Time) -> | |
| {ok, C} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
| %% riakc_pb_socket:use_native_encoding(C, UseNative), | |
| qfPutC(C, Time). | |
| dp(UseNativeInit, UseNativeSend, Time) -> | |
| {ok, C} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
| riakc_pb_socket:use_native_encoding(C, UseNativeInit), | |
| qfPutC(C, UseNativeSend, Time). | |
| qfPutC(C, Time) -> | |
| Data = [[<<"family1">>, <<"seriesX">>, Time, 1, <<"test1">>, 1.0, true]], | |
| riakc_ts:put(C, <<"GeoCheckin">>, Data). | |
| qfPutC(C, UseNativeSend, Time) -> | |
| Data = [[<<"family1">>, <<"seriesX">>, Time, 1, <<"test1">>, 1.0, true]], | |
| riakc_ts:put(C, <<"GeoCheckin">>, Data, UseNativeSend). | |
| queryfailtest() -> | |
| Data = [[<<"family1">>, <<"seriesX">>, 100, 1, <<"test1">>, 1.0, true]], | |
| {ok, Cttb} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
| riakc_pb_socket:use_native_encoding(Cttb, true), | |
| riakc_ts:put(Cttb, <<"GeoCheckin">>, Data), | |
| {ok, Cpb} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
| riakc_pb_socket:use_native_encoding(Cpb, false), | |
| riakc_ts:put(Cpb, <<"GeoCheckin">>, Data), | |
| riakc_ts:put(Cttb, <<"GeoCheckin">>, Data). | |
| append_file(Filename, Bytes) -> | |
| case file:open(Filename, [append]) of | |
| {ok, IoDevice} -> | |
| file:write(IoDevice, Bytes), | |
| file:close(IoDevice); | |
| {error, Reason} -> | |
| io:format("~s open error reason:~s~n", [Filename, Reason]) | |
| end. | |
| getids(CoverageVNodes) -> | |
| [riak_core_coverage_plan:index_to_id(X,64) || {X, _} <- CoverageVNodes]. | |
| putNormal() -> | |
| C = getClient(), | |
| Key = <<"test">>, | |
| Data = [[<<"family1">>, <<"seriesX">>, 100, 1, <<"test1">>, 1.0, true]], | |
| Obj = riakc_obj:new({<<"GeoCheckin">>,<<"GeoCheckin">>}, Key, Data), | |
| riakc_pb_socket:put(C, Obj). | |
| putTs() -> | |
| C = getClient(), | |
| Data = [[<<"family1">>, <<"seriesX">>, 100, 1, <<"test1">>, 1.0, true]], | |
| riakc_ts:put(C, <<"GeoCheckin">>, Data). | |
| %----------------------------------------------------------------------- | |
| % Return the node ids for the current ring | |
| %----------------------------------------------------------------------- | |
| getringnodes() -> | |
| {_,_,_,Tuple,_,_,_,_,_,_,_} = riak_core_ring:fresh(), | |
| {8, NodeList} = Tuple, | |
| [X || {X, _} <- NodeList]. | |
| %----------------------------------------------------------------------- | |
| % Get the ID of the vnode to which this Bucket,Key pair will be hashed | |
| %----------------------------------------------------------------------- | |
| getnodeid(Bucket, Key) -> | |
| riak_core_apl:get_primary_apl(riak_core_util:chash_key({Bucket, Key}), 1, riak_kv). | |
| %----------------------------------------------------------------------- | |
| % Find unique keys for the current ring partitions | |
| %----------------------------------------------------------------------- | |
| findkeys() -> | |
| findkeys(getringnodes()). | |
| %----------------------------------------------------------------------- | |
| % Find unique keys for the passed vnode ids | |
| %----------------------------------------------------------------------- | |
| findkeys(Nodes) -> | |
| findkeys(Nodes, Nodes, [], 0). | |
| findkeys(_Nodes, [], Keys, _Acc) -> | |
| Keys; | |
| findkeys(Nodes, RemainingNodes, Keys, Acc) -> | |
| Key = integer_to_binary(Acc), | |
| [{{NodeId,_},primary}] = getnodeid(Key), | |
| case lists:filter(fun(Elem) -> Elem == NodeId end, RemainingNodes) of | |
| [] -> | |
| NodesLeft = RemainingNodes, | |
| NewKeys = Keys; | |
| _ -> | |
| NodesLeft = lists:delete(NodeId, RemainingNodes), | |
| NewKeys = lists:append(Keys, [Key]) | |
| end, | |
| findkeys(Nodes, NodesLeft, NewKeys, Acc+1). | |
| %----------------------------------------------------------------------- | |
| % Write keys to the specified bucket (KV write) | |
| %----------------------------------------------------------------------- | |
| writeToKvPartitions(Bucket, Keys) -> | |
| C = getClient(), | |
| PutFun = | |
| fun(Key) -> | |
| Obj = riakc_obj:new(Bucket, Key, Key), | |
| ok = riakc_pb_socket:put(C, Obj) | |
| end, | |
| [PutFun(X) || X <- Keys]. | |
| %----------------------------------------------------------------------- | |
| % Write one key per partition to the specified bucket | |
| %----------------------------------------------------------------------- | |
| writeOneKeyPerKvPartition(Bucket) -> | |
| Keys = findkeys(), | |
| writeToKvPartitions(Bucket, Keys). | |
| %%======================================================================= | |
| %% Torben's additions below | |
| %%======================================================================= | |
| %% adapted from riak_test ts_cluster_coverage | |
| quantum_ms() -> | |
| 15*60*1000. | |
| step() -> | |
| 3124. | |
| upper_bound_excl(QuantaTally) -> | |
| QuantaTally * quantum_ms(). | |
| timestamps(QuantaTally) -> | |
| Bound = upper_bound_excl(QuantaTally), | |
| lists:seq(1, Bound-1, step()). | |
| timestamps() -> | |
| timestamps(100). | |
| %% key analysis | |
| key_timestamp({_,_,T}) -> T. | |
| timestamps_of_keys(Keys) -> | |
| lists:map( fun key_timestamp/1, Keys). | |
| timestamp_analysis(TSList) -> | |
| List = lists:sort(TSList), | |
| Expected = timestamps(), | |
| Duplicates = List -- Expected, | |
| UniqueDuplicates = lists:usort(Duplicates), | |
| Missing = Expected -- List, | |
| Unique = lists:usort(List), | |
| UniqueCount = length(Unique), | |
| DuplicatesCount = length(Duplicates), | |
| UniqueDuplicatesCount = length(UniqueDuplicates), | |
| MissingCount = length(Missing), | |
| ReceivedCount = length(List), | |
| [{received_count, ReceivedCount}, | |
| {unique_count, UniqueCount}, | |
| {missing_count, MissingCount}, | |
| {duplicates_count, DuplicatesCount}, | |
| {unique_duplicates_count, UniqueDuplicatesCount}, | |
| {missing, Missing}, | |
| {duplicates, Duplicates}, | |
| {unique_duplicates, UniqueDuplicates}]. | |
| get_keys_and_analyse(DevNo) -> | |
| TSList = mrts:timestamps_of_keys(mrts:keys(lk, DevNo)), | |
| timestamp_analysis(TSList). | |
| analysis(DurationInSecs) -> | |
| StartTime = current_time(), | |
| EndTime = end_time(StartTime, DurationInSecs), | |
| start_mrts_pick_dev(), | |
| Res = run_analysis(initial_stats(), EndTime), | |
| stop_mrts_pick_dev(), | |
| Res. | |
| run_once() -> | |
| Start = current_time(), | |
| DevNo = pick_dev(), | |
| Res = get_keys_and_analyse(DevNo), | |
| Hash = erlang:phash2(Res), | |
| {Hash, Res, DevNo, Start}. | |
| pick_dev() -> | |
| mrts_pick_dev ! {pick, self()}, | |
| receive | |
| DevNo -> | |
| DevNo | |
| end. | |
| devs() -> | |
| lists:seq(1,3). | |
| start_mrts_pick_dev() -> | |
| Pid = spawn( fun() -> pick_dev_loop(devs()) end ), | |
| register(mrts_pick_dev, Pid). | |
| stop_mrts_pick_dev() -> | |
| case whereis(mrts_pick_dev) of | |
| Pid when is_pid(Pid) -> | |
| exit(Pid, kill); | |
| _ -> | |
| ok | |
| end. | |
| pick_dev_loop([]) -> | |
| pick_dev_loop(shuffle(devs())); | |
| pick_dev_loop([Dev|Devs]) -> | |
| receive | |
| {pick, From} -> | |
| From ! Dev, | |
| pick_dev_loop(Devs) | |
| end. | |
| shuffle(L) -> | |
| [X || | |
| {_,X} <- lists:sort([ {random:uniform(), N} | |
| || N <- L])]. | |
| run_analysis(Stats, EndTime) -> | |
| {_,_,_, Start} = Res = run_once(), | |
| NewStats = update_stats(Stats, Res), | |
| case Start > EndTime of | |
| true -> | |
| NewStats; | |
| false -> | |
| run_analysis(NewStats, EndTime) | |
| end. | |
| initial_stats() -> | |
| E = orddict:new(), | |
| {E, E, E, E, E}. | |
| update_stats({HashRes, DevCounts, DevTimes, Counts, Times}, {Hash, Res, DevNo, Start}) -> | |
| NewHashRes = orddict:store(Hash, Res, HashRes), | |
| Key = {Hash, DevNo}, | |
| NewDevCounts = inc_count(Key, DevCounts), | |
| NewDevTimes = add_time(Key, Start, DevTimes), | |
| NewCounts = inc_count(Hash, Counts), | |
| NewTimes = add_time(Hash, Start, Times), | |
| {NewHashRes, NewDevCounts, NewDevTimes, NewCounts, NewTimes}. | |
| inc_count(Key, Counts) -> | |
| orddict:update_counter(Key, 1, Counts). | |
| add_time(Key, Start, Times) -> | |
| orddict:append(Key, Start, Times). | |
| current_time() -> | |
| erlang:localtime(). | |
| end_time(StartTime, DurationInSecs) -> | |
| S = calendar:datetime_to_gregorian_seconds(StartTime), | |
| E = S + DurationInSecs, | |
| calendar:gregorian_seconds_to_datetime(E). | |
| write_results(HashRes) -> | |
| List = orddict:to_list(HashRes), | |
| lists:foreach( fun write_res/1, | |
| HashRes ). | |
| write_res({Hash, Res}) -> | |
| Filename = io_lib:format("~B.txt", [Hash]), | |
| ResStr = io_lib:format("~p~n", [Res]), | |
| file:write_file(Filename, ResStr). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment